New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Java IO etc. more usable #1013

Closed
nevillelyh opened this Issue Jan 17, 2018 · 6 comments

Comments

Projects
None yet
3 participants
@nevillelyh
Member

nevillelyh commented Jan 17, 2018

We should add some implicit converters, probably com.spotify.scio.JavaConverters._, to make Java APIs more usable. It should include implicit conversions from common Java types like SerializableFunction, DynamicDestination. etc.

@jpvelez

This comment has been minimized.

Show comment
Hide comment
@jpvelez

jpvelez Feb 9, 2018

Contributor

I'll take a stab at this.

I'm going to start by looking at AvroIO, BigQueryIO, FileIO; examine the builder methods and list out their argument types. Any non-primitive types, let's figure out how to make them more fluent.

Contributor

jpvelez commented Feb 9, 2018

I'll take a stab at this.

I'm going to start by looking at AvroIO, BigQueryIO, FileIO; examine the builder methods and list out their argument types. Any non-primitive types, let's figure out how to make them more fluent.

@jpvelez

This comment has been minimized.

Show comment
Hide comment
@jpvelez

jpvelez Mar 8, 2018

Contributor

AvroIO

Static Tranform Constructors

To make AvroIO.(Read | ReadAll | Parse | ParseAll | Write | TypedWrite) root transforms, you call one of the following AvroIO static constructor methods:

Read<T> read(Class<T> recordClass) 
ReadAll<T> readAll(Class<T> recordClass) 
Read<GenericRecord> readGenericRecords(Schema schema) 
ReadAll<GenericRecord> readAllGenericRecords(Schema schema) 
Read<GenericRecord> readGenericRecords(String schema) 
ReadAll<GenericRecord> readAllGenericRecords(String schema) 
Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) 
ParseAll<T> parseAllGenericRecords(SerializableFunction<GenericRecord, T> parseFn) 
Write<T> write(Class<T> recordClass) 
Write<GenericRecord> writeGenericRecords(Schema schema) 
TypedWrite<UserT, Void, OutputT> writeCustomType()
TypedWrite<UserT, Void, GenericRecord> writeCustomTypeToGenericRecords()

Non-primitive argument types that may need Scala conversion here are:

  • org.apache.avro.Schema
  • org.apache.beam.sdk.transforms.SerializableFunction

AvroIO.Read PTransform

Each method use a Builder for each Transform type:

  public static <T> Read<T> read(Class<T> recordClass) {
    return new AutoValue_AvroIO_Read.Builder<T>()
        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
        .setRecordClass(recordClass)
        .setSchema(ReflectData.get().getSchema(recordClass))
        .setHintMatchesManyFiles(false)
        .build();
  }

Some fields are set by the builder here, others must be supplied by the user post-build. Most can be overridden by static methods on the resulting Transform. Here are the ones for the AvroIO.Read PTransform:

public Read<T> from(ValueProvider<String> filepattern)
public Read<T> from(String filepattern) 
public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) 
public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) 
public Read<T> watchForNewFiles(Duration pollInterval, TerminationCondition<String, ?> terminationCondition) 
public Read<T> withHintMatchesManyFiles() 
public PCollection<T> expand(PBegin input) 
public void populateDisplayData(DisplayData.Builder builder) 

So you must supply the filename or filepattern to read from, configure file pattern matching logic to allow empty matches or wait for specific kinds of files to show up, and tell the runner that the filepattern will match many files, which may trigger an optimization.

expand seems to turn the PTranform into a PCollection of Avro types directly, by passing apply, and populateDisplayData registers Read Transform internals - the filepattern and match config you supplied - as display data. Not sure if we want users to modify these or not.

Non-primitive arg types for these Transform modifier methods:

  • org.apache.beam.sdk.options.ValueProvider
  • org.apache.beam.sdk.io.FileIO.MatchConfiguration
  • org.apache.beam.sdk.io.fs.EmptyMatchTreatment
  • org.joda.time.Duration
  • org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
  • org.apache.beam.sdk.values.PDone
  • org.apache.beam.sdk.transforms.display.DisplayData.Builder

