Skip to content

Commit

Permalink
remove FileSplit restriction.
Browse files Browse the repository at this point in the history
  • Loading branch information
Raghu Angadi committed Mar 28, 2012
1 parent b1d48f4 commit 330c8cb
Showing 1 changed file with 63 additions and 30 deletions.
@@ -1,9 +1,12 @@
package com.twitter.elephantbird.mapred.input;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;

import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
Expand All @@ -23,12 +26,11 @@
* interface is required. </p>
*
* Current restrictions on InputFormat: <ul>
* <li> input split should be a FileSplit
* <li> the record reader should reuse key and value objects
* </ul>
*
* While these restrictions are satisfied by most input formats,
* they could be removed with a couple more configuration options.
* While this restriction is satisfied by most input formats,
* it could be removed with a configuration option.
* <p>
*
* Usage: <pre>
Expand Down Expand Up @@ -84,7 +86,6 @@ public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,

@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// currently only FileSplit conversion is supported.
initInputFormat(job);

try {
Expand All @@ -95,24 +96,10 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
return null;
}

FileSplit[] resultSplits = new FileSplit[splits.size()];
InputSplit[] resultSplits = new InputSplit[splits.size()];
int i = 0;
for (org.apache.hadoop.mapreduce.InputSplit split : splits) {

// assert that this is a FileSplit. Later we could let user supply
// a converter from mapreuduce.split to mapred.split
if (split.getClass() != org.apache.hadoop.mapreduce.lib.input.FileSplit.class) {
throw new IOException("only FileSplit is supported in this wrapper. "
+ "but got " + split.getClass());
}

org.apache.hadoop.mapreduce.lib.input.FileSplit fsplit =
(org.apache.hadoop.mapreduce.lib.input.FileSplit)split;

resultSplits[i++] = new FileSplit(fsplit.getPath(),
fsplit.getStart(),
fsplit.getLength(),
job);
resultSplits[i++] = new InputSplitWrapper(split);
}

return resultSplits;
Expand All @@ -139,16 +126,10 @@ public RecordReaderWrapper(InputFormat<K, V> newInputFormat,
InputSplit oldSplit,
JobConf oldJobConf) throws IOException {

// create newFileSplit from old FileSplit.
FileSplit ofs = (FileSplit) oldSplit; // FileSplit is enforced in getSplits().
org.apache.hadoop.mapreduce.lib.input.FileSplit split =
new org.apache.hadoop.mapreduce.lib.input.FileSplit(
ofs.getPath(),
ofs.getStart(),
ofs.getLength(),
ofs.getLocations());
splitLen = oldSplit.getLength();

splitLen = split.getLength();
org.apache.hadoop.mapreduce.InputSplit split =
((InputSplitWrapper)oldSplit).realSplit;

TaskAttemptContext taskContext =
new TaskAttemptContext(oldJobConf, TaskAttemptID.forName(oldJobConf.get("mapred.task.id")));
Expand Down Expand Up @@ -237,4 +218,56 @@ public boolean next(K key, V value) throws IOException {
}
}

private static class InputSplitWrapper implements InputSplit {

org.apache.hadoop.mapreduce.InputSplit realSplit;


@SuppressWarnings("unused") // MapReduce instantiates this.
public InputSplitWrapper() {}

public InputSplitWrapper(org.apache.hadoop.mapreduce.InputSplit realSplit) {
this.realSplit = realSplit;
}

@Override
public long getLength() throws IOException {
try {
return realSplit.getLength();
} catch (InterruptedException e) {
throw new IOException(e);
}
}

@Override
public String[] getLocations() throws IOException {
try {
return realSplit.getLocations();
} catch (InterruptedException e) {
throw new IOException(e);
}
}

@Override
public void readFields(DataInput in) throws IOException {
String className = WritableUtils.readString(in);
Class<?> splitClass;

try {
splitClass = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}

realSplit = (org.apache.hadoop.mapreduce.InputSplit)
ReflectionUtils.newInstance(splitClass, null);
((Writable)realSplit).readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, realSplit.getClass().getName());
((Writable)realSplit).write(out);
}
}
}

0 comments on commit 330c8cb

Please sign in to comment.