From 330c8cbd6ba60c4322143cda0729449d0a40a161 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Tue, 27 Mar 2012 23:44:43 -0700 Subject: [PATCH] remove FileSplit restriction. --- .../input/DeprecatedInputFormatWrapper.java | 93 +++++++++++++------ 1 file changed, 63 insertions(+), 30 deletions(-) diff --git a/src/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java b/src/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java index 9940876c9..7130994b7 100644 --- a/src/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java +++ b/src/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java @@ -1,9 +1,12 @@ package com.twitter.elephantbird.mapred.input; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.List; -import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -23,12 +26,11 @@ * interface is required.

* * Current restrictions on InputFormat: * - * While these restrictions are satisfied by most input formats, - * they could be removed with a couple more configuration options. + * While this restriction is satisfied by most input formats, + * it could be removed with a configuration option. *

* * Usage:

@@ -84,7 +86,6 @@ public RecordReader getRecordReader(InputSplit split, JobConf job,
 
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    // currently only FileSplit conversion is supported.
     initInputFormat(job);
 
     try {
@@ -95,24 +96,10 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
         return null;
       }
 
-      FileSplit[] resultSplits = new FileSplit[splits.size()];
+      InputSplit[] resultSplits = new InputSplit[splits.size()];
       int i = 0;
       for (org.apache.hadoop.mapreduce.InputSplit split : splits) {
-
-        // assert that this is a FileSplit. Later we could let user supply
-        // a converter from mapreuduce.split to mapred.split
-        if (split.getClass() != org.apache.hadoop.mapreduce.lib.input.FileSplit.class) {
-          throw new IOException("only FileSplit is supported in this wrapper. "
-                                + "but got " + split.getClass());
-        }
-
-        org.apache.hadoop.mapreduce.lib.input.FileSplit fsplit =
-          (org.apache.hadoop.mapreduce.lib.input.FileSplit)split;
-
-        resultSplits[i++] = new FileSplit(fsplit.getPath(),
-                                          fsplit.getStart(),
-                                          fsplit.getLength(),
-                                          job);
+        resultSplits[i++] = new InputSplitWrapper(split);
       }
 
       return resultSplits;
@@ -139,16 +126,10 @@ public RecordReaderWrapper(InputFormat newInputFormat,
                                InputSplit oldSplit,
                                JobConf oldJobConf) throws IOException {
 
-      // create newFileSplit from old FileSplit.
-      FileSplit ofs = (FileSplit) oldSplit; // FileSplit is enforced in getSplits().
-      org.apache.hadoop.mapreduce.lib.input.FileSplit split =
-        new org.apache.hadoop.mapreduce.lib.input.FileSplit(
-                                              ofs.getPath(),
-                                              ofs.getStart(),
-                                              ofs.getLength(),
-                                              ofs.getLocations());
+      splitLen = oldSplit.getLength();
 
-      splitLen = split.getLength();
+      org.apache.hadoop.mapreduce.InputSplit split =
+        ((InputSplitWrapper)oldSplit).realSplit;
 
       TaskAttemptContext taskContext =
         new TaskAttemptContext(oldJobConf, TaskAttemptID.forName(oldJobConf.get("mapred.task.id")));
@@ -237,4 +218,56 @@ public boolean next(K key, V value) throws IOException {
     }
   }
 
+  private static class InputSplitWrapper implements InputSplit {
+
+    org.apache.hadoop.mapreduce.InputSplit realSplit;
+
+
+    @SuppressWarnings("unused") // MapReduce instantiates this.
+    public InputSplitWrapper() {}
+
+    public InputSplitWrapper(org.apache.hadoop.mapreduce.InputSplit realSplit) {
+      this.realSplit = realSplit;
+    }
+
+    @Override
+    public long getLength() throws IOException {
+      try {
+        return realSplit.getLength();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+      try {
+        return realSplit.getLocations();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      String className = WritableUtils.readString(in);
+      Class splitClass;
+
+      try {
+        splitClass = Class.forName(className);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+
+      realSplit = (org.apache.hadoop.mapreduce.InputSplit)
+                  ReflectionUtils.newInstance(splitClass, null);
+      ((Writable)realSplit).readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeString(out, realSplit.getClass().getName());
+      ((Writable)realSplit).write(out);
+    }
+  }
 }