Skip to content

Commit

Permalink
KITE-1025 Use CombineFileInputFormat
Browse files Browse the repository at this point in the history
Wrap Avro and Parquet inputs with CombineFileInputFormat so that
multiple small files don't result in multiple input splits and
MapReduce tasks.
  • Loading branch information
Gabriel Reid authored and rdblue committed Aug 16, 2015
1 parent 6ebb8ca commit e92bf71
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kitesdk.data.spi.filesystem;

import java.io.IOException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
* Base class for wrapping file-based record readers with CombineFileInputFormat functionality. This allows multiple
* files to be combined into a single InputSplit, with the main pre-requisite being that the InputFormat being wrapped
* must make use of FileInputSplits.
* <p/>
* Sub-classes need only implement the method to instantiate an instance of their owning InputFormat class.
*/
abstract class AbstractCombineFileRecordReader<K, V> extends RecordReader<K, V> {

private int idx;
private RecordReader<K, V> delegate;

public AbstractCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) {
this.idx = idx;
}

/**
* Single extension point. Returns an instance of the underlying InputFormat being used.
*
* @return an instance of the underlying FileInputFormat
*/
abstract FileInputFormat<K, V> getInputFormat();

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
if (delegate != null) {
delegate.close();
}
CombineFileSplit combineSplit = (CombineFileSplit) split;
FileSplit fileSplit = new FileSplit(combineSplit.getPath(idx), combineSplit.getOffset(idx),
combineSplit.getLength(idx), combineSplit.getLocations());
delegate = getInputFormat().createRecordReader(fileSplit, context);
delegate.initialize(fileSplit, context);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return delegate.nextKeyValue();
}

@Override
public K getCurrentKey() throws IOException, InterruptedException {
return delegate.getCurrentKey();
}

@Override
public V getCurrentValue() throws IOException, InterruptedException {
return delegate.getCurrentValue();
}

@Override
public float getProgress() throws IOException, InterruptedException {
return delegate.getProgress();
}

