Skip to content

Commit

Permalink
apply NUTCH-2184
Browse files Browse the repository at this point in the history
this patch is the one from Jan 2016, but has been updated at
apache#95, so once 1.12 is released
check if the PR made it in.
  • Loading branch information
naegelejd committed May 24, 2016
1 parent 034a8bc commit 95e9956
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 85 deletions.
90 changes: 56 additions & 34 deletions src/java/org/apache/nutch/indexer/IndexerMapReduce.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import org.apache.nutch.scoring.ScoringFilters;

public class IndexerMapReduce extends Configured implements
Mapper<Text, Writable, Text, NutchWritable>,
Reducer<Text, NutchWritable, Text, NutchIndexAction> {
Mapper<Text, Writable, Text, NutchWritable>,
Reducer<Text, NutchWritable, Text, NutchIndexAction> {

public static final Logger LOG = LoggerFactory
.getLogger(IndexerMapReduce.class);
Expand Down Expand Up @@ -255,24 +255,30 @@ public void reduce(Text key, Iterator<NutchWritable> values,
}
}

if (fetchDatum == null || dbDatum == null || parseText == null
|| parseData == null) {
return; // only have inlinks
//exclusion of dbDatum == null in OR logic means that we can still index
//segment(s) even if they are not accompanied by a crawldb CrawlDatum
if (fetchDatum == null || parseText == null || parseData == null) {
return; // no segment data is available to index.
}

// Whether to delete pages marked as duplicates
if (delete && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
reporter.incrCounter("IndexerStatus", "deleted (duplicates)", 1);
output.collect(key, DELETE_ACTION);
return;
if (dbDatum != null) {
if (delete && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
reporter.incrCounter("IndexerStatus", "deleted duplicates", 1);
output.collect(key, DELETE_ACTION);
return;
}
}

// Whether to skip DB_NOTMODIFIED pages
if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
reporter.incrCounter("IndexerStatus", "skipped (not modified)", 1);
return;
if (dbDatum != null) {
if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
reporter.incrCounter("IndexerStatus", "skipped (not modified)", 1);
return;
}
}


if (!parseData.getStatus().isSuccess()
|| fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
return;
Expand Down Expand Up @@ -309,12 +315,20 @@ public void reduce(Text key, Iterator<NutchWritable> values,

try {
// Indexing filters may also be interested in the signature
fetchDatum.setSignature(dbDatum.getSignature());

if (dbDatum != null) {
fetchDatum.setSignature(dbDatum.getSignature());
}

// extract information from dbDatum and pass it to
// fetchDatum so that indexing filters can use it
final Text url = (Text) dbDatum.getMetaData().get(
Nutch.WRITABLE_REPR_URL_KEY);
Text url = null;
if (dbDatum != null) {
url = (Text) dbDatum.getMetaData().get(
Nutch.WRITABLE_REPR_URL_KEY);
} else {
//implement some replacement
}

if (url != null) {
// Representation URL also needs normalization and filtering.
// If repr URL is excluded by filters we still accept this document
Expand Down Expand Up @@ -373,28 +387,24 @@ public void close() throws IOException {
public static void initMRJob(Path crawlDb, Path linkDb,
Collection<Path> segments, JobConf job, boolean addBinaryContent) {

LOG.info("IndexerMapReduce: crawldb: {}", crawlDb);

if (linkDb != null)
LOG.info("IndexerMapReduce: linkdb: {}", linkDb);

for (final Path segment : segments) {
LOG.info("IndexerMapReduces: adding segment: {}", segment);
FileInputFormat.addInputPath(job, new Path(segment,
CrawlDatum.FETCH_DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment,
CrawlDatum.PARSE_DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));

if (addBinaryContent) {
FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
if (crawlDb != null) {
LOG.info("IndexerMapReduce: crawldb: {}", crawlDb);
Path currentCrawlDb = new Path(crawlDb, CrawlDb.CURRENT_NAME);
try {
if (FileSystem.get(job).exists(currentCrawlDb)) {
FileInputFormat.addInputPath(job, currentCrawlDb);
} else {
LOG.warn("Ignoring crawlDb for indexing, no crawlDb found in path: {}",
crawlDb);
}
} catch (IOException e) {
LOG.warn("Failed to use crawlDb ({}) for indexing: {}", crawlDb,
org.apache.hadoop.util.StringUtils.stringifyException(e));
}
}

FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));

if (linkDb != null) {
LOG.info("IndexerMapReduce: linkdb: {}", linkDb);
Path currentLinkDb = new Path(linkDb, LinkDb.CURRENT_NAME);
try {
if (FileSystem.get(job).exists(currentLinkDb)) {
Expand All @@ -409,6 +419,18 @@ public static void initMRJob(Path crawlDb, Path linkDb,
}
}

for (final Path segment : segments) {
LOG.info("IndexerMapReduces: adding segment: {}", segment);
FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));

if (addBinaryContent) {
FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
}
}

job.setInputFormat(SequenceFileInputFormat.class);

job.setMapperClass(IndexerMapReduce.class);
Expand Down
208 changes: 161 additions & 47 deletions src/java/org/apache/nutch/indexer/IndexingJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@

import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.segment.SegmentChecker;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -163,35 +169,142 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
}

public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err
//.println("Usage: Indexer <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone] [-filter] [-normalize]");
.println("Usage: Indexer <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone] [-filter] [-normalize] [-addBinaryContent] [-base64]");
IndexWriters writers = new IndexWriters(getConf());
System.err.println(writers.describe());
return -1;
}

final Path crawlDb = new Path(args[0]);
Path linkDb = null;

final List<Path> segments = new ArrayList<Path>();
String params = null;

boolean noCommit = false;
boolean deleteGone = false;
boolean filter = false;
boolean normalize = false;
boolean addBinaryContent = false;
boolean base64 = false;
// boolean options
Option helpOpt = new Option("h", "help", false, "show this help message");
// argument options
@SuppressWarnings("static-access")
Option crawldbOpt = OptionBuilder
.withArgName("crawldb")
.hasArg()
.withDescription(
"a crawldb directory to use with this tool (optional)")
.create("crawldb");
@SuppressWarnings("static-access")
Option linkdbOpt = OptionBuilder
.withArgName("linkdb")
.hasArg()
.withDescription(
"a linkdb directory to use with this tool (optional)")
.create("linkdb");
@SuppressWarnings("static-access")
Option paramsOpt = OptionBuilder
.withArgName("params")
.hasArg()
.withDescription(
"key value parameters to be used with this tool e.g. k1=v1&k2=v2... (optional)")
.create("params");
@SuppressWarnings("static-access")
Option segOpt = OptionBuilder
.withArgName("segment")
.hasArgs()
.withDescription("the segment(s) to use (either this or --segmentDir is mandatory)")
.create("segment");
@SuppressWarnings("static-access")
Option segmentDirOpt = OptionBuilder
.withArgName("segmentDir")
.hasArg()
.withDescription(
"directory containing one or more segments to be used with this tool "
+ "(either this or --segment is mandatory)")
.create("segmentDir");
@SuppressWarnings("static-access")
Option noCommitOpt = OptionBuilder
.withArgName("noCommit")
.withDescription(
"do the commits once and for all the reducers in one go (optional)")
.create("noCommit");
@SuppressWarnings("static-access")
Option deleteGoneOpt = OptionBuilder
.withArgName("deleteGone")
.withDescription(
"delete gone documents e.g. documents which no longer exist at the particular resource (optional)")
.create("deleteGone");
@SuppressWarnings("static-access")
Option filterOpt = OptionBuilder
.withArgName("filter")
.withDescription(
"filter documents (optional)")
.create("filter");
@SuppressWarnings("static-access")
Option normalizeOpt = OptionBuilder
.withArgName("normalize")
.withDescription(
"normalize documents (optional)")
.create("normalize");
@SuppressWarnings("static-access")
Option addBinaryContentOpt = OptionBuilder
.withArgName("addBinaryContent")
.withDescription(
"add the raw content of the document to the indexing job (optional)")
.create("addBinaryContent");
@SuppressWarnings("static-access")
Option base64Opt = OptionBuilder
.withArgName("base64")
.withDescription(
"if raw content is added, base64 encode it (optional)")
.create("base64");

Options options = new Options();
options.addOption(helpOpt);
options.addOption(crawldbOpt);
options.addOption(linkdbOpt);
options.addOption(segOpt);
options.addOption(paramsOpt);
options.addOption(segmentDirOpt);
options.addOption(noCommitOpt);
options.addOption(deleteGoneOpt);
options.addOption(filterOpt);
options.addOption(normalizeOpt);
options.addOption(addBinaryContentOpt);
options.addOption(base64Opt);

GnuParser parser = new GnuParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
// if 'help' is present OR one of 'segmentDir' or 'segment' is NOT present then print help
if (cmd.hasOption("help") || (!cmd.hasOption("segmentDir")
&& (!cmd.hasOption("segment")))) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(getClass().getSimpleName() +
" [-crawldb <crawldb>] [-linkdb <linkdb>] [-params k1=v1&k2=v2...] "
+ "(<segment> ... | -dir <segments>) [-noCommit] [-deleteGone] "
+ "[-filter] [-normalize] [-addBinaryContent] [-base64]", options, true);
IndexWriters writers = new IndexWriters(getConf());
LOG.error(writers.describe());
return -1;
}

for (int i = 1; i < args.length; i++) {
Path crawlDb = null;
Path linkDb = null;
final List<Path> segments = new ArrayList<Path>();
String params = null;
boolean noCommit = false;
boolean deleteGone = false;
boolean filter = false;
boolean normalize = false;
boolean addBinaryContent = false;
boolean base64 = false;
FileSystem fs = null;
Path dir = null;
if (args[i].equals("-linkdb")) {
linkDb = new Path(args[++i]);
} else if (args[i].equals("-dir")) {
dir = new Path(args[++i]);

if (cmd.hasOption("crawldb")) {
crawlDb = new Path(cmd.getOptionValue("crawldb"));
}
if (cmd.hasOption("linkdb")) {
linkDb = new Path(cmd.getOptionValue("linkdb"));
}
if (cmd.hasOption("params")) {
params = cmd.getOptionValue("params");
}
if (cmd.hasOption("segment")) {
dir = new Path(cmd.getOptionValue("segment"));
fs = dir.getFileSystem(getConf());
if (SegmentChecker.isIndexable(dir,fs)) {
segments.add(dir);
}
} else if (cmd.hasOption("segmentDir")) {
dir = new Path(cmd.getOptionValue("segmentDir"));
fs = dir.getFileSystem(getConf());
FileStatus[] fstats = fs.listStatus(dir,
HadoopFSUtil.getPassDirectoriesFilter(fs));
Expand All @@ -201,35 +314,35 @@ public int run(String[] args) throws Exception {
segments.add(p);
}
}
} else if (args[i].equals("-noCommit")) {
}
if (cmd.hasOption("noCommit")) {
noCommit = true;
} else if (args[i].equals("-deleteGone")) {
}
if (cmd.hasOption("deleteGone")) {
deleteGone = true;
} else if (args[i].equals("-filter")) {
}
if (cmd.hasOption("filter")) {
filter = true;
} else if (args[i].equals("-normalize")) {
}
if (cmd.hasOption("normalize")) {
normalize = true;
} else if (args[i].equals("-addBinaryContent")) {
}
if (cmd.hasOption("addBinaryContent")) {
addBinaryContent = true;
} else if (args[i].equals("-base64")) {
}
if (cmd.hasOption("base64")) {
base64 = true;
} else if (args[i].equals("-params")) {
params = args[++i];
} else {
dir = new Path(args[i]);
fs = dir.getFileSystem(getConf());
if (SegmentChecker.isIndexable(dir,fs)) {
segments.add(dir);
}
}
}

try {
index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, normalize, addBinaryContent, base64);
return 0;
} catch (final Exception e) {
LOG.error("Indexer: {}", StringUtils.stringifyException(e));
return -1;
try {
index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, normalize, addBinaryContent, base64);
return 0;
} catch (final Exception e) {
LOG.error("Indexer: {}", StringUtils.stringifyException(e));
return -1;
}
} finally {
//do nothing
}
}

Expand All @@ -241,6 +354,7 @@ public static void main(String[] args) throws Exception {


//Used for REST API
@SuppressWarnings("unchecked")
@Override
public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
boolean noCommit = false;
Expand Down
7 changes: 5 additions & 2 deletions src/java/org/apache/nutch/tools/FileDumper.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,11 @@ public static void main(String[] args) throws Exception {
"output directory (which will be created) to host the raw data")
.create("outputDir");
@SuppressWarnings("static-access")
Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
.withDescription("the segment(s) to use").create("segment");
Option segOpt = OptionBuilder
.withArgName("segment")
.hasArgs()
.withDescription("the segment(s) to use")
.create("segment");
@SuppressWarnings("static-access")
Option mimeOpt = OptionBuilder
.withArgName("mimetype")
Expand Down

0 comments on commit 95e9956

Please sign in to comment.