Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #1189, support Parquet compression #1318

Merged
merged 1 commit into from Aug 21, 2018
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -28,26 +28,30 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import javax.annotation.Nullable;

public class ParquetAvroSink<T> extends HadoopFileBasedSink<T, Void, T> {

private final String schemaString;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;

public ParquetAvroSink(ValueProvider<ResourceId> baseOutputFileName,
DynamicDestinations<T, Void, T> dynamicDestinations,
Schema schema,
Configuration conf) {
Configuration conf,
CompressionCodecName compression) {
super(baseOutputFileName, dynamicDestinations);
schemaString = schema.toString();
this.schemaString = schema.toString();
this.conf = new SerializableConfiguration(conf);
this.compression = compression;
}

@Override
public HadoopFileBasedSink.WriteOperation<Void, T> createWriteOperation() {
return new ParquetAvroWriteOperation<T>(this, schemaString, conf);
return new ParquetAvroWriteOperation<T>(this, schemaString, conf, compression);
}

@Override
@@ -75,18 +79,21 @@ public String getSuggestedFilenameSuffix() {

private final String schemaString;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;

public ParquetAvroWriteOperation(HadoopFileBasedSink<T, Void, T> sink,
String schemaString,
SerializableConfiguration conf) {
SerializableConfiguration conf,
CompressionCodecName compression) {
super(sink);
this.schemaString = schemaString;
this.conf = conf;
this.compression = compression;
}

@Override
public Writer<Void, T> createWriter() throws Exception {
return new ParquetAvroWriter<>(this, new Schema.Parser().parse(schemaString), conf);
return new ParquetAvroWriter<>(this, new Schema.Parser().parse(schemaString), conf, compression);
}
}

@@ -98,21 +105,25 @@ public ParquetAvroWriteOperation(HadoopFileBasedSink<T, Void, T> sink,

private final Schema schema;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private ParquetWriter<T> writer;

public ParquetAvroWriter(WriteOperation<Void, T> writeOperation,
Schema schema,
SerializableConfiguration conf) {
SerializableConfiguration conf,
CompressionCodecName compression) {
super(writeOperation);
this.schema = schema;
this.conf = conf;
this.compression = compression;
}

@Override
protected void prepareWrite(Path path) throws Exception {
writer = org.apache.parquet.avro.AvroParquetWriter.<T>builder(path)
.withSchema(schema)
.withConf(conf.get())
.withCompressionCodec(compression)
.build();
}

@@ -18,20 +18,18 @@
package com.spotify.scio.parquet.avro.nio

import com.spotify.scio.parquet.avro._

import com.spotify.scio.ScioContext
import com.spotify.scio.values.SCollection
import com.spotify.scio.io.Tap
import com.spotify.scio.nio.ScioIO
import com.spotify.scio.util.ScioUtil

import org.apache.hadoop.mapreduce.Job
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import org.apache.avro.reflect.ReflectData

import org.apache.beam.sdk.io._
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import scala.concurrent.Future

@@ -51,7 +49,7 @@ final case class ParquetAvroIO[T](path: String)
throw new IllegalStateException("Can't create a Tap for parquet avro file")

def write(sc: SCollection[T], params: WriteP): Future[Tap[T]] = params match {
case ParquetAvroIO.Parameters(numShards, schema, suffix) =>
case ParquetAvroIO.Parameters(numShards, schema, suffix, compression) =>
val job = Job.getInstance()
if (ScioUtil.isLocalRunner(sc.context.options.getRunner)) {
GcsConnectorUtil.setCredentials(job)
@@ -66,9 +64,10 @@ final case class ParquetAvroIO[T](path: String)
val resource = FileBasedSink.convertToFileResourceIfPossible(sc.pathWithShards(path))
val prefix = StaticValueProvider.of(resource)
val usedFilenamePolicy = DefaultFilenamePolicy.fromStandardParameters(
prefix, null, ".parquet", false)
prefix, null, "", false)
val destinations = DynamicFileDestinations.constant[T](usedFilenamePolicy)
val sink = new ParquetAvroSink[T](prefix, destinations, writerSchema, job.getConfiguration)
val sink = new ParquetAvroSink[T](
prefix, destinations, writerSchema, job.getConfiguration, compression)
val t = HadoopWriteFiles.to(sink).withNumShards(numShards)
sc.applyInternal(t)
Future.failed(new NotImplementedError("Parquet Avro future not implemented"))
@@ -79,6 +78,7 @@ object ParquetAvroIO {
sealed trait WriteParam
final case class Parameters(numShards: Int = 0,
schema: Schema = null,
suffix: String = "")
suffix: String = "",
compression: CompressionCodecName = CompressionCodecName.SNAPPY)
extends WriteParam
}
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.parquet.avro.AvroParquetInputFormat
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.slf4j.LoggerFactory

import scala.concurrent.Future
@@ -186,8 +187,10 @@ package object avro {
def saveAsParquetAvroFile(path: String,
numShards: Int = 0,
schema: Schema = null,
suffix: String = ""): Future[Tap[T]] = {
val params = ParquetAvroIO.Parameters(numShards, schema, suffix)
suffix: String = "",
compression: CompressionCodecName = CompressionCodecName.SNAPPY)
: Future[Tap[T]] = {
val params = ParquetAvroIO.Parameters(numShards, schema, suffix, compression)
self.write(ParquetAvroIO[T](path))(params)
}
}
@@ -20,7 +20,6 @@ package com.spotify.scio.parquet.avro
import com.spotify.scio._
import com.spotify.scio.avro._
import com.spotify.scio.io.TapSpec
import com.spotify.scio.testing._
import org.apache.avro.generic.GenericRecord
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
@@ -120,14 +119,15 @@ class ParquetAvroTest extends TapSpec with BeforeAndAfterAll {
}

it should "write generic records" in {
val dir = tmpDir

val genericRecords = (1 to 100).map(AvroUtils.newGenericRecord)
val sc = ScioContext()
val tmp = tmpDir
sc.parallelize(genericRecords)
.saveAsParquetAvroFile(tmp.toString, numShards = 1, schema = AvroUtils.schema)
.saveAsParquetAvroFile(dir.toString, numShards = 1, schema = AvroUtils.schema)
sc.close()

val files = tmp.listFiles()
val files = dir.listFiles()
files.length shouldBe 1

val reader = AvroParquetReader.builder[GenericRecord](new Path(files(0).toString)).build()
@@ -137,10 +137,10 @@ class ParquetAvroTest extends TapSpec with BeforeAndAfterAll {
b += r
r = reader.read()
}
reader.close()
b.result() should contain theSameElementsAs genericRecords

reader.close()
FileUtils.deleteDirectory(tmp)
FileUtils.deleteDirectory(dir)
}

it should "work with JobTest" in {
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.