AvroIO.ReadAll PTransform

ReadAll has most of the methods above, plus this one:

ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes)

No non-primitive arg types here.

AvroIO.Parse and ParseAll PTransforms

Parse and ParseAll have most of the methods above, plus this one:

Parse<T> withCoder(Coder<T> coder)

Non-primitive arg types not covered above:

  • org.apache.beam.sdk.coders.Coder

AvroIO.Write PTransform

Methods:

public Write<T> to(String outputPrefix) 
public Write<T> to(ResourceId outputPrefix) 
public Write<T> to(ValueProvider<String> outputPrefix) 
public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) 
public Write<T> to(FilenamePolicy filenamePolicy) 
public Write<T> to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) 
public Write<T> withSchema(Schema schema) 
public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) 
public Write<T> withTempDirectory(ResourceId tempDirectory) 
public Write<T> withShardNameTemplate(String shardTemplate) 
public Write<T> withSuffix(String filenameSuffix) 
public Write<T> withNumShards(int numShards) 
public Write<T> withoutSharding() 
public Write<T> withWindowedWrites() 
public Write<T> withCodec(CodecFactory codec) 
public TypedWrite<T, DestinationT, T> withOutputFilenames() 
public Write<T> withMetadata(Map<String, Object> metadata) 
public PDone expand(PCollection<T> input) 
public void populateDisplayData(DisplayData.Builder builder) 

Non-primitive arg types not covered above:

  • org.apache.beam.sdk.io.fs.ResourceId
  • org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy
  • org.apache.beam.sdk.io.AvroIO.DynamicAvroDestinations
  • org.apache.avro.file.CodecFactory

AvroIO.TypedWrite PTransform

public TypedWrite<UserT, DestinationT, OutputT> to(String outputPrefix) 
public TypedWrite<UserT, DestinationT, OutputT> to(ResourceId outputPrefix) 
public TypedWrite<UserT, DestinationT, OutputT> to(ValueProvider<String> outputPrefix) 
public TypedWrite<UserT, DestinationT, OutputT> toResource(ValueProvider<ResourceId> outputPrefix) 
public TypedWrite<UserT, DestinationT, OutputT> to(FilenamePolicy filenamePolicy) 
public <NewDestinationT> TypedWrite<UserT, NewDestinationT, OutputT> to(DynamicAvroDestinations<UserT, NewDestinationT, OutputT> dynamicDestinations) 
public TypedWrite<UserT, DestinationT, OutputT> withSchema(Schema schema) 
public TypedWrite<UserT, DestinationT, OutputT> withFormatFunction(SerializableFunction<UserT, OutputT> formatFunction) 
public TypedWrite<UserT, DestinationT, OutputT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) 
public TypedWrite<UserT, DestinationT, OutputT> withTempDirectory(ResourceId tempDirectory) 
public TypedWrite<UserT, DestinationT, OutputT> withShardNameTemplate(String shardTemplate) 
public TypedWrite<UserT, DestinationT, OutputT> withSuffix(String filenameSuffix) 
public TypedWrite<UserT, DestinationT, OutputT> withNumShards(int numShards) 
public TypedWrite<UserT, DestinationT, OutputT> withoutSharding() 
public TypedWrite<UserT, DestinationT, OutputT> withWindowedWrites() 
public TypedWrite<UserT, DestinationT, OutputT> withCodec(CodecFactory codec) 
public TypedWrite<UserT, DestinationT, OutputT> withMetadata(Map<String, Object> metadata) 
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) 
public void populateDisplayData(DisplayData.Builder builder) 

All non-primitive arg types covered above.

TextIO

Transforms: Read, ReadAll, ReadFiles, Write, TypedWrite<T, void>

Constructors:

