Skip to content

Commit

Permalink
hadoop-zero: improved ULUserReducer
Browse files Browse the repository at this point in the history
  • Loading branch information
is committed Dec 7, 2010
1 parent 43129a9 commit c3022ec
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
@@ -0,0 +1,76 @@
package us.yuxin.demo.hadoop.zero.userlogin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ULJob2 extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();

Job job0 = new Job(conf, "UserLogin");
job0.setJarByClass(ULJob2.class);

FileInputFormat.setInputPaths(job0, args[0]);
FileOutputFormat.setOutputPath(job0, new Path(args[1]));

job0.setMapperClass(ULMapper.class);
job0.setReducerClass(ULUserReducer2.class);

job0.setMapOutputKeyClass(Text.class);
job0.setMapOutputValueClass(IntWritable.class);

job0.setOutputKeyClass(IntWritable.class);
job0.setOutputValueClass(ULDistribution.class);

job0.setInputFormatClass(TextInputFormat.class);
job0.setOutputFormatClass(SequenceFileOutputFormat.class);

job0.setNumReduceTasks(12);
job0.setPartitionerClass(HashPartitioner.class);

if (!job0.waitForCompletion(true))
return 1;

Job job1 = new Job(conf, "UserLoginGroup");
job1.setJarByClass(ULJob2.class);

FileInputFormat.setInputPaths(job1, args[1]);
FileOutputFormat.setOutputPath(job1, new Path(args[2]));

job1.setReducerClass(ULMonthReducer.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(ULDistribution.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(ULDistribution.class);

job1.setPartitionerClass(HashPartitioner.class);
// job1.setNumReduceTasks(12);

job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setOutputFormatClass(TextOutputFormat.class);

if (!job1.waitForCompletion(true))
return 1;

return 0;
}

public static void main(String args[]) throws Exception {
ToolRunner.run(new Configuration(), new ULJob2(), args);
}
}
@@ -0,0 +1,55 @@
package us.yuxin.demo.hadoop.zero.userlogin;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ULUserReducer2 extends
Reducer<Text, IntWritable, IntWritable, ULDistribution> {

private HashMap<Integer, ULDistribution> monReducer;



@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {

ULDistribution uld = new ULDistribution();
for (IntWritable month : values) {
uld.add(month.get());
}

//first.set(uld.firstLogin());
int first = uld.firstLogin();

ULDistribution base = monReducer.get(first);
if (base == null) {
monReducer.put(first, uld);
} else {
base.add(uld);
}
}



@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
for (Map.Entry<Integer, ULDistribution> entry : monReducer.entrySet()) {
context.write(new IntWritable(entry.getKey()), entry.getValue());
}
super.cleanup(context);
}

@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
monReducer = new HashMap<Integer, ULDistribution>();
}
}

0 comments on commit c3022ec

Please sign in to comment.