diff --git a/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java b/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java index ac24947b8..54ce3ffc9 100644 --- a/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java +++ b/contrib/src/main/java/org/archive/crawler/frontier/AMQPUrlReceiver.java @@ -118,7 +118,7 @@ public void setQueueName(String queueName) { public boolean isRunning() { return isRunning; } - + /** Should be queues be marked as durable? */ private boolean durable = false; public boolean isDurable() { @@ -139,12 +139,10 @@ public void setAutoDelete(boolean autoDelete) { private transient Lock lock = new ReentrantLock(true); - private boolean pauseConsumer = true; - - private boolean isConsuming = false; + private transient boolean pauseConsumer = false; + private transient String consumerTag = null; private class StarterRestarter extends Thread { - private String consumerTag = null; public StarterRestarter(String name) { super(name); @@ -155,31 +153,23 @@ public void run() { while (!Thread.interrupted()) { try { lock.lockInterruptibly(); - logger.finest("Checking isConsuming=" + isConsuming + " and pauseConsumer=" + pauseConsumer); + logger.finest("Checking consumerTag=" + consumerTag + " and pauseConsumer=" + pauseConsumer); try { - if (!isConsuming && !pauseConsumer) { + if (consumerTag == null && !pauseConsumer) { // start up again try { - Consumer consumer = new UrlConsumer(channel()); - channel().exchangeDeclare(getExchange(), "direct", true); - channel().queueDeclare(getQueueName(), durable, - false, autoDelete, null); - channel().queueBind(getQueueName(), getExchange(), getQueueName()); - consumerTag = channel().basicConsume(getQueueName(), false, consumer); - isConsuming = true; - logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag); + startConsumer(); } catch (IOException e) { logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again after 10 seconds)", e); } } - if (isConsuming && pauseConsumer) { + if (consumerTag != null && pauseConsumer) { try { if (consumerTag != null) { logger.info("Attempting to cancel URLConsumer with consumerTag=" + consumerTag); channel().basicCancel(consumerTag); consumerTag = null; - isConsuming = false; logger.info("Cancelled URLConsumer."); } } catch (IOException e) { @@ -187,16 +177,26 @@ public void run() { } } - Thread.sleep(10 * 1000); } finally { lock.unlock(); } - } catch (InterruptedException e) { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { return; } } } + + public void startConsumer() throws IOException { + Consumer consumer = new UrlConsumer(channel()); + channel().exchangeDeclare(getExchange(), "direct", true); + channel().queueDeclare(getQueueName(), durable, + false, autoDelete, null); + channel().queueBind(getQueueName(), getExchange(), getQueueName()); + consumerTag = channel().basicConsume(getQueueName(), false, consumer); + logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag); + } } transient private StarterRestarter starterRestarter; @@ -208,6 +208,13 @@ public void start() { // spawn off a thread to start up the amqp consumer, and try to restart it if it dies if (!isRunning) { starterRestarter = new StarterRestarter(AMQPUrlReceiver.class.getSimpleName() + "-starter-restarter"); + try { + // try to synchronously start the consumer right now, so + // that the queue is bound before crawling starts + starterRestarter.startConsumer(); + } catch (IOException e) { + logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again soon)", e); + } starterRestarter.start(); } isRunning = true; @@ -221,13 +228,6 @@ public void stop() { lock.lock(); try { logger.info("shutting down"); - if (connection != null && connection.isOpen()) { - try { - connection.close(); - } catch (IOException e) { - logger.log(Level.SEVERE, "problem closing AMQP connection", e); - } - } if (starterRestarter != null && starterRestarter.isAlive()) { starterRestarter.interrupt(); try { @@ -236,6 +236,14 @@ public void stop() { } } starterRestarter = null; + + if (connection != null && connection.isOpen()) { + try { + connection.close(); + } catch (IOException e) { + logger.log(Level.SEVERE, "problem closing AMQP connection", e); + } + } connection = null; channel = null; isRunning = false; @@ -310,7 +318,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, throw new RuntimeException(e); // can't happen } JSONObject jo = new JSONObject(decodedBody); - + if ("GET".equals(jo.getString("method"))) { CrawlURI curi; try { @@ -345,7 +353,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, this.getChannel().basicAck(envelope.getDeliveryTag(), false); } - + @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { @@ -354,7 +362,7 @@ public void handleShutdownSignal(String consumerTag, } else { logger.info("amqp channel/connection shut down consumerTag=" + consumerTag); } - isConsuming = false; + AMQPUrlReceiver.this.consumerTag = null; } // { @@ -379,7 +387,7 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException, String hopPath = parentHopPath + Hop.INFERRED.getHopString(); CrawlURI curi = new CrawlURI(uuri, hopPath, via, LinkContext.INFERRED_MISC); - + // set the heritable data from the parent url, passed back to us via amqp // XXX brittle, only goes one level deep, and only handles strings and arrays, the latter of which it converts to a Set. // 'heritableData': {'source': 'https://facebook.com/whitehouse/', 'heritable': ['source', 'heritable']} @@ -412,7 +420,7 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException, */ curi.setSchedulingDirective(SchedulingConstants.HIGH); curi.setPrecedence(1); - + // optional forceFetch instruction: if (jo.has("forceFetch")) { boolean forceFetch = jo.getBoolean("forceFetch"); @@ -446,15 +454,16 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException, public void onApplicationEvent(CrawlStateEvent event) { switch(event.getState()) { case PAUSING: case PAUSED: - logger.info("Requesting a pause of the URLConsumer..."); - this.pauseConsumer = true; + if (!this.pauseConsumer) { + logger.info("Requesting a pause of the URLConsumer..."); + this.pauseConsumer = true; + } break; case RUNNING: - logger.info("Requesting restart of the URLConsumer..."); - this.pauseConsumer = false; - if (starterRestarter == null || !starterRestarter.isAlive()) { - start(); + if (this.pauseConsumer) { + logger.info("Requesting unpause of the URLConsumer..."); + this.pauseConsumer = false; } break;