Skip to content

Commit

Permalink
ISPN-6919 Improve non-tx writes (triangle)
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and danberindei committed Dec 2, 2016
1 parent 77d47a4 commit 49bb4c6
Show file tree
Hide file tree
Showing 58 changed files with 2,733 additions and 405 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class MarshallUtil {
*/
public static <K, V, T extends Map<K, V>> void marshallMap(T map, ObjectOutput out) throws IOException {
final int mapSize = map == null ? NULL_VALUE : map.size();
marshallInt(out, mapSize);
marshallSize(out, mapSize);
if (mapSize <= 0) return;

for (Map.Entry<K, V> me : map.entrySet()) {
Expand All @@ -56,10 +56,10 @@ public static <K, V, T extends Map<K, V>> void marshallMap(T map, ObjectOutput o
* @return The populated {@link Map} created by the {@link MapBuilder} or {@code null}.
* @throws IOException If any of the usual Input/Output related exceptions occur.
* @throws ClassNotFoundException If the class of a serialized object cannot be found.
* @see {@link #marshallMap(Map, ObjectOutput)}
* @see #marshallMap(Map, ObjectOutput)
*/
public static <K, V, T extends Map<K, V>> T unmarshallMap(ObjectInput in, MapBuilder<K, V, T> builder) throws IOException, ClassNotFoundException {
final int size = unmarshallInt(in);
final int size = unmarshallSize(in);
if (size == NULL_VALUE) {
return null;
}
Expand Down Expand Up @@ -98,7 +98,7 @@ public static void marshallUUID(UUID uuid, ObjectOutput out, boolean checkNull)
* @param checkNull If {@code true}, it checks if the {@link UUID} marshalled was {@link null}.
* @return {@link UUID} marshalled.
* @throws IOException If any of the usual Input/Output related exceptions occur.
* @see {@link #marshallUUID(UUID, ObjectOutput, boolean)}.
* @see #marshallUUID(UUID, ObjectOutput, boolean).
*/
public static UUID unmarshallUUID(ObjectInput in, boolean checkNull) throws IOException {
if (checkNull && in.readBoolean()) {
Expand All @@ -119,7 +119,7 @@ public static UUID unmarshallUUID(ObjectInput in, boolean checkNull) throws IOEx
*/
public static <E> void marshallArray(E[] array, ObjectOutput out) throws IOException {
final int size = array == null ? NULL_VALUE : array.length;
marshallInt(out, size);
marshallSize(out, size);
if (size <= 0) {
return;
}
Expand All @@ -137,10 +137,10 @@ public static <E> void marshallArray(E[] array, ObjectOutput out) throws IOExcep
* @return The populated array.
* @throws IOException If any of the usual Input/Output related exceptions occur.
* @throws ClassNotFoundException If the class of a serialized object cannot be found.
* @see {@link #marshallArray(Object[], ObjectOutput)}.
* @see #marshallArray(Object[], ObjectOutput).
*/
public static <E> E[] unmarshallArray(ObjectInput in, ArrayBuilder<E> builder) throws IOException, ClassNotFoundException {
final int size = unmarshallInt(in);
final int size = unmarshallSize(in);
if (size == NULL_VALUE) {
return null;
} else if (size == 0) {
Expand All @@ -167,7 +167,7 @@ public static <E> E[] unmarshallArray(ObjectInput in, ArrayBuilder<E> builder) t
*/
public static <E> void marshallCollection(Collection<E> collection, ObjectOutput out) throws IOException {
final int size = collection == null ? NULL_VALUE : collection.size();
marshallInt(out, size);
marshallSize(out, size);
if (size <= 0) {
return;
}
Expand All @@ -188,7 +188,7 @@ public static <E> void marshallCollection(Collection<E> collection, ObjectOutput
* @throws ClassNotFoundException If the class of a serialized object cannot be found.
*/
public static <E, T extends Collection<E>> T unmarshallCollection(ObjectInput in, CollectionBuilder<E, T> builder) throws IOException, ClassNotFoundException {
final int size = unmarshallInt(in);
final int size = unmarshallSize(in);
if (size == NULL_VALUE) {
return null;
}
Expand All @@ -205,10 +205,10 @@ public static <E, T extends Collection<E>> T unmarshallCollection(ObjectInput in
* <p>
* Used when the size of the {@link Collection} is not needed for it construction.
*
* @see {@link #unmarshallCollection(ObjectInput, CollectionBuilder)}.
* @see #unmarshallCollection(ObjectInput, CollectionBuilder).
*/
public static <E, T extends Collection<E>> T unmarshallCollectionUnbounded(ObjectInput in, UnboundedCollectionBuilder<E, T> builder) throws IOException, ClassNotFoundException {
final int size = unmarshallInt(in);
final int size = unmarshallSize(in);
if (size == NULL_VALUE) {
return null;
}
Expand Down Expand Up @@ -245,7 +245,7 @@ public static void marshallString(String string, ObjectOutput out) throws IOExce
* @param in {@link ObjectInput} to read.
* @return The {@link String} or {@code null}.
* @throws IOException If any of the usual Input/Output related exceptions occur.
* @see {@link #marshallString(String, ObjectOutput)}.
* @see #marshallString(String, ObjectOutput).
*/
public static String unmarshallString(ObjectInput in) throws IOException {
if (in.readBoolean()) {
Expand All @@ -257,11 +257,11 @@ public static String unmarshallString(ObjectInput in) throws IOException {
/**
* Same as {@link #marshallArray(Object[], ObjectOutput)} but specialized for byte arrays.
*
* @see {@link #marshallArray(Object[], ObjectOutput)}.
* @see #marshallArray(Object[], ObjectOutput).
*/
public static void marshallByteArray(byte[] array, ObjectOutput out) throws IOException {
final int size = array == null ? NULL_VALUE : array.length;
marshallInt(out, size);
marshallSize(out, size);
if (size <= 0) {
return;
}
Expand All @@ -273,10 +273,10 @@ public static void marshallByteArray(byte[] array, ObjectOutput out) throws IOEx
* <p>
* No {@link ArrayBuilder} is necessary.
*
* @see {@link #unmarshallArray(ObjectInput, ArrayBuilder)}.
* @see #unmarshallArray(ObjectInput, ArrayBuilder).
*/
public static byte[] unmarshallByteArray(ObjectInput in) throws IOException {
final int size = unmarshallInt(in);
final int size = unmarshallSize(in);
if (size == NULL_VALUE) {
return null;
} else if (size == 0) {
Expand All @@ -291,7 +291,7 @@ public static byte[] unmarshallByteArray(ObjectInput in) throws IOException {
* A special marshall implementation for integer.
* <p>
* This method supports negative values but they are handles as {@link #NULL_VALUE}. It means that the real value is
* lost and {@link #NULL_VALUE} is returned by {@link #unmarshallInt(ObjectInput)}.
* lost and {@link #NULL_VALUE} is returned by {@link #unmarshallSize(ObjectInput)}.
* <p>
* The integer is marshalled in a variable length from 1 to 5 bytes. Negatives values are always marshalled in 1
* byte.
Expand All @@ -300,7 +300,7 @@ public static byte[] unmarshallByteArray(ObjectInput in) throws IOException {
* @param value Integer value to marshall.
* @throws IOException If any of the usual Input/Output related exceptions occur.
*/
public static void marshallInt(ObjectOutput out, int value) throws IOException {
public static void marshallSize(ObjectOutput out, int value) throws IOException {
if (value < 0) {
out.writeByte(0x80); //meaning it is a negative value!
return;
Expand All @@ -325,9 +325,9 @@ public static void marshallInt(ObjectOutput out, int value) throws IOException {
* @param in {@link ObjectInput} to read.
* @return The integer value or {@link #NULL_VALUE} if the original value was negative.
* @throws IOException If any of the usual Input/Output related exceptions occur.
* @see {@link #marshallInt(ObjectOutput, int)}.
* @see #marshallSize(ObjectOutput, int).
*/
public static int unmarshallInt(ObjectInput in) throws IOException {
public static int unmarshallSize(ObjectInput in) throws IOException {
byte b = in.readByte();
if ((b & 0x80) != 0) {
return NULL_VALUE; //negative value
Expand Down Expand Up @@ -366,6 +366,45 @@ public static <E extends Enum<E>> E unmarshallEnum(ObjectInput input, EnumBuilde
}
}

/**
* Marshalls a collection of integers.
*
* @param collection the collection to marshall.
* @param out the {@link ObjectOutput} to write to.
* @throws IOException if an error occurs.
*/
public static void marshallIntCollection(Collection<Integer> collection, ObjectOutput out) throws IOException {
final int size = collection == null ? NULL_VALUE : collection.size();
marshallSize(out, size);
if (size <= 0) {
return;
}
for (Integer integer : collection) {
out.writeInt(integer);
}
}

/**
* Unmarshalls a collection of integers.
*
* @param in the {@link ObjectInput} to read from.
* @param builder the {@link CollectionBuilder} to build the collection of integer.
* @param <T> the concrete type of the collection.
* @return the collection.
* @throws IOException if an error occurs.
*/
public static <T extends Collection<Integer>> T unmarshallIntCollection(ObjectInput in, CollectionBuilder<Integer, T> builder) throws IOException {
final int size = unmarshallSize(in);
if (size == NULL_VALUE) {
return null;
}
T collection = Objects.requireNonNull(builder, "CollectionBuilder must be non-null").build(size);
for (int i = 0; i < size; ++i) {
collection.add(in.readInt());
}
return collection;
}

public interface ArrayBuilder<E> {
E[] build(int size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ private static void positiveRange(int min, int max, int bytesExpected, ObjectInp

private static void checkIntAndByteArray(int i, int bytesExpected, ObjectInputOutput io) throws IOException {
io.reset();
MarshallUtil.marshallInt(io, i);
MarshallUtil.marshallSize(io, i);
Assert.assertEquals("Error for i=" + i, bytesExpected, io.buffer.size());
Assert.assertEquals("Error for i=" + i, i, MarshallUtil.unmarshallInt(io));
Assert.assertEquals("Error for i=" + i, i, MarshallUtil.unmarshallSize(io));
Assert.assertEquals("Error for i=" + i, 0, io.buffer.size());
}

private static void checkNegativeInt(int i, ObjectInputOutput io) throws IOException {
io.reset();
MarshallUtil.marshallInt(io, i);
MarshallUtil.marshallSize(io, i);
Assert.assertEquals("Error for i=" + i, 1, io.buffer.size());
Assert.assertEquals("Error for i=" + i, -1, MarshallUtil.unmarshallInt(io));
Assert.assertEquals("Error for i=" + i, -1, MarshallUtil.unmarshallSize(io));
Assert.assertEquals("Error for i=" + i, 0, io.buffer.size());
}

Expand Down Expand Up @@ -80,8 +80,8 @@ public void testRandomPositiveInt() throws IOException {
v = -v;
}
io.reset();
MarshallUtil.marshallInt(io, v);
Assert.assertEquals("Error for v=" + v, v, MarshallUtil.unmarshallInt(io));
MarshallUtil.marshallSize(io, v);
Assert.assertEquals("Error for v=" + v, v, MarshallUtil.unmarshallSize(io));
}
}

Expand All @@ -98,8 +98,8 @@ public void testRandomNegativeInt() throws IOException {
continue;
}
io.reset();
MarshallUtil.marshallInt(io, v);
Assert.assertEquals("Error for v=" + v, -1, MarshallUtil.unmarshallInt(io));
MarshallUtil.marshallSize(io, v);
Assert.assertEquals("Error for v=" + v, -1, MarshallUtil.unmarshallSize(io));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public boolean equals(Object o) {

}

public Address getAddress() {
return address;
}

@Override
public int hashCode() {
int result = address != null ? address.hashCode() : 0;
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/infinispan/commands/CommandsFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,16 @@
import org.infinispan.commands.tx.VersionedCommitCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
import org.infinispan.commands.write.ApplyDeltaCommand;
import org.infinispan.commands.write.BackupAckCommand;
import org.infinispan.commands.write.BackupMultiKeyAckCommand;
import org.infinispan.commands.write.BackupWriteRcpCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.EvictCommand;
import org.infinispan.commands.write.ExceptionAckCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PrimaryAckCommand;
import org.infinispan.commands.write.PrimaryMultiKeyAckCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
Expand Down Expand Up @@ -490,4 +497,15 @@ <K, V> WriteOnlyKeyValueCommand<K, V> buildWriteOnlyKeyValueCommand(

<K, V, R> ReadWriteManyEntriesCommand<K, V, R> buildReadWriteManyEntriesCommand(Map<? extends K, ? extends V> entries, BiFunction<V, ReadWriteEntryView<K, V>, R> f, Params params);

BackupAckCommand buildBackupAckCommand(CommandInvocationId id, int topologyId);

PrimaryAckCommand buildPrimaryAckCommand();

BackupMultiKeyAckCommand buildBackupMultiKeyAckCommand(CommandInvocationId id, Collection<Integer> segments, int topologyId);

PrimaryMultiKeyAckCommand buildPrimaryMultiKeyAckCommand(CommandInvocationId id, int topologyId);

ExceptionAckCommand buildExceptionAckCommand(CommandInvocationId id, Throwable throwable, int topologyId);

BackupWriteRcpCommand buildBackupWriteRcpCommand(DataWriteCommand command);
}
Loading

0 comments on commit 49bb4c6

Please sign in to comment.