Skip to content

Commit

Permalink
NUTCH-1142 Normalization and filtering in WebGraph
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/nutch/trunk@1200346 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Markus Jelsma committed Nov 10, 2011
1 parent 1f666a3 commit e874ce9
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
@@ -1,5 +1,7 @@
Nutch Change Log

* NUTCH-1142 Normalization and filtering in WebGraph (markus)

* NUTCH-1153 LinkRank not to log all keys and not to write Hadoop _SUCCESS file (markus)

Release 1.4 - 11/4/2011
Expand Down
106 changes: 98 additions & 8 deletions src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.parse.Outlink;
import org.apache.nutch.parse.ParseData;
Expand Down Expand Up @@ -109,6 +110,9 @@ public static class OutlinkDb
implements Mapper<Text, Writable, Text, LinkDatum>,
Reducer<Text, LinkDatum, Text, LinkDatum> {

public static final String URL_NORMALIZING = "webgraph.url.normalizers";
public static final String URL_FILTERING = "webgraph.url.filters";

// ignoring internal domains, internal hosts
private boolean ignoreDomain = true;
private boolean ignoreHost = true;
Expand All @@ -117,8 +121,13 @@ public static class OutlinkDb
private boolean limitPages = true;
private boolean limitDomains = true;

// url normalizers and job configuration
// using normalizers and/or filters
private boolean normalize = false;
private boolean filter = false;

// url normalizers, filters and job configuration
private URLNormalizers urlNormalizers;
private URLFilters filters;
private JobConf conf;

/**
Expand All @@ -130,6 +139,10 @@ public static class OutlinkDb
*/
private String normalizeUrl(String url) {

if (!normalize) {
return url;
}

String normalized = null;
if (urlNormalizers != null) {
try {
Expand All @@ -147,6 +160,28 @@ private String normalizeUrl(String url) {
return normalized;
}

/**
* Filters the given url.
*
* @param url The url to filter.
*
* @return The filtered url or null.
*/
private String filterUrl(String url) {

if (!filter) {
return url;
}

try {
url = filters.filter(url);
} catch (Exception e) {
url = null;
}

return url;
}

/**
* Returns the fetch time from the parse data or the current system time if
* the fetch time doesn't exist.
Expand Down Expand Up @@ -193,7 +228,17 @@ public void configure(JobConf conf) {
ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true);
limitPages = conf.getBoolean("link.ignore.limit.page", true);
limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);

normalize = conf.getBoolean(URL_NORMALIZING, false);
filter = conf.getBoolean(URL_FILTERING, false);

if (normalize) {
urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
}

if (filter) {
filters = new URLFilters(conf);
}
}

/**
Expand All @@ -210,6 +255,14 @@ public void map(Text key, Writable value,
return;
}

// filter url
if (filterUrl(url) == null) {
return;
}

// Overwrite the key with the normalized URL
key.set(url);

if (value instanceof ParseData) {

// get the parse data and the outlinks from the parse data, along with
Expand All @@ -225,6 +278,10 @@ public void map(Text key, Writable value,
Outlink outlink = outlinkAr[i];
String toUrl = normalizeUrl(outlink.getToUrl());

if (filterUrl(toUrl) == null) {
continue;
}

// only put into map if the url doesn't already exist in the map or
// if it does and the anchor for that link is null, will replace if
// url is existing
Expand All @@ -244,9 +301,15 @@ public void map(Text key, Writable value,
}
}
else if (value instanceof LinkDatum) {
LinkDatum datum = (LinkDatum)value;
String linkDatumUrl = normalizeUrl(datum.getUrl());

// collect existing outlinks from existing OutlinkDb
output.collect(key, (LinkDatum)value);
if (filterUrl(linkDatumUrl) != null) {
datum.setUrl(linkDatumUrl);

// collect existing outlinks from existing OutlinkDb
output.collect(key, datum);
}
}
}

Expand Down Expand Up @@ -435,17 +498,21 @@ else if (next.getLinkType() == LinkDatum.OUTLINK) {
* @param webGraphDb The WebGraph to create or update.
* @param segments The array of segments used to update the WebGraph. Newer
* segments and fetch times will overwrite older segments.
* @param normalize whether to use URLNormalizers on URL's in the segment
* @param filter whether to use URLFilters on URL's in the segment
*
* @throws IOException If an error occurs while processing the WebGraph.
*/
public void createWebGraph(Path webGraphDb, Path[] segments)
public void createWebGraph(Path webGraphDb, Path[] segments, boolean normalize, boolean filter)
throws IOException {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
if (LOG.isInfoEnabled()) {
LOG.info("WebGraphDb: starting at " + sdf.format(start));
LOG.info("WebGraphDb: webgraphdb: " + webGraphDb);
LOG.info("WebGraphDb: URL normalize: " + normalize);
LOG.info("WebGraphDb: URL filter: " + filter);
}

Configuration conf = getConf();
Expand Down Expand Up @@ -486,6 +553,9 @@ public void createWebGraph(Path webGraphDb, Path[] segments)
LOG.info("OutlinkDb: adding input: " + outlinkDb);
FileInputFormat.addInputPath(outlinkJob, outlinkDb);

outlinkJob.setBoolean(OutlinkDb.URL_NORMALIZING, normalize);
outlinkJob.setBoolean(OutlinkDb.URL_FILTERING, filter);

outlinkJob.setInputFormat(SequenceFileInputFormat.class);
outlinkJob.setMapperClass(OutlinkDb.class);
outlinkJob.setReducerClass(OutlinkDb.class);
Expand All @@ -495,6 +565,7 @@ public void createWebGraph(Path webGraphDb, Path[] segments)
outlinkJob.setOutputValueClass(LinkDatum.class);
FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
outlinkJob.setOutputFormat(MapFileOutputFormat.class);
outlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);

// run the outlinkdb job and replace any old outlinkdb with the new one
try {
Expand Down Expand Up @@ -532,6 +603,7 @@ public void createWebGraph(Path webGraphDb, Path[] segments)
inlinkJob.setOutputValueClass(LinkDatum.class);
FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
inlinkJob.setOutputFormat(MapFileOutputFormat.class);
inlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);

try {

Expand Down Expand Up @@ -572,6 +644,7 @@ public void createWebGraph(Path webGraphDb, Path[] segments)
nodeJob.setOutputValueClass(Node.class);
FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
nodeJob.setOutputFormat(MapFileOutputFormat.class);
nodeJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);

try {

Expand Down Expand Up @@ -621,18 +694,23 @@ public int run(String[] args)
"the segment(s) to use").create("segment");
Option segDirOpts = OptionBuilder.withArgName("segmentDir").hasArgs().withDescription(
"the segment directory to use").create("segmentDir");
Option normalizeOpts = OptionBuilder.withArgName("normalize").withDescription(
"whether to use URLNormalizers on the URL's in the segment").create("normalize");
Option filterOpts = OptionBuilder.withArgName("filter").withDescription(
"whether to use URLFilters on the URL's in the segment").create("filter");
options.addOption(helpOpts);
options.addOption(webGraphDbOpts);
options.addOption(segOpts);
options.addOption(segDirOpts);
options.addOption(normalizeOpts);
options.addOption(filterOpts);

CommandLineParser parser = new GnuParser();
try {

CommandLine line = parser.parse(options, args);
if (line.hasOption("help") || !line.hasOption("webgraphdb")
|| (!line.hasOption("segment") && !line.hasOption("segmentDir"))
) {
|| (!line.hasOption("segment") && !line.hasOption("segmentDir"))) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("WebGraph", options);
return -1;
Expand All @@ -659,7 +737,19 @@ public int run(String[] args)
segPaths = HadoopFSUtil.getPaths(fstats);
}

createWebGraph(new Path(webGraphDb), segPaths);
boolean normalize = false;

if (line.hasOption("normalize")) {
normalize = true;
}

boolean filter = false;

if (line.hasOption("filter")) {
filter = true;
}

createWebGraph(new Path(webGraphDb), segPaths, normalize, filter);
return 0;
}
catch (Exception e) {
Expand Down

0 comments on commit e874ce9

Please sign in to comment.