Skip to content

Commit

Permalink
use POST /priority/taskId to acknowledge completion of single-point work
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd committed Jun 22, 2015
1 parent ca47b38 commit 3156357
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 109 deletions.
100 changes: 56 additions & 44 deletions src/main/java/org/opentripplanner/analyst/broker/Broker.java
Expand Up @@ -22,33 +22,23 @@
import java.util.Queue; import java.util.Queue;


/** /**
* This class watches for incoming requests for work tasks, and attempts to match them to enqueued tasks. * This class tracks incoming requests from workers to consume Analyst tasks, and attempts to match those
* It draws tasks fairly from all users, and fairly from all jobs within each user, while attempting to respect the * requests to enqueued tasks. It aims to draw tasks fairly from all users, and fairly from all jobs within each user,
* cache affinity of each worker (give it tasks on the same graph it has been working on recently). * while attempting to respect the graph affinity of each worker (give it tasks that require the same graph it has been
* working on recently).
* *
* When no work is available, the polling functions return immediately. Workers are expected to sleep and re-poll * When no work is available or no workers are available, the polling functions return immediately, avoiding spin-wait.
* after a few tens of seconds. * When they are receiving no work, workers are expected to disconnect and re-poll occasionally, on the order of 30
* seconds. This serves as a signal to the broker that they are still alive and waiting.
* *
* TODO if there is a backlog of work (the usual case when jobs are lined up) workers will constantly change graphs * TODO if there is a backlog of work (the usual case when jobs are lined up) workers will constantly change graphs.
* We need a queue of deferred work: (job, timestamp) when a job would have fairly had its work consumed if a worker was available. * Because (at least currently) two users never share the same graph, we can get by with pulling tasks cyclically or
* Anything that survives at the head of that queue for more than e.g. one minute gets forced on a non-affinity worker. * randomly from all the jobs, and just actively shaping the number of workers with affinity for each graph by forcing
* Any new workers without an affinity preferentially pull work off the deferred queue. * some of them to accept tasks on graphs other than the one they have declared afffinity for.
* Polling worker connections scan the deferred queue before ever going to the main circular queue.
* When the deferred queue exceeds a certain size, that's when we must start more workers.
* *
* We should distinguish between two cases: * This could be thought of as "affinity homeostasis". We will constantly keep track of the ideal proportion of workers
* 1. we were waiting for work and woke up because work became available. * by graph (based on active queues), and the true proportion of consumers by graph (based on incoming requests) then
* 2. we were waiting for a consumer and woke up when one arrived. * we can decide when a worker's graph affinity should be ignored and what it should be forced to.
*
* The first case implies that many workers should migrate toward the new work.
*
* Two key ideas are:
* 1. Least recently serviced queue of jobs
* 2. Affinity Homeostasis
*
* If we can constantly keep track of the ideal proportion of workers by graph (based on active queues),
* and the true proportion of consumers by graph (based on incoming requests) then we can decide when a worker's graph
* affinity should be ignored.
* *
* It may also be helpful to mark jobs every time they are skipped in the LRU queue. Each time a job is serviced, * It may also be helpful to mark jobs every time they are skipped in the LRU queue. Each time a job is serviced,
* it is taken out of the queue and put at its end. Jobs that have not been serviced float to the top. * it is taken out of the queue and put at its end. Jobs that have not been serviced float to the top.
Expand All @@ -61,7 +51,7 @@ public class Broker implements Runnable {


public final CircularList<Job> jobs = new CircularList<>(); public final CircularList<Job> jobs = new CircularList<>();


private int nUndeliveredTasks = 0; private int nUndeliveredTasks = 0; // Including normal priority jobs and high-priority tasks.


private int nWaitingConsumers = 0; // including some that might be closed private int nWaitingConsumers = 0; // including some that might be closed


Expand All @@ -76,10 +66,10 @@ public class Broker implements Runnable {
TIntIntMap deliveryTimes = new TIntIntHashMap(); TIntIntMap deliveryTimes = new TIntIntHashMap();


/** Requests that are not part of a job and can "cut in line" in front of jobs for immediate execution. */ /** Requests that are not part of a job and can "cut in line" in front of jobs for immediate execution. */
private Queue<AnalystClusterRequest> priorityTasks = new ArrayDeque<>(); private Queue<AnalystClusterRequest> highPriorityTasks = new ArrayDeque<>();


