Skip to content

Commit

Permalink
KAFKA-12446: update change encoding to use varint (apache#13533)
Browse files Browse the repository at this point in the history
KIP-904 had the goal in mind to save space when encoding the size on a byte array. However, using UINT32 does not achieve this goal. This PR changes the encoding to VARINT instead.

Reviewers: Victoria Xia <victoria.xia@confluent.io>,  Farooq Qaiser <fqaiser94@gmail.com>, Walker Carlson <wcarlson@confluent.io>
  • Loading branch information
mjsax committed Apr 24, 2023
1 parent ab8f285 commit 2557a4b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 80 deletions.
Expand Up @@ -26,7 +26,7 @@

public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {

private static final int NEW_OLD_FLAG_SIZE = 1;
private static final int ENCODING_FLAG_SIZE = 1;
private static final int IS_LATEST_FLAG_SIZE = 1;

private Deserializer<T> inner;
Expand All @@ -50,64 +50,76 @@ public void setIfUnset(final SerdeGetter getter) {
@Override
public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {
// The format we need to deserialize is:
// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
// {BYTE_ARRAY oldValue}{BYTE encodingFlag=0}
// {BYTE_ARRAY newValue}{BYTE encodingFlag=1}
// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2}
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3}
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4}
// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5}
final ByteBuffer buffer = ByteBuffer.wrap(data);
final byte newOldFlag = buffer.get(data.length - NEW_OLD_FLAG_SIZE);
final byte encodingFlag = buffer.get(data.length - ENCODING_FLAG_SIZE);

final byte[] newData;
final byte[] oldData;
final boolean isLatest;
if (newOldFlag == (byte) 0) {
newData = null;
final int oldDataLength = data.length - NEW_OLD_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(oldData);
isLatest = true;
} else if (newOldFlag == (byte) 1) {
oldData = null;
final int newDataLength = data.length - NEW_OLD_FLAG_SIZE;
newData = new byte[newDataLength];
buffer.get(newData);
isLatest = true;
} else if (newOldFlag == (byte) 2) {
final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
newData = new byte[newDataLength];

final int oldDataLength = data.length - Integer.BYTES - newDataLength - NEW_OLD_FLAG_SIZE;
oldData = new byte[oldDataLength];

buffer.get(newData);
buffer.get(oldData);
isLatest = true;
} else if (newOldFlag == (byte) 3) {
newData = null;
final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(oldData);
isLatest = readIsLatestFlag(buffer);
} else if (newOldFlag == (byte) 4) {
oldData = null;
final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
newData = new byte[newDataLength];
buffer.get(newData);
isLatest = readIsLatestFlag(buffer);
} else if (newOldFlag == (byte) 5) {
final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
newData = new byte[newDataLength];

final int oldDataLength = data.length - Integer.BYTES - newDataLength - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE;
oldData = new byte[oldDataLength];

buffer.get(newData);
buffer.get(oldData);
isLatest = readIsLatestFlag(buffer);
} else {
throw new StreamsException("Encountered unknown byte value `" + newOldFlag + "` for oldNewFlag in ChangedDeserializer.");
switch (encodingFlag) {
case (byte) 0: {
newData = null;
final int oldDataLength = data.length - ENCODING_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(oldData);
isLatest = true;
break;
}
case (byte) 1: {
oldData = null;
final int newDataLength = data.length - ENCODING_FLAG_SIZE;
newData = new byte[newDataLength];
buffer.get(newData);
isLatest = true;
break;
}
case (byte) 2: {
final int newDataLength = ByteUtils.readVarint(buffer);
newData = new byte[newDataLength];
buffer.get(newData);

final int oldDataLength = buffer.capacity() - buffer.position() - ENCODING_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(oldData);
isLatest = true;
break;
}
case (byte) 3: {
newData = null;
final int oldDataLength = data.length - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(oldData);
isLatest = readIsLatestFlag(buffer);
break;
}
case (byte) 4: {
oldData = null;
final int newDataLength = data.length - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE;
newData = new byte[newDataLength];
buffer.get(newData);
isLatest = readIsLatestFlag(buffer);
break;
}
case (byte) 5: {
final int newDataLength = ByteUtils.readVarint(buffer);
newData = new byte[newDataLength];
buffer.get(newData);

final int oldDataLength = buffer.capacity() - buffer.position() - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(oldData);

isLatest = readIsLatestFlag(buffer);
break;
}
default:
throw new StreamsException("Encountered unknown byte value `" + encodingFlag + "` for encodingFlag in ChangedDeserializer.");
}

return new Change<>(
Expand All @@ -117,7 +129,7 @@ public Change<T> deserialize(final String topic, final Headers headers, final by
}

private boolean readIsLatestFlag(final ByteBuffer buffer) {
final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - NEW_OLD_FLAG_SIZE);
final byte isLatestFlag = buffer.get(buffer.capacity() - IS_LATEST_FLAG_SIZE - ENCODING_FLAG_SIZE);
if (isLatestFlag == (byte) 1) {
return true;
} else if (isLatestFlag == (byte) 0) {
Expand Down
Expand Up @@ -28,8 +28,8 @@

public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, Void, T> {

private static final int NEW_OLD_FLAG_SIZE = 1;
private static final int UINT32_SIZE = 4;
private static final int ENCODING_FLAG_SIZE = 1;
private static final int MAX_VARINT_LENGTH = 5;
private Serializer<T> inner;
private boolean isUpgrade;

Expand Down Expand Up @@ -104,33 +104,40 @@ public byte[] serialize(final String topic, final Headers headers, final Change<
final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;

// The serialization format is:
// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
final ByteBuffer buf;
// {BYTE_ARRAY oldValue}{BYTE encodingFlag=0}
// {BYTE_ARRAY newValue}{BYTE encodingFlag=1}
// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2}
if (newValueIsNotNull && oldValueIsNotNull) {
if (isUpgrade) {
throw new StreamsException("Both old and new values are not null (" + data.oldValue
+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed unless upgrading.");
} else {
final int capacity = UINT32_SIZE + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
ByteUtils.writeUnsignedInt(buf, newDataLength);
final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + ENCODING_FLAG_SIZE;
final ByteBuffer buf = ByteBuffer.allocate(capacity);
ByteUtils.writeVarint(newDataLength, buf);
buf.put(newData).put(oldData).put((byte) 2);

final byte[] serialized = new byte[buf.position()];
buf.position(0);
buf.get(serialized);

return serialized;
}
} else if (newValueIsNotNull) {
final int capacity = newDataLength + NEW_OLD_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
final int capacity = newDataLength + ENCODING_FLAG_SIZE;
final ByteBuffer buf = ByteBuffer.allocate(capacity);
buf.put(newData).put((byte) 1);

return buf.array();
} else if (oldValueIsNotNull) {
final int capacity = oldDataLength + NEW_OLD_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
final int capacity = oldDataLength + ENCODING_FLAG_SIZE;
final ByteBuffer buf = ByteBuffer.allocate(capacity);
buf.put(oldData).put((byte) 0);

return buf.array();
} else {
throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
}

return buf.array();
}

@Override
Expand Down
Expand Up @@ -43,9 +43,9 @@ public class ChangedSerdeTest {
private static final ChangedDeserializer<String> CHANGED_STRING_DESERIALIZER =
new ChangedDeserializer<>(Serdes.String().deserializer());

private static final int NEW_OLD_FLAG_SIZE = 1;
private static final int ENCODING_FLAG_SIZE = 1;
private static final int IS_LATEST_FLAG_SIZE = 1;
private static final int UINT32_SIZE = 4;
private static final int MAX_VARINT_LENGTH = 5;

final String nonNullNewValue = "hello";
final String nonNullOldValue = "world";
Expand Down Expand Up @@ -141,29 +141,33 @@ private static byte[] serializeVersions3Through5(final String topic, final Chang
final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;

// The serialization format is:
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
// {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3}
// {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4}
// {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5}
final ByteBuffer buf;
final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0;
if (newValueIsNotNull && oldValueIsNotNull) {
final int capacity = UINT32_SIZE + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
final int capacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
ByteUtils.writeUnsignedInt(buf, newDataLength);
ByteUtils.writeVarint(newDataLength, buf);
buf.put(newData).put(oldData).put(isLatest).put((byte) 5);
} else if (newValueIsNotNull) {
final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
final int capacity = newDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
buf.put(newData).put(isLatest).put((byte) 4);
} else if (oldValueIsNotNull) {
final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
final int capacity = oldDataLength + IS_LATEST_FLAG_SIZE + ENCODING_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
buf.put(oldData).put(isLatest).put((byte) 3);
} else {
throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
}

return buf.array();
final byte[] serialized = new byte[buf.position()];
buf.position(0);
buf.get(serialized);

return serialized;
}

private static void checkRoundTripForReservedVersion(final Change<String> data) {
Expand Down

0 comments on commit 2557a4b

Please sign in to comment.