Skip to content

Commit

Permalink
for #38 I've laid out some configuration classes for flows
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Houle authored and Paul Houle committed Oct 9, 2013
1 parent 0b50bf6 commit fee68ce
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.amazonaws.services.simpleworkflow.model.ExecutionStatus;
import com.google.common.collect.Sets;
import com.ontology2.centipede.shell.ExitCodeException;
import com.ontology2.haruhi.flows.Flow;

public class AmazonEMRCluster implements Cluster {
private static Log logger = LogFactory.getLog(AmazonEMRCluster.class);
Expand Down Expand Up @@ -100,4 +101,9 @@ public void runJob(MavenManagedJar defaultJar,List<String> jarArgs)
}
}

@Override
public void runFlow(MavenManagedJar defaultJar, Flow f) throws Exception {
throw new Exception("runFlow() not implemented yet");
}

}
4 changes: 3 additions & 1 deletion haruhi/src/main/java/com/ontology2/haruhi/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import java.io.IOException;
import java.util.List;

import com.ontology2.haruhi.flows.Flow;

public interface Cluster {

public void runJob(MavenManagedJar defaultJar, List<String> jarArgs) throws Exception;

public void runFlow(MavenManagedJar defaultJar, Flow f) throws Exception;
}
73 changes: 73 additions & 0 deletions haruhi/src/main/java/com/ontology2/haruhi/FlowApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.ontology2.haruhi;

import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import com.ontology2.centipede.shell.CommandLineApplication;

