Skip to content

Commit

Permalink
[fast-avro] FastGenericDatumReader forwards setSchema() to coldDeseri…
Browse files Browse the repository at this point in the history
…alizer (#534)

* TDD approach - adding unit test which should pass but it fails.

* Some minor code cleanup.

* [fast-avro][bugfix] Delegating setSchema() call to coldDeserializer from FastGenericDatumReader.
It's needed to deserialize 1st record(s) from file using DataFileStream.
  • Loading branch information
krisso-rtb committed Jan 24, 2024
1 parent 58c9203 commit 9487a8b
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"type": "record",
"name": "SimpleTestRecord",
"namespace": "com.linkedin.avro.fastserde.generated.avro",
"doc": "Used in tests of fast-serde to verify writing records by DataFileWriter and reading by DataFileReader/DataFileStream",
"fields": [
{
"name": "text",
"type": "string",
"default": ""
},
{
"name": "fiveBytes",
"type": {
"name": "Fixed5",
"type": "fixed",
"size": 5
},
"default": "Fizyk"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.linkedin.avro.fastserde.file;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.linkedin.avro.fastserde.FastGenericDatumReader;
import com.linkedin.avro.fastserde.FastSpecificDatumReader;
import com.linkedin.avro.fastserde.FastSpecificDatumWriter;
import com.linkedin.avro.fastserde.generated.avro.Fixed5;
import com.linkedin.avro.fastserde.generated.avro.SimpleTestRecord;
import com.linkedin.avroutil1.compatibility.AvroRecordUtil;

public class FastSerdeWithDataFileStreamTest {

@DataProvider
private Object[][] dataFileStreamDeserializationTestCases() {
Schema readerSchema = SimpleTestRecord.SCHEMA$;
return new Object[][]{
new Object[]{11, new FastSpecificDatumReader<>(null, readerSchema)},
new Object[]{12, new FastGenericDatumReader<GenericRecord>(null, readerSchema)},
};
}

@Test(groups = "deserializationTest", dataProvider = "dataFileStreamDeserializationTestCases")
<D extends IndexedRecord> void dataFileStreamShouldReadDataUsingSpecificReader(int recordsToWrite,
DatumReader<D> datumReader) throws IOException {
// given: records to be written to one file
List<SimpleTestRecord> records = new ArrayList<>(recordsToWrite);
for (byte i = 0; i < recordsToWrite; i++) {
Fixed5 fiveBytes = new Fixed5();
fiveBytes.bytes(new byte[]{'K', 'r', 'i', 's', i});

SimpleTestRecord simpleTestRecord = new SimpleTestRecord();
AvroRecordUtil.setField(simpleTestRecord, "fiveBytes", fiveBytes);
AvroRecordUtil.setField(simpleTestRecord, "text", "text-" + i);

records.add(simpleTestRecord);
}

// given: bytes array representing content of persistent file with schema and multiple records
byte[] bytes = writeTestRecordsToFile(records);

// when: pre-populated bytes array is consumed by DataFileStream (in tests more convenient than DataFileReader
// because SeekableByteArrayInput is not available for older Avro versions)
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
DataFileStream<D> dataFileStream = new DataFileStream<>(inputStream, datumReader);

// then: records read from file are the same as records sent to file
int idx = 0;
for (IndexedRecord recordReadFromFile : dataFileStream) {
Assert.assertEquals(recordReadFromFile.toString(), records.get(idx++).toString());
}
}

/**
* @return bytes array representing file content
*/
private static byte[] writeTestRecordsToFile(List<SimpleTestRecord> records) throws IOException {
Schema schema = SimpleTestRecord.SCHEMA$;
FastSpecificDatumWriter<SimpleTestRecord> datumWriter = new FastSpecificDatumWriter<>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

try (DataFileWriter<SimpleTestRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, outputStream);

for (SimpleTestRecord record : records) {
dataFileWriter.append(record);
}

dataFileWriter.flush();
}

return outputStream.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.FastGenericDeserializerGenerator;
import com.linkedin.avro.fastserde.FastGenericSerializerGenerator;
import com.linkedin.avro.fastserde.FastSerdeCache;
import com.linkedin.avro.fastserde.FastSerializer;
import com.linkedin.avro.fastserde.FastSpecificDeserializerGenerator;
import com.linkedin.avro.fastserde.FastSpecificSerializerGenerator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;

import static com.linkedin.avro.fastserde.customized.DatumReaderCustomization.*;
Expand All @@ -19,5 +19,15 @@ default T deserialize(T reuse, Decoder d) throws IOException {
return deserialize(reuse, d, DEFAULT_DATUM_READER_CUSTOMIZATION);
}

/**
* Set the writer's schema.
* @see org.apache.avro.io.DatumReader#setSchema(Schema)
*/
default void setSchema(Schema writerSchema) {
// Implement this method only in vanilla-avro-based classes (e.g. fallback scenario).
// Normally for generated deserializers it doesn't make sense.
throw new UnsupportedOperationException("Can't change schema for already generated class.");
}

T deserialize(T reuse, Decoder d, DatumReaderCustomization customization) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void setSchema(Schema schema) {
if (readerSchema == null) {
readerSchema = writerSchema;
}

coldDeserializer.setSchema(schema);
}

@Override
Expand All @@ -99,7 +101,7 @@ public T read(T reuse, Decoder in) throws IOException {
fastDeserializer = getFastDeserializerFromCache(cache, writerSchema, readerSchema, modelData, customization);
if (fastDeserializer.hasDynamicClassGenerationDone()) {
if (fastDeserializer.isBackedByGeneratedClass()) {
/**
/*
* Runtime class generation is done successfully, so cache it.
*/
cachedFastDeserializer.compareAndSet(null, fastDeserializer);
Expand All @@ -108,7 +110,7 @@ public T read(T reuse, Decoder in) throws IOException {
+ readerSchema + "], writer schema: [" + writerSchema + "]");
}
} else {
/**
/*
* Runtime class generation fails, so this class will cache a newly generated cold deserializer, which will
* honer {@link FastSerdeCache#isFailFast()}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ public void write(T data, Encoder out) throws IOException {

if (fastSerializer.hasDynamicClassGenerationDone()) {
if (fastSerializer.isBackedByGeneratedClass()) {
/**
/*
* Runtime class generation is done successfully, so cache it.
*/
cachedFastSerializer = fastSerializer;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("FastSerializer has been generated and cached for writer schema: [" + writerSchema + "]");
}
} else {
/**
/*
* Runtime class generation fails, so this class will cache a newly generated cold deserializer, which will
* honer {@link FastSerdeCache#isFailFast()}.
*/
Expand All @@ -95,7 +95,7 @@ public void write(T data, Encoder out) throws IOException {
}
fastSerializer = cachedFastSerializer;
} else {
/**
/*
* Don't use the cached serializer since it may not support the passed customization.
*/
fastSerializer = coldSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static boolean isSupportedForFastSerializer(Schema.Type schemaType) {
Schema.Type.ARRAY);
}

public static boolean isFastDeserializer(FastDeserializer deserializer) {
public static boolean isFastDeserializer(FastDeserializer<?> deserializer) {
return deserializer.isBackedByGeneratedClass();
}

Expand Down Expand Up @@ -476,7 +476,7 @@ private FastDeserializer<?> buildSpecificDeserializer(Schema writerSchema, Schem
LOGGER.error("Deserializer class instantiation exception", e);
}

return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl(writerSchema, readerSchema, modelData, customization, failFast, true);
return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true);
}

/**
Expand Down Expand Up @@ -536,7 +536,7 @@ private FastDeserializer<?> buildGenericDeserializer(Schema writerSchema, Schema
LOGGER.error("Deserializer class instantiation exception:", e);
}

return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl(writerSchema, readerSchema, modelData, customization, failFast, true);
return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true);
}

public FastSerializer<?> buildFastSpecificSerializer(Schema schema, SpecificData modelData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@
import org.apache.avro.generic.CustomizedSpecificDatumWriter;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.customized.DatumWriterCustomization;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.ColdGenericDatumReader;
import org.apache.avro.generic.ColdSpecificDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.specific.SpecificData;
Expand Down Expand Up @@ -50,12 +46,17 @@ public FastDeserializerWithAvroSpecificImpl(Schema writerSchema, Schema readerSc
SpecificData modelData, DatumReaderCustomization customization, boolean failFast, boolean runtimeClassGenerationDone) {
this.customization = customization == null ? DatumReaderCustomization.DEFAULT_DATUM_READER_CUSTOMIZATION : customization;
this.customizedDatumReader = Utils.isAvro14() ?
new CustomizedSpecificDatumReaderForAvro14(writerSchema, readerSchema, this.customization) :
new CustomizedSpecificDatumReaderForAvro14<>(writerSchema, readerSchema, this.customization) :
new CustomizedSpecificDatumReader<>(writerSchema, readerSchema, modelData, this.customization);
this.failFast = failFast;
this.runtimeClassGenerationDone = runtimeClassGenerationDone;
}

@Override
public void setSchema(Schema writerSchema) {
this.customizedDatumReader.setSchema(writerSchema);
}

@Override
public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException {
if (failFast) {
Expand Down Expand Up @@ -100,13 +101,18 @@ public FastDeserializerWithAvroGenericImpl(Schema writerSchema, Schema readerSch
GenericData modelData, DatumReaderCustomization customization, boolean failFast, boolean runtimeClassGenerationDone) {
this.customization = customization == null ? DatumReaderCustomization.DEFAULT_DATUM_READER_CUSTOMIZATION : customization;
this.customizedDatumReader = Utils.isAvro14() ?
new CustomizedGenericDatumReaderForAvro14(writerSchema, readerSchema, this.customization) :
new CustomizedGenericDatumReaderForAvro14<>(writerSchema, readerSchema, this.customization) :
new CustomizedGenericDatumReader<>(writerSchema, readerSchema, modelData, this.customization);

this.failFast = failFast;
this.runtimeClassGenerationDone = runtimeClassGenerationDone;
}

@Override
public void setSchema(Schema writerSchema) {
customizedDatumReader.setSchema(writerSchema);
}

@Override
public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException {
if (failFast) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private <T> T instantiate(Class<T> clazz, Schema of) {
//TODO - look for both old and new SchemaConstructable ctrs 1st
try {
Constructor<T> noArgCtr = clazz.getDeclaredConstructor(NO_ARGS);
return noArgCtr.newInstance(NO_ARGS);
return noArgCtr.newInstance();
} catch (Exception e) {
throw new IllegalStateException("while trying to instantiate a(n) " + clazz.getName(), e);
}
Expand Down

0 comments on commit 9487a8b

Please sign in to comment.