Skip to content

Commit

Permalink
on priority tasks, pump DELETE body back to waiting originator connec…
Browse files Browse the repository at this point in the history
…tion

also dropped explicit user level in URL hierarchy
actually there's no need for a URL hierarchy at all anymore because the broker can now bind the requests it's handling
  • Loading branch information
abyrd committed Jun 19, 2015
1 parent 71d8821 commit 6ab203b
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 182 deletions.
92 changes: 39 additions & 53 deletions src/main/java/org/opentripplanner/analyst/broker/Broker.java
Expand Up @@ -15,7 +15,6 @@
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque; import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -60,7 +59,7 @@ public class Broker implements Runnable {


private static final Logger LOG = LoggerFactory.getLogger(Broker.class); private static final Logger LOG = LoggerFactory.getLogger(Broker.class);


private CircularList<User> users = new CircularList<>(); public final CircularList<Job> jobs = new CircularList<>();


private int nUndeliveredTasks = 0; private int nUndeliveredTasks = 0;


Expand All @@ -83,31 +82,31 @@ public class Broker implements Runnable {
private TIntObjectMap<Response> priorityResponses = new TIntObjectHashMap<>(); private TIntObjectMap<Response> priorityResponses = new TIntObjectHashMap<>();


/** Outstanding requests from workers for tasks, grouped by worker graph affinity. */ /** Outstanding requests from workers for tasks, grouped by worker graph affinity. */
Map<String, Deque<Response>> connectionsForGraph = new HashMap<>(); Map<String, Deque<Response>> consumersByGraph = new HashMap<>();


// Queue of tasks to complete Delete, Enqueue etc. to avoid synchronizing all the functions ? // Queue of tasks to complete Delete, Enqueue etc. to avoid synchronizing all the functions ?


/** /**
* Enqueue a task for execution ASAP, planning to return the response over the same HTTP connection. * Enqueue a task for execution ASAP, planning to return the response over the same HTTP connection.
* Low-reliability, no re-delivery. * Low-reliability, no re-delivery.
*/ */
public synchronized void enqueuePriorityTask (QueuePath queuePath, AnalystClusterRequest task, Response response) { public synchronized void enqueuePriorityTask (AnalystClusterRequest task, Response response) {
task.taskId = nextTaskId++; task.taskId = nextTaskId++;
priorityTasks.add(task); priorityTasks.add(task);
priorityResponses.put(task.taskId, response); priorityResponses.put(task.taskId, response);
} }


/** Enqueue some tasks for asynchronous execution possibly much later. Results will be saved to S3. */ /** Enqueue some tasks for queued execution possibly much later. Results will be saved to S3. */
public synchronized void enqueueTasks (QueuePath queuePath, Collection<AnalystClusterRequest> tasks) { public synchronized void enqueueTasks (List<AnalystClusterRequest> tasks) {
LOG.debug("Queue {}", queuePath); Job job = findJob(tasks.get(0)); // creates one if it doesn't exist
// Assumes tasks are pre-validated and are all on the same user/job
User user = findUser(queuePath.userId, true);
Job job = user.findJob(queuePath.jobId, true);
for (AnalystClusterRequest task : tasks) { for (AnalystClusterRequest task : tasks) {
task.taskId = nextTaskId++; task.taskId = nextTaskId++;
job.addTask(task); job.addTask(task);
nUndeliveredTasks += 1; nUndeliveredTasks += 1;
LOG.debug("Enqueued task id {} in job {}", task.taskId, job.jobId); LOG.debug("Enqueued task id {} in job {}", task.taskId, job.jobId);
if (task.graphId != job.graphId) {
LOG.warn("Task graph ID {} does not match job graph ID {}.", task.graphId, job.graphId);
}
} }
// Wake up the delivery thread if it's waiting on input. // Wake up the delivery thread if it's waiting on input.
// This wakes whatever thread called wait() while holding the monitor for this Broker object. // This wakes whatever thread called wait() while holding the monitor for this Broker object.
Expand All @@ -117,10 +116,10 @@ public synchronized void enqueueTasks (QueuePath queuePath, Collection<AnalystCl
/** Long poll operations are enqueued here. */ /** Long poll operations are enqueued here. */
public synchronized void registerSuspendedResponse(String graphId, Response response) { public synchronized void registerSuspendedResponse(String graphId, Response response) {
// The workers are not allowed to request a specific job or task, just a specific graph and queue type. // The workers are not allowed to request a specific job or task, just a specific graph and queue type.
Deque<Response> deque = connectionsForGraph.get(graphId); Deque<Response> deque = consumersByGraph.get(graphId);
if (deque == null) { if (deque == null) {
deque = new ArrayDeque<>(); deque = new ArrayDeque<>();
connectionsForGraph.put(graphId, deque); consumersByGraph.put(graphId, deque);
} }
deque.addLast(response); deque.addLast(response);
nWaitingConsumers += 1; nWaitingConsumers += 1;
Expand All @@ -131,7 +130,7 @@ public synchronized void registerSuspendedResponse(String graphId, Response resp


/** When we notice that a long poll connection has closed, we remove it here. */ /** When we notice that a long poll connection has closed, we remove it here. */
public synchronized boolean removeSuspendedResponse(String graphId, Response response) { public synchronized boolean removeSuspendedResponse(String graphId, Response response) {
Deque<Response> deque = connectionsForGraph.get(graphId); Deque<Response> deque = consumersByGraph.get(graphId);
if (deque == null) { if (deque == null) {
return false; return false;
} }
Expand All @@ -145,7 +144,7 @@ public synchronized boolean removeSuspendedResponse(String graphId, Response res
} }


private void logQueueStatus() { private void logQueueStatus() {
LOG.info("Status {} undelivered, {} consumers waiting.", nUndeliveredTasks, nWaitingConsumers); LOG.info("{} priority, {} undelivered, {} consumers waiting.", priorityTasks.size(), nUndeliveredTasks, nWaitingConsumers);
} }


/** /**
Expand All @@ -164,14 +163,7 @@ public synchronized void deliverTasksForOneJob () throws InterruptedException {
logQueueStatus(); logQueueStatus();


// Circular lists retain iteration state via their head pointers. // Circular lists retain iteration state via their head pointers.
Job job = null; Job job = jobs.advanceToElement(e -> e.visibleTasks.size() > 0);
while (job == null) {
User user = users.advance();
if (user == null) {
LOG.error("There should always be at least one user here, because there is an undelivered task.");
}
job = user.jobs.advanceToElement(e -> e.visibleTasks.size() > 0);
}


// We have found job with some undelivered tasks. Give them to a consumer, // We have found job with some undelivered tasks. Give them to a consumer,
// waiting until one is available even if this means ignoring graph affinity. // waiting until one is available even if this means ignoring graph affinity.
Expand All @@ -185,10 +177,10 @@ public synchronized void deliverTasksForOneJob () throws InterruptedException {
LOG.debug("Task delivery thread is awake, and some consumers are waiting."); LOG.debug("Task delivery thread is awake, and some consumers are waiting.");
logQueueStatus(); logQueueStatus();


// Here, we know there are some consumer connections waiting, but we don't know if they're still open. // Here, we know there are some consumer connections waiting, but we're not sure they're still open.
// First try to get a consumer with affinity for this graph // First try to get a consumer with affinity for this graph
LOG.debug("Looking for an eligible consumer, respecting graph affinity."); LOG.debug("Looking for an eligible consumer, respecting graph affinity.");
Deque<Response> deque = connectionsForGraph.get(job.graphId); Deque<Response> deque = consumersByGraph.get(job.graphId);
while (deque != null && !deque.isEmpty()) { while (deque != null && !deque.isEmpty()) {
Response response = deque.pop(); Response response = deque.pop();
nWaitingConsumers -= 1; nWaitingConsumers -= 1;
Expand All @@ -199,7 +191,7 @@ public synchronized void deliverTasksForOneJob () throws InterruptedException {


// Then try to get a consumer from the graph with the most workers // Then try to get a consumer from the graph with the most workers
LOG.debug("No consumers with the right affinity. Looking for any consumer."); LOG.debug("No consumers with the right affinity. Looking for any consumer.");
List<Deque<Response>> deques = new ArrayList<>(connectionsForGraph.values()); List<Deque<Response>> deques = new ArrayList<>(consumersByGraph.values());
deques.sort((d1, d2) -> Integer.compare(d2.size(), d1.size())); deques.sort((d1, d2) -> Integer.compare(d2.size(), d1.size()));
for (Deque<Response> d : deques) { for (Deque<Response> d : deques) {
while (!d.isEmpty()) { while (!d.isEmpty()) {
Expand Down Expand Up @@ -265,29 +257,27 @@ public synchronized boolean deliver (Job job, Response response) {
} }


/** /**
* Take a task out of the job, marking it as completed. The body of this DELETE request... * Take a normal (non-priority) task out of a job queue, marking it as completed so it will not be re-delivered.
* @return whether the task was found and removed. * @return whether the task was found and removed.
*/ */
public synchronized boolean deleteTask (QueuePath queuePath) { public synchronized boolean deleteJobTask (int taskId) {

User user = findUser(queuePath.userId, false);
if (user == null) {
return false;
}

Job job = user.findJob(queuePath.jobId, false);
if (job == null) {
return false;
}

// There could be thousands of invisible (delivered) tasks, so we use a hash map. // There could be thousands of invisible (delivered) tasks, so we use a hash map.
// We only allow removal of invisible tasks for now. // We only allow removal of delivered, invisible tasks for now (not undelivered tasks).
// Return whether removal call discovered an existing task. // Return whether removal call discovered an existing task.
return job.invisibleTasks.remove(queuePath.taskId) != null; return deliveredTasks.remove(taskId) != null;
}


/**
* Marks the specified priority request as completed, and returns the suspended Response object for the connection
* that submitted the priority request, and is likely still waiting for a result over the same connection.
* The HttpHandler thread can then pump data from the DELETE body back to the origin of the request,
* without blocking the broker thread.
*/
public synchronized Response deletePriorityTask (int taskId) {
return priorityResponses.remove(taskId);
} }


// Todo: occasionally purge closed connections from connectionsForGraph // Todo: occasionally purge closed connections from consumersByGraph


@Override @Override
public void run() { public void run() {
Expand All @@ -301,20 +291,16 @@ public void run() {
} }
} }


/** Search through the users to find one with the given ID, without advancing the head of the circular list. */ public Job findJob (AnalystClusterRequest task) {
public User findUser (String userId, boolean create) { for (Job job : jobs) {
for (User user : users) { if (job.jobId.equals(task.jobId)) {
if (user.userId.equals(userId)) { return job;
return user;
} }
} }
if (create) { Job job = new Job(task.jobId);
User user = new User(userId); job.graphId = task.graphId;
users.insertAtTail(user); jobs.insertAtTail(job);
return user; return job;
}
return null;
} }



} }
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.ByteStreams;
import org.glassfish.grizzly.http.Method; import org.glassfish.grizzly.http.Method;
import org.glassfish.grizzly.http.server.HttpHandler; import org.glassfish.grizzly.http.server.HttpHandler;
import org.glassfish.grizzly.http.server.Request; import org.glassfish.grizzly.http.server.Request;
Expand Down Expand Up @@ -48,16 +49,17 @@ public BrokerHttpHandler(Broker broker) {
@Override @Override
public void service(Request request, Response response) throws Exception { public void service(Request request, Response response) throws Exception {


// request.getRequestURI(); // without protocol or server, only request path
// request.getPathInfo(); // without handler base path

response.setContentType("application/json"); response.setContentType("application/json");


// may be a partially specified QueuePath without job or task ID // request.getRequestURI(); // without protocol or server, only request path
QueuePath queuePath = new QueuePath(request.getPathInfo()); // request.getPathInfo(); // without handler base path
String[] pathComponents = request.getPathInfo().split("/");
// Path component 0 is empty since the path always starts with a slash.
if (pathComponents.length < 3) {
response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("path should have at least one part");
}


// Request body is expected to be JSON. Rather than loading it into a string, we could parse it to a tree
// or bind to a type immediately. However binding introduces a dependency on the message type classes.
try { try {
if (request.getMethod() == Method.HEAD) { if (request.getMethod() == Method.HEAD) {
/* Let the client know server is alive and URI + request are valid. */ /* Let the client know server is alive and URI + request are valid. */
Expand All @@ -66,33 +68,58 @@ public void service(Request request, Response response) throws Exception {
return; return;
} else if (request.getMethod() == Method.GET) { } else if (request.getMethod() == Method.GET) {
/* Return a chunk of tasks for a particular graph. */ /* Return a chunk of tasks for a particular graph. */
String graphAffinity = pathComponents[1];
request.getRequest().getConnection().addCloseListener((closeable, iCloseType) -> { request.getRequest().getConnection().addCloseListener((closeable, iCloseType) -> {
broker.removeSuspendedResponse(queuePath.graphId, response); broker.removeSuspendedResponse(graphAffinity, response);
}); });
response.suspend(); // This request should survive after the handler function exits. response.suspend(); // The request should survive after the handler function exits.
broker.registerSuspendedResponse(queuePath.graphId, response); broker.registerSuspendedResponse(graphAffinity, response);
} else if (request.getMethod() == Method.POST) { } else if (request.getMethod() == Method.POST) {
/* Enqueue new messages. */ /* Enqueue new messages. */
// Text round trip through JSON is done in the HTTP handler thread, does not block the broker thread. String context = pathComponents[1];
List<AnalystClusterRequest> tasks = if ("tasks".equals(context)) {
mapper.readValue(request.getInputStream(), new TypeReference<List<AnalystClusterRequest>>(){}); // Enqueue a single priority task
for (AnalystClusterRequest task : tasks) { AnalystClusterRequest task = mapper.readValue(request.getInputStream(), AnalystClusterRequest.class);
if (!task.graphId.equals(queuePath.graphId) response.suspend(); // The request should survive after the handler function exits.
|| !task.userId.equals(queuePath.userId) broker.enqueuePriorityTask(task, response);
|| !task.jobId.equals(queuePath.jobId)) { } else if ("jobs".equals(context)) {
response.setStatus(HttpStatus.BAD_REQUEST_400); // Enqueue a list of tasks that belong to jobs
response.setDetailMessage("Task graph/user/job ID does not match POST path."); List<AnalystClusterRequest> tasks = mapper.readValue(request.getInputStream(),
return; new TypeReference<List<AnalystClusterRequest>>(){});
// Pre-validate tasks checking that they are all on the same job
AnalystClusterRequest exemplar = tasks.get(0);
for (AnalystClusterRequest task : tasks) {
if (task.jobId != exemplar.jobId || task.graphId != exemplar.graphId) {
response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("All tasks must be for the same graph and job.");
}
} }
broker.enqueueTasks(tasks);
response.setStatus(HttpStatus.ACCEPTED_202);
} }
broker.enqueueTasks(queuePath, tasks);
response.setStatus(HttpStatus.ACCEPTED_202);
} else if (request.getMethod() == Method.DELETE) { } else if (request.getMethod() == Method.DELETE) {
/* Acknowledge completion of a task and remove it from queues. */ /* Acknowledge completion of a task and remove it from queues, avoiding re-delivery. */
if (broker.deleteTask(queuePath)) { if ("tasks".equalsIgnoreCase(pathComponents[1])) {
response.setStatus(HttpStatus.OK_200); int taskId = Integer.parseInt(pathComponents[2]);
// First check if this was a priority task, in which case the DELETE should contain a result.
Response suspendedRemoteResponse = broker.deletePriorityTask(taskId);
if (suspendedRemoteResponse != null) {
// Copy the body of this DELETE request back to the connection that was the source of the task.
ByteStreams.copy(request.getInputStream(), suspendedRemoteResponse.getOutputStream());
response.setStatus(HttpStatus.OK_200);
suspendedRemoteResponse.setStatus(HttpStatus.OK_200);
suspendedRemoteResponse.resume();
return;
}
// This must not have been a priority task. Try to delete it as a normal job task.
if (broker.deleteJobTask(taskId)) {
response.setStatus(HttpStatus.OK_200);
} else {
response.setStatus(HttpStatus.NOT_FOUND_404);
}
} else { } else {
response.setStatus(HttpStatus.NOT_FOUND_404); response.setStatus(HttpStatus.BAD_REQUEST_400);
response.setDetailMessage("Delete is only allowed for tasks.");
} }
} else { } else {
response.setStatus(HttpStatus.BAD_REQUEST_400); response.setStatus(HttpStatus.BAD_REQUEST_400);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opentripplanner/analyst/broker/Job.java
Expand Up @@ -18,11 +18,11 @@ public class Job {


private int nTasks = 0; private int nTasks = 0;


public final String jobId;

/* Defines cache affinity group for contained tasks. TODO set this when created. */ /* Defines cache affinity group for contained tasks. TODO set this when created. */
String graphId; String graphId;


public final String jobId;

/* Tasks awaiting delivery. */ /* Tasks awaiting delivery. */
Queue<AnalystClusterRequest> visibleTasks = new ArrayDeque<>(); Queue<AnalystClusterRequest> visibleTasks = new ArrayDeque<>();


Expand Down
70 changes: 0 additions & 70 deletions src/main/java/org/opentripplanner/analyst/broker/QueuePath.java

This file was deleted.

0 comments on commit 6ab203b

Please sign in to comment.