@Override
public void close() throws IOException {
if (delegate != null) {
delegate.close();
delegate = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kitesdk.data.spi.filesystem;

import com.google.common.collect.Lists;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

/**
* A Kite-specific subclass of CombineFileInputFormat and CombineFileRecordReader to work around the fact that
* Crunch does special handling of CombineFileSplits internally.
* <p/>
* The classes in this file don't add any additional functionality to CombineFileInputFormat or CombineFileRecordReader,
* they just ensure that KiteCombineFileSplits are being used (instead of CombineFileSplit) to avoid CrunchInputFormat
* from interpreting them as coming from Crunch.
*/
abstract class AbstractKiteCombineFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {

/**
* A wrapper around CombineFileSplit which simply allows us to avoid using CombineFileSplit directly, as Crunch
* has special handling for CombineFileSplit.
*/
static class KiteCombineFileSplit extends InputSplit implements Writable {

private CombineFileSplit delegate;

public KiteCombineFileSplit() {
// Needed for Writable interface
}

public KiteCombineFileSplit(CombineFileSplit delegate) {
this.delegate = delegate;
}


public void readFields(DataInput in) throws IOException {
delegate = new CombineFileSplit();
delegate.readFields(in);
}

public void write(DataOutput out) throws IOException {
delegate.write(out);
}

@Override
public long getLength() throws IOException, InterruptedException {
return delegate.getLength();
}

@Override
public String[] getLocations() throws IOException, InterruptedException {
return delegate.getLocations();
}

CombineFileSplit getCombineFileSplit() {
return delegate;
}
}

abstract Class<? extends AbstractCombineFileRecordReader> getRecordReaderClass();

private static class KiteCombineFileRecordReader extends CombineFileRecordReader {

@SuppressWarnings("unchecked")
public KiteCombineFileRecordReader(KiteCombineFileSplit kiteCombineSplit, TaskAttemptContext context, Class rrClass) throws IOException {
super(kiteCombineSplit.getCombineFileSplit(), context, rrClass);
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
KiteCombineFileSplit kiteCombineFileSplit = (KiteCombineFileSplit) split;
super.initialize(kiteCombineFileSplit.getCombineFileSplit(), context);
}
}

@SuppressWarnings("unchecked")
@Override
public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskContext) throws IOException {
return new KiteCombineFileRecordReader((KiteCombineFileSplit) inputSplit, taskContext, getRecordReaderClass());
}

@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> kiteCombineSplits = Lists.newArrayList();
for (InputSplit inputSplit : super.getSplits(job)) {
kiteCombineSplits.add(new KiteCombineFileSplit((CombineFileSplit) inputSplit));
}
return kiteCombineSplits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.kitesdk.compat.DynMethods;
import org.kitesdk.compat.Hadoop;
Expand Down Expand Up @@ -114,10 +117,10 @@ public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
if (setInputPaths(jobContext, job)) {
if (Formats.AVRO.equals(format)) {
AvroJob.setInputKeySchema(job, dataset.getDescriptor().getSchema());
AvroKeyInputFormat<E> delegate = new AvroKeyInputFormat<E>();
AvroCombineInputFormat<E> delegate = new AvroCombineInputFormat<E>();
return delegate.getSplits(jobContext);
} else if (Formats.PARQUET.equals(format)) {
AvroParquetInputFormat delegate = new AvroParquetInputFormat();
AvroParquetCombineInputFormat delegate = new AvroParquetCombineInputFormat();
return delegate.getSplits(jobContext);
} else if (Formats.JSON.equals(format)) {
return new JSONInputFormat().getSplits(jobContext);
Expand Down Expand Up @@ -172,10 +175,10 @@ private RecordReader<E, Void> createUnfilteredRecordReader(InputSplit inputSplit
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Format format = dataset.getDescriptor().getFormat();
if (Formats.AVRO.equals(format)) {
return new AvroKeyReaderWrapper(new AvroKeyInputFormat<E>());
return new AvroKeyReaderWrapper(new AvroCombineInputFormat<E>());

} else if (Formats.PARQUET.equals(format)) {
return new ValueReaderWrapper(new AvroParquetInputFormat());
return new ValueReaderWrapper(new AvroParquetCombineInputFormat());

} else if (Formats.JSON.equals(format)) {
JSONInputFormat<E> delegate = new JSONInputFormat<E>();
Expand All @@ -198,7 +201,7 @@ private RecordReader<E, Void> createUnfilteredRecordReader(InputSplit inputSplit

private static class AvroKeyReaderWrapper<E> extends
AbstractKeyRecordReaderWrapper<E, AvroKey<E>, NullWritable> {
public AvroKeyReaderWrapper(AvroKeyInputFormat<E> inputFormat) {
public AvroKeyReaderWrapper(AvroCombineInputFormat<E> inputFormat) {
super(inputFormat);
}

Expand All @@ -208,4 +211,48 @@ public E getCurrentKey() throws IOException, InterruptedException {
}
}

private static class AvroCombineFileRecordReader<E> extends AbstractCombineFileRecordReader<AvroKey<E>, NullWritable> {

public AvroCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) {
super(split, context, idx);
}

@Override
FileInputFormat<AvroKey<E>, NullWritable> getInputFormat() {
return new AvroKeyInputFormat<E>();
}
}

/**
* Combines multiple small Avro files into a single input split.
*/
private static class AvroCombineInputFormat<E> extends AbstractKiteCombineFileInputFormat<AvroKey<E>, NullWritable> {

@Override
Class<? extends AbstractCombineFileRecordReader> getRecordReaderClass() {
return AvroCombineFileRecordReader.class;
}
}

private static class AvroParquetCombineFileRecordReader<E> extends AbstractCombineFileRecordReader<Void, E> {
public AvroParquetCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) {
super(split, context, idx);
}

@Override
FileInputFormat<Void, E> getInputFormat() {
return new AvroParquetInputFormat<E>();
}
}

/**
* Combines multiple small Parquet files into a single input split.
*/
private static class AvroParquetCombineInputFormat<E> extends AbstractKiteCombineFileInputFormat<Void, E> {

@Override
Class<? extends AbstractCombineFileRecordReader> getRecordReaderClass() {
return AvroParquetCombineFileRecordReader.class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.crunch.Source;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.impl.mr.run.CrunchMapper;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.CrunchInputs;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.ReadableSourceTarget;
Expand All @@ -37,15 +36,11 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import org.kitesdk.data.View;
import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
import org.kitesdk.data.spi.LastModifiedAccessor;
import org.kitesdk.data.spi.SizeAccessor;
import org.kitesdk.data.spi.filesystem.FileSystemDataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,14 +75,6 @@ public DatasetSourceTarget(View<E> view, AvroType<E> avroType) {
Configuration temp = new Configuration(false /* use an empty conf */ );
DatasetKeyInputFormat.configure(temp).readFrom(view);
this.formatBundle = inputBundle(temp);

Dataset<E> dataset = view.getDataset();

// Disable CombineFileInputFormat in Crunch unless we're dealing with Avro or Parquet files
Format format = dataset.getDescriptor().getFormat();
boolean isAvroOrParquetFile = (dataset instanceof FileSystemDataset)
&& (Formats.AVRO.equals(format) || Formats.PARQUET.equals(format));
formatBundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.toString(!isAvroOrParquetFile));
}

public DatasetSourceTarget(URI uri, AvroType<E> avroType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void testTargetViewProvidedPartition() throws IOException {
Assert.assertEquals(1, datasetSize(outputDataset));
}


@Test
public void testViewUris() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
Expand All @@ -317,7 +317,7 @@ public void testViewUris() throws IOException {
View<Record> inputView = Datasets.<Record, Dataset<Record>> load(sourceViewUri,
Record.class);
Assert.assertEquals(1, datasetSize(inputView));

Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> data = pipeline.read(CrunchDatasets
.asSource(sourceViewUri, GenericData.Record.class));
Expand All @@ -329,7 +329,7 @@ public void testViewUris() throws IOException {

Assert.assertEquals(1, datasetSize(outputDataset));
}

@Test
public void testDatasetUris() throws IOException {
PartitionStrategy partitionStrategy = new PartitionStrategy.Builder().hash(
Expand Down Expand Up @@ -482,7 +482,7 @@ public NewUserRecord map(NewUserRecord input) {
}

@Test
public void testUseReaderSchema() throws IOException {
public void testUseReaderSchema() throws IOException {

// Create a schema with only a username, so we can test reading it
// with an enhanced record structure.
Expand Down Expand Up @@ -630,4 +630,28 @@ private void runCheckpointPipeline(View<Record> inputView,
Target.WriteMode.CHECKPOINT);
pipeline.done();
}

@Test
public void testMultipleFileReadingFromCrunch() throws IOException {
Dataset<Record> inputDatasetA = repo.create("ns", "inA", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
Dataset<Record> inputDatasetB = repo.create("ns", "inB", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());
Dataset<Record> outputDataset = repo.create("ns", "out", new DatasetDescriptor.Builder()
.schema(USER_SCHEMA).build());

// write two files, each of 5 records
writeTestUsers(inputDatasetA, 5, 0);
writeTestUsers(inputDatasetB, 5, 5);

Pipeline pipeline = new MRPipeline(TestCrunchDatasets.class);
PCollection<GenericData.Record> dataA = pipeline.read(
CrunchDatasets.asSource(inputDatasetA));
PCollection<GenericData.Record> dataB = pipeline.read(
CrunchDatasets.asSource(inputDatasetB));
pipeline.write(dataA.union(dataB), CrunchDatasets.asTarget(outputDataset), Target.WriteMode.APPEND);
pipeline.run();

checkTestUsers(outputDataset, 10);
}
}

0 comments on commit e92bf71

Please sign in to comment.