From e4acb6c5129fcb2c7494bc206a82734c37383c1f Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 8 Oct 2015 17:22:44 -0700 Subject: [PATCH 1/7] try very hard to start url consumer, and therefore bind the queue to the routing key, so that no messages are dropped, before crawling starts (should always work unless rabbitmq is down); some other tweaks for clarity and stability --- .../crawler/frontier/AMQPUrlReceiver.java | 74 ++++++++++--------- 1 file changed, 40 insertions(+), 34 deletions(-) diff --git a/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java b/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java index ca10f2a1d..918913d7c 100644 --- a/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java +++ b/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java @@ -106,7 +106,7 @@ public void setQueueName(String queueName) { public boolean isRunning() { return isRunning; } - + /** Should be queues be marked as durable? */ private boolean durable = false; public boolean isDurable() { @@ -127,12 +127,10 @@ public void setAutoDelete(boolean autoDelete) { private transient Lock lock = new ReentrantLock(true); - private boolean pauseConsumer = true; - - private boolean isConsuming = false; + private transient boolean pauseConsumer = false; + private transient String consumerTag = null; private class StarterRestarter extends Thread { - private String consumerTag = null; public StarterRestarter(String name) { super(name); @@ -143,31 +141,23 @@ public void run() { while (!Thread.interrupted()) { try { lock.lockInterruptibly(); - logger.finest("Checking isConsuming=" + isConsuming + " and pauseConsumer=" + pauseConsumer); + logger.finest("Checking consumerTag=" + consumerTag + " and pauseConsumer=" + pauseConsumer); try { - if (!isConsuming && !pauseConsumer) { + if (consumerTag == null && !pauseConsumer) { // start up again try { - Consumer consumer = new UrlConsumer(channel()); - channel().exchangeDeclare(getExchange(), "direct", true); - channel().queueDeclare(getQueueName(), durable, - false, autoDelete, null); - channel().queueBind(getQueueName(), getExchange(), getQueueName()); - consumerTag = channel().basicConsume(getQueueName(), false, consumer); - isConsuming = true; - logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag); + startConsumer(); } catch (IOException e) { logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again after 10 seconds)", e); } } - if (isConsuming && pauseConsumer) { + if (consumerTag != null && pauseConsumer) { try { if (consumerTag != null) { logger.info("Attempting to cancel URLConsumer with consumerTag=" + consumerTag); channel().basicCancel(consumerTag); consumerTag = null; - isConsuming = false; logger.info("Cancelled URLConsumer."); } } catch (IOException e) { @@ -175,16 +165,27 @@ public void run() { } } - Thread.sleep(10 * 1000); } finally { lock.unlock(); } + + Thread.sleep(10 * 1000); } catch (InterruptedException e) { return; } } } + + public void startConsumer() throws IOException { + Consumer consumer = new UrlConsumer(channel()); + channel().exchangeDeclare(getExchange(), "direct", true); + channel().queueDeclare(getQueueName(), durable, + false, autoDelete, null); + channel().queueBind(getQueueName(), getExchange(), getQueueName()); + consumerTag = channel().basicConsume(getQueueName(), false, consumer); + logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag); + } } transient private StarterRestarter starterRestarter; @@ -196,6 +197,13 @@ public void start() { // spawn off a thread to start up the amqp consumer, and try to restart it if it dies if (!isRunning) { starterRestarter = new StarterRestarter(AMQPUrlReceiver.class.getSimpleName() + "-starter-restarter"); + try { + // try to synchronously start the consumer right now, so + // that the queue is bound before crawling starts + starterRestarter.startConsumer(); + } catch (IOException e) { + logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again soon)", e); + } starterRestarter.start(); } isRunning = true; @@ -209,13 +217,6 @@ public void stop() { lock.lock(); try { logger.info("shutting down"); - if (connection != null && connection.isOpen()) { - try { - connection.close(); - } catch (IOException e) { - logger.log(Level.SEVERE, "problem closing AMQP connection", e); - } - } if (starterRestarter != null && starterRestarter.isAlive()) { starterRestarter.interrupt(); try { @@ -224,6 +225,14 @@ public void stop() { } } starterRestarter = null; + + if (connection != null && connection.isOpen()) { + try { + connection.close(); + } catch (IOException e) { + logger.log(Level.SEVERE, "problem closing AMQP connection", e); + } + } connection = null; channel = null; isRunning = false; @@ -298,7 +307,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, throw new RuntimeException(e); // can't happen } JSONObject jo = new JSONObject(decodedBody); - + if ("GET".equals(jo.getString("method"))) { CrawlURI curi; try { @@ -328,7 +337,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, this.getChannel().basicAck(envelope.getDeliveryTag(), false); } - + @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { @@ -337,7 +346,7 @@ public void handleShutdownSignal(String consumerTag, } else { logger.info("amqp channel/connection shut down consumerTag=" + consumerTag); } - isConsuming = false; + AMQPUrlReceiver.this.consumerTag = null; } // { @@ -362,7 +371,7 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException, String hopPath = parentHopPath + Hop.INFERRED.getHopString(); CrawlURI curi = new CrawlURI(uuri, hopPath, via, LinkContext.INFERRED_MISC); - + // set the heritable data from the parent url, passed back to us via amqp // XXX brittle, only goes one level deep, and only handles strings and arrays, the latter of which it converts to a Set. // 'heritableData': {'source': 'https://facebook.com/whitehouse/', 'heritable': ['source', 'heritable']} @@ -395,7 +404,7 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException, */ curi.setSchedulingDirective(SchedulingConstants.HIGH); curi.setPrecedence(1); - + // optional forceFetch instruction: if (jo.has("forceFetch")) { boolean forceFetch = jo.getBoolean("forceFetch"); @@ -425,11 +434,8 @@ public void onApplicationEvent(CrawlStateEvent event) { break; case RUNNING: - logger.info("Requesting restart of the URLConsumer..."); + logger.info("Requesting unpause of the URLConsumer..."); this.pauseConsumer = false; - if (starterRestarter == null || !starterRestarter.isAlive()) { - start(); - } break; default: From e37b2b42cb3ac65bc7a1922bfa243bfed14b967b Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 8 Oct 2015 18:11:09 -0700 Subject: [PATCH 2/7] avoid spurious logging --- .../archive/crawler/frontier/AMQPUrlReceiver.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java b/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java index 918913d7c..8483f4fdd 100644 --- a/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java +++ b/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java @@ -171,7 +171,6 @@ public void run() { Thread.sleep(10 * 1000); } catch (InterruptedException e) { - return; } } @@ -429,13 +428,17 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException, public void onApplicationEvent(CrawlStateEvent event) { switch(event.getState()) { case PAUSING: case PAUSED: - logger.info("Requesting a pause of the URLConsumer..."); - this.pauseConsumer = true; + if (!this.pauseConsumer) { + logger.info("Requesting a pause of the URLConsumer..."); + this.pauseConsumer = true; + } break; case RUNNING: - logger.info("Requesting unpause of the URLConsumer..."); - this.pauseConsumer = false; + if (this.pauseConsumer) { + logger.info("Requesting unpause of the URLConsumer..."); + this.pauseConsumer = false; + } break; default: From 0f6d385b247083157deb2abf0e2d52a2ef40d64d Mon Sep 17 00:00:00 2001 From: Andrew Jackson Date: Fri, 27 Nov 2015 20:51:35 +0000 Subject: [PATCH 3/7] Hook in submitted seeds properly. --- .../crawler/frontier/AMQPUrlReceiver.java | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java b/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java index ca10f2a1d..ac24947b8 100644 --- a/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java +++ b/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java @@ -37,6 +37,7 @@ import org.archive.modules.SchedulingConstants; import org.archive.modules.extractor.Hop; import org.archive.modules.extractor.LinkContext; +import org.archive.modules.seeds.SeedModule; import org.archive.net.UURI; import org.archive.net.UURIFactory; import org.json.JSONArray; @@ -77,6 +78,17 @@ public void setFrontier(Frontier frontier) { this.frontier = frontier; } + protected SeedModule seeds; + + public SeedModule getSeeds() { + return this.seeds; + } + + @Autowired + public void setSeeds(SeedModule seeds) { + this.seeds = seeds; + } + protected String amqpUri = "amqp://guest:guest@localhost:5672/%2f"; public String getAmqpUri() { return this.amqpUri; @@ -303,8 +315,13 @@ public void handleDelivery(String consumerTag, Envelope envelope, CrawlURI curi; try { curi = makeCrawlUri(jo); - // bypasses scoping (unless rechecking is configured) - getFrontier().schedule(curi); + // Declare as a new seed if it is the case: + if (curi.isSeed()) { + getSeeds().addSeed(curi); // Also triggers scheduling. + } else { + // bypasses scoping (unless rechecking is configured) + getFrontier().schedule(curi); + } if (logger.isLoggable(Level.FINE)) { logger.fine("scheduled " + curi); } @@ -399,15 +416,24 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException, // optional forceFetch instruction: if (jo.has("forceFetch")) { boolean forceFetch = jo.getBoolean("forceFetch"); - logger.info("Setting forceFetch=" + forceFetch); + if (forceFetch) + logger.info("Setting forceFetch=" + forceFetch); curi.setForceFetch(forceFetch); } // optional isSeed instruction: if (jo.has("isSeed")) { boolean isSeed = jo.getBoolean("isSeed"); - logger.info("Setting isSeed=" + isSeed); + if (isSeed) + logger.info("Setting isSeed=" + isSeed); curi.setSeed(isSeed); + // We want to force the seed version of a URL to be fetched even + // if the URL has been seen before. See + // + // org.archive.crawler.postprocessor.CandidatesProcessor.runCandidateChain(CrawlURI + // candidate, CrawlURI source) + // + curi.setForceFetch(true); } curi.getAnnotations().add(A_RECEIVED_FROM_AMQP); From 38931f152a79ac8c2ba3b3a3e52b3b02ee56b81d Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Fri, 11 Dec 2015 15:42:16 -0800 Subject: [PATCH 4/7] handle multiple clauses for same user agent in robots.txt --- .../archive/modules/net/RobotsDirectives.java | 8 +++++-- .../org/archive/modules/net/Robotstxt.java | 16 ++++++++------ .../archive/modules/net/RobotstxtTest.java | 22 +++++++++++++++++++ 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/modules/src/main/java/org/archive/modules/net/RobotsDirectives.java b/modules/src/main/java/org/archive/modules/net/RobotsDirectives.java index 97752a0e8..d0ccf9df7 100644 --- a/modules/src/main/java/org/archive/modules/net/RobotsDirectives.java +++ b/modules/src/main/java/org/archive/modules/net/RobotsDirectives.java @@ -34,7 +34,8 @@ public class RobotsDirectives implements Serializable { protected ConcurrentSkipListSet disallows = new ConcurrentSkipListSet(); protected ConcurrentSkipListSet allows = new ConcurrentSkipListSet(); - protected float crawlDelay = -1; + protected float crawlDelay = -1; + public transient boolean hasDirectives = false; public boolean allows(String path) { return !(longestPrefixLength(disallows, path) > longestPrefixLength(allows, path)); @@ -57,19 +58,22 @@ protected int longestPrefixLength(ConcurrentSkipListSet prefixSet, } public void addDisallow(String path) { + hasDirectives = true; if(path.length()==0) { // ignore empty-string disallows // (they really mean allow, when alone) - return; + return; } disallows.add(path); } public void addAllow(String path) { + hasDirectives = true; allows.add(path); } public void setCrawlDelay(float i) { + hasDirectives = true; crawlDelay=i; } diff --git a/modules/src/main/java/org/archive/modules/net/Robotstxt.java b/modules/src/main/java/org/archive/modules/net/Robotstxt.java index 6ec605716..5cf121875 100644 --- a/modules/src/main/java/org/archive/modules/net/Robotstxt.java +++ b/modules/src/main/java/org/archive/modules/net/Robotstxt.java @@ -84,8 +84,6 @@ protected void initializeFromReader(BufferedReader reader) throws IOException { long charCount = 0; // current is the disallowed paths for the preceding User-Agent(s) RobotsDirectives current = null; - // whether a non-'User-Agent' directive has been encountered - boolean hasDirectivesYet = false; while (reader != null) { // we count characters instead of bytes because the byte count isn't easily available if (charCount >= MAX_SIZE) { @@ -117,11 +115,18 @@ protected void initializeFromReader(BufferedReader reader) throws IOException { read = read.trim(); if (read.matches("(?i)^User-agent:.*")) { String ua = read.substring(11).trim().toLowerCase(); - if (current == null || hasDirectivesYet ) { + RobotsDirectives preexisting; + if (ua.equals("*")) { + preexisting = wildcardDirectives; + } else { + preexisting = agentsToDirectives.get(ua); + } + if (preexisting != null && preexisting.hasDirectives) { + current = preexisting; + } else if (current == null || current.hasDirectives) { // only create new rules-list if necessary // otherwise share with previous user-agent current = new RobotsDirectives(); - hasDirectivesYet = false; } if (ua.equals("*")) { wildcardDirectives = current; @@ -146,7 +151,6 @@ protected void initializeFromReader(BufferedReader reader) throws IOException { path = path.substring(0,path.length()-1); } current.addDisallow(path); - hasDirectivesYet = true; continue; } if (read.matches("(?i)Crawl-delay:.*")) { @@ -157,7 +161,6 @@ protected void initializeFromReader(BufferedReader reader) throws IOException { } // consider a crawl-delay as sufficient to end a grouping of // User-Agent lines - hasDirectivesYet = true; String val = read.substring(12).trim(); try { val = val.split("[^\\d\\.]+")[0]; @@ -184,7 +187,6 @@ protected void initializeFromReader(BufferedReader reader) throws IOException { path = path.substring(0,path.length()-1); } current.addAllow(path); - hasDirectivesYet = true; continue; } // unknown line; do nothing for now diff --git a/modules/src/test/java/org/archive/modules/net/RobotstxtTest.java b/modules/src/test/java/org/archive/modules/net/RobotstxtTest.java index c8083e96d..1f5290778 100644 --- a/modules/src/test/java/org/archive/modules/net/RobotstxtTest.java +++ b/modules/src/test/java/org/archive/modules/net/RobotstxtTest.java @@ -205,4 +205,26 @@ public void testCompactSerialization() throws IOException { assertTrue("user-agent a and b shares the same RobotsDirectives after deserialization", da == db); } } + + public void testSeparatedSections() throws IOException { + final String TEST_ROBOTS_TXT = "User-agent: *\n" + + "Crawl-delay: 5\n" + + "User-agent: a\n" + + "Disallow: /\n" + + "User-agent: *\n" + + "Disallow: /disallowed\n" + + "User-agent: a\n" + + "Crawl-delay: 99\n"; + StringReader sr = new StringReader(TEST_ROBOTS_TXT); + Robotstxt rt = new Robotstxt(new BufferedReader(sr)); + + assertFalse(rt.getDirectivesFor("a").allows("/foo")); + + assertTrue(rt.getDirectivesFor("c").allows("/foo")); + assertFalse(rt.getDirectivesFor("c").allows("/disallowed")); + + assertEquals(5f, rt.getDirectivesFor("c").getCrawlDelay()); + + assertEquals(99f, rt.getDirectivesFor("a").getCrawlDelay()); + } } From 2f82d43110a2d645cc933844967413dd547ff428 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Mon, 14 Dec 2015 15:24:08 -0800 Subject: [PATCH 5/7] remove already-outdated stuff from javadoc --- .../crawler/prefetch/SourceQuotaEnforcer.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/contrib/src/main/java/org/archive/crawler/prefetch/SourceQuotaEnforcer.java b/contrib/src/main/java/org/archive/crawler/prefetch/SourceQuotaEnforcer.java index 881f9d594..26c3b91c3 100644 --- a/contrib/src/main/java/org/archive/crawler/prefetch/SourceQuotaEnforcer.java +++ b/contrib/src/main/java/org/archive/crawler/prefetch/SourceQuotaEnforcer.java @@ -65,17 +65,7 @@ public Map getQuotas() { return quotas; } /** - * Keys can be any of the {@link CrawledBytesHistotable} keys: - *
    - *
  • {@value CrawledBytesHistotable#NOVEL} - *
  • {@value CrawledBytesHistotable#NOVELCOUNT} - *
  • {@value CrawledBytesHistotable#DUPLICATE} - *
  • {@value CrawledBytesHistotable#DUPLICATECOUNT} - *
  • {@value CrawledBytesHistotable#NOTMODIFIED} - *
  • {@value CrawledBytesHistotable#NOTMODIFIEDCOUNT} - *
  • {@value CrawledBytesHistotable#OTHERDUPLICATE} - *
  • {@value CrawledBytesHistotable#OTHERDUPLICATECOUNT} - *
+ * Keys can be any of the {@link CrawledBytesHistotable} keys. */ public void setQuotas(Map quotas) { this.quotas = quotas; From 8918ce99c66a6c1f5f764dea0505cef8ff4fa0b6 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 15 Dec 2015 11:24:53 -0800 Subject: [PATCH 6/7] check that sourceTag of CrawlURI actually matches configured sourceTag --- .../java/org/archive/crawler/prefetch/SourceQuotaEnforcer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/contrib/src/main/java/org/archive/crawler/prefetch/SourceQuotaEnforcer.java b/contrib/src/main/java/org/archive/crawler/prefetch/SourceQuotaEnforcer.java index 26c3b91c3..6e4fd7475 100644 --- a/contrib/src/main/java/org/archive/crawler/prefetch/SourceQuotaEnforcer.java +++ b/contrib/src/main/java/org/archive/crawler/prefetch/SourceQuotaEnforcer.java @@ -83,6 +83,7 @@ public void setStatisticsTracker(StatisticsTracker statisticsTracker) { @Override protected boolean shouldProcess(CrawlURI curi) { return curi.containsDataKey(CoreAttributeConstants.A_SOURCE_TAG) + && sourceTag.equals(curi.getSourceTag()) && statisticsTracker.getSourceStats(curi.getSourceTag()) != null; } From 65a10774258232e82518ff7b2f3040dac90ff3cd Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 12 Jan 2016 14:59:45 -0800 Subject: [PATCH 7/7] license header --- .../crawler/selftest/StatisticsSelfTest.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/engine/src/test/java/org/archive/crawler/selftest/StatisticsSelfTest.java b/engine/src/test/java/org/archive/crawler/selftest/StatisticsSelfTest.java index 3eb97acc8..e5dde0399 100644 --- a/engine/src/test/java/org/archive/crawler/selftest/StatisticsSelfTest.java +++ b/engine/src/test/java/org/archive/crawler/selftest/StatisticsSelfTest.java @@ -1,3 +1,21 @@ +/* + * This file is part of the Heritrix web crawler (crawler.archive.org). + * + * Licensed to the Internet Archive (IA) by one or more individual + * contributors. + * + * The IA licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.archive.crawler.selftest; import org.archive.crawler.reporting.StatisticsTracker; @@ -12,12 +30,12 @@ protected String getSeedsString() { // non-deterministic return "http://127.0.0.1:7777/a.html\\nhttp://localhost:7777/b.html"; } - + @Override protected void verify() throws Exception { verifySourceStats(); } - + protected void verifySourceStats() throws Exception { StatisticsTracker stats = heritrix.getEngine().getJob("selftest-job").getCrawlController().getStatisticsTracker(); assertNotNull(stats); @@ -29,7 +47,7 @@ protected void verifySourceStats() throws Exception { assertEquals(2, sourceStats.keySet().size()); assertEquals(2942l, (long) sourceStats.get("novel")); assertEquals(3l, (long) sourceStats.get("novelCount")); - + sourceStats = stats.getSourceStats("http://localhost:7777/b.html"); assertNotNull(sourceStats); assertEquals(2, sourceStats.keySet().size());