Skip to content

Commit

Permalink
Integrate datum-factory in smb-avro (#5181)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Jan 18, 2024
1 parent 7f467e5 commit c2886ee
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

/** Wrap {@link BigtableServiceImpl} and expose package private methods. */
public class BigtableServiceHelper extends BigtableServiceImpl {
private static final BigtableConfig EMPTY_CONFIG = BigtableConfig.builder().setValidate(true).build();
private static final BigtableConfig EMPTY_CONFIG =
BigtableConfig.builder().setValidate(true).build();

public BigtableServiceHelper(BigtableOptions bigtableOptions, PipelineOptions pipelineOptions)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,6 @@ public class AvroBucketMetadata<K1, K2, V extends IndexedRecord> extends BucketM
@JsonIgnore private final AtomicReference<int[]> keyPath = new AtomicReference<>();
@JsonIgnore private final AtomicReference<int[]> keyPathSecondary = new AtomicReference<>();

public AvroBucketMetadata(
int numBuckets,
int numShards,
Class<K1> keyClassPrimary,
String keyField,
Class<K2> keyClassSecondary,
String keyFieldSecondary,
HashType hashType,
String filenamePrefix,
Class<V> recordClass)
throws CannotProvideCoderException, NonDeterministicException {
this(
BucketMetadata.CURRENT_VERSION,
numBuckets,
numShards,
keyClassPrimary,
AvroUtils.validateKeyField(keyField, keyClassPrimary, recordClass),
keyClassSecondary,
keyFieldSecondary == null
? null
: AvroUtils.validateKeyField(keyFieldSecondary, keyClassSecondary, recordClass),
hashType,
filenamePrefix);
}

public AvroBucketMetadata(
int numBuckets,
int numShards,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
Expand All @@ -46,52 +41,36 @@

/** {@link org.apache.beam.sdk.extensions.smb.FileOperations} implementation for Avro files. */
public class AvroFileOperations<ValueT> extends FileOperations<ValueT> {
private final Class<ValueT> recordClass;

private final AvroDatumFactory<ValueT> datumFactory;
private final SerializableSchemaSupplier schemaSupplier;
private final PatchedSerializableAvroCodecFactory codec;
private final Map<String, Object> metadata;
private PatchedSerializableAvroCodecFactory codec;
private Map<String, Object> metadata;

static CodecFactory defaultCodec() {
return CodecFactory.deflateCodec(6);
}

private AvroFileOperations(
Class<ValueT> recordClass, Schema schema, CodecFactory codec, Map<String, Object> metadata) {
private AvroFileOperations(AvroDatumFactory<ValueT> datumFactory, Schema schema) {
super(Compression.UNCOMPRESSED, MimeTypes.BINARY); // Avro has its own compression via codec
this.recordClass = recordClass;
this.schemaSupplier = new SerializableSchemaSupplier(schema);
this.codec = new PatchedSerializableAvroCodecFactory(codec);
this.metadata = metadata;
}

public static <V extends IndexedRecord> AvroFileOperations<V> of(Schema schema) {
return of(schema, defaultCodec());
this.datumFactory = datumFactory;
this.codec = new PatchedSerializableAvroCodecFactory(defaultCodec());
}

public static <V extends IndexedRecord> AvroFileOperations<V> of(
Schema schema, CodecFactory codec) {
return of(schema, codec, null);
}

public static <V extends IndexedRecord> AvroFileOperations<V> of(
Schema schema, CodecFactory codec, Map<String, Object> metadata) {
return new AvroFileOperations<>(null, schema, codec, metadata);
}

public static <V extends IndexedRecord> AvroFileOperations<V> of(Class<V> recordClass) {
return of(recordClass, defaultCodec());
AvroDatumFactory<V> datumFactory, Schema schema) {
return new AvroFileOperations<>(datumFactory, schema);
}

public static <V extends IndexedRecord> AvroFileOperations<V> of(
Class<V> recordClass, CodecFactory codec) {
return of(recordClass, codec, null);
public AvroFileOperations<ValueT> withCodec(CodecFactory codec) {
this.codec = new PatchedSerializableAvroCodecFactory(codec);
return this;
}

public static <V extends IndexedRecord> AvroFileOperations<V> of(
Class<V> recordClass, CodecFactory codec, Map<String, Object> metadata) {
// Use reflection to get SR schema
final Schema schema = new ReflectData(recordClass.getClassLoader()).getSchema(recordClass);
return new AvroFileOperations<>(recordClass, schema, codec, metadata);
public AvroFileOperations<ValueT> withMetadata(Map<String, Object> metadata) {
this.metadata = metadata;
return this;
}

@Override
Expand All @@ -103,52 +82,34 @@ public void populateDisplayData(Builder builder) {

@Override
protected Reader<ValueT> createReader() {
return new AvroReader<>(recordClass, schemaSupplier);
return new AvroReader<>(datumFactory, schemaSupplier);
}

@SuppressWarnings("unchecked")
@Override
protected FileIO.Sink<ValueT> createSink() {
final AvroIO.Sink<ValueT> sink =
recordClass == null
// https://github.com/spotify/scio/issues/2649
// force GenericDatumWriter instead of ReflectDatumWriter
? (AvroIO.Sink<ValueT>)
AvroIO.<GenericRecord>sink(getSchema())
.withDatumWriterFactory(AvroDatumFactory.generic())
: AvroIO.sink(recordClass)
.withDatumWriterFactory(
(writer) -> {
// same as SpecificRecordDatumFactory in scio-avro
ReflectData data = new ReflectData(recordClass.getClassLoader());
org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils
.addLogicalTypeConversions(data);
return new ReflectDatumWriter<>(writer, data);
});
((AvroIO.Sink<ValueT>) AvroIO.sink(getSchema()))
.withDatumWriterFactory(datumFactory)
.withCodec(codec.getCodec());

if (metadata != null) {
return sink.withMetadata(metadata);
} else {
return sink;
}

return sink.withCodec(codec.getCodec());
}

@SuppressWarnings("unchecked")
@Override
public Coder<ValueT> getCoder() {
return recordClass == null
? (AvroCoder<ValueT>) AvroCoder.of(getSchema())
: AvroCoder.of(recordClass, true);
return AvroCoder.of(datumFactory, getSchema());
}

Schema getSchema() {
return schemaSupplier.get();
}

Class<ValueT> getRecordClass() {
return recordClass;
}

private static class SerializableSchemaString implements Serializable {
private final String schema;

Expand Down Expand Up @@ -183,29 +144,20 @@ public Schema get() {
////////////////////////////////////////

private static class AvroReader<ValueT> extends FileOperations.Reader<ValueT> {
private Class<ValueT> recordClass;
private AvroDatumFactory<ValueT> datumFactory;
private SerializableSchemaSupplier schemaSupplier;
private transient DataFileStream<ValueT> reader;

AvroReader(Class<ValueT> recordClass, SerializableSchemaSupplier schemaSupplier) {
this.recordClass = recordClass;
AvroReader(AvroDatumFactory<ValueT> datumFactory, SerializableSchemaSupplier schemaSupplier) {
this.datumFactory = datumFactory;
this.schemaSupplier = schemaSupplier;
}

@Override
public void prepareRead(ReadableByteChannel channel) throws IOException {
final Schema schema = schemaSupplier.get();

DatumReader<ValueT> datumReader;
if (recordClass == null) {
datumReader = new GenericDatumReader<>(schema);
} else {
// same as SpecificRecordDatumFactory in scio-avro
ReflectData data = new ReflectData(recordClass.getClassLoader());
org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils.addLogicalTypeConversions(data);
datumReader = new ReflectDatumReader<>(schema, schema, data);
}

DatumReader<ValueT> datumReader = datumFactory.apply(schema, schema);
reader = new DataFileStream<>(Channels.newInputStream(channel), datumReader);
}

Expand Down
Loading

0 comments on commit c2886ee

Please sign in to comment.