/** Priority requests that have already been farmed out to workers, and are awaiting a response. */ /** Priority requests that have already been farmed out to workers, and are awaiting a response. */
private TIntObjectMap<Response> priorityResponses = new TIntObjectHashMap<>(); private TIntObjectMap<Response> highPriorityResponses = new TIntObjectHashMap<>();


/** Outstanding requests from workers for tasks, grouped by worker graph affinity. */ /** Outstanding requests from workers for tasks, grouped by worker graph affinity. */
Map<String, Deque<Response>> consumersByGraph = new HashMap<>(); Map<String, Deque<Response>> consumersByGraph = new HashMap<>();
Expand All @@ -92,8 +82,10 @@ public class Broker implements Runnable {
*/ */
public synchronized void enqueuePriorityTask (AnalystClusterRequest task, Response response) { public synchronized void enqueuePriorityTask (AnalystClusterRequest task, Response response) {
task.taskId = nextTaskId++; task.taskId = nextTaskId++;
priorityTasks.add(task); highPriorityTasks.add(task);
priorityResponses.put(task.taskId, response); highPriorityResponses.put(task.taskId, response);
nUndeliveredTasks += 1;
notify();
} }


/** Enqueue some tasks for queued execution possibly much later. Results will be saved to S3. */ /** Enqueue some tasks for queued execution possibly much later. Results will be saved to S3. */
Expand All @@ -113,7 +105,7 @@ public synchronized void enqueueTasks (List<AnalystClusterRequest> tasks) {
notify(); notify();
} }


