Skip to content

Commit

Permalink
fork-join pool handling of incoming queue messages
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd committed Jun 3, 2015
1 parent 8d8f42a commit b032776
Showing 1 changed file with 44 additions and 40 deletions.
Expand Up @@ -11,6 +11,7 @@
import com.beust.jcommander.internal.Maps; import com.beust.jcommander.internal.Maps;
import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.opentripplanner.analyst.TimeSurface; import org.opentripplanner.analyst.TimeSurface;
import org.opentripplanner.profile.ProfileRequest; import org.opentripplanner.profile.ProfileRequest;
Expand All @@ -36,30 +37,34 @@
public class AnalystWorker implements Runnable { public class AnalystWorker implements Runnable {


private static final Logger LOG = LoggerFactory.getLogger(AnalystWorker.class); private static final Logger LOG = LoggerFactory.getLogger(AnalystWorker.class);
public static final Random random = new Random();
public static final int nCores = Runtime.getRuntime().availableProcessors();
public static final String QUEUE_PREFIX = "analyst_t_job_";



ObjectMapper objectMapper; ObjectMapper objectMapper;
String lastQueueUrl = null; String lastQueueUrl = null;


// Of course this will need to be shared between multiple AnalystWorker threads eventually. // Of course this will eventually need to be shared between multiple AnalystWorker threads.
ClusterGraphBuilder clusterGraphBuilder; ClusterGraphBuilder clusterGraphBuilder;

// Of course this will eventually need to be shared between multiple AnalystWorker threads.
PointSetDatastore pointSetDatastore;

AmazonSQS sqs; AmazonSQS sqs;


Random random = new Random(); String graphId = null;
String graphId = "";
long startupTime; long startupTime;
int nThreads = 2;


// Region awsRegion = Region.getRegion(Regions.EU_CENTRAL_1); // Region awsRegion = Region.getRegion(Regions.EU_CENTRAL_1);
Region awsRegion = Region.getRegion(Regions.US_EAST_1); Region awsRegion = Region.getRegion(Regions.US_EAST_1);


public static final String QUEUE_PREFIX = "analyst-dev_";

boolean isSinglePoint = false; boolean isSinglePoint = false;


String pointsetBucket = "analyst-dev_pointsets"; String pointsetBucket = "analyst-dev_pointsets";


public AnalystWorker() { public AnalystWorker() {
startupTime = System.currentTimeMillis() / 1000; startupTime = System.currentTimeMillis() / 1000; // TODO auto-shutdown
// When creating the S3 and SQS clients use the default credentials chain. // When creating the S3 and SQS clients use the default credentials chain.
// This will check environment variables and ~/.aws/credentials first, then fall back on // This will check environment variables and ~/.aws/credentials first, then fall back on
// the auto-assigned IAM role if this code is running on an EC2 instance. // the auto-assigned IAM role if this code is running on an EC2 instance.
Expand All @@ -69,7 +74,10 @@ public AnalystWorker() {
objectMapper = new ObjectMapper(); objectMapper = new ObjectMapper();
objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true); objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // ignore JSON fields that don't match target type

clusterGraphBuilder = new ClusterGraphBuilder(); clusterGraphBuilder = new ClusterGraphBuilder();
pointSetDatastore = new PointSetDatastore(10, null, false, pointsetBucket);
} }


@Override @Override
Expand All @@ -79,18 +87,19 @@ public void run() {
// S3Object object = s3.getObject(bucketName, objectKey); // S3Object object = s3.getObject(bucketName, objectKey);


// Loop forever, attempting to fetch some messages from a queue and process them. // Loop forever, attempting to fetch some messages from a queue and process them.
List<Message> messages = new ArrayList<>();
while (true) { while (true) {
List<Message> messages = new ArrayList<>();
try { try {
sqs.listQueues().getQueueUrls().stream().forEach(q -> LOG.info(q)); // sqs.listQueues().getQueueUrls().stream().forEach(q -> LOG.info(q));


// Attempt to get messages from the last queue URL from which we successfully received messages. // Attempt to get messages from the last queue URL from which we successfully received messages.
if (lastQueueUrl != null) { if (lastQueueUrl != null) {
int retries = 0; int retries = 0;
while (messages.isEmpty() && retries++ < 2) { while (messages.isEmpty() && retries++ < 2) {
LOG.info("Polling for more messages on the same queue {}", lastQueueUrl);
// Long-poll (wait a few seconds for messages to become available) // Long-poll (wait a few seconds for messages to become available)
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(lastQueueUrl) ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(lastQueueUrl)
.withVisibilityTimeout(120).withMaxNumberOfMessages(5).withWaitTimeSeconds(5); .withVisibilityTimeout(600).withMaxNumberOfMessages(nCores).withWaitTimeSeconds(5);
messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
} }
} }
Expand All @@ -101,8 +110,11 @@ public void run() {
lastQueueUrl = null; lastQueueUrl = null;
// For the first two retries, discover queues for the same graph. After that, all graphs. // For the first two retries, discover queues for the same graph. After that, all graphs.
String queuePrefix = QUEUE_PREFIX; String queuePrefix = QUEUE_PREFIX;
if (retries++ < 2) { if (retries++ < 2 && graphId != null) {
LOG.info("Polling for messages on different queues for the same graph {}", graphId);
queuePrefix += graphId; queuePrefix += graphId;
} else {
LOG.info("Polling for messages on all queues across all graphs.");
} }
List<String> queueUrls = sqs.listQueues(queuePrefix).getQueueUrls(); List<String> queueUrls = sqs.listQueues(queuePrefix).getQueueUrls();
Collections.shuffle(queueUrls); Collections.shuffle(queueUrls);
Expand All @@ -111,42 +123,35 @@ public void run() {
if (Stream.of("_output_", "_results_", "_single").anyMatch(s -> queueUrl.contains(s))) { if (Stream.of("_output_", "_results_", "_single").anyMatch(s -> queueUrl.contains(s))) {
continue; continue;
} }
LOG.info(" {}", queueUrl); LOG.debug(" {}", queueUrl);
// Non-blocking poll would return fast if no messages are available. // Non-blocking poll would return fast if no messages are available.
// However we use long polling just because it polls all servers and gives less false-empties. // However we use long polling just because it polls all servers and gives less false-empties.
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl) ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
.withVisibilityTimeout(120).withMaxNumberOfMessages(5).withWaitTimeSeconds(2); .withVisibilityTimeout(600).withMaxNumberOfMessages(nCores).withWaitTimeSeconds(1);
messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
LOG.info("Received {} messages.", messages.size());
if (!messages.isEmpty()) { if (!messages.isEmpty()) {
lastQueueUrl = queueUrl; lastQueueUrl = queueUrl;
break DISCOVER; break DISCOVER;
} }
} }
int sleepSeconds = random.nextInt(10) + 5; // keep workers from all acting at the same time int sleepSeconds = 5;
LOG.info("No work found. Sleeping {} seconds.", sleepSeconds); LOG.info("No work found. Sleeping {} seconds.", sleepSeconds);
Thread.sleep(sleepSeconds * 1000); // wait a while before re-scanning the queues Thread.sleep(sleepSeconds * 1000); // wait a while before re-scanning the queues
} }


// At this point we are sure to have some messages to process. // At this point we are sure to have some messages to process.
System.out.printf("Received %d messages. ", messages.size()); LOG.info("Received {} messages. ", messages.size());
for (Message message : messages) {
LOG.info(" Message"); // All tasks are known to require the same graph object (they came from the same queue).
LOG.info(" MessageId: " + message.getMessageId()); // Execute all tasks in the default forkJoinPool with as many threads as we have processor cores.
LOG.info(" ReceiptHandle: " + message.getReceiptHandle()); // This will block until all tasks have completed.
LOG.info(" MD5OfBody: " + message.getMD5OfBody()); messages.parallelStream().forEach(this::handleOneMessage);
LOG.info(" Body: " + message.getBody());
for (Map.Entry<String, String> entry : message.getAttributes().entrySet()) { // Remove messages from queue so they won't be re-delivered to other workers.
LOG.info(" Attribute"); messages.stream().forEach(m -> {
LOG.info(" Name: " + entry.getKey()); LOG.info("Removing message: {}", m.getBody());
LOG.info(" Value: " + entry.getValue()); sqs.deleteMessage(lastQueueUrl, m.getReceiptHandle());
} });
if (handleOneMessage(message)) {
// Message was successfully handled, delete it from SQS to avoid re-delivery.
sqs.deleteMessage(lastQueueUrl, message.getReceiptHandle());
}
}

} catch (AmazonServiceException ase) { } catch (AmazonServiceException ase) {
LOG.error("Your request made it to Amazon S3, but was rejected with an error response."); LOG.error("Your request made it to Amazon S3, but was rejected with an error response.");
LOG.error(" Error Message: " + ase.getMessage()); LOG.error(" Error Message: " + ase.getMessage());
Expand All @@ -170,9 +175,9 @@ public void run() {
* To avoid circular dependencies, for now we will read the fields of these incoming objects from a tree * To avoid circular dependencies, for now we will read the fields of these incoming objects from a tree
* rather than binding to classes. * rather than binding to classes.
*/ */
private boolean handleOneMessage(Message message) { private void handleOneMessage(Message message) {
try { try {

LOG.info("Handling message {}", message.getBody());
// Parse / bind the cluster request into the shared superclass so we can decide which request subclass // Parse / bind the cluster request into the shared superclass so we can decide which request subclass
// we actually want to bind it to. // we actually want to bind it to.
// TODO there has to be a better way to do this. Maybe by having separate profileRequest and request fields // TODO there has to be a better way to do this. Maybe by having separate profileRequest and request fields
Expand All @@ -181,6 +186,7 @@ private boolean handleOneMessage(Message message) {


// Get the graph object for the ID given in the request, fetching inputs and building as needed. // Get the graph object for the ID given in the request, fetching inputs and building as needed.
Graph graph = clusterGraphBuilder.getGraph(clusterRequest.graphId); Graph graph = clusterGraphBuilder.getGraph(clusterRequest.graphId);
graphId = clusterRequest.graphId; // Record graphId so we "stick" to this same graph on subsequent polls


// Convert the field "options" to a request object // Convert the field "options" to a request object
if (clusterRequest.profile) { if (clusterRequest.profile) {
Expand All @@ -189,12 +195,14 @@ private boolean handleOneMessage(Message message) {
TimeSurface.RangeSet result = router.timeSurfaceRangeSet; TimeSurface.RangeSet result = router.timeSurfaceRangeSet;
Map<String, Integer> idForSurface = Maps.newHashMap(); Map<String, Integer> idForSurface = Maps.newHashMap();
// TODO check graph and job ID against queue URL for coherency // TODO check graph and job ID against queue URL for coherency
// Message was successfully handled, delete it from SQS to avoid re-delivery to another worker.
sqs.deleteMessage(lastQueueUrl, message.getReceiptHandle());
} else { } else {
RoutingRequest routingRequest = objectMapper.readValue(message.getBody(), RoutingRequest.class); RoutingRequest routingRequest = objectMapper.readValue(message.getBody(), RoutingRequest.class);
// TODO finish me // TODO finish me
} }


if (clusterRequest.destinationPointsetId.isEmpty()) { if (clusterRequest.destinationPointsetId == null) {
// No pointset specified, produce isochrones. // No pointset specified, produce isochrones.
} else { } else {
// A pointset was specified, calculate travel times to the points in the pointset. // A pointset was specified, calculate travel times to the points in the pointset.
Expand All @@ -205,15 +213,11 @@ private boolean handleOneMessage(Message message) {
LOG.error("JSON processing exception while parsing incoming message: {}", e); LOG.error("JSON processing exception while parsing incoming message: {}", e);
LOG.error("Deleting this message as it will likely confuse all workers that attempt to read it."); LOG.error("Deleting this message as it will likely confuse all workers that attempt to read it.");
e.printStackTrace(); e.printStackTrace();
sqs.deleteMessage(lastQueueUrl, message.getReceiptHandle());
return false;
} catch (IOException e) { } catch (IOException e) {
LOG.error("IO exception while parsing incoming message: {}", e); LOG.error("IO exception while parsing incoming message: {}", e);
LOG.error("Leaving this message alive for later consumption by another worker."); LOG.error("Leaving this message alive for later consumption by another worker.");
e.printStackTrace(); e.printStackTrace();
return false;
} }
return true;
} }


} }

0 comments on commit b032776

Please sign in to comment.