diff --git a/haruhi/src/main/java/com/ontology2/haruhi/AmazonEMRCluster.java b/haruhi/src/main/java/com/ontology2/haruhi/AmazonEMRCluster.java index 5ff2d2e..99f9715 100644 --- a/haruhi/src/main/java/com/ontology2/haruhi/AmazonEMRCluster.java +++ b/haruhi/src/main/java/com/ontology2/haruhi/AmazonEMRCluster.java @@ -4,6 +4,7 @@ import static com.ontology2.centipede.shell.ExitCodeException.EX_UNAVAILABLE; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -22,6 +23,7 @@ import com.amazonaws.services.elasticmapreduce.model.StepConfig; import com.amazonaws.services.simpleworkflow.model.ExecutionStatus; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.ontology2.centipede.shell.ExitCodeException; import com.ontology2.haruhi.flows.Flow; @@ -120,7 +122,7 @@ public void runFlow(MavenManagedJar jar, Flow f,List flowArgs) throws Ex steps.add(new StepConfig( "main" ,new HadoopJarStepConfig(jarLocation) - .withArgs(j.getStepArgs(flowArgs))) + .withArgs(j.getStepArgs(emptyMap(),flowArgs))) ); } else { throw new RuntimeException("Could not process step of type "+that.getClass()); @@ -135,4 +137,8 @@ public void runFlow(MavenManagedJar jar, Flow f,List flowArgs) throws Ex RunJobFlowResult result=emrClient.runJobFlow(that); pollClusterForCompletion(result); } + + private Map emptyMap() { + return Maps.newHashMap(); + } } diff --git a/haruhi/src/main/java/com/ontology2/haruhi/LocalCmdCluster.java b/haruhi/src/main/java/com/ontology2/haruhi/LocalCmdCluster.java index 2fab2ff..150e467 100644 --- a/haruhi/src/main/java/com/ontology2/haruhi/LocalCmdCluster.java +++ b/haruhi/src/main/java/com/ontology2/haruhi/LocalCmdCluster.java @@ -6,6 +6,7 @@ import java.io.InputStreamReader; import java.lang.ProcessBuilder.Redirect; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -13,6 +14,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.io.CharStreams; import com.ontology2.centipede.shell.ExitCodeException; import com.ontology2.haruhi.flows.Flow; @@ -80,9 +82,13 @@ public void runFlow(MavenManagedJar jar, Flow f,List flowArgs) throws Ex private void runStep(MavenManagedJar jar, List flowArgs, FlowStep that) throws Exception { if(that instanceof JobStep) { JobStep jobStep=(JobStep) that; - this.runJob(jar, jobStep.getStepArgs(flowArgs)); + this.runJob(jar, jobStep.getStepArgs(emptyMap(),flowArgs)); } else { throw new RuntimeException("Could not process step of type "+that.getClass()); } } + + private Map emptyMap() { + return Maps.newHashMap(); + } }; diff --git a/haruhi/src/main/java/com/ontology2/haruhi/flows/SpringStep.java b/haruhi/src/main/java/com/ontology2/haruhi/flows/SpringStep.java index 68d9512..1f4df48 100644 --- a/haruhi/src/main/java/com/ontology2/haruhi/flows/SpringStep.java +++ b/haruhi/src/main/java/com/ontology2/haruhi/flows/SpringStep.java @@ -1,7 +1,9 @@ package com.ontology2.haruhi.flows; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -12,6 +14,7 @@ import org.springframework.expression.spel.support.StandardEvaluationContext; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public abstract class SpringStep extends FlowStep { private static final Log logger = LogFactory.getLog(SpringStep.class); @@ -21,8 +24,8 @@ public SpringStep(List argDefinitions) { this.argDefinitions=argDefinitions; } - public List getStepArgs(List flowArgs) { - SpringStepContext stepContext=new SpringStepContext(flowArgs); + public List getStepArgs(Map local,List flowArgs) { + SpringStepContext stepContext=new SpringStepContext(flowArgs,local); ExpressionParser parser = new SpelExpressionParser(); List stepArgs=Lists.newArrayList(); @@ -37,21 +40,27 @@ public List getStepArgs(List flowArgs) { }; public List getStepArgs(String... flowArgs) { - return getStepArgs(Arrays.asList(flowArgs)); + return getStepArgs(new HashMap(),Arrays.asList(flowArgs)); }; public class SpringStepContext { private final List pos; + private final Map local; - public SpringStepContext(List pos) { + public SpringStepContext(List pos, Map local) { super(); this.pos = pos; + this.local = local; } public List getPos() { return pos; } + public Map getLocal() { + return local; + } + // // right now hardcoded to the HDFS root // diff --git a/haruhi/src/test/java/com/ontology2/haruhi/flows/TestFlowBeans.java b/haruhi/src/test/java/com/ontology2/haruhi/flows/TestFlowBeans.java index f0a0e42..e4a4a2e 100644 --- a/haruhi/src/test/java/com/ontology2/haruhi/flows/TestFlowBeans.java +++ b/haruhi/src/test/java/com/ontology2/haruhi/flows/TestFlowBeans.java @@ -3,6 +3,7 @@ import static org.junit.Assert.*; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -13,6 +14,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.ontology2.haruhi.JobApp; @RunWith(SpringJUnit4ClassRunner.class) @@ -31,7 +33,7 @@ public class TestFlowBeans { { assertTrue(steps.get(0) instanceof SpringStep); SpringStep step0=(SpringStep) steps.get(0); - List args=step0.getStepArgs(flowArgs); + List args=step0.getStepArgs(emptyMap(),flowArgs); assertEquals(4,args.size()); @@ -45,7 +47,7 @@ public class TestFlowBeans { { assertTrue(steps.get(1) instanceof SpringStep); SpringStep step1=(SpringStep) steps.get(1); - List args=step1.getStepArgs(flowArgs); + List args=step1.getStepArgs(emptyMap(),flowArgs); assertEquals(4,args.size()); @@ -59,7 +61,7 @@ public class TestFlowBeans { { assertTrue(steps.get(2) instanceof SpringStep); SpringStep step2=(SpringStep) steps.get(2); - List args=step2.getStepArgs(flowArgs); + List args=step2.getStepArgs(emptyMap(),flowArgs); assertEquals(4,args.size()); @@ -73,7 +75,7 @@ public class TestFlowBeans { { assertTrue(steps.get(3) instanceof SpringStep); SpringStep step2=(SpringStep) steps.get(3); - List args=step2.getStepArgs(flowArgs); + List args=step2.getStepArgs(emptyMap(),flowArgs); assertEquals(5,args.size()); @@ -85,4 +87,8 @@ public class TestFlowBeans { assertEquals("/preprocessed/1942-12-07-00-00/",args.get(i++)); } } + + private Map emptyMap() { + return Maps.newHashMap(); + } }