From e367ff207e3ec105c97e47baf9dde0769c2100ee Mon Sep 17 00:00:00 2001 From: Markus Jelsma Date: Fri, 16 Dec 2011 11:17:10 +0000 Subject: [PATCH] NUTCH-1221 Migrate DomainStatistics to MapReduce API git-svn-id: https://svn.apache.org/repos/asf/nutch/trunk@1215090 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../nutch/util/domain/DomainStatistics.java | 208 ++++++++---------- 2 files changed, 98 insertions(+), 112 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c1c8db05f3..491ef7d5e5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,7 @@ Nutch Change Log +* NUTCH-1221 Migrate DomainStatistics to MapReduce API (markus) + * NUTCH-1216 Add trivial comment to lib/native/README.txt (lewismc) * NUTCH-1214 DomainStats tool should be named for what it's doing (markus) diff --git a/src/java/org/apache/nutch/util/domain/DomainStatistics.java b/src/java/org/apache/nutch/util/domain/DomainStatistics.java index b87603d5ad..0f6f7ade2f 100644 --- a/src/java/org/apache/nutch/util/domain/DomainStatistics.java +++ b/src/java/org/apache/nutch/util/domain/DomainStatistics.java @@ -25,52 +25,43 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.util.NutchConfiguration; -import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.TimingUtil; import org.apache.nutch.util.URLUtil; /** - * Extracts some very basic statistics about domains from the crawldb + * Extracts some very basic statistics about domains from the crawldb */ -public class DomainStatistics -extends MapReduceBase -implements Tool, Mapper, - Reducer { +public class DomainStatistics extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(DomainStatistics.class); - + private static final Text FETCHED_TEXT = new Text("FETCHED"); private static final Text NOT_FETCHED_TEXT = new Text("NOT_FETCHED"); - + public static enum MyCounter {FETCHED, NOT_FETCHED, EMPTY_RESULT}; - + private static final int MODE_HOST = 1; private static final int MODE_DOMAIN = 2; private static final int MODE_SUFFIX = 3; - + private int mode = 0; - - private Configuration conf; - - public int run(String[] args) throws IOException { + + public int run(String[] args) throws Exception { if (args.length < 3) { System.out.println("usage: DomainStatistics inputDirs outDir host|domain|suffix [numOfReducer]"); return 1; @@ -78,7 +69,7 @@ public int run(String[] args) throws IOException { String inputDir = args[0]; String outputDir = args[1]; int numOfReducers = 1; - + if (args.length > 3) { numOfReducers = Integer.parseInt(args[3]); } @@ -87,132 +78,125 @@ public int run(String[] args) throws IOException { long start = System.currentTimeMillis(); LOG.info("DomainStatistics: starting at " + sdf.format(start)); - JobConf job = new NutchJob(getConf()); - int mode = 0; + String jobName = "DomainStatistics"; if(args[2].equals("host")) { - job.setJobName("Host statistics"); + jobName = "Host statistics"; mode = MODE_HOST; } else if(args[2].equals("domain")) { - job.setJobName("Domain statistics"); + jobName = "Domain statistics"; mode = MODE_DOMAIN; } else if(args[2].equals("suffix")) { - job.setJobName("Suffix statistics"); + jobName = "Suffix statistics"; mode = MODE_SUFFIX; } - job.setInt("domain.statistics.mode", mode); + Configuration conf = getConf(); + conf.setInt("domain.statistics.mode", mode); + conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + + Job job = new Job(conf, jobName); + job.setJarByClass(DomainStatistics.class); String[] inputDirsSpecs = inputDir.split(","); for (int i = 0; i < inputDirsSpecs.length; i++) { FileInputFormat.addInputPath(job, new Path(inputDirsSpecs[i])); } - job.setInputFormat(SequenceFileInputFormat.class); - job.setMapperClass(DomainStatistics.class); + job.setInputFormatClass(SequenceFileInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(outputDir)); - job.setOutputFormat(TextOutputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); - job.setReducerClass(DomainStatistics.class); + + job.setMapperClass(DomainStatisticsMapper.class); + job.setReducerClass(DomainStatisticsReducer.class); job.setCombinerClass(DomainStatisticsCombiner.class); job.setNumReduceTasks(numOfReducers); - - JobClient.runJob(job); - + + try { + job.waitForCompletion(true); + } catch (Exception e) { + throw e; + } + long end = System.currentTimeMillis(); LOG.info("DomainStatistics: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); return 0; } - @Override - public void configure(JobConf job) { - super.configure(job); - mode = job.getInt("domain.statistics.mode", MODE_DOMAIN); - } - - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - } + static class DomainStatisticsMapper extends Mapper { + int mode = 0; - public void map(Text urlText, CrawlDatum datum, - OutputCollector output, Reporter reporter) - throws IOException { - - if(datum.getStatus() == CrawlDatum.STATUS_DB_FETCHED - || datum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { - - try { - URL url = new URL(urlText.toString()); - String out = null; - switch (mode) { - case MODE_HOST: - out = url.getHost(); - break; - case MODE_DOMAIN: - out = URLUtil.getDomainName(url); - break; - case MODE_SUFFIX: - out = URLUtil.getDomainSuffix(url).getDomain(); - break; - } - if(out.trim().equals("")) { - LOG.info("url : " + url); - reporter.incrCounter(MyCounter.EMPTY_RESULT, 1); - } - - output.collect(new Text(out), new LongWritable(1)); - } catch (Exception ex) { } - reporter.incrCounter(MyCounter.FETCHED, 1); - output.collect(FETCHED_TEXT, new LongWritable(1)); + public void setup(Context context) { + mode = context.getConfiguration().getInt("domain.statistics.mode", MODE_DOMAIN); } - else { - reporter.incrCounter(MyCounter.NOT_FETCHED, 1); - output.collect(NOT_FETCHED_TEXT, new LongWritable(1)); + + public void map(Text urlText, CrawlDatum datum, Context context) throws IOException, InterruptedException { + + if(datum.getStatus() == CrawlDatum.STATUS_DB_FETCHED + || datum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { + + try { + URL url = new URL(urlText.toString()); + String out = null; + switch (mode) { + case MODE_HOST: + out = url.getHost(); + break; + case MODE_DOMAIN: + out = URLUtil.getDomainName(url); + break; + case MODE_SUFFIX: + out = URLUtil.getDomainSuffix(url).getDomain(); + break; + } + if(out.trim().equals("")) { + LOG.info("url : " + url); + context.getCounter(MyCounter.EMPTY_RESULT).increment(1); + } + + context.write(new Text(out), new LongWritable(1)); + } catch (Exception ex) { } + + context.getCounter(MyCounter.FETCHED).increment(1); + context.write(FETCHED_TEXT, new LongWritable(1)); + } + else { + context.getCounter(MyCounter.NOT_FETCHED).increment(1); + context.write(NOT_FETCHED_TEXT, new LongWritable(1)); + } } } - public void reduce(Text key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - - long total = 0; - - while(values.hasNext()) { - LongWritable val = values.next(); - total += val.get(); + static class DomainStatisticsReducer extends Reducer { + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + long total = 0; + + for (LongWritable val : values) { + total += val.get(); + } + + context.write(new LongWritable(total), key); } - //invert output - output.collect(new LongWritable(total), key); } - - - public static class DomainStatisticsCombiner extends MapReduceBase - implements Reducer { - - public void reduce(Text key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { + + public static class DomainStatisticsCombiner extends Reducer { + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long total = 0; - - while(values.hasNext()) { - LongWritable val = values.next(); + + for (LongWritable val : values) { total += val.get(); - } - output.collect(key, new LongWritable(total)); + } + context.write(key, new LongWritable(total)); } - } public static void main(String[] args) throws Exception { ToolRunner.run(NutchConfiguration.create(), new DomainStatistics(), args); } - + }