Permalink
Browse files

updating cascading test

  • Loading branch information...
Costin Leau
Costin Leau committed Dec 18, 2012
1 parent eb0e248 commit 81b8d2336595043ce3e178ff545a8f3403a7f2d6
@@ -16,31 +16,19 @@
package org.springframework.data.hadoop.cascading;
import java.util.List;
-import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.data.hadoop.TestUtils;
-import org.springframework.data.hadoop.configuration.ConfigurationUtils;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import cascading.cascade.Cascade;
import cascading.flow.Flow;
-import cascading.flow.FlowConnector;
-import cascading.flow.FlowConnectorProps;
-import cascading.flow.hadoop.HadoopFlowConnector;
-import cascading.operation.DebugLevel;
-import cascading.pipe.Pipe;
-import cascading.scheme.Scheme;
-import cascading.scheme.hadoop.TextDelimited;
-import cascading.tap.SinkMode;
-import cascading.tap.Tap;
-import cascading.tap.hadoop.Hfs;
-import cascading.tuple.Fields;
+import cascading.flow.hadoop.HadoopFlow;
/**
* @author Costin Leau
@@ -68,27 +56,7 @@ public void testCascade() throws Exception {
@Test
public void testManualCascade() throws Exception {
-
- Scheme sourceScheme = new TextDelimited(new Fields("name", "definition"), ",");
- Tap source = new Hfs(sourceScheme, "/test/cascading/names/input/babynamedefinitions.csv.gz");
-
- Scheme sinkScheme = new TextDelimited(new Fields("definition", "name"), " $$ ");
- Tap sink = new Hfs(sinkScheme, "/test/cascading/names/output/simplepipe", SinkMode.REPLACE);
-
- Pipe assembly = new Pipe("flip");
- //OPTIONAL: Debug the tuple
- //assembly = new Each(assembly, DebugLevel.VERBOSE, new Debug());
-
-
- // no base jar to run against
- // cfg.set("mapred.jar", null);
- Properties props = ConfigurationUtils.asProperties(hadoopConfiguration);
-
- // see HADOOP-9123
- FlowConnector flowConnector = new HadoopFlowConnector(props);
-
- FlowConnectorProps.setDebugLevel(props, DebugLevel.VERBOSE);
- Flow flow = flowConnector.connect("flipflow", source, sink, assembly);
+ HadoopFlow flow = ctx.getBean("copyFlow", HadoopFlow.class);
flow.complete();
}
}
@@ -63,4 +63,14 @@ fsh.put("src/test/resources/data/babynamedefinitions.csv.gz", "/test/cascading/n
<!--
<bean id="cascade-runner" class="org.springframework.data.hadoop.cascading.CascadeRunner" p:unit-of-work-ref="cascade" depends-on="script" p:run-at-startup="true"/>
-->
+
+ <!-- Java configuration -->
+ <bean id="copyFlowDef" class="org.springframework.data.hadoop.cascading.TestFlows" factory-method="copyFlow">
+ <constructor-arg name="sourcePath"><value>/test/cascading/names/input/babynamedefinitions.csv.gz</value></constructor-arg>
+ <constructor-arg name="sinkPath"><value>/test/cascading/names/output/simplepipe]]</value></constructor-arg>
+ </bean>
+
+ <!-- wrap copy flow -->
+ <bean id="copyFlow" class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean" p:configuration-ref="hadoopConfiguration" p:flow-def-ref="copyFlowDef"/>
+
</beans>

0 comments on commit 81b8d23

Please sign in to comment.