Skip to content

Commit

Permalink
Merge branch 'feature/cascading2' of github.com:nathanmarz/elephantdb…
Browse files Browse the repository at this point in the history
…-cascading into feature/cascading2
  • Loading branch information
sritchie committed May 2, 2012
2 parents 9499c9f + 33196a4 commit d460b14
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 49 deletions.
8 changes: 4 additions & 4 deletions project.clj
@@ -1,16 +1,16 @@
(defproject elephantdb/elephantdb-cascading "0.3.1"
(defproject elephantdb/elephantdb-cascading "0.3.2-SNAPSHOT"
:source-path "src/clj"
:java-source-path "src/jvm"
:javac-options {:debug "true" :fork "true"}
:repositories {"conjars" "http://conjars.org/repo"}
:dependencies [[elephantdb "0.2.0-wip1"]
:dependencies [[elephantdb "0.2.0-wip2"]
[org.slf4j/slf4j-api "1.6.1"]
[cascading/cascading-hadoop "2.0.0-wip-226"
[cascading/cascading-hadoop "2.0.0-wip-281"
:exclusions [org.codehaus.janino/janino
org.apache.hadoop/hadoop-core]]]
:dev-dependencies [[org.apache.hadoop/hadoop-core "0.20.2-dev"]
[org.clojure/clojure "1.3.0"]
[hadoop-util "0.2.7"]
[hadoop-util "0.2.8"]
[jackknife "0.1.2"]
[org.apache.hadoop/hadoop-core "0.20.2-dev"]
[midje "1.3.0"]])
59 changes: 21 additions & 38 deletions src/jvm/elephantdb/cascading/ElephantDBTap.java
Expand Up @@ -2,7 +2,7 @@

import cascading.flow.Flow;
import cascading.flow.FlowListener;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.FlowProcess;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tap.hadoop.Hfs;
Expand All @@ -27,7 +27,7 @@
import java.util.UUID;


public class ElephantDBTap extends Hfs implements FlowListener {
public class ElephantDBTap extends Hfs {
public static final Logger LOG = Logger.getLogger(ElephantDBTap.class);

public static class Args implements Serializable {
Expand Down Expand Up @@ -69,7 +69,7 @@ public DomainSpec getSpec() {
return spec;
}

@Override public void sourceConfInit(HadoopFlowProcess process, JobConf conf) {
@Override public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
super.sourceConfInit( process, conf );

FileInputFormat.setInputPaths(conf, "/" + UUID.randomUUID().toString());
Expand All @@ -87,7 +87,7 @@ public DomainSpec getSpec() {
Utils.setObject(conf, ElephantInputFormat.ARGS_CONF, eargs);
}

@Override public void sinkConfInit(HadoopFlowProcess process, JobConf conf) {
@Override public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
super.sinkConfInit( process, conf );

ElephantOutputFormat.Args args = null;
Expand Down Expand Up @@ -147,48 +147,31 @@ public long getModifiedTime(JobConf jc) throws IOException {
return System.currentTimeMillis();
}

public void onStarting(Flow flow) {

}

public void onStopping(Flow flow) {

}

private boolean isSinkOf(Flow<JobConf> flow) {
for (Entry<String, Tap> e : flow.getSinks().entrySet()) {
if (e.getValue() == this)
return true;
}
return false;
}

public void onCompleted(Flow flow) {
@Override public boolean commitResource(JobConf conf) {
try {
if (isSinkOf(flow)) {
DomainStore dstore = getDomainStore();
if (flow.getFlowStats().isSuccessful()) {
dstore.getFileSystem().mkdirs(new Path(newVersionPath));

// If the user wants to run a incremental, skip version synchronization.
if (args.incremental) {
dstore.synchronizeInProgressVersion(newVersionPath);
}

dstore.succeedVersion(newVersionPath);
} else {
dstore.failVersion(newVersionPath);
}
DomainStore dstore = getDomainStore();
dstore.getFileSystem().mkdirs(new Path(newVersionPath));

// If the user wants to run a incremental, skip version synchronization.
if (args.incremental) {
dstore.synchronizeInProgressVersion(newVersionPath);
}

dstore.succeedVersion(newVersionPath);

return true;

} catch (IOException e) {
throw new TapException("Couldn't finalize new elephant domain version", e);
} finally {
newVersionPath = null; //working around cascading calling sinkinit twice
}
}

// TODO: Once failResource is implemented, move guts of onCompleted over to commitResource.
@Override public boolean commitResource(JobConf conf) {

@Override public boolean rollbackResource(JobConf conf) throws IOException {
DomainStore dstore = getDomainStore();
dstore.failVersion(newVersionPath);

return true;
}

Expand Down
16 changes: 9 additions & 7 deletions src/jvm/elephantdb/cascading/ElephantScheme.java
@@ -1,6 +1,6 @@
package elephantdb.cascading;

import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
Expand All @@ -21,7 +21,7 @@

import java.io.IOException;

public class ElephantScheme extends Scheme<HadoopFlowProcess, JobConf, RecordReader, OutputCollector, Object[], Object[]> {
public class ElephantScheme extends Scheme<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector, Object[], Object[]> {
Serializer serializer;
Gateway gateway;

Expand All @@ -37,17 +37,19 @@ public Serializer getSerializer() {
}

@Override
public void sourceConfInit(HadoopFlowProcess flowProcess, Tap tap, JobConf conf) {
public void sourceConfInit(FlowProcess<JobConf> flowProcess,
Tap<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setInputFormat(ElephantInputFormat.class);
}

@Override public void sinkConfInit(HadoopFlowProcess flowProcess, Tap tap, JobConf conf) {
@Override public void sinkConfInit(FlowProcess<JobConf> flowProcess,
Tap<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setOutputKeyClass(IntWritable.class); // be explicit
conf.setOutputValueClass( BytesWritable.class ); // be explicit
conf.setOutputFormat(ElephantOutputFormat.class);
}

@Override public void sourcePrepare(HadoopFlowProcess flowProcess,
@Override public void sourcePrepare(FlowProcess<JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) {

sourceCall.setContext(new Object[2]);
Expand All @@ -56,7 +58,7 @@ public void sourceConfInit(HadoopFlowProcess flowProcess, Tap tap, JobConf conf)
sourceCall.getContext()[1] = sourceCall.getInput().createValue();
}

@Override public boolean source(HadoopFlowProcess flowProcess,
@Override public boolean source(FlowProcess<JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) throws IOException {

NullWritable key = (NullWritable) sourceCall.getContext()[0];
Expand All @@ -74,7 +76,7 @@ public void sourceConfInit(HadoopFlowProcess flowProcess, Tap tap, JobConf conf)
return true;
}

@Override public void sink(HadoopFlowProcess flowProcess,
@Override public void sink(FlowProcess<JobConf> flowProcess,
SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
Tuple tuple = sinkCall.getOutgoingEntry().getTuple();

Expand Down

0 comments on commit d460b14

Please sign in to comment.