diff --git a/src/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java b/src/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java index 9940876c9..7130994b7 100644 --- a/src/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java +++ b/src/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java @@ -1,9 +1,12 @@ package com.twitter.elephantbird.mapred.input; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.List; -import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -23,12 +26,11 @@ * interface is required.
* * Current restrictions on InputFormat:* * Usage:
@@ -84,7 +86,6 @@ public RecordReadergetRecordReader(InputSplit split, JobConf job, @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - // currently only FileSplit conversion is supported. initInputFormat(job); try { @@ -95,24 +96,10 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { return null; } - FileSplit[] resultSplits = new FileSplit[splits.size()]; + InputSplit[] resultSplits = new InputSplit[splits.size()]; int i = 0; for (org.apache.hadoop.mapreduce.InputSplit split : splits) { - - // assert that this is a FileSplit. Later we could let user supply - // a converter from mapreuduce.split to mapred.split - if (split.getClass() != org.apache.hadoop.mapreduce.lib.input.FileSplit.class) { - throw new IOException("only FileSplit is supported in this wrapper. " - + "but got " + split.getClass()); - } - - org.apache.hadoop.mapreduce.lib.input.FileSplit fsplit = - (org.apache.hadoop.mapreduce.lib.input.FileSplit)split; - - resultSplits[i++] = new FileSplit(fsplit.getPath(), - fsplit.getStart(), - fsplit.getLength(), - job); + resultSplits[i++] = new InputSplitWrapper(split); } return resultSplits; @@ -139,16 +126,10 @@ public RecordReaderWrapper(InputFormat newInputFormat, InputSplit oldSplit, JobConf oldJobConf) throws IOException { - // create newFileSplit from old FileSplit. - FileSplit ofs = (FileSplit) oldSplit; // FileSplit is enforced in getSplits(). - org.apache.hadoop.mapreduce.lib.input.FileSplit split = - new org.apache.hadoop.mapreduce.lib.input.FileSplit( - ofs.getPath(), - ofs.getStart(), - ofs.getLength(), - ofs.getLocations()); + splitLen = oldSplit.getLength(); - splitLen = split.getLength(); + org.apache.hadoop.mapreduce.InputSplit split = + ((InputSplitWrapper)oldSplit).realSplit; TaskAttemptContext taskContext = new TaskAttemptContext(oldJobConf, TaskAttemptID.forName(oldJobConf.get("mapred.task.id"))); @@ -237,4 +218,56 @@ public boolean next(K key, V value) throws IOException { } } + private static class InputSplitWrapper implements InputSplit { + + org.apache.hadoop.mapreduce.InputSplit realSplit; + + + @SuppressWarnings("unused") // MapReduce instantiates this. + public InputSplitWrapper() {} + + public InputSplitWrapper(org.apache.hadoop.mapreduce.InputSplit realSplit) { + this.realSplit = realSplit; + } + + @Override + public long getLength() throws IOException { + try { + return realSplit.getLength(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public String[] getLocations() throws IOException { + try { + return realSplit.getLocations(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + String className = WritableUtils.readString(in); + Class> splitClass; + + try { + splitClass = Class.forName(className); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + + realSplit = (org.apache.hadoop.mapreduce.InputSplit) + ReflectionUtils.newInstance(splitClass, null); + ((Writable)realSplit).readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeString(out, realSplit.getClass().getName()); + ((Writable)realSplit).write(out); + } + } }