Read read()
ReadAll readAll()
ReadFiles readFiles()
Write write()
TypedWrite<UserT, Void> writeCustomType()

Non-primitive argument types across all Transform methods:

  • org.apache.beam.sdk.io.Compression: enum
  • TextIO.CompressionType: on a deprecated method, may not need to cover. replace by above.
  • org.apache.beam.sdk.io.DefaultFilenamePolicy.Params
  • org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
  • byte[]: probably don't need to cover?
  • PCollection<FileIO.ReadableFile>: a few methods take PCollections
  • @Nullable String: not sure if it matters that String arg type has Nullable annotation

BigQueryIO

Transforms: Read (seems deprecated), TypedRead<T>, Write<T>

Constructors:

Read read()
TypedRead<TableRow> readTableRows()
TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> parseFn)
ReadFiles readFiles()
Write write()
TypedWrite<UserT, Void> writeCustomType()

Non-primitive argument types across all Transform methods:

  • org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices
  • org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy
  • com.google.api.services.bigquery.model.TableSchema
  • com.google.api.services.bigquery.model.TableReference
  • com.google.api.services.bigquery.model.TimePartitioning
  • org.apache.beam.sdk.coders.CoderRegistry
  • org.apache.beam.sdk.coders.Coder<T>
  • org.apache.beam.sdk.options.PipelineOptions
  • org.apache.beam.sdk.values.PCollectionView<Map<String, String>>
  • BigQueryIO.Write.CreateDisposition: public enum
  • BigQueryIO.Write.WriteDisposition: public enum
  • BigQueryIO.Write.Method: public enum
  • BigQueryIO.TypedRead<T>
Contributor

jpvelez commented Mar 8, 2018

AvroIO

Static Tranform Constructors

To make AvroIO.(Read | ReadAll | Parse | ParseAll | Write | TypedWrite) root transforms, you call one of the following AvroIO static constructor methods:

Read<T> read(Class<T> recordClass) 
ReadAll<T> readAll(Class<T> recordClass) 
Read<GenericRecord> readGenericRecords(Schema schema) 
ReadAll<GenericRecord> readAllGenericRecords(Schema schema) 
Read<GenericRecord> readGenericRecords(String schema) 
ReadAll<GenericRecord> readAllGenericRecords(String schema) 
Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) 
ParseAll<T> parseAllGenericRecords(SerializableFunction<GenericRecord, T> parseFn) 
Write<T> write(Class<T> recordClass) 
Write<GenericRecord> writeGenericRecords(Schema schema) 
TypedWrite<UserT, Void, OutputT> writeCustomType()
TypedWrite<UserT, Void, GenericRecord> writeCustomTypeToGenericRecords()

Non-primitive argument types that may need Scala conversion here are:

  • org.apache.avro.Schema
  • org.apache.beam.sdk.transforms.SerializableFunction

AvroIO.Read PTransform

Each method use a Builder for each Transform type:

  public static <T> Read<T> read(Class<T> recordClass) {
    return new AutoValue_AvroIO_Read.Builder<T>()
        .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
        .setRecordClass(recordClass)
        .setSchema(ReflectData.get().getSchema(recordClass))
        .setHintMatchesManyFiles(false)
        .build();
  }

Some fields are set by the builder here, others must be supplied by the user post-build. Most can be overridden by static methods on the resulting Transform. Here are the ones for the AvroIO.Read PTransform:

public Read<T> from(ValueProvider<String> filepattern)
public Read<T> from(String filepattern) 
public Read<T> withMatchConfiguration(MatchConfiguration matchConfiguration) 
public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) 
public Read<T> watchForNewFiles(Duration pollInterval, TerminationCondition<String, ?> terminationCondition) 
public Read<T> withHintMatchesManyFiles() 
public PCollection<T> expand(PBegin input) 
public void populateDisplayData(DisplayData.Builder builder) 

