Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

updated no shuffle sort code

  • Loading branch information...
commit 775cfb6547b389e1519c981a536f1efa4d180c38 1 parent 6fc444f
@jpatanooga authored
View
117 src/tv/floe/caduceus/hadoop/movingaverage/NoShuffleSort_MovingAverageJob.java
@@ -1,5 +1,120 @@
package tv.floe.caduceus.hadoop.movingaverage;
-public class NoShuffleSort_MovingAverageJob {
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class NoShuffleSort_MovingAverageJob extends Configured implements Tool {
+
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ System.out.println( "\n\nNoShuffleSort_MovingAverageJob\n" );
+
+
+ JobConf conf = new JobConf( getConf(), NoShuffleSort_MovingAverageJob.class );
+ conf.setJobName( "NoShuffleSort_MovingAverageJob" );
+
+ //conf.setMapOutputKeyClass( TimeseriesKey.class );
+
+ // since we only want to group by stock, we'll use the Text Class
+ conf.setMapOutputKeyClass( Text.class );
+ conf.setMapOutputValueClass( TimeseriesDataPoint.class );
+
+ conf.setMapperClass( NoShuffleSort_MovingAverageMapper.class );
+ conf.setReducerClass( NoShuffleSort_MovingAverageReducer.class );
+
+ // in this case we'll take the defaults for partitioning and grouping
+
+// conf.setPartitionerClass( NaturalKeyPartitioner.class );
+// conf.setOutputKeyComparatorClass( CompositeKeyComparator.class );
+// conf.setOutputValueGroupingComparator( NaturalKeyGroupingComparator.class );
+
+
+ List<String> other_args = new ArrayList<String>();
+ for(int i=0; i < args.length; ++i) {
+ try {
+ if ("-m".equals(args[i])) {
+
+ conf.setNumMapTasks(Integer.parseInt(args[++i]));
+
+ } else if ("-r".equals(args[i])) {
+
+ conf.setNumReduceTasks(Integer.parseInt(args[++i]));
+
+ } else if ("-windowSize".equals(args[i]) ) {
+
+ conf.set( "tv.floe.examples.mr.sax.windowSize", args[++i] );
+
+ } else if ("-windowStepSize".equals(args[i]) ) {
+
+ conf.set( "tv.floe.examples.mr.sax.windowStepSize", args[++i] );
+
+ } else {
+
+ other_args.add(args[i]);
+
+ }
+ } catch (NumberFormatException except) {
+ System.out.println("ERROR: Integer expected instead of " + args[i]);
+ return printUsage();
+ } catch (ArrayIndexOutOfBoundsException except) {
+ System.out.println("ERROR: Required parameter missing from " +
+ args[i-1]);
+ return printUsage();
+ }
+ }
+ // Make sure there are exactly 2 parameters left.
+ if (other_args.size() != 2) {
+ System.out.println("ERROR: Wrong number of parameters: " +
+ other_args.size() + " instead of 2.");
+ return printUsage();
+ }
+
+ conf.setInputFormat( TextInputFormat.class );
+
+ conf.setOutputFormat(TextOutputFormat.class);
+ conf.setCompressMapOutput(true);
+
+
+ FileInputFormat.setInputPaths( conf, other_args.get(0) );
+ FileOutputFormat.setOutputPath( conf, new Path(other_args.get(1)) );
+
+
+ JobClient.runJob(conf);
+
+ return 0;
+ }
+
+
+
+ static int printUsage() {
+ System.out.println("NoShuffleSort_MovingAverageJob [-m <maps>] [-r <reduces>] <input> <output>");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ }
+
+
+ public static void main(String[] args) throws Exception {
+
+ int res = ToolRunner.run( new Configuration(), new NoShuffleSort_MovingAverageJob(), args );
+ System.exit(res);
+
+ }
+
+
}
View
8 src/tv/floe/caduceus/hadoop/movingaverage/NoShuffleSort_MovingAverageMapper.java
@@ -13,7 +13,7 @@
//import tv.floe.caduceus.hadoop.movingaverage.MovingAverageMapper.Timeseries_Counters;
-public class NoShuffleSort_MovingAverageMapper extends MapReduceBase implements Mapper<LongWritable, Text, TimeseriesKey, TimeseriesDataPoint>
+public class NoShuffleSort_MovingAverageMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, TimeseriesDataPoint>
{
@@ -21,7 +21,7 @@
private JobConf configuration;
- private final TimeseriesKey key = new TimeseriesKey();
+ private final Text key = new Text();
private final TimeseriesDataPoint val = new TimeseriesDataPoint();
@@ -43,7 +43,7 @@ public void configure(JobConf conf) {
@Override
- public void map(LongWritable inkey, Text value, OutputCollector<TimeseriesKey, TimeseriesDataPoint> output, Reporter reporter) throws IOException {
+ public void map(LongWritable inkey, Text value, OutputCollector<Text, TimeseriesDataPoint> output, Reporter reporter) throws IOException {
String line = value.toString();
@@ -52,7 +52,7 @@ public void map(LongWritable inkey, Text value, OutputCollector<TimeseriesKey, T
if (rec != null) {
// set both parts of the key
- key.set( rec.stock_symbol, rec.date );
+ key.set( rec.stock_symbol );
val.fValue = rec.getAdjustedClose();
val.lDateTime = rec.date;
View
8 src/tv/floe/caduceus/hadoop/movingaverage/NoShuffleSort_MovingAverageReducer.java
@@ -24,7 +24,7 @@
*
*/
-public class NoShuffleSort_MovingAverageReducer extends MapReduceBase implements Reducer<TimeseriesKey, TimeseriesDataPoint, Text, Text> {
+public class NoShuffleSort_MovingAverageReducer extends MapReduceBase implements Reducer<Text, TimeseriesDataPoint, Text, Text> {
static enum PointCounters { POINTS_SEEN, POINTS_ADDED_TO_WINDOWS, MOVING_AVERAGES_CALCD };
@@ -38,7 +38,7 @@ public void configure(JobConf job) {
} // configure()
- public void reduce(TimeseriesKey key, Iterator<TimeseriesDataPoint> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<TimeseriesDataPoint> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
TimeseriesDataPoint next_point;
float point_sum = 0;
@@ -68,6 +68,8 @@ public void reduce(TimeseriesKey key, Iterator<TimeseriesDataPoint> values, Outp
next_point = values.next();
+ // we need to copy the points into new objects since MR re-uses k/v pairs
+ // to avoid GC churn
TimeseriesDataPoint point_copy = new TimeseriesDataPoint();
point_copy.copy(next_point);
@@ -102,7 +104,7 @@ public void reduce(TimeseriesKey key, Iterator<TimeseriesDataPoint> values, Outp
// ---------- compute the moving average here -----------
- out_key.set( "Group: " + key.getGroup() + ", Date: " + strBackDate );
+ out_key.set( "Group: " + key.toString() + ", Date: " + strBackDate );
point_sum = 0;
View
107 src/tv/floe/caduceus/hadoop/movingaverage/tests/TestSlidingWindow.java
@@ -4,7 +4,10 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.LinkedList;
+import java.util.PriorityQueue;
+import org.apache.hadoop.io.Text;
import org.junit.Test;
import tv.floe.caduceus.hadoop.movingaverage.SlidingWindow;
@@ -166,4 +169,108 @@ public void testWindowFull() {
}
+ @Test
+ public void testSimpleMovingAverage() {
+
+
+ TimeseriesDataPoint next_point;
+ float point_sum = 0;
+ float moving_avg = 0;
+
+ // make static
+ long day_in_ms = 24 * 60 * 60 * 1000;
+
+
+ // should match the width of your training samples sizes
+ int iWindowSizeInDays = 2; //this.configuration.getInt("tv.floe.examples.mr.sax.windowSize", 30 );
+ int iWindowStepSizeInDays = 1; //this.configuration.getInt("tv.floe.examples.mr.sax.windowStepSize", 1 );
+
+ long iWindowSizeInMS = iWindowSizeInDays * day_in_ms; // = this.configuration.getInt("tv.floe.examples.mr.sax.windowSize", 14 );
+ long iWindowStepSizeInMS = iWindowStepSizeInDays * day_in_ms; // = this.configuration.getInt("tv.floe.examples.mr.sax.windowStepSize", 7 );
+
+
+ // Text out_key = new Text();
+ // Text out_val = new Text();
+
+ SlidingWindow sliding_window = new SlidingWindow( iWindowSizeInMS, iWindowStepSizeInMS, day_in_ms );
+
+ PriorityQueue<TimeseriesDataPoint> oPointHeapNew = new PriorityQueue<TimeseriesDataPoint>();
+
+ TimeseriesDataPoint p_copy_0 = new TimeseriesDataPoint();
+ p_copy_0.fValue = 0;
+ p_copy_0.lDateTime = ParseDate( "2008-02-01" );
+
+ oPointHeapNew.add(p_copy_0);
+
+
+ TimeseriesDataPoint p_copy_1 = new TimeseriesDataPoint();
+ p_copy_1.fValue = 1;
+ p_copy_1.lDateTime = ParseDate( "2008-02-02" );
+
+ oPointHeapNew.add(p_copy_1);
+
+
+ TimeseriesDataPoint p_copy_2 = new TimeseriesDataPoint();
+ p_copy_2.fValue = 2;
+ p_copy_2.lDateTime = ParseDate( "2008-02-03" );
+
+ oPointHeapNew.add(p_copy_2);
+
+
+ while ( oPointHeapNew.isEmpty() == false ) {
+
+ //reporter.incrCounter( PointCounters.POINTS_ADDED_TO_WINDOWS, 1 );
+
+ next_point = oPointHeapNew.poll();
+
+ try {
+ sliding_window.AddPoint(next_point);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+
+ if ( sliding_window.WindowIsFull() ) {
+
+ //reporter.incrCounter( PointCounters.MOVING_AVERAGES_CALCD, 1 );
+ System.out.println( "calc'ing SMA --------- " );
+
+ LinkedList<TimeseriesDataPoint> oWindow = sliding_window.GetCurrentWindow();
+
+ String strBackDate = oWindow.getLast().getDate();
+
+ // ---------- compute the moving average here -----------
+
+ //out_key.set( "Group: " + key.getGroup() + ", Date: " + strBackDate );
+
+ point_sum = 0;
+
+ for ( int x = 0; x < oWindow.size(); x++ ) {
+
+ point_sum += oWindow.get(x).fValue;
+
+ } // for
+
+ moving_avg = point_sum / oWindow.size();
+
+ System.out.println("Moving Average: " + moving_avg );
+
+ //output.collect( out_key, out_val );
+
+
+ // 2. step window forward
+
+ sliding_window.SlideWindowForward();
+
+ }
+
+
+
+ }
+
+
+
+ }
+
}
Please sign in to comment.
Something went wrong with that request. Please try again.