Skip to content

Commit

Permalink
Merge branch 'master' into novel-quotas
Browse files Browse the repository at this point in the history
* master:
  license header
  check that sourceTag of CrawlURI actually matches configured sourceTag
  remove already-outdated stuff from javadoc
  handle multiple clauses for same user agent in robots.txt
  Hook in submitted seeds properly.
  avoid spurious logging
  try very hard to start url consumer, and therefore bind the queue to the routing key, so that no messages are dropped, before crawling starts (should always work unless rabbitmq is down); some other tweaks for clarity and stability
  • Loading branch information
nlevitt committed Jan 13, 2016
2 parents 22336fb + b828969 commit 9e4c7f0
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 64 deletions.
Expand Up @@ -37,6 +37,7 @@
import org.archive.modules.SchedulingConstants;
import org.archive.modules.extractor.Hop;
import org.archive.modules.extractor.LinkContext;
import org.archive.modules.seeds.SeedModule;
import org.archive.net.UURI;
import org.archive.net.UURIFactory;
import org.json.JSONArray;
Expand Down Expand Up @@ -77,6 +78,17 @@ public void setFrontier(Frontier frontier) {
this.frontier = frontier;
}

protected SeedModule seeds;

public SeedModule getSeeds() {
return this.seeds;
}

@Autowired
public void setSeeds(SeedModule seeds) {
this.seeds = seeds;
}

