Skip to content

Commit

Permalink
Merge pull request #132 from nlevitt/amqp-url-receiver-changes
Browse files Browse the repository at this point in the history
try very hard to start url consumer, and therefore bind the queue to …
  • Loading branch information
adam-miller committed Jan 13, 2016
2 parents df5748d + e37b2b4 commit b828969
Showing 1 changed file with 46 additions and 37 deletions.
Expand Up @@ -118,7 +118,7 @@ public void setQueueName(String queueName) {
public boolean isRunning() {
return isRunning;
}

/** Should be queues be marked as durable? */
private boolean durable = false;
public boolean isDurable() {
Expand All @@ -139,12 +139,10 @@ public void setAutoDelete(boolean autoDelete) {

private transient Lock lock = new ReentrantLock(true);

private boolean pauseConsumer = true;

private boolean isConsuming = false;
private transient boolean pauseConsumer = false;
private transient String consumerTag = null;

private class StarterRestarter extends Thread {
private String consumerTag = null;

public StarterRestarter(String name) {
super(name);
Expand All @@ -155,48 +153,50 @@ public void run() {
while (!Thread.interrupted()) {
try {
lock.lockInterruptibly();
logger.finest("Checking isConsuming=" + isConsuming + " and pauseConsumer=" + pauseConsumer);
logger.finest("Checking consumerTag=" + consumerTag + " and pauseConsumer=" + pauseConsumer);
try {
if (!isConsuming && !pauseConsumer) {
if (consumerTag == null && !pauseConsumer) {
// start up again
try {
Consumer consumer = new UrlConsumer(channel());
channel().exchangeDeclare(getExchange(), "direct", true);
channel().queueDeclare(getQueueName(), durable,
false, autoDelete, null);
channel().queueBind(getQueueName(), getExchange(), getQueueName());
consumerTag = channel().basicConsume(getQueueName(), false, consumer);
isConsuming = true;
logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag);
startConsumer();
} catch (IOException e) {
logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again after 10 seconds)", e);
}
}

if (isConsuming && pauseConsumer) {
if (consumerTag != null && pauseConsumer) {
try {
if (consumerTag != null) {
logger.info("Attempting to cancel URLConsumer with consumerTag=" + consumerTag);
channel().basicCancel(consumerTag);
consumerTag = null;
isConsuming = false;
logger.info("Cancelled URLConsumer.");
}
} catch (IOException e) {
logger.log(Level.SEVERE, "problem cancelling AMQP consumer (will try again after 10 seconds)", e);
}
}

Thread.sleep(10 * 1000);
} finally {
lock.unlock();
}
} catch (InterruptedException e) {

Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
return;
}
}
}

public void startConsumer() throws IOException {
Consumer consumer = new UrlConsumer(channel());
channel().exchangeDeclare(getExchange(), "direct", true);
channel().queueDeclare(getQueueName(), durable,
false, autoDelete, null);
channel().queueBind(getQueueName(), getExchange(), getQueueName());
consumerTag = channel().basicConsume(getQueueName(), false, consumer);
logger.info("started AMQP consumer uri=" + getAmqpUri() + " exchange=" + getExchange() + " queueName=" + getQueueName() + " consumerTag=" + consumerTag);
}
}

transient private StarterRestarter starterRestarter;
Expand All @@ -208,6 +208,13 @@ public void start() {
// spawn off a thread to start up the amqp consumer, and try to restart it if it dies
if (!isRunning) {
starterRestarter = new StarterRestarter(AMQPUrlReceiver.class.getSimpleName() + "-starter-restarter");
try {
// try to synchronously start the consumer right now, so
// that the queue is bound before crawling starts
starterRestarter.startConsumer();
} catch (IOException e) {
logger.log(Level.SEVERE, "problem starting AMQP consumer (will try again soon)", e);
}
starterRestarter.start();
}
isRunning = true;
Expand All @@ -221,13 +228,6 @@ public void stop() {
lock.lock();
try {
logger.info("shutting down");
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
logger.log(Level.SEVERE, "problem closing AMQP connection", e);
}
}
if (starterRestarter != null && starterRestarter.isAlive()) {
starterRestarter.interrupt();
try {
Expand All @@ -236,6 +236,14 @@ public void stop() {
}
}
starterRestarter = null;

if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
logger.log(Level.SEVERE, "problem closing AMQP connection", e);
}
}
connection = null;
channel = null;
isRunning = false;
Expand Down Expand Up @@ -310,7 +318,7 @@ public void handleDelivery(String consumerTag, Envelope envelope,
throw new RuntimeException(e); // can't happen
}
JSONObject jo = new JSONObject(decodedBody);

if ("GET".equals(jo.getString("method"))) {
CrawlURI curi;
try {
Expand Down Expand Up @@ -345,7 +353,7 @@ public void handleDelivery(String consumerTag, Envelope envelope,

this.getChannel().basicAck(envelope.getDeliveryTag(), false);
}

@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
Expand All @@ -354,7 +362,7 @@ public void handleShutdownSignal(String consumerTag,
} else {
logger.info("amqp channel/connection shut down consumerTag=" + consumerTag);
}
isConsuming = false;
AMQPUrlReceiver.this.consumerTag = null;
}

// {
Expand All @@ -379,7 +387,7 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
String hopPath = parentHopPath + Hop.INFERRED.getHopString();

CrawlURI curi = new CrawlURI(uuri, hopPath, via, LinkContext.INFERRED_MISC);

// set the heritable data from the parent url, passed back to us via amqp
// XXX brittle, only goes one level deep, and only handles strings and arrays, the latter of which it converts to a Set.
// 'heritableData': {'source': 'https://facebook.com/whitehouse/', 'heritable': ['source', 'heritable']}
Expand Down Expand Up @@ -412,7 +420,7 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
*/
curi.setSchedulingDirective(SchedulingConstants.HIGH);
curi.setPrecedence(1);

// optional forceFetch instruction:
if (jo.has("forceFetch")) {
boolean forceFetch = jo.getBoolean("forceFetch");
Expand Down Expand Up @@ -446,15 +454,16 @@ protected CrawlURI makeCrawlUri(JSONObject jo) throws URIException,
public void onApplicationEvent(CrawlStateEvent event) {
switch(event.getState()) {
case PAUSING: case PAUSED:
logger.info("Requesting a pause of the URLConsumer...");
this.pauseConsumer = true;
if (!this.pauseConsumer) {
logger.info("Requesting a pause of the URLConsumer...");
this.pauseConsumer = true;
}
break;

case RUNNING:
logger.info("Requesting restart of the URLConsumer...");
this.pauseConsumer = false;
if (starterRestarter == null || !starterRestarter.isAlive()) {
start();
if (this.pauseConsumer) {
logger.info("Requesting unpause of the URLConsumer...");
this.pauseConsumer = false;
}
break;

Expand Down

0 comments on commit b828969

Please sign in to comment.