Skip to content

Commit

Permalink
add local field to context object for SPeL evaluation of flow argumen…
Browse files Browse the repository at this point in the history
…ts; currently the local field is shimmed with empty maps
  • Loading branch information
Paul Houle authored and Paul Houle committed Oct 15, 2013
1 parent c4e1731 commit 980b85f
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.ontology2.centipede.shell.ExitCodeException.EX_UNAVAILABLE;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.logging.Log;
Expand All @@ -22,6 +23,7 @@
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.simpleworkflow.model.ExecutionStatus;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.ontology2.centipede.shell.ExitCodeException;
import com.ontology2.haruhi.flows.Flow;
Expand Down Expand Up @@ -120,7 +122,7 @@ public void runFlow(MavenManagedJar jar, Flow f,List<String> flowArgs) throws Ex
steps.add(new StepConfig(
"main"
,new HadoopJarStepConfig(jarLocation)
.withArgs(j.getStepArgs(flowArgs)))
.withArgs(j.getStepArgs(emptyMap(),flowArgs)))
);
} else {
throw new RuntimeException("Could not process step of type "+that.getClass());
Expand All @@ -135,4 +137,8 @@ public void runFlow(MavenManagedJar jar, Flow f,List<String> flowArgs) throws Ex
RunJobFlowResult result=emrClient.runJobFlow(that);
pollClusterForCompletion(result);
}

private Map<String, Object> emptyMap() {
return Maps.newHashMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import java.io.InputStreamReader;
import java.lang.ProcessBuilder.Redirect;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.xerces.impl.xpath.XPath.Step;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.CharStreams;
import com.ontology2.centipede.shell.ExitCodeException;
import com.ontology2.haruhi.flows.Flow;
Expand Down Expand Up @@ -80,9 +82,13 @@ public void runFlow(MavenManagedJar jar, Flow f,List<String> flowArgs) throws Ex
private void runStep(MavenManagedJar jar, List<String> flowArgs, FlowStep that) throws Exception {
if(that instanceof JobStep) {
JobStep jobStep=(JobStep) that;
this.runJob(jar, jobStep.getStepArgs(flowArgs));
this.runJob(jar, jobStep.getStepArgs(emptyMap(),flowArgs));
} else {
throw new RuntimeException("Could not process step of type "+that.getClass());
}
}

private Map<String, Object> emptyMap() {
return Maps.newHashMap();
}
};
17 changes: 13 additions & 4 deletions haruhi/src/main/java/com/ontology2/haruhi/flows/SpringStep.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.ontology2.haruhi.flows;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -12,6 +14,7 @@
import org.springframework.expression.spel.support.StandardEvaluationContext;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

public abstract class SpringStep extends FlowStep {
private static final Log logger = LogFactory.getLog(SpringStep.class);
Expand All @@ -21,8 +24,8 @@ public SpringStep(List<String> argDefinitions) {
this.argDefinitions=argDefinitions;
}

public List<String> getStepArgs(List<String> flowArgs) {
SpringStepContext stepContext=new SpringStepContext(flowArgs);
public List<String> getStepArgs(Map<String,Object> local,List<String> flowArgs) {
SpringStepContext stepContext=new SpringStepContext(flowArgs,local);
ExpressionParser parser = new SpelExpressionParser();
List<String> stepArgs=Lists.newArrayList();

Expand All @@ -37,21 +40,27 @@ public List<String> getStepArgs(List<String> flowArgs) {
};

public List<String> getStepArgs(String... flowArgs) {
return getStepArgs(Arrays.asList(flowArgs));
return getStepArgs(new HashMap<String,Object>(),Arrays.asList(flowArgs));
};

public class SpringStepContext {
private final List<String> pos;
private final Map<String,Object> local;

public SpringStepContext(List<String> pos) {
public SpringStepContext(List<String> pos, Map<String, Object> local) {
super();
this.pos = pos;
this.local = local;
}

public List<String> getPos() {
return pos;
}

public Map<String,Object> getLocal() {
return local;
}

//
// right now hardcoded to the HDFS root
//
Expand Down
14 changes: 10 additions & 4 deletions haruhi/src/test/java/com/ontology2/haruhi/flows/TestFlowBeans.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.junit.Assert.*;

import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -13,6 +14,7 @@
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.ontology2.haruhi.JobApp;

@RunWith(SpringJUnit4ClassRunner.class)
Expand All @@ -31,7 +33,7 @@ public class TestFlowBeans {
{
assertTrue(steps.get(0) instanceof SpringStep);
SpringStep step0=(SpringStep) steps.get(0);
List<String> args=step0.getStepArgs(flowArgs);
List<String> args=step0.getStepArgs(emptyMap(),flowArgs);

assertEquals(4,args.size());

Expand All @@ -45,7 +47,7 @@ public class TestFlowBeans {
{
assertTrue(steps.get(1) instanceof SpringStep);
SpringStep step1=(SpringStep) steps.get(1);
List<String> args=step1.getStepArgs(flowArgs);
List<String> args=step1.getStepArgs(emptyMap(),flowArgs);

assertEquals(4,args.size());

Expand All @@ -59,7 +61,7 @@ public class TestFlowBeans {
{
assertTrue(steps.get(2) instanceof SpringStep);
SpringStep step2=(SpringStep) steps.get(2);
List<String> args=step2.getStepArgs(flowArgs);
List<String> args=step2.getStepArgs(emptyMap(),flowArgs);

assertEquals(4,args.size());

Expand All @@ -73,7 +75,7 @@ public class TestFlowBeans {
{
assertTrue(steps.get(3) instanceof SpringStep);
SpringStep step2=(SpringStep) steps.get(3);
List<String> args=step2.getStepArgs(flowArgs);
List<String> args=step2.getStepArgs(emptyMap(),flowArgs);

assertEquals(5,args.size());

Expand All @@ -85,4 +87,8 @@ public class TestFlowBeans {
assertEquals("/preprocessed/1942-12-07-00-00/",args.get(i++));
}
}

private Map<String,Object> emptyMap() {
return Maps.newHashMap();
}
}

0 comments on commit 980b85f

Please sign in to comment.