protected String amqpUri = "amqp://guest:guest@localhost:5672/%2f";
public String getAmqpUri() {
return this.amqpUri;
Expand Down Expand Up @@ -106,7 +118,7 @@ public void setQueueName(String queueName) {
public boolean isRunning() {
return isRunning;
}

/** Should be queues be marked as durable? */
private boolean durable = false;
public boolean isDurable() {
Expand All @@ -127,12 +139,10 @@ public void setAutoDelete(boolean autoDelete) {

private transient Lock lock = new ReentrantLock(true);

private boolean pauseConsumer = true;

private boolean isConsuming = false;
private transient boolean pauseConsumer = false;
private transient String consumerTag = null;

private class StarterRestarter extends Thread {
private String consumerTag = null;

public StarterRestarter(String name) {
super(name);
Expand All @@ -143,48 +153,50 @@ public void run() {
while (!Thread.interrupted()) {
try {
lock.lockInterruptibly();
logger.finest("Checking isConsuming=" + isConsuming + " and pauseConsumer=" + pauseConsumer);
logger.finest("Checking consumerTag=" + consumerTag + " and pauseConsumer=" + pauseConsumer);
try {
if (!isConsuming && !pauseConsumer) {
if (consumerTag == null && !pauseConsumer) {
// start up again
try {
Consumer consumer = new UrlConsumer(channel());
channel().exchangeDeclare(getExchange(), "direct", true);
channel().queueDeclare(getQueueName(), durable,
false, autoDelete, null);
channel().queueBind(getQueueName(), getExchange(), getQueueName());
consumerTag = channel().basicConsume(getQueueName(), false, consumer);
isConsuming = true;
logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag);
startConsumer();
} catch (IOException e) {
logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again after 10 seconds)", e);
}
}

if (isConsuming && pauseConsumer) {
if (consumerTag != null && pauseConsumer) {
try {
if (consumerTag != null) {
logger.info("Attempting to cancel URLConsumer with consumerTag=" + consumerTag);
channel().basicCancel(consumerTag);
consumerTag = null;
isConsuming = false;
logger.info("Cancelled URLConsumer.");
}
} catch (IOException e) {
logger.log(Level.SEVERE, "problem cancelling AMQP consumer (will try again after 10 seconds)", e);
}
}

Thread.sleep(10 * 1000);
} finally {
lock.unlock();
}
} catch (InterruptedException e) {

Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
return;
}
}
}

public void startConsumer() throws IOException {
Consumer consumer = new UrlConsumer(channel());
channel().exchangeDeclare(getExchange(), "direct", true);
channel().queueDeclare(getQueueName(), durable,
false, autoDelete, null);
channel().queueBind(getQueueName(), getExchange(), getQueueName());
consumerTag = channel().basicConsume(getQueueName(), false, consumer);
logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag);
}
}

transient private StarterRestarter starterRestarter;
Expand All @@ -196,6 +208,13 @@ public void start() {
// spawn off a thread to start up the amqp consumer, and try to restart it if it dies
if (!isRunning) {
starterRestarter = new StarterRestarter(AMQPUrlReceiver.class.getSimpleName() + "-starter-restarter");
try {
// try to synchronously start the consumer right now, so
// that the queue is bound before crawling starts
starterRestarter.startConsumer();
} catch (IOException e) {
logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again soon)", e);
}
starterRestarter.start();
}
isRunning = true;
Expand All @@ -209,13 +228,6 @@ public void stop() {
lock.lock();
try {
logger.info("shutting down");
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
logger.log(Level.SEVERE, "problem closing AMQP connection", e);
}
}
if (starterRestarter != null && starterRestarter.isAlive()) {
starterRestarter.interrupt();
try {
Expand All @@ -224,6 +236,14 @@ public void stop() {
}
}
starterRestarter = null;

if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
logger.log(Level.SEVERE, "problem closing AMQP connection", e);
}
}
connection = null;
channel = null;
isRunning = false;
Expand Down Expand Up @@ -298,13 +318,18 @@ public void handleDelivery(String consumerTag, Envelope envelope,
throw new RuntimeException(e); // can't happen
}
JSONObject jo = new JSONObject(decodedBody);

if ("GET".equals(jo.getString("method"))) {
CrawlURI curi;
try {
curi = makeCrawlUri(jo);
// bypasses scoping (unless rechecking is configured)
getFrontier().schedule(curi);
// Declare as a new seed if it is the case:
if (curi.isSeed()) {
getSeeds().addSeed(curi); // Also triggers scheduling.
} else {
// bypasses scoping (unless rechecking is configured)
getFrontier().schedule(curi);
}
if (logger.isLoggable(Level.FINE)) {
logger.fine("scheduled " + curi);
}
Expand All @@ -328,7 +353,7 @@ public void handleDelivery(String consumerTag, Envelope envelope,

this.getChannel().basicAck(envelope.getDeliveryTag(), false);
}

@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
Expand All @@ -337,7 +362,7 @@ public void handleShutdownSignal(String consumerTag,
} else {
logger.info("amqp channel/connection shut down consumerTag=" + consumerTag);
}
isConsuming = false;
AMQPUrlReceiver.this.consumerTag = null;
}

// {
Expand All @@ -362,7 +387,7 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
String hopPath = parentHopPath + Hop.INFERRED.getHopString();

CrawlURI curi = new CrawlURI(uuri, hopPath, via, LinkContext.INFERRED_MISC);

// set the heritable data from the parent url, passed back to us via amqp
// XXX brittle, only goes one level deep, and only handles strings and arrays, the latter of which it converts to a Set.
// 'heritableData': {'source': 'https://facebook.com/whitehouse/', 'heritable': ['source', 'heritable']}
Expand Down Expand Up @@ -395,19 +420,28 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
*/
curi.setSchedulingDirective(SchedulingConstants.HIGH);
curi.setPrecedence(1);

// optional forceFetch instruction:
if (jo.has("forceFetch")) {
boolean forceFetch = jo.getBoolean("forceFetch");
logger.info("Setting forceFetch=" + forceFetch);
if (forceFetch)
logger.info("Setting forceFetch=" + forceFetch);
curi.setForceFetch(forceFetch);
}

// optional isSeed instruction:
if (jo.has("isSeed")) {
boolean isSeed = jo.getBoolean("isSeed");
logger.info("Setting isSeed=" + isSeed);
if (isSeed)
logger.info("Setting isSeed=" + isSeed);
curi.setSeed(isSeed);
// We want to force the seed version of a URL to be fetched even
// if the URL has been seen before. See
//
// org.archive.crawler.postprocessor.CandidatesProcessor.runCandidateChain(CrawlURI
// candidate, CrawlURI source)
//
curi.setForceFetch(true);
}

curi.getAnnotations().add(A_RECEIVED_FROM_AMQP);
Expand All @@ -420,15 +454,16 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
public void onApplicationEvent(CrawlStateEvent event) {
switch(event.getState()) {
case PAUSING: case PAUSED:
logger.info("Requesting a pause of the URLConsumer...");
this.pauseConsumer = true;
if (!this.pauseConsumer) {
logger.info("Requesting a pause of the URLConsumer...");
this.pauseConsumer = true;
}
break;

case RUNNING:
logger.info("Requesting restart of the URLConsumer...");
this.pauseConsumer = false;
if (starterRestarter == null || !starterRestarter.isAlive()) {
start();
if (this.pauseConsumer) {
logger.info("Requesting unpause of the URLConsumer...");
this.pauseConsumer = false;
}
break;

Expand Down
Expand Up @@ -65,17 +65,7 @@ public Map<String, Long> getQuotas() {
return quotas;
}
/**
* Keys can be any of the {@link CrawledBytesHistotable} keys:
* <ul>
* <li>{@value CrawledBytesHistotable#NOVEL}
* <li>{@value CrawledBytesHistotable#NOVELCOUNT}
* <li>{@value CrawledBytesHistotable#DUPLICATE}
* <li>{@value CrawledBytesHistotable#DUPLICATECOUNT}
* <li>{@value CrawledBytesHistotable#NOTMODIFIED}
* <li>{@value CrawledBytesHistotable#NOTMODIFIEDCOUNT}
* <li>{@value CrawledBytesHistotable#OTHERDUPLICATE}
* <li>{@value CrawledBytesHistotable#OTHERDUPLICATECOUNT}
* </ul>
* Keys can be any of the {@link CrawledBytesHistotable} keys.
*/
public void setQuotas(Map<String, Long> quotas) {
this.quotas = quotas;
Expand All @@ -93,6 +83,7 @@ public void setStatisticsTracker(StatisticsTracker statisticsTracker) {
@Override
protected boolean shouldProcess(CrawlURI curi) {
return curi.containsDataKey(CoreAttributeConstants.A_SOURCE_TAG)
&& sourceTag.equals(curi.getSourceTag())
&& statisticsTracker.getSourceStats(curi.getSourceTag()) != null;
}

Expand Down
@@ -1,3 +1,21 @@
/*
* This file is part of the Heritrix web crawler (crawler.archive.org).
*
* Licensed to the Internet Archive (IA) by one or more individual
* contributors.
*
* The IA licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.archive.crawler.selftest;

import org.archive.crawler.reporting.StatisticsTracker;
Expand All @@ -19,13 +37,13 @@ protected String getSeedsString() {
// non-deterministic
return "http://127.0.0.1:7777/a.html\\nhttp://localhost:7777/b.html";
}

@Override
protected void verify() throws Exception {
verifySourceStats();
verifyWarcStats();
}

protected void verifyWarcStats() {
StatisticsTracker stats = heritrix.getEngine().getJob("selftest-job").getCrawlController().getStatisticsTracker();
assertNotNull(stats);
Expand Down Expand Up @@ -53,7 +71,7 @@ protected void verifySourceStats() throws Exception {
assertEquals(3l, (long) sourceStats.get("novelCount"));
assertEquals(2942l, (long) sourceStats.get("warcNovelContentBytes"));
assertEquals(3l, (long) sourceStats.get("warcNovelUrls"));

sourceStats = stats.getSourceStats("http://localhost:7777/b.html");
assertNotNull(sourceStats);
assertEquals(4, sourceStats.keySet().size());
Expand Down
Expand Up @@ -34,7 +34,8 @@ public class RobotsDirectives implements Serializable {

protected ConcurrentSkipListSet<String> disallows = new ConcurrentSkipListSet<String>();
protected ConcurrentSkipListSet<String> allows = new ConcurrentSkipListSet<String>();
protected float crawlDelay = -1;
protected float crawlDelay = -1;
public transient boolean hasDirectives = false;

public boolean allows(String path) {
return !(longestPrefixLength(disallows, path) > longestPrefixLength(allows, path));
Expand All @@ -57,19 +58,22 @@ protected int longestPrefixLength(ConcurrentSkipListSet<String> prefixSet,
}

public void addDisallow(String path) {
hasDirectives = true;
if(path.length()==0) {
// ignore empty-string disallows
// (they really mean allow, when alone)
return;
return;
}
disallows.add(path);
}

public void addAllow(String path) {
hasDirectives = true;
allows.add(path);
}

public void setCrawlDelay(float i) {
hasDirectives = true;
crawlDelay=i;
}

Expand Down

0 comments on commit 9e4c7f0

Please sign in to comment.