Permalink
Find file
Fetching contributors…
Cannot retrieve contributors at this time
111 lines (78 sloc) 3.78 KB
---
cascading: |-
CASCADING COOKBOOK
Copy a Tuple
Tuple orig = new Tuple();
orig.add( "some value" );
// copy constructor
Tuple copy = new Tuple( orig );
Nest a Tuple in a Tuple
Tuple parent = new Tuple();
Tuple child = new Tuple( "value1", "value2" );
parent.add( child );
Split a stream and name the branches
pipe = new Each( pipe, new SomeFunction() )
Pipe lhs = new Pipe( "lhs", pipe );
Pipe rhs = new Pipe( "rhs", pipe );
Copy a value
pipe = new Each( pipe, new Fields( "field" ), new Identity( new Fields( "copy" ) ), Fields.ALL );
Rename a field
// two incoming fields: "other" and "field"
pipe = new Each( pipe, new Fields( "field" ),
new Identity( new Fields( "renamed" ) ),
new Fields( "other", "renamed" ) );
Convert strings to primitive types
Class[] types = new Class[] {Long.class, Float.class, Boolean.class};
pipe = new Each( pipe,
new Fields( "longField", "floatField", "booleanField" ),
new Identity( types ) );
Reorder fields
// two incoming fields: "field1" and "field2"
pipe = new Each( pipe, new Fields( "field2", "field1" ), new Identity() );
Discard unused fields
// two incoming fields: "field1" and "field2"
pipe = new Each( pipe, new Fields( "field2" ), new Identity() );
Sort values passed to Aggregator
pipe = new GroupBy( pipe, new Fields( "group1", "group2" ), new Fields( "value" ) );
Insert constant values into a value stream
pipe = new Each( pipe, new Insert( new Fields( "field1", "field2" ), "value1", "value2" ), Fields.ALL );
Create 'timestamp' from date/time fields
// build 'datetime' string.
FieldFormatter formatter = new FieldFormatter( new Fields( "datetime" ), "%s:%s:%s:%s:%s:%s.%s" );
Fields timeFields = new Fields( "year", "month", "day", "hour", "minute", "second", "millisecond" );
pipe = new Each( pipe, timeFields, formatter, Fields.ALL );
pipe = new Each( pipe, new Fields( "datetime" ), new DateParser( new Fields( "ts" ), "yyyy:MM:dd:HH:mm:ss.SSS" ), Fields.ALL );
Create date/time string from 'timestamp' field
DateFormatter timeFormatter = new DateFormatter(new Fields( "datetime" ), "HH:mm:ss.SSS" );
pass field "ts" into the timeFormatter function, which returns a "datetime" field
pipe = new Each( pipe, new Fields( "ts" ), timeFormatter, Fields.ALL );
Get 'DISTINCT' (unique) values from a Tuple stream
// group on all values
pipe = new GroupBy( pipe, Fields.ALL );
only take the first tuple in the grouping, ignore the rest
pipe = new Every( pipe, Fields.ALL, new First(), Fields.RESULTS );
Merge two streams with the same column names
// merge streams, group on 'last' field name
lhs = 'first', 'last', 'middle'
rhs = 'first', 'last', 'middle'
pipe = new GroupBy( lhs, rhs, new Fields("last") );
Passing properties to an Operation
// set propery on Flow
Properties props = new Properties();
props.set( "key", "value");
FlowConnector conn = new FlowConnector( props );
...
// get property from within an Operation (Function,Filter,etc)
String value = flowProcess.getProperty( "key" );
Passing a JobConf to a Flow and Operation
// set JobConf on Flow
JobConf jobConf = new JobConf();
jobConf.set( "key", "value" );
Properties props = new Properties();
MultiMapReducePlanner.setJobConf( properties, jobConf );
FlowConnector conn = new FlowConnector( props );
... ....
// get property from within an Operation (Function,Filter,etc)
String value = flowProcess.getProperty( "key" );
// or
JobConf jobConf = ((HadoopFlowProcess) flowProcess).getJobConf();