Skip to content

Commit

Permalink
[HUDI-4959] Fixing Avro's Utf8 serialization in Kryo (apache#7024)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeykudinkin authored and satishkotha committed Dec 11, 2022
1 parent 40446b3 commit bcf6a73
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.hudi.common.util;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.util.Utf8;
import org.objenesis.strategy.StdInstantiatorStrategy;

import java.io.ByteArrayOutputStream;
Expand All @@ -36,9 +38,6 @@ public class SerializationUtils {
private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF =
ThreadLocal.withInitial(KryoSerializerInstance::new);

// Serialize
// -----------------------------------------------------------------------

/**
* <p>
* Serializes an {@code Object} to a byte array for storage/serialization.
Expand All @@ -52,9 +51,6 @@ public static byte[] serialize(final Object obj) throws IOException {
return SERIALIZER_REF.get().serialize(obj);
}

// Deserialize
// -----------------------------------------------------------------------

/**
* <p>
* Deserializes a single {@code Object} from an array of bytes.
Expand Down Expand Up @@ -112,17 +108,42 @@ Object deserialize(byte[] objectData) {
private static class KryoInstantiator implements Serializable {

public Kryo newKryo() {

Kryo kryo = new Kryo();
// ensure that kryo doesn't fail if classes are not registered with kryo.

// This instance of Kryo should not require prior registration of classes
kryo.setRegistrationRequired(false);
// This would be used for object initialization if nothing else works out.
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
// Handle cases where we may have an odd classloader setup like with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());

// Register serializers
kryo.register(Utf8.class, new AvroUtf8Serializer());

return kryo;
}

}

/**
* NOTE: This {@link Serializer} could deserialize instance of {@link Utf8} serialized
* by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer}
*/
private static class AvroUtf8Serializer extends Serializer<Utf8> {

@SuppressWarnings("unchecked")
@Override
public void write(Kryo kryo, Output output, Utf8 utf8String) {
Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
bytesSerializer.write(kryo, output, utf8String.getBytes());
}

@SuppressWarnings("unchecked")
@Override
public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
return new Utf8(bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@
package org.apache.hudi.common.util;

import org.apache.avro.util.Utf8;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Objects;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -52,12 +58,33 @@ public void testSerDeser() throws IOException {
verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5)));
}

@Test
public void testAvroUtf8SerDe() throws IOException {
byte[] firstBytes = SerializationUtils.serialize(new Utf8("test"));
// 4 byte string + 3 bytes length (Kryo uses variable-length encoding)
assertEquals(7, firstBytes.length);
}

@Test
public void testClassFullyQualifiedNameSerialization() throws IOException {
DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key", "partition"));
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(new DeleteRecord[]{deleteRecord}, Collections.emptyMap());

byte[] firstBytes = SerializationUtils.serialize(deleteBlock);
byte[] secondBytes = SerializationUtils.serialize(deleteBlock);

assertNotSame(firstBytes, secondBytes);
// NOTE: Here we assert that Kryo doesn't optimize out the fully-qualified class-name
// and always writes it out
assertEquals(ByteBuffer.wrap(firstBytes), ByteBuffer.wrap(secondBytes));
}

private <T> void verifyObject(T expectedValue) throws IOException {
byte[] serializedObject = SerializationUtils.serialize(expectedValue);
assertNotNull(serializedObject);
assertTrue(serializedObject.length > 0);

final T deserializedValue = SerializationUtils.<T>deserialize(serializedObject);
final T deserializedValue = SerializationUtils.deserialize(serializedObject);
if (expectedValue == null) {
assertNull(deserializedValue);
} else {
Expand Down

0 comments on commit bcf6a73

Please sign in to comment.