Skip to content

Commit

Permalink
#32 -- added runFlow() implementation for AWSCluster which passes int…
Browse files Browse the repository at this point in the history
…egration test
  • Loading branch information
Paul Houle authored and Paul Houle committed Oct 12, 2013
1 parent 0d96e1a commit 010fb62
Showing 1 changed file with 32 additions and 3 deletions.
35 changes: 32 additions & 3 deletions haruhi/src/main/java/com/ontology2/haruhi/AmazonEMRCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.simpleworkflow.model.ExecutionStatus;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.ontology2.centipede.shell.ExitCodeException;
import com.ontology2.haruhi.flows.Flow;
import com.ontology2.haruhi.flows.FlowStep;
import com.ontology2.haruhi.flows.JobStep;

public class AmazonEMRCluster implements Cluster {
private static Log logger = LogFactory.getLog(AmazonEMRCluster.class);
Expand Down Expand Up @@ -55,6 +58,11 @@ public void runJob(MavenManagedJar defaultJar,List<String> jarArgs)
.withLogUri(awsLogUri)
.withInstances(instances);
RunJobFlowResult result=emrClient.runJobFlow(that);
pollClusterForCompletion(result);
}

private void pollClusterForCompletion(RunJobFlowResult result)
throws Exception, InterruptedException, ExitCodeException {
String jobFlowId=result.getJobFlowId();
logger.info("Created job flow in AWS with id "+jobFlowId);

Expand Down Expand Up @@ -102,8 +110,29 @@ public void runJob(MavenManagedJar defaultJar,List<String> jarArgs)
}

@Override
public void runFlow(MavenManagedJar defaultJar, Flow f,List<String> flowArgs) throws Exception {
throw new Exception("runFlow() not implemented yet");
public void runFlow(MavenManagedJar jar, Flow f,List<String> flowArgs) throws Exception {
String jarLocation=jar.s3JarLocation(awsSoftwareBucket);
List<StepConfig> steps=Lists.newArrayList(debugStep);

for(FlowStep that:f.generateSteps(flowArgs))
if(that instanceof JobStep) {
JobStep j=(JobStep) that;
steps.add(new StepConfig(
"main"
,new HadoopJarStepConfig(jarLocation)
.withArgs(j.getStepArgs(flowArgs)))
);
} else {
throw new RuntimeException("Could not process step of type "+that.getClass());
}

RunJobFlowRequest that=new RunJobFlowRequest()
.withName("Haruhi submitted job")
.withSteps(steps)
.withLogUri(awsLogUri)
.withInstances(instances);

RunJobFlowResult result=emrClient.runJobFlow(that);
pollClusterForCompletion(result);
}

}

0 comments on commit 010fb62

Please sign in to comment.