/** Long poll operations are enqueued here. */ /** Consumer long-poll operations are enqueued here. */
public synchronized void registerSuspendedResponse(String graphId, Response response) { public synchronized void registerSuspendedResponse(String graphId, Response response) {
// The workers are not allowed to request a specific job or task, just a specific graph and queue type. // The workers are not allowed to request a specific job or task, just a specific graph and queue type.
Deque<Response> deque = consumersByGraph.get(graphId); Deque<Response> deque = consumersByGraph.get(graphId);
Expand Down Expand Up @@ -144,14 +136,15 @@ public synchronized boolean removeSuspendedResponse(String graphId, Response res
} }


private void logQueueStatus() { private void logQueueStatus() {
LOG.info("{} priority, {} undelivered, {} consumers waiting.", priorityTasks.size(), nUndeliveredTasks, nWaitingConsumers); LOG.info("{} undelivered, of which {} high-priority", nUndeliveredTasks, highPriorityTasks.size());
LOG.info("{} producers waiting, {} consumers waiting", highPriorityResponses.size(), nWaitingConsumers);
} }


/** /**
* Pull the next job queue with undelivered work fairly from users and jobs. * This method checks whether there are any high-priority tasks or normal job tasks and attempts to match them with
* Pass some of that work to a worker, blocking if necessary until there are workers available. * waiting workers. It blocks until there are tasks or workers available.
*/ */
public synchronized void deliverTasksForOneJob () throws InterruptedException { public synchronized void deliverTasks() throws InterruptedException {


// Wait until there are some undelivered tasks. // Wait until there are some undelivered tasks.
while (nUndeliveredTasks == 0) { while (nUndeliveredTasks == 0) {
Expand All @@ -162,13 +155,27 @@ public synchronized void deliverTasksForOneJob () throws InterruptedException {
LOG.debug("Task delivery thread is awake and there are some undelivered tasks."); LOG.debug("Task delivery thread is awake and there are some undelivered tasks.");
logQueueStatus(); logQueueStatus();


// Circular lists retain iteration state via their head pointers. // A reference to the job that will be drawn from in this iteration.
Job job = jobs.advanceToElement(e -> e.visibleTasks.size() > 0); Job job;

// Service all high-priority tasks before handling any normal priority jobs.
// These tasks are wrapped in a trivial Job so we can re-use delivery code in both high and low priority cases.
if (highPriorityTasks.size() > 0) {
AnalystClusterRequest task = highPriorityTasks.remove();
job = new Job("HIGH PRIORITY");
job.graphId = task.graphId;
job.addTask(task);
} else {
// Circular lists retain iteration state via their head pointers.
// We know we will find a task here because nUndeliveredTasks > 0.
job = jobs.advanceToElement(e -> e.visibleTasks.size() > 0);
}


// We have found job with some undelivered tasks. Give them to a consumer, // We have found job with some undelivered tasks. Give them to a consumer,
// waiting until one is available even if this means ignoring graph affinity. // waiting until one is available, possibly defying graph affinity.
LOG.debug("Task delivery thread has found undelivered tasks in job {}.", job.jobId); LOG.debug("Task delivery thread has found undelivered tasks in job {}.", job.jobId);
while (true) { while (true) {

while (nWaitingConsumers == 0) { while (nWaitingConsumers == 0) {
LOG.debug("Task delivery thread is going to sleep, there are no consumers waiting."); LOG.debug("Task delivery thread is going to sleep, there are no consumers waiting.");
// Thread will be notified when there are new incoming consumer connections. // Thread will be notified when there are new incoming consumer connections.
Expand Down Expand Up @@ -203,19 +210,22 @@ public synchronized void deliverTasksForOneJob () throws InterruptedException {
} }
} }


// No workers were available to accept the tasks. The thread should wait on the next iteration. // No workers were available to accept the tasks.
// Loop back, waiting for a consumer for the tasks in this job (thread should wait on the next iteration).
LOG.debug("No consumer was available. They all must have closed their connections."); LOG.debug("No consumer was available. They all must have closed their connections.");
if (nWaitingConsumers != 0) { if (nWaitingConsumers != 0) {
throw new AssertionError("There should be no waiting consumers here, something is wrong."); throw new AssertionError("There should be no waiting consumers here, something is wrong.");
} }

} }


} }


/** /**
* Attempt to hand some tasks from the given job to a waiting consumer connection. * Attempt to hand some tasks from the given job to a waiting consumer connection.
* The write will fail if the consumer has closed the connection but it hasn't been removed from the connection * The write will fail if the consumer has closed the connection but it hasn't been removed from the connection
* queue yet because the Broker methods are synchronized (the removal action is waiting to get the monitor). * queue yet. This can happen because the Broker methods are synchronized, and the removal action may be waiting
* to get the monitor while we are trying to distribute tasks here.
* @return whether the handoff succeeded. * @return whether the handoff succeeded.
*/ */
public synchronized boolean deliver (Job job, Response response) { public synchronized boolean deliver (Job job, Response response) {
Expand Down Expand Up @@ -269,21 +279,23 @@ public synchronized boolean deleteJobTask (int taskId) {


/** /**
* Marks the specified priority request as completed, and returns the suspended Response object for the connection * Marks the specified priority request as completed, and returns the suspended Response object for the connection
* that submitted the priority request, and is likely still waiting for a result over the same connection. * that submitted the priority request (the UI), which probably still waiting to receive a result back over the
* The HttpHandler thread can then pump data from the DELETE body back to the origin of the request, * same connection. A HttpHandler thread can then pump data from the DELETE body back to the origin of the request,
* without blocking the broker thread. * without blocking the broker thread.
* TODO rename to "deregisterSuspendedProducer" and "deregisterSuspendedConsumer" ?
*/ */
public synchronized Response deletePriorityTask (int taskId) { public synchronized Response deletePriorityTask (int taskId) {
return priorityResponses.remove(taskId); return highPriorityResponses.remove(taskId);
} }


// Todo: occasionally purge closed connections from consumersByGraph // TODO: occasionally purge closed connections from consumersByGraph
// TODO: worker catalog and graph affinity homeostasis


@Override @Override
public void run() { public void run() {
while (true) { while (true) {
try { try {
deliverTasksForOneJob(); deliverTasks();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Task pump thread was interrupted."); LOG.warn("Task pump thread was interrupted.");
return; return;
Expand Down
Expand Up @@ -57,7 +57,7 @@ public void service(Request request, Response response) throws Exception {
// request.getPathInfo(); // without handler base path // request.getPathInfo(); // without handler base path
String[] pathComponents = request.getPathInfo().split("/"); String[] pathComponents = request.getPathInfo().split("/");
// Path component 0 is empty since the path always starts with a slash. // Path component 0 is empty since the path always starts with a slash.
if (pathComponents.length < 3) { if (pathComponents.length < 2) {
response.setStatus(HttpStatus.BAD_REQUEST_400); response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("path should have at least one part"); response.setDetailMessage("path should have at least one part");
} }
Expand All @@ -79,11 +79,39 @@ public void service(Request request, Response response) throws Exception {
} else if (request.getMethod() == Method.POST) { } else if (request.getMethod() == Method.POST) {
/* Enqueue new messages. */ /* Enqueue new messages. */
String context = pathComponents[1]; String context = pathComponents[1];
if ("tasks".equals(context)) { if ("priority".equals(context)) {
// Enqueue a single priority task if (pathComponents.length == 2) {
AnalystClusterRequest task = mapper.readValue(request.getInputStream(), AnalystClusterRequest.class); // Enqueue a single priority task
response.suspend(); // The request should survive after the handler function exits. AnalystClusterRequest task = mapper.readValue(request.getInputStream(), AnalystClusterRequest.class);
broker.enqueuePriorityTask(task, response); broker.enqueuePriorityTask(task, response);
// Enqueueing the priority task has set its internal taskId.
// TODO move all removal listener registration into the broker functions.
request.getRequest().getConnection().addCloseListener((closeable, iCloseType) -> {
broker.deletePriorityTask(task.taskId);
});
response.suspend(); // The request should survive after the handler function exits.
} else {
// Mark a specific high-priority task as completed, and record its result.
// We were originally planning to do this with a DELETE request that has a body,
// but that is nonstandard enough to anger many libraries including Grizzly.
int taskId = Integer.parseInt(pathComponents[2]);
Response suspendedProducerResponse = broker.deletePriorityTask(taskId);
if (suspendedProducerResponse == null) {
response.setStatus(HttpStatus.NOT_FOUND_404);
return;
}
// Copy the result back to the connection that was the source of the task.
try {
ByteStreams.copy(request.getInputStream(), suspendedProducerResponse.getOutputStream());
} catch (IOException ioex) {
// Apparently the task producer did not wait to retrieve its result. Priority task result delivery
// is not guaranteed, we don't need to retry, this is not considered an error by the worker.
}
response.setStatus(HttpStatus.OK_200);
suspendedProducerResponse.setStatus(HttpStatus.OK_200);
suspendedProducerResponse.resume();
return;
}
} else if ("jobs".equals(context)) { } else if ("jobs".equals(context)) {
// Enqueue a list of tasks that belong to jobs // Enqueue a list of tasks that belong to jobs
List<AnalystClusterRequest> tasks = mapper.readValue(request.getInputStream(), List<AnalystClusterRequest> tasks = mapper.readValue(request.getInputStream(),
Expand All @@ -103,16 +131,6 @@ public void service(Request request, Response response) throws Exception {
/* Acknowledge completion of a task and remove it from queues, avoiding re-delivery. */ /* Acknowledge completion of a task and remove it from queues, avoiding re-delivery. */
if ("tasks".equalsIgnoreCase(pathComponents[1])) { if ("tasks".equalsIgnoreCase(pathComponents[1])) {
int taskId = Integer.parseInt(pathComponents[2]); int taskId = Integer.parseInt(pathComponents[2]);
// First check if this was a priority task, in which case the DELETE should contain a result.
Response suspendedRemoteResponse = broker.deletePriorityTask(taskId);
if (suspendedRemoteResponse != null) {
// Copy the body of this DELETE request back to the connection that was the source of the task.
ByteStreams.copy(request.getInputStream(), suspendedRemoteResponse.getOutputStream());
response.setStatus(HttpStatus.OK_200);
suspendedRemoteResponse.setStatus(HttpStatus.OK_200);
suspendedRemoteResponse.resume();
return;
}
// This must not have been a priority task. Try to delete it as a normal job task. // This must not have been a priority task. Try to delete it as a normal job task.
if (broker.deleteJobTask(taskId)) { if (broker.deleteJobTask(taskId)) {
response.setStatus(HttpStatus.OK_200); response.setStatus(HttpStatus.OK_200);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opentripplanner/analyst/broker/Job.java
Expand Up @@ -20,7 +20,7 @@ public class Job {


public final String jobId; public final String jobId;


/* Defines cache affinity group for contained tasks. TODO set this when created. */ /* Defines cache affinity group for contained tasks. */
String graphId; String graphId;


/* Tasks awaiting delivery. */ /* Tasks awaiting delivery. */
Expand Down
Expand Up @@ -36,6 +36,7 @@ public class AnalystClusterRequest implements Serializable {
/** /**
* To what queue should the notification of the result be delivered? * To what queue should the notification of the result be delivered?
*/ */
@Deprecated
public String outputQueue; public String outputQueue;


/** /**
Expand Down

0 comments on commit 3156357

Please sign in to comment.