-
Notifications
You must be signed in to change notification settings - Fork 23
/
HadoopJobTool.java
61 lines (52 loc) · 2.46 KB
/
HadoopJobTool.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package dk.netarkivet.common.utils.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
/**
* A simple generic Hadoop map-only tool that runs a given mapper on the passed input file
* containing new-line separated file paths and outputs the job's resulting files in the passed output path
*/
public class HadoopJobTool extends Configured implements Tool {
private Mapper<LongWritable, Text, NullWritable, Text> mapper;
public HadoopJobTool(Configuration conf, Mapper<LongWritable, Text, NullWritable, Text> mapper) {
super(conf);
this.mapper = mapper;
}
/**
* Method for running the tool/job.
* @param args Expects two strings representing the job's in- and output paths (Tool interface dictates String[])
* @return An exitcode to report back if the job succeeded.
* @throws InterruptedException, IOException, ClassNotFoundException
*/
@Override
public int run(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Configuration conf = getConf();
Job job = Job.getInstance(conf, this.getClass().getName());
//job.setJarByClass(this.getClass());
job.setInputFormatClass(NLineInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
NLineInputFormat.addInputPath(job, inputPath);
TextOutputFormat.setOutputPath(job, outputPath);
job.setMapperClass(mapper.getClass());
job.setNumReduceTasks(0); // Ensure job is map-only
// How many files should each node process at a time (how many lines are read from the input file)
NLineInputFormat.setNumLinesPerSplit(job, 5);
// In- and output types
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}