Skip to content

Commit

Permalink
Merge pull request #2 from sorenmacbeth/develop
Browse files Browse the repository at this point in the history
Pass a fully qualified path into OutputFormatArgs
  • Loading branch information
sorenmacbeth committed Aug 22, 2012
2 parents f119aa7 + cdca707 commit 988f99f
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
:repositories {"conjars" "http://conjars.org/repo"}
:dependencies [[elephantdb "0.2.0-wip3"]
[org.slf4j/slf4j-api "1.6.1"]
[cascading/cascading-hadoop "2.0.0-wip-281"
[cascading/cascading-hadoop "2.0.0"
:exclusions [org.codehaus.janino/janino
org.apache.hadoop/hadoop-core]]]
:dev-dependencies [[org.apache.hadoop/hadoop-core "0.20.2-dev"]
Expand Down
4 changes: 4 additions & 0 deletions src/jvm/elephantdb/cascading/ElephantDBTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import elephantdb.index.Indexer;
import elephantdb.store.DomainStore;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -106,9 +107,12 @@ public DomainSpec getSpec() {

public ElephantOutputFormat.Args outputArgs(JobConf conf) throws IOException {
DomainStore dstore = getDomainStore();
FileSystem fs = dstore.getFileSystem();

if (newVersionPath == null) { //working around cascading calling sinkinit twice
newVersionPath = dstore.createVersion();
// make the path qualified before serializing into the jobconf
newVersionPath = new Path(newVersionPath).makeQualified(fs).toString();
}
ElephantOutputFormat.Args eargs = new ElephantOutputFormat.Args(spec, newVersionPath);

Expand Down
6 changes: 3 additions & 3 deletions src/jvm/elephantdb/cascading/ElephantScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.io.IOException;

public class ElephantScheme extends Scheme<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector, Object[], Object[]> {
public class ElephantScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
Serializer serializer;
Gateway gateway;

Expand All @@ -38,12 +38,12 @@ public Serializer getSerializer() {

@Override
public void sourceConfInit(FlowProcess<JobConf> flowProcess,
Tap<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setInputFormat(ElephantInputFormat.class);
}

@Override public void sinkConfInit(FlowProcess<JobConf> flowProcess,
Tap<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setOutputKeyClass(IntWritable.class); // be explicit
conf.setOutputValueClass( BytesWritable.class ); // be explicit
conf.setOutputFormat(ElephantOutputFormat.class);
Expand Down

0 comments on commit 988f99f

Please sign in to comment.