Skip to content

Commit

Permalink
Merge pull request #1 from defg/gora-accumulo
Browse files Browse the repository at this point in the history
Gora accumulo
  • Loading branch information
jatrost committed Apr 4, 2012
2 parents c4d74ed + 13d45d1 commit 7efa5fa
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 9 deletions.
4 changes: 4 additions & 0 deletions src/java/org/apache/nutch/crawl/GeneratorMapper.java
Expand Up @@ -17,6 +17,7 @@
package org.apache.nutch.crawl;

import java.io.IOException;
import java.net.MalformedURLException;

import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.GeneratorJob.SelectorEntry;
Expand Down Expand Up @@ -62,6 +63,9 @@ public void map(String reversedUrl, WebPage page,
} catch (URLFilterException e) {
GeneratorJob.LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
return;
} catch (MalformedURLException e) {
GeneratorJob.LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
return;
}

// check fetch schedule
Expand Down
8 changes: 7 additions & 1 deletion src/java/org/apache/nutch/crawl/GeneratorReducer.java
Expand Up @@ -20,6 +20,8 @@
import java.util.HashMap;
import java.util.Map;

import java.net.MalformedURLException;

import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.GeneratorJob.SelectorEntry;
Expand Down Expand Up @@ -73,7 +75,11 @@ protected void reduce(SelectorEntry key, Iterable<WebPage> values,
}

Mark.GENERATE_MARK.putMark(page, batchId);
context.write(TableUtil.reverseUrl(key.url), page);
try {
context.write(TableUtil.reverseUrl(key.url), page);
} catch (MalformedURLException e) {
continue;
}
context.getCounter("Generator", "GENERATE_MARK").increment(1);
count++;
}
Expand Down
15 changes: 12 additions & 3 deletions src/java/org/apache/nutch/fetcher/FetcherJob.java
Expand Up @@ -154,6 +154,7 @@ public Map<String,Object> run(Map<String,Object> args) throws Exception {
Integer threads = (Integer)args.get(Nutch.ARG_THREADS);
Boolean shouldResume = (Boolean)args.get(Nutch.ARG_RESUME);
Integer numTasks = (Integer)args.get(Nutch.ARG_NUMTASKS);
Boolean parse = (Boolean)args.get(Nutch.ARG_PARSE);

if (threads != null && threads > 0) {
getConf().setInt(THREADS_KEY, threads);
Expand All @@ -166,6 +167,10 @@ public Map<String,Object> run(Map<String,Object> args) throws Exception {
getConf().setBoolean(RESUME_KEY, shouldResume);
}

if (parse != null) {
getConf().setBoolean(PARSE_KEY, parse);
}

// set the actual time for the timelimit relative
// to the beginning of the whole job and not of a specific task
// otherwise it keeps trying again if a task fails
Expand Down Expand Up @@ -201,7 +206,7 @@ public Map<String,Object> run(Map<String,Object> args) throws Exception {
* @return 0 on success
* @throws Exception
*/
public int fetch(String batchId, int threads, boolean shouldResume, int numTasks)
public int fetch(String batchId, int threads, boolean shouldResume, int numTasks, boolean parse)
throws Exception {
LOG.info("FetcherJob: starting");

Expand All @@ -219,7 +224,8 @@ public int fetch(String batchId, int threads, boolean shouldResume, int numTasks
Nutch.ARG_BATCH, batchId,
Nutch.ARG_THREADS, threads,
Nutch.ARG_RESUME, shouldResume,
Nutch.ARG_NUMTASKS, numTasks));
Nutch.ARG_NUMTASKS, numTasks,
Nutch.ARG_PARSE, parse));
LOG.info("FetcherJob: done");
return 0;
}
Expand Down Expand Up @@ -261,6 +267,7 @@ void checkConfiguration() {
public int run(String[] args) throws Exception {
int threads = -1;
boolean shouldResume = false;
boolean parse = false;
String batchId;

String usage = "Usage: FetcherJob (<batchId> | -all) [-crawlId <id>] " +
Expand Down Expand Up @@ -292,10 +299,12 @@ public int run(String[] args) throws Exception {
numTasks = Integer.parseInt(args[++i]);
} else if ("-crawlId".equals(args[i])) {
getConf().set(Nutch.CRAWL_ID_KEY, args[++i]);
} else if ("-parse".equals(args[i])) {
parse = true;
}
}

int fetchcode = fetch(batchId, threads, shouldResume, numTasks); // run the Fetcher
int fetchcode = fetch(batchId, threads, shouldResume, numTasks, parse); // run the Fetcher

return fetchcode;
}
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/nutch/metadata/Nutch.java
Expand Up @@ -110,6 +110,8 @@ public interface Nutch {
public static final String ARG_CLASS = "class";
/** Depth (number of cycles) of a crawl. */
public static final String ARG_DEPTH = "depth";
/** Parse */
public static final String ARG_PARSE = "parse";

// short constants for status / results fields
/** Status / result message. */
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/nutch/parse/ParseUtil.java
Expand Up @@ -204,7 +204,7 @@ public URLWebPage process(String key, WebPage page) {
} catch (MalformedURLException e) {
return redirectedPage;
}
if (newUrl == null || newUrl.equals(url)) {
if (newUrl != null && !newUrl.equals(url)) {
String reprUrl = URLUtil.chooseRepr(url, newUrl,
refreshTime < FetcherJob.PERM_REFRESH_TIME);
WebPage newWebPage = new WebPage();
Expand Down
10 changes: 6 additions & 4 deletions src/java/org/apache/nutch/util/TableUtil.java
Expand Up @@ -119,11 +119,13 @@ public static String getReversedHost(String reversedUrl) {
}

private static void reverseAppendSplits(String[] splits, StringBuilder buf) {
for (int i = splits.length - 1; i > 0; i--) {
buf.append(splits[i]);
buf.append('.');
if (splits.length > 0) {
for (int i = splits.length - 1; i > 0; i--) {
buf.append(splits[i]);
buf.append('.');
}
buf.append(splits[0]);
}
buf.append(splits[0]);
}

/**
Expand Down

0 comments on commit 7efa5fa

Please sign in to comment.