public class FlowApp extends CommandLineApplication {
private static Log logger = LogFactory.getLog(FlowApp.class);
@Autowired private ApplicationContext applicationContext;
@Autowired private MavenManagedJar defaultJar;
@Autowired private Cluster defaultCluster;

@Override
protected void _run(String[] arguments) throws Exception {
PeekingIterator<String> a=Iterators.peekingIterator(Iterators.forArray(arguments));
if (!a.hasNext())
usage();

Cluster cluster=defaultCluster;
MavenManagedJar jar=defaultJar;
while(a.hasNext() && a.peek().startsWith("-")) {
String flagName=a.next().substring(1).intern();
if (!a.hasNext())
usage();

String flagValue=a.next();
if (flagName=="clusterId") {
cluster=applicationContext.getBean(flagValue,Cluster.class);
} else if(flagName=="jarId") {
jar=applicationContext.getBean(flagValue,MavenManagedJar.class);
} else {
usage();
};
}

if (!a.hasNext())
usage();

String flowId=a.next();
List<String> flowArgs=Lists.newArrayList();
Iterators.addAll(flowArgs, a);
}

private void usage() {
System.out.println("To submit a job flow to the FlowApplication do the following:");
System.out.println();
System.out.println("haruhi run flow [options] flow_id flow_args ...");
System.out.println("");
System.out.println("The system will pass on any arguments beyond the options to ");
System.out.println("the Hadoop application. The system will use default options for the cluster");
System.out.println("and JAR configuration unless you override them with the following options:");
System.out.println("");
System.out.println(" -clusterId <clusterId>");
System.out.println(" -jarId <jarId>");
System.out.println("");
System.out.println("both of these arguments of Spring bean names. If you want to add new");
System.out.println("configurations, this application searches");
System.out.println("");
System.out.println("$HOME/.haruhi/applicationContext.xml");
System.out.println("");
System.out.println("where you can override existing bean definitions or define new ones.");
System.exit(-1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.google.common.collect.Lists;
import com.google.common.io.CharStreams;
import com.ontology2.centipede.shell.ExitCodeException;
import com.ontology2.haruhi.flows.Flow;

public class LocalCmdCluster implements Cluster {
private static Log logger = LogFactory.getLog(LocalCmdCluster.class);
Expand Down Expand Up @@ -66,5 +67,10 @@ public String getMavenRepoPath() {
public void setMavenRepoPath(String mavenRepoPath) {
this.mavenRepoPath = mavenRepoPath;
}

@Override
public void runFlow(MavenManagedJar defaultJar, Flow f) throws Exception {
throw new Exception("runFlow() not implemented yet");
}

};
16 changes: 16 additions & 0 deletions haruhi/src/main/java/com/ontology2/haruhi/flows/Flow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.ontology2.haruhi.flows;

import java.util.List;

//
// This is inspired by the "JobFlow" concept in Amazon EMR; the difference is
// that this is compatible with both local Hadoop and EMR.
//
// the key thing about this is that it transforms the arguments (which could
// be paths) into the arguments of a number of job steps which are run
// sequentially against a local cluster or submitted together for a batch
//

public interface Flow {
public List<FlowStep> generateSteps(List<String> flowArgs);
}
13 changes: 13 additions & 0 deletions haruhi/src/main/java/com/ontology2/haruhi/flows/FlowStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.ontology2.haruhi.flows;

import java.util.List;

//
// at this point the base FlowStep doesn't do anything because different
// Clusters may do radically different things with different step types;
// for instance, a local cluster will run all flows sequentally while
// an EMR cluster will batch them together, then run them
//

public interface FlowStep {
}
15 changes: 15 additions & 0 deletions haruhi/src/main/java/com/ontology2/haruhi/flows/JobStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.ontology2.haruhi.flows;

import java.util.Arrays;
import java.util.List;


public class JobStep extends SpringStep {

public JobStep(List<String> argDefinitions) {
super(argDefinitions);
}

public JobStep(String... defs) { this(Arrays.asList(defs)); }

}
31 changes: 31 additions & 0 deletions haruhi/src/main/java/com/ontology2/haruhi/flows/SpringFlow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.ontology2.haruhi.flows;

import java.util.List;

import com.ontology2.haruhi.MavenManagedJar;

/**
* A SpringFlow processes creates a series of SpringSteps, for each SpringStep
* the system will parse the arguments with SPeL
*
* I'd like to substitute the arguments of flowArgs into variables like
*
* $0, $1, $2
*
* and (in the future) pass in named arguments.
*
*/

public class SpringFlow implements Flow {

private final List<SpringStep> springSteps;

public SpringFlow(List<SpringStep> springSteps) {
this.springSteps = springSteps;
}

@Override
public List<FlowStep> generateSteps(List<String> flowArgs) {
return null;
}
}
53 changes: 53 additions & 0 deletions haruhi/src/main/java/com/ontology2/haruhi/flows/SpringStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.ontology2.haruhi.flows;

import java.util.Arrays;
import java.util.List;

import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;

import com.google.common.collect.Lists;

public abstract class SpringStep implements FlowStep {

final List<String> argDefinitions;

public SpringStep(List<String> argDefinitions) {
this.argDefinitions=argDefinitions;
}

public List<String> getStepArgs(List<String> flowArgs) {
SpringStepContext stepContext=new SpringStepContext(flowArgs);
ExpressionParser parser = new SpelExpressionParser();
List<String> stepArgs=Lists.newArrayList();

for(String that:argDefinitions) {
Expression e=parser.parseExpression(that);
EvaluationContext c=new StandardEvaluationContext(stepContext);
stepArgs.add(e.getValue(c,String.class));
};

return stepArgs;
};

public List<String> getStepArgs(String... flowArgs) {
return getStepArgs(Arrays.asList(flowArgs));
};

public class SpringStepContext {
private final List<String> pos;

public SpringStepContext(List<String> pos) {
super();
this.pos = pos;
}

public List<String> getPos() {
return pos;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.ontology2.haruhi.flows;

import static org.junit.Assert.*;

import java.util.List;

import org.junit.Test;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;

public class SpringStepTest {

@Test
public void upperCaseArgument() {
JobStep step=new JobStep(
"'hello world'.toUpperCase()"
);

List<String> out=step.getStepArgs();
assertEquals("HELLO WORLD",out.get(0));
assertEquals(1,out.size());
}

@Test
public void itDoesMath() {
JobStep step=new JobStep(
"(7*7).toString()"
);

List<String> out=step.getStepArgs();
assertEquals("49",out.get(0));
assertEquals(1,out.size());
}

@Test
public void itFindPositionalArguments() {
JobStep step=new JobStep(
"'furry '+pos[0]+' brothers'"
);

List<String> out=step.getStepArgs("freak");
assertEquals("furry freak brothers",out.get(0));
assertEquals(1,out.size());
}



}

0 comments on commit fee68ce

Please sign in to comment.