Skip to content

Commit

Permalink
CDK-188: Add a Kite format backed by FileInputFormat.
Browse files Browse the repository at this point in the history
This format relies on descriptor properties to configure the
InputFormat class and whether the key, value, or both should be used as
the record type. MR works like the other formats, but delegates to the
underlying InputFormat to instantiate readers, which are then wrapped.
  • Loading branch information
rdblue committed Jan 24, 2015
1 parent 273d985 commit ba25f2a
Show file tree
Hide file tree
Showing 16 changed files with 1,136 additions and 40 deletions.
Expand Up @@ -61,10 +61,18 @@ private Formats() {
new CompressionType[] { Uncompressed });

/**
* Return a {@link Format} for the format name specified. If
* {@code formatName} is not a valid name, an IllegalArgumentException is
* thrown. Currently the formats <q>avro</q>, <q>csv</q>, and <q>parquet</q>
* are supported. Format names are case sensitive.
* INPUTFORMAT: a mapreduce InputFormat (read-only).
*
* @since 0.18.0
*/
public static final Format INPUTFORMAT = new Format("inputformat", Uncompressed,
new CompressionType[] { Uncompressed });

/**
* Return a {@link Format} for the format name specified. If {@code formatName}
* is not a valid name, an IllegalArgumentException is thrown. Currently the
* formats <q>avro</q>, <q>csv</q>, and <q>parquet</q> are supported. Format names are
* case sensitive.
*
* @since 0.9.0
* @return an appropriate instance of Format
Expand All @@ -79,6 +87,8 @@ public static Format fromString(String formatName) {
return JSON;
} else if (formatName.equals("csv")) {
return CSV;
} else if (formatName.equals("inputformat")) {
return INPUTFORMAT;
} else {
throw new IllegalArgumentException("Unknown format type: " + formatName);
}
Expand Down
Expand Up @@ -16,6 +16,7 @@
package org.kitesdk.data.spi;

import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand All @@ -31,15 +32,21 @@
*/
public abstract class AbstractKeyRecordReaderWrapper<E, K, V> extends RecordReader<E, Void> {

protected RecordReader<K, V> delegate;
protected RecordReader<K, V> delegate = null;
private InputFormat<K, V> inputFormat = null;

public AbstractKeyRecordReaderWrapper(RecordReader<K, V> delegate) {
this.delegate = delegate;
public AbstractKeyRecordReaderWrapper(InputFormat<K, V> inputFormat) {
this.inputFormat = inputFormat;
}

@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
// clean up the current wrapped reader, if present
if (delegate != null) {
delegate.close();
}
this.delegate = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
delegate.initialize(inputSplit, taskAttemptContext);
}

Expand All @@ -60,6 +67,9 @@ public float getProgress() throws IOException, InterruptedException {

@Override
public void close() throws IOException {
delegate.close();
if (delegate != null) {
delegate.close();
delegate = null;
}
}
}
Expand Up @@ -101,6 +101,9 @@ public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
} else if (Formats.CSV.equals(format)) {
// this generates an unchecked cast exception?
return new CSVInputFormat().getSplits(jobContext);
} else if (Formats.INPUTFORMAT.equals(format)) {
return InputFormatUtil.newInputFormatInstance(dataset.getDescriptor())
.getSplits(jobContext);
} else {
throw new UnsupportedOperationException(
"Not a supported format: " + format);
Expand Down Expand Up @@ -146,13 +149,9 @@ private RecordReader<E, Void> createUnfilteredRecordReader(InputSplit inputSplit
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Format format = dataset.getDescriptor().getFormat();
if (Formats.AVRO.equals(format)) {
AvroKeyInputFormat<E> delegate = new AvroKeyInputFormat<E>();
return new KeyReaderWrapper(
delegate.createRecordReader(inputSplit, taskAttemptContext));
return new AvroKeyReaderWrapper(new AvroKeyInputFormat<E>());
} else if (Formats.PARQUET.equals(format)) {
AvroParquetInputFormat delegate = new AvroParquetInputFormat();
return new ValueReaderWrapper(
delegate.createRecordReader(inputSplit, taskAttemptContext));
return new ValueReaderWrapper(new AvroParquetInputFormat());
} else if (Formats.JSON.equals(format)) {
JSONInputFormat<E> delegate = new JSONInputFormat<E>();
delegate.setDescriptor(dataset.getDescriptor());
Expand All @@ -163,16 +162,18 @@ private RecordReader<E, Void> createUnfilteredRecordReader(InputSplit inputSplit
delegate.setDescriptor(dataset.getDescriptor());
delegate.setType(dataset.getType());
return delegate.createRecordReader(inputSplit, taskAttemptContext);
} else if (Formats.INPUTFORMAT.equals(format)) {
return InputFormatUtil.newRecordReader(dataset.getDescriptor());
} else {
throw new UnsupportedOperationException(
"Not a supported format: " + format);
}
}

private static class KeyReaderWrapper<E> extends
private static class AvroKeyReaderWrapper<E> extends
AbstractKeyRecordReaderWrapper<E, AvroKey<E>, NullWritable> {
public KeyReaderWrapper(RecordReader<AvroKey<E>, NullWritable> delegate) {
super(delegate);
public AvroKeyReaderWrapper(AvroKeyInputFormat<E> inputFormat) {
super(inputFormat);
}

@Override
Expand All @@ -181,16 +182,4 @@ public E getCurrentKey() throws IOException, InterruptedException {
}
}

private static class ValueReaderWrapper<E> extends
AbstractKeyRecordReaderWrapper<E, Void, E> {
public ValueReaderWrapper(RecordReader<Void, E> delegate) {
super(delegate);
}

@Override
public E getCurrentKey() throws IOException, InterruptedException {
return delegate.getCurrentValue();
}
}

}
@@ -0,0 +1,201 @@
/*
* Copyright 2013 Cloudera Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kitesdk.data.spi.filesystem;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.kitesdk.compat.Hadoop;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetReaderException;
import org.kitesdk.data.spi.AbstractDatasetReader;
import org.kitesdk.data.spi.ReaderWriterState;

public class InputFormatReader<E> extends AbstractDatasetReader<E> {
private static final TaskAttemptID FAKE_ID =
new TaskAttemptID("", 0, false, 0, 0);

private final FileSystem fs;
private final Path path;
private final Configuration conf;
private final DatasetDescriptor descriptor;
private final TaskAttemptContext attemptContext;

// reader state
private ReaderWriterState state = ReaderWriterState.NEW;
private Iterator<InputSplit> splits;
private RecordReader<E, Void> currentReader = null;
private boolean hasNext = false;
private boolean shouldAdvance = false;

public InputFormatReader(FileSystem fs, Path path, DatasetDescriptor descriptor) {
this.fs = fs;
this.path = path;
this.descriptor = descriptor;
this.state = ReaderWriterState.NEW;

// set up the configuration from the descriptor properties
this.conf = new Configuration(fs.getConf());
for (String prop : descriptor.listProperties()) {
conf.set(prop, descriptor.getProperty(prop));
}

this.attemptContext = Hadoop.TaskAttemptContext.ctor.newInstance(conf, FAKE_ID);
}

@Override
public void initialize() {
Preconditions.checkState(ReaderWriterState.NEW.equals(state),
"A reader may not be opened more than once - current state:%s", state);

try {
FileInputFormat format = InputFormatUtil.newInputFormatInstance(descriptor);
Job job = Hadoop.Job.newInstance.invoke(conf);

FileInputFormat.addInputPath(job, path);
// attempt to minimize the number of InputSplits
FileStatus stat = fs.getFileStatus(path);
FileInputFormat.setMaxInputSplitSize(job, stat.getLen());

this.splits = format.getSplits(job).iterator();
this.shouldAdvance = true;
this.state = ReaderWriterState.OPEN;

} catch (RuntimeException e) {
this.state = ReaderWriterState.ERROR;
throw new DatasetReaderException("Cannot calculate splits", e);
} catch (IOException e) {
this.state = ReaderWriterState.ERROR;
throw new DatasetIOException("Cannot calculate splits", e);
}
}

@Override
public boolean hasNext() {
Preconditions.checkState(ReaderWriterState.OPEN.equals(state),
"Attempt to read from a file in state:%s", state);

// the Iterator contract requires that calls to hasNext() not change the
// iterator state. calling next() should advance the iterator. however,
// this wraps a RecordReader that reuses objects, so advancing in next
// after retrieving the key/value pair mutates the pair. this hack is a way
// to advance once per call to next(), but do it as late as possible.
if (shouldAdvance) {
this.hasNext = advance();
this.shouldAdvance = false;
}
return hasNext;
}

@Override
public E next() {
Preconditions.checkState(ReaderWriterState.OPEN.equals(state),
"Attempt to read from a file in state:%s", state);

if (!hasNext()) {
throw new NoSuchElementException();
}

try {
E record = currentReader.getCurrentKey();

this.shouldAdvance = true;

return record;
} catch (RuntimeException e) {
this.state = ReaderWriterState.ERROR;
throw new DatasetReaderException("Cannot get record", e);
} catch (IOException e) {
this.state = ReaderWriterState.ERROR;
throw new DatasetIOException("Cannot get record", e);
} catch (InterruptedException e) {
// don't swallow the interrupt
Thread.currentThread().interrupt();
// error: it is unclear whether the underlying reader is valid
this.state = ReaderWriterState.ERROR;
throw new DatasetReaderException("Interrupted", e);
}
}

private boolean advance() {
try {
if (currentReader != null && currentReader.nextKeyValue()) {
return true;
} else {
if (currentReader == null) {
this.currentReader = InputFormatUtil.newRecordReader(descriptor);
}
while (splits.hasNext()) {
// advance the reader and see if it has records
InputSplit nextSplit = splits.next();
currentReader.initialize(nextSplit, attemptContext);
if (currentReader.nextKeyValue()) {
return true;
}
}
// either no next split or all readers were empty
return false;
}
} catch (RuntimeException e) {
this.state = ReaderWriterState.ERROR;
throw new DatasetReaderException("Cannot advance reader", e);
} catch (IOException e) {
this.state = ReaderWriterState.ERROR;
throw new DatasetIOException("Cannot advance reader", e);
} catch (InterruptedException e) {
// error: it is unclear whether the underlying reader is valid
this.state = ReaderWriterState.ERROR;
throw new DatasetReaderException("Interrupted", e);
}
}

@Override
public void close() {
if (!state.equals(ReaderWriterState.OPEN)) {
return;
}

this.state = ReaderWriterState.CLOSED;

try {
if (currentReader != null) {
currentReader.close();
}
} catch (IOException e) {
throw new DatasetReaderException("Unable to close reader path:" + path, e);
}

this.hasNext = false;
}

@Override
public boolean isOpen() {
return (ReaderWriterState.OPEN == state);
}
}

0 comments on commit ba25f2a

Please sign in to comment.