From b032776309862fc3337f669eb7879e84cb3d3e98 Mon Sep 17 00:00:00 2001 From: Andrew Byrd Date: Thu, 4 Jun 2015 01:08:36 +0200 Subject: [PATCH] fork-join pool handling of incoming queue messages --- .../analyst/cluster/AnalystWorker.java | 84 ++++++++++--------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/src/main/java/org/opentripplanner/analyst/cluster/AnalystWorker.java b/src/main/java/org/opentripplanner/analyst/cluster/AnalystWorker.java index 1562ebdd105..4424bb94838 100644 --- a/src/main/java/org/opentripplanner/analyst/cluster/AnalystWorker.java +++ b/src/main/java/org/opentripplanner/analyst/cluster/AnalystWorker.java @@ -11,6 +11,7 @@ import com.beust.jcommander.internal.Maps; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.opentripplanner.analyst.TimeSurface; import org.opentripplanner.profile.ProfileRequest; @@ -36,30 +37,34 @@ public class AnalystWorker implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(AnalystWorker.class); + public static final Random random = new Random(); + public static final int nCores = Runtime.getRuntime().availableProcessors(); + public static final String QUEUE_PREFIX = "analyst_t_job_"; + ObjectMapper objectMapper; String lastQueueUrl = null; - // Of course this will need to be shared between multiple AnalystWorker threads eventually. + // Of course this will eventually need to be shared between multiple AnalystWorker threads. ClusterGraphBuilder clusterGraphBuilder; + + // Of course this will eventually need to be shared between multiple AnalystWorker threads. + PointSetDatastore pointSetDatastore; + AmazonSQS sqs; - Random random = new Random(); - String graphId = ""; + String graphId = null; long startupTime; - int nThreads = 2; // Region awsRegion = Region.getRegion(Regions.EU_CENTRAL_1); Region awsRegion = Region.getRegion(Regions.US_EAST_1); - public static final String QUEUE_PREFIX = "analyst-dev_"; - boolean isSinglePoint = false; String pointsetBucket = "analyst-dev_pointsets"; public AnalystWorker() { - startupTime = System.currentTimeMillis() / 1000; + startupTime = System.currentTimeMillis() / 1000; // TODO auto-shutdown // When creating the S3 and SQS clients use the default credentials chain. // This will check environment variables and ~/.aws/credentials first, then fall back on // the auto-assigned IAM role if this code is running on an EC2 instance. @@ -69,7 +74,10 @@ public AnalystWorker() { objectMapper = new ObjectMapper(); objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true); objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // ignore JSON fields that don't match target type + clusterGraphBuilder = new ClusterGraphBuilder(); + pointSetDatastore = new PointSetDatastore(10, null, false, pointsetBucket); } @Override @@ -79,18 +87,19 @@ public void run() { // S3Object object = s3.getObject(bucketName, objectKey); // Loop forever, attempting to fetch some messages from a queue and process them. - List messages = new ArrayList<>(); while (true) { + List messages = new ArrayList<>(); try { - sqs.listQueues().getQueueUrls().stream().forEach(q -> LOG.info(q)); + // sqs.listQueues().getQueueUrls().stream().forEach(q -> LOG.info(q)); // Attempt to get messages from the last queue URL from which we successfully received messages. if (lastQueueUrl != null) { int retries = 0; while (messages.isEmpty() && retries++ < 2) { + LOG.info("Polling for more messages on the same queue {}", lastQueueUrl); // Long-poll (wait a few seconds for messages to become available) ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(lastQueueUrl) - .withVisibilityTimeout(120).withMaxNumberOfMessages(5).withWaitTimeSeconds(5); + .withVisibilityTimeout(600).withMaxNumberOfMessages(nCores).withWaitTimeSeconds(5); messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); } } @@ -101,8 +110,11 @@ public void run() { lastQueueUrl = null; // For the first two retries, discover queues for the same graph. After that, all graphs. String queuePrefix = QUEUE_PREFIX; - if (retries++ < 2) { + if (retries++ < 2 && graphId != null) { + LOG.info("Polling for messages on different queues for the same graph {}", graphId); queuePrefix += graphId; + } else { + LOG.info("Polling for messages on all queues across all graphs."); } List queueUrls = sqs.listQueues(queuePrefix).getQueueUrls(); Collections.shuffle(queueUrls); @@ -111,42 +123,35 @@ public void run() { if (Stream.of("_output_", "_results_", "_single").anyMatch(s -> queueUrl.contains(s))) { continue; } - LOG.info(" {}", queueUrl); + LOG.debug(" {}", queueUrl); // Non-blocking poll would return fast if no messages are available. // However we use long polling just because it polls all servers and gives less false-empties. ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl) - .withVisibilityTimeout(120).withMaxNumberOfMessages(5).withWaitTimeSeconds(2); + .withVisibilityTimeout(600).withMaxNumberOfMessages(nCores).withWaitTimeSeconds(1); messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); - LOG.info("Received {} messages.", messages.size()); if (!messages.isEmpty()) { lastQueueUrl = queueUrl; break DISCOVER; } } - int sleepSeconds = random.nextInt(10) + 5; // keep workers from all acting at the same time + int sleepSeconds = 5; LOG.info("No work found. Sleeping {} seconds.", sleepSeconds); Thread.sleep(sleepSeconds * 1000); // wait a while before re-scanning the queues } // At this point we are sure to have some messages to process. - System.out.printf("Received %d messages. ", messages.size()); - for (Message message : messages) { - LOG.info(" Message"); - LOG.info(" MessageId: " + message.getMessageId()); - LOG.info(" ReceiptHandle: " + message.getReceiptHandle()); - LOG.info(" MD5OfBody: " + message.getMD5OfBody()); - LOG.info(" Body: " + message.getBody()); - for (Map.Entry entry : message.getAttributes().entrySet()) { - LOG.info(" Attribute"); - LOG.info(" Name: " + entry.getKey()); - LOG.info(" Value: " + entry.getValue()); - } - if (handleOneMessage(message)) { - // Message was successfully handled, delete it from SQS to avoid re-delivery. - sqs.deleteMessage(lastQueueUrl, message.getReceiptHandle()); - } - } - + LOG.info("Received {} messages. ", messages.size()); + + // All tasks are known to require the same graph object (they came from the same queue). + // Execute all tasks in the default forkJoinPool with as many threads as we have processor cores. + // This will block until all tasks have completed. + messages.parallelStream().forEach(this::handleOneMessage); + + // Remove messages from queue so they won't be re-delivered to other workers. + messages.stream().forEach(m -> { + LOG.info("Removing message: {}", m.getBody()); + sqs.deleteMessage(lastQueueUrl, m.getReceiptHandle()); + }); } catch (AmazonServiceException ase) { LOG.error("Your request made it to Amazon S3, but was rejected with an error response."); LOG.error(" Error Message: " + ase.getMessage()); @@ -170,9 +175,9 @@ public void run() { * To avoid circular dependencies, for now we will read the fields of these incoming objects from a tree * rather than binding to classes. */ - private boolean handleOneMessage(Message message) { + private void handleOneMessage(Message message) { try { - + LOG.info("Handling message {}", message.getBody()); // Parse / bind the cluster request into the shared superclass so we can decide which request subclass // we actually want to bind it to. // TODO there has to be a better way to do this. Maybe by having separate profileRequest and request fields @@ -181,6 +186,7 @@ private boolean handleOneMessage(Message message) { // Get the graph object for the ID given in the request, fetching inputs and building as needed. Graph graph = clusterGraphBuilder.getGraph(clusterRequest.graphId); + graphId = clusterRequest.graphId; // Record graphId so we "stick" to this same graph on subsequent polls // Convert the field "options" to a request object if (clusterRequest.profile) { @@ -189,12 +195,14 @@ private boolean handleOneMessage(Message message) { TimeSurface.RangeSet result = router.timeSurfaceRangeSet; Map idForSurface = Maps.newHashMap(); // TODO check graph and job ID against queue URL for coherency + // Message was successfully handled, delete it from SQS to avoid re-delivery to another worker. + sqs.deleteMessage(lastQueueUrl, message.getReceiptHandle()); } else { RoutingRequest routingRequest = objectMapper.readValue(message.getBody(), RoutingRequest.class); // TODO finish me } - if (clusterRequest.destinationPointsetId.isEmpty()) { + if (clusterRequest.destinationPointsetId == null) { // No pointset specified, produce isochrones. } else { // A pointset was specified, calculate travel times to the points in the pointset. @@ -205,15 +213,11 @@ private boolean handleOneMessage(Message message) { LOG.error("JSON processing exception while parsing incoming message: {}", e); LOG.error("Deleting this message as it will likely confuse all workers that attempt to read it."); e.printStackTrace(); - sqs.deleteMessage(lastQueueUrl, message.getReceiptHandle()); - return false; } catch (IOException e) { LOG.error("IO exception while parsing incoming message: {}", e); LOG.error("Leaving this message alive for later consumption by another worker."); e.printStackTrace(); - return false; } - return true; } } \ No newline at end of file