So you must supply the filename or filepattern to read from, configure file pattern matching logic to allow empty matches or wait for specific kinds of files to show up, and tell the runner that the filepattern will match many files, which may trigger an optimization.

expand seems to turn the PTranform into a PCollection of Avro types directly, by passing apply, and populateDisplayData registers Read Transform internals - the filepattern and match config you supplied - as display data. Not sure if we want users to modify these or not.

Non-primitive arg types for these Transform modifier methods:

  • org.apache.beam.sdk.options.ValueProvider
  • org.apache.beam.sdk.io.FileIO.MatchConfiguration
  • org.apache.beam.sdk.io.fs.EmptyMatchTreatment
  • org.joda.time.Duration
  • org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
  • org.apache.beam.sdk.values.PDone
  • org.apache.beam.sdk.transforms.display.DisplayData.Builder

AvroIO.ReadAll PTransform

ReadAll has most of the methods above, plus this one:

ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes)

No non-primitive arg types here.

AvroIO.Parse and ParseAll PTransforms

Parse and ParseAll have most of the methods above, plus this one:

Parse<T> withCoder(Coder<T> coder)

Non-primitive arg types not covered above:

  • org.apache.beam.sdk.coders.Coder

AvroIO.Write PTransform

Methods:

public Write<T> to(String outputPrefix) 
public Write<T> to(ResourceId outputPrefix) 
public Write<T> to(ValueProvider<String> outputPrefix) 
public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) 
public Write<T> to(FilenamePolicy filenamePolicy) 
public Write<T> to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) 
public Write<T> withSchema(Schema schema) 
public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) 
public Write<T> withTempDirectory(ResourceId tempDirectory) 
public Write<T> withShardNameTemplate(String shardTemplate) 
public Write<T> withSuffix(String filenameSuffix) 
public Write<T> withNumShards(int numShards) 
public Write<T> withoutSharding() 
public Write<T> withWindowedWrites() 
public Write<T> withCodec(CodecFactory codec) 
public TypedWrite<T, DestinationT, T> withOutputFilenames() 
public Write<T> withMetadata(Map<String, Object> metadata) 
public PDone expand(PCollection<T> input) 
public void populateDisplayData(DisplayData.Builder builder) 

Non-primitive arg types not covered above:

  • org.apache.beam.sdk.io.fs.ResourceId
  • org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy
  • org.apache.beam.sdk.io.AvroIO.DynamicAvroDestinations
  • org.apache.avro.file.CodecFactory

AvroIO.TypedWrite PTransform

public TypedWrite<UserT, DestinationT, OutputT> to(String outputPrefix) 
public TypedWrite<UserT, DestinationT, OutputT> to(ResourceId outputPrefix) 
public TypedWrite<UserT, DestinationT, OutputT> to(ValueProvider<String> outputPrefix) 
public TypedWrite<UserT, DestinationT, OutputT> toResource(ValueProvider<ResourceId> outputPrefix) 
public TypedWrite<UserT, DestinationT, OutputT> to(FilenamePolicy filenamePolicy) 
public <NewDestinationT> TypedWrite<UserT, NewDestinationT, OutputT> to(DynamicAvroDestinations<UserT, NewDestinationT, OutputT> dynamicDestinations) 
public TypedWrite<UserT, DestinationT, OutputT> withSchema(Schema schema) 
public TypedWrite<UserT, DestinationT, OutputT> withFormatFunction(SerializableFunction<UserT, OutputT> formatFunction) 
public TypedWrite<UserT, DestinationT, OutputT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) 
public TypedWrite<UserT, DestinationT, OutputT> withTempDirectory(ResourceId tempDirectory) 
public TypedWrite<UserT, DestinationT, OutputT> withShardNameTemplate(String shardTemplate) 
public TypedWrite<UserT, DestinationT, OutputT> withSuffix(String filenameSuffix) 
public TypedWrite<UserT, DestinationT, OutputT> withNumShards(int numShards) 
public TypedWrite<UserT, DestinationT, OutputT> withoutSharding() 
public TypedWrite<UserT, DestinationT, OutputT> withWindowedWrites() 
public TypedWrite<UserT, DestinationT, OutputT> withCodec(CodecFactory codec) 
public TypedWrite<UserT, DestinationT, OutputT> withMetadata(Map<String, Object> metadata) 
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) 
public void populateDisplayData(DisplayData.Builder builder) 

