Skip to content

Commit

Permalink
#32 at this point we have the first flow example running with one job…
Browse files Browse the repository at this point in the history
… in the cluster; next steps are adding more jobs, adding a flow runner for the EMR cluster, and fluentizing the Java/Sprin XML API
  • Loading branch information
Paul Houle authored and Paul Houle committed Oct 10, 2013
1 parent 738dc26 commit c2d27d8
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
4 changes: 4 additions & 0 deletions haruhi/src/main/java/com/ontology2/haruhi/FlowApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import com.ontology2.centipede.shell.CommandLineApplication;
import com.ontology2.haruhi.flows.Flow;

public class FlowApp extends CommandLineApplication {
private static Log logger = LogFactory.getLog(FlowApp.class);
Expand Down Expand Up @@ -45,8 +46,11 @@ protected void _run(String[] arguments) throws Exception {
usage();

String flowId=a.next();
Flow f=applicationContext.getBean(flowId,Flow.class);

List<String> flowArgs=Lists.newArrayList();
Iterators.addAll(flowArgs, a);
cluster.runFlow(jar, f, flowArgs);
}

private void usage() {
Expand Down
12 changes: 8 additions & 4 deletions haruhi/src/main/java/com/ontology2/haruhi/LocalCmdCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ public void setMavenRepoPath(String mavenRepoPath) {

@Override
public void runFlow(MavenManagedJar jar, Flow f,List<String> flowArgs) throws Exception {
List<FlowStep> steps=f.generateSteps(flowArgs);
if(steps instanceof JobStep) {
JobStep jobStep=(JobStep) steps;
for(FlowStep that:f.generateSteps(flowArgs))
runStep(jar, flowArgs, that);
}

private void runStep(MavenManagedJar jar, List<String> flowArgs, FlowStep that) throws Exception {
if(that instanceof JobStep) {
JobStep jobStep=(JobStep) that;
this.runJob(jar, jobStep.getStepArgs(flowArgs));
} else {
throw new RuntimeException("Could not process step of type "+steps.getClass());
throw new RuntimeException("Could not process step of type "+that.getClass());
}
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
<bean class="com.ontology2.haruhi.flows.JobStep">
<constructor-arg>
<list>
<value>'run'</value>
<value>'freebaseRDFPrefilter'</value>
<value>pos[0]+'freebase-rdf-'+pos[1]+'/'</value>
<value>tmpDir+'preprocessed/'+pos[1]+'/'</value>
Expand Down

0 comments on commit c2d27d8

Please sign in to comment.