Skip to content

Commit

Permalink
Merge pull request nathanmarz#5 from sorenmacbeth/develop
Browse files Browse the repository at this point in the history
Hack around fs.default.name issue w/ Consolidator.

When consolidating files in a filesystem other than the one running the Hadoop job, dfs-datastores was overriding the `fs.default.name` property in the JobConf. This caused the entire job to choke.
  • Loading branch information
sritchie committed Jan 21, 2012
2 parents 0d3c9fd + 44230ac commit 38e5a62
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject backtype/dfs-datastores "1.1.0"
(defproject backtype/dfs-datastores "1.1.1-SNAPSHOT"
:source-path "src/clj"
:test-path "test/clj"
:java-source-path "src/jvm"
Expand Down
21 changes: 12 additions & 9 deletions src/jvm/backtype/hadoop/Consolidator.java
Expand Up @@ -28,15 +28,17 @@ public class Consolidator {
private static RunningJob job = null;

public static class ConsolidatorArgs implements Serializable {
public String fsUri;
public RecordStreamFactory streams;
public PathLister pathLister;
public List<String> dirs;
public long targetSizeBytes;
public String extension;


public ConsolidatorArgs(RecordStreamFactory streams, PathLister pathLister, List<String> dirs,
long targetSizeBytes, String extension) {
public ConsolidatorArgs(String fsUri, RecordStreamFactory streams, PathLister pathLister,
List<String> dirs, long targetSizeBytes, String extension) {
this.fsUri = fsUri;
this.streams = streams;
this.pathLister = pathLister;
this.dirs = dirs;
Expand Down Expand Up @@ -81,8 +83,8 @@ private static String getDirsString(List<String> targetDirs) {
public static void consolidate(FileSystem fs, RecordStreamFactory streams, PathLister lister, List<String> dirs,
long targetSizeBytes, String extension) throws IOException {
JobConf conf = new JobConf(Consolidator.class);
conf.set("fs.default.name", fs.getUri().toString());
ConsolidatorArgs args = new ConsolidatorArgs(streams, lister, dirs, targetSizeBytes, extension);
String fsUri = fs.getUri().toString();
ConsolidatorArgs args = new ConsolidatorArgs(fsUri, streams, lister, dirs, targetSizeBytes, extension);
Utils.setObject(conf, ARGS, args);

conf.setJobName("Consolidator: " + getDirsString(dirs));
Expand Down Expand Up @@ -146,8 +148,9 @@ public static class ConsolidatorMapper extends MapReduceBase implements Mapper<A
ConsolidatorArgs args;

public void map(ArrayWritable sourcesArr, Text target, OutputCollector<NullWritable, NullWritable> oc, Reporter rprtr) throws IOException {

Path finalFile = new Path(target.toString());

List<Path> sources = new ArrayList<Path>();
for(int i=0; i<sourcesArr.get().length; i++) {
sources.add(new Path(((Text)sourcesArr.get()[i]).toString()));
Expand Down Expand Up @@ -199,10 +202,10 @@ public void map(ArrayWritable sourcesArr, Text target, OutputCollector<NullWrita

@Override
public void configure(JobConf job) {
args = (ConsolidatorArgs) Utils.getObject(job, ARGS);
try {
fs = FileSystem.get(job);
args = (ConsolidatorArgs) Utils.getObject(job, ARGS);
} catch (IOException e) {
fs = Utils.getFS(args.fsUri);
} catch(IOException e) {
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -346,12 +349,12 @@ public int compare(InputSplit o1, InputSplit o2) {
}

public InputSplit[] getSplits(JobConf conf, int ignored) throws IOException {
FileSystem fs = FileSystem.get(conf);
ConsolidatorArgs args = (ConsolidatorArgs) Utils.getObject(conf, ARGS);
PathLister lister = args.pathLister;
List<String> dirs = args.dirs;
List<InputSplit> ret = new ArrayList<InputSplit>();
for(String dir: dirs) {
FileSystem fs = Utils.getFS(dir);
ret.addAll(createSplits(fs, lister.getFiles(fs,dir),
dir, args.targetSizeBytes, args.extension));
}
Expand Down

0 comments on commit 38e5a62

Please sign in to comment.