All non-primitive arg types covered above.

TextIO

Transforms: Read, ReadAll, ReadFiles, Write, TypedWrite<T, void>

Constructors:

Read read()
ReadAll readAll()
ReadFiles readFiles()
Write write()
TypedWrite<UserT, Void> writeCustomType()

Non-primitive argument types across all Transform methods:

  • org.apache.beam.sdk.io.Compression: enum
  • TextIO.CompressionType: on a deprecated method, may not need to cover. replace by above.
  • org.apache.beam.sdk.io.DefaultFilenamePolicy.Params
  • org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
  • byte[]: probably don't need to cover?
  • PCollection<FileIO.ReadableFile>: a few methods take PCollections
  • @Nullable String: not sure if it matters that String arg type has Nullable annotation

BigQueryIO

Transforms: Read (seems deprecated), TypedRead<T>, Write<T>

Constructors:

Read read()
TypedRead<TableRow> readTableRows()
TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> parseFn)
ReadFiles readFiles()
Write write()
TypedWrite<UserT, Void> writeCustomType()

Non-primitive argument types across all Transform methods:

  • org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices
  • org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy
  • com.google.api.services.bigquery.model.TableSchema
  • com.google.api.services.bigquery.model.TableReference
  • com.google.api.services.bigquery.model.TimePartitioning
  • org.apache.beam.sdk.coders.CoderRegistry
  • org.apache.beam.sdk.coders.Coder<T>
  • org.apache.beam.sdk.options.PipelineOptions
  • org.apache.beam.sdk.values.PCollectionView<Map<String, String>>
  • BigQueryIO.Write.CreateDisposition: public enum
  • BigQueryIO.Write.WriteDisposition: public enum
  • BigQueryIO.Write.Method: public enum
  • BigQueryIO.TypedRead<T>
@nevillelyh

This comment has been minimized.

Show comment
Hide comment
@nevillelyh

nevillelyh Mar 9, 2018

Member

So we can start by wrapping non-primitive argument types, especially ones with natural scala counterparts. We can do it with either implicit conversion, or Rich* classes with conversion methods as*.

  • SerializableFunction
  • ValueProvider
  • ResourceId
  • FilenamePolicy
  • ...
Member

nevillelyh commented Mar 9, 2018

So we can start by wrapping non-primitive argument types, especially ones with natural scala counterparts. We can do it with either implicit conversion, or Rich* classes with conversion methods as*.

  • SerializableFunction
  • ValueProvider
  • ResourceId
  • FilenamePolicy
  • ...
@jpvelez

This comment has been minimized.

Show comment
Hide comment
@jpvelez

jpvelez Apr 5, 2018

Contributor

@nevillelyh gonna start working on SerializableFunction. Any others?

Contributor

jpvelez commented Apr 5, 2018

@nevillelyh gonna start working on SerializableFunction. Any others?

@nevillelyh

This comment has been minimized.

Show comment
Hide comment
@nevillelyh

nevillelyh Apr 5, 2018

Member

Can't think of any at the moment, but take a look at some other Beam IOs?

Member

nevillelyh commented Apr 5, 2018

Can't think of any at the moment, but take a look at some other Beam IOs?

@jbx

This comment has been minimized.

Show comment
Hide comment
@jbx

jbx Jul 17, 2018

Member

Can't think of more IOs, closing for now. THANKS @jpvelez

Member

jbx commented Jul 17, 2018

Can't think of more IOs, closing for now. THANKS @jpvelez

@jbx jbx closed this Jul 17, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment