Skip to content

Commit

Permalink
finish re-sending tasks #2048
Browse files Browse the repository at this point in the history
this includes an integration test to verify re-delivery
  • Loading branch information
abyrd committed Jul 22, 2015
1 parent c66114e commit 5ce078a
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 21 deletions.
21 changes: 17 additions & 4 deletions src/main/java/org/opentripplanner/analyst/broker/Broker.java
Expand Up @@ -57,7 +57,7 @@ public class Broker implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(Broker.class);

private static final int REDELIVERY_INTERVAL_SEC = 30;
private static final int REDELIVERY_INTERVAL_SEC = 10;

public final CircularList<Job> jobs = new CircularList<>();

Expand Down Expand Up @@ -122,7 +122,7 @@ public class Broker implements Runnable {
private TObjectLongMap<String> recentlyRequestedWorkers = new TObjectLongHashMap<>();

// Queue of tasks to complete Delete, Enqueue etc. to avoid synchronizing all the functions ?

// TODO rename config to brokerConfig
public Broker (Properties config, String addr, int port) {
this.config = config;

Expand Down Expand Up @@ -297,12 +297,18 @@ private void logQueueStatus() {
* Check whether there are any delivered tasks that have reached their invisibility timeout but have not yet been
* marked complete. Enqueue those tasks for redelivery.
*/
private void redeliver () {
private void redeliver() {
if (System.currentTimeMillis() > nextRedeliveryCheckTime) {
nextRedeliveryCheckTime += REDELIVERY_INTERVAL_SEC * 1000;
LOG.info("Scanning for redelivery...");
int nRedelivered = 0;
int nInvisible = 0;
for (Job job : jobs) {
nUndeliveredTasks += job.redeliver();
nInvisible += job.invisibleUntil.size();
nRedelivered += job.redeliver();
}
LOG.info("{} tasks enqueued for redelivery out of {} invisible tasks.", nRedelivered, nInvisible);
nUndeliveredTasks += nRedelivered;
}
}

Expand Down Expand Up @@ -550,6 +556,13 @@ public synchronized boolean deleteJob (String jobId) {

private Multimap<String, String> activeJobsPerGraph = HashMultimap.create();

public synchronized boolean anyJobsActive() {
for (Job job : jobs) {
if (!job.isComplete()) return true;
}
return false;
}

void activateJob (Job job) {
activeJobsPerGraph.put(job.graphId, job.jobId);
}
Expand Down
29 changes: 21 additions & 8 deletions src/main/java/org/opentripplanner/analyst/broker/BrokerMain.java
Expand Up @@ -14,17 +14,21 @@

// benchmark: $ ab -n 2000 -k -c 100 http://localhost:9001/

public class BrokerMain {
// TODO Merge with Broker
public class BrokerMain implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(BrokerMain.class);

private static final int DEFAULT_PORT = 9001;

private static final String DEFAULT_BIND_ADDRESS = "0.0.0.0";

public static final Properties config = new Properties();
Properties config = new Properties();

public Broker broker;

public static void main(String[] args) {

File cfg;
if (args.length > 0)
cfg = new File(args[0]);
Expand All @@ -36,26 +40,37 @@ public static void main(String[] args) {
return;
}

Properties brokerConfig = new Properties();
try {
FileInputStream is = new FileInputStream(cfg);
config.load(is);
brokerConfig.load(is);
is.close();
} catch (IOException e) {
LOG.error("Error reading config file {}", e);
return;
}

// Create instance and run in the current thread.
new BrokerMain(brokerConfig).run();

}

public BrokerMain(Properties brokerConfig) {
this.config = brokerConfig;
}

public void run() {
int port = config.getProperty("port") != null ? Integer.parseInt(config.getProperty("port")) : DEFAULT_PORT;
String addr = config.getProperty("bind-address") != null ? config.getProperty("bind-address") : DEFAULT_BIND_ADDRESS;

LOG.info("Starting analyst broker on port {} of interface {}", port, addr);
HttpServer httpServer = new HttpServer();
NetworkListener networkListener = new NetworkListener("broker", addr, port);
networkListener.getTransport().setIOStrategy(SameThreadIOStrategy.getInstance()); // we avoid blocking IO, and this allows us to see closed connections.
// We avoid blocking IO, and the following line allows us to see closed connections.
networkListener.getTransport().setIOStrategy(SameThreadIOStrategy.getInstance());
httpServer.addListener(networkListener);
// Bypass Jersey etc. and add a low-level Grizzly handler.
// As in servlets, * is needed in base path to identify the "rest" of the path.
Broker broker = new Broker(config, addr, port);
broker = new Broker(config, addr, port);
httpServer.getServerConfiguration().addHttpHandler(new BrokerHttpHandler(broker), "/*");
try {
httpServer.start();
Expand All @@ -70,8 +85,6 @@ public static void main(String[] args) {
LOG.info("Interrupted, shutting down.");
}
httpServer.shutdown();

}


}
6 changes: 4 additions & 2 deletions src/main/java/org/opentripplanner/analyst/broker/Job.java
Expand Up @@ -23,6 +23,8 @@ public class Job {

private static final Logger LOG = LoggerFactory.getLogger(Job.class);

private static final int INVISIBLE_DURATION_SEC = 30; // How long until tasks are eligible for re-delivery.

/* A unique identifier for this job, usually a random UUID. */
public final String jobId;

Expand Down Expand Up @@ -58,7 +60,7 @@ public void addTask (AnalystClusterRequest task) {

public void markTasksDelivered(List<AnalystClusterRequest> tasks) {
long deliveryTime = System.currentTimeMillis();
long visibleAt = deliveryTime + 60 * 1000; // Invisible for one minute
long visibleAt = deliveryTime + INVISIBLE_DURATION_SEC * 1000;
for (AnalystClusterRequest task : tasks) {
invisibleUntil.put(task.taskId, visibleAt);
}
Expand All @@ -80,7 +82,7 @@ public int redeliver () {
if (now > timeout) {
invisibleIterator.remove();
visibleTasks.add(tasksById.get(taskId));
LOG.warn("Task {} was not completed in time, queueing it for re-delivery.", taskId);
LOG.warn("Task {} was of job {} was not completed in time, queueing it for re-delivery.", taskId, jobId);
nRedelivered += 1;
}
}
Expand Down
Expand Up @@ -53,6 +53,7 @@
*
*/
public class AnalystWorker implements Runnable {

/**
* worker ID - just a random ID so we can differentiate machines used for computation.
* Useful to isolate the logs from a particular machine, as well as to evaluate any
Expand All @@ -72,6 +73,13 @@ public class AnalystWorker implements Runnable {

public static final int POLL_TIMEOUT = 10 * 1000;

/**
* If this value is non-negative, the worker will not actually do any work. It will just report all tasks
* as completed immediately, but will fail to do so on the given percentage of tasks. This is used in testing task
* re-delivery and overall broker sanity.
*/
public int dryRunFailureRate = -1;

/** should this worker shut down automatically */
public final boolean autoShutdown;

Expand Down Expand Up @@ -226,6 +234,18 @@ public void run() {
}

private void handleOneRequest(AnalystClusterRequest clusterRequest) {
if (dryRunFailureRate >= 0) {
// This worker is running in test mode.
// It should report all work as completed without actually doing anything,
// but will fail a certain percentage of the time.
if (random.nextInt(100) >= dryRunFailureRate) {
// Pretend to succeed.
deleteRequest(clusterRequest);
} else {
LOG.info("Intentionally failing on task {}", clusterRequest.taskId);
}
return;
}
try {
long startTime = System.currentTimeMillis();
LOG.info("Handling message {}", clusterRequest.toString());
Expand Down Expand Up @@ -257,7 +277,6 @@ private void handleOneRequest(AnalystClusterRequest clusterRequest) {
ts.lon = clusterRequest.profileRequest.fromLon;
ts.lat = clusterRequest.profileRequest.fromLat;


RepeatedRaptorProfileRouter router;

boolean isochrone = clusterRequest.destinationPointsetId == null;
Expand Down
Expand Up @@ -26,15 +26,25 @@
public class JobSimulator {

public static String USERID = "userA";
public String s3prefix = "S3PREFIX";
public String pointSetId = "POINTSET";
public String graphId = "GRAPH";
public int nOrigins = 100;

DefaultHttpClient httpClient = new DefaultHttpClient();

public static void main(String[] args) {

DefaultHttpClient httpClient = new DefaultHttpClient();
JobSimulator js = new JobSimulator();
js.s3prefix = args[0];
js.pointSetId = args[1];
js.graphId = args[2];
js.nOrigins = Integer.parseInt(args[3]);
js.sendFakeJob();

}

String s3prefix = args[0];
String pointSetId = args[1];
String graphId = args[2];
int nOrigins = Integer.parseInt(args[3]);
public void sendFakeJob() {

String jobId = compactUUID();

Expand All @@ -61,7 +71,7 @@ public static void main(String[] args) {
// throw new RuntimeException(e);
// }

String url = String.format("http://localhost:9001/jobs");
String url = String.format("http://localhost:9001/enqueue/jobs");
HttpPost httpPost = new HttpPost(url);
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
Expand Down
@@ -0,0 +1,68 @@
package org.opentripplanner.analyst.broker;

import org.opentripplanner.analyst.cluster.AnalystWorker;
import org.opentripplanner.analyst.cluster.JobSimulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
* This test is not an automatic unit test. It is an integration test that must be started manually, because it takes
* a long time to run. It will start up a broker and some local workers, then submit a large job to the broker. The
* workers will fail to complete tasks some percentage of the time, but eventually the whole job should be finished
* because the broker will re-send tasks.
*/
public class RedeliveryTest {

private static final Logger LOG = LoggerFactory.getLogger(RedeliveryTest.class);
static final int N_TASKS = 100;
static final int N_WORKERS = 4;
static final int FAILURE_RATE = 20; // percent

public static void main(String[] params) {

// Start a broker in a new thread.
Properties brokerConfig = new Properties();
brokerConfig.setProperty("graphs-bucket", "FAKE");
brokerConfig.setProperty("pointsets-bucket", "FAKE");
brokerConfig.setProperty("work-offline", "true");
BrokerMain brokerMain = new BrokerMain(brokerConfig);
Thread brokerThread = new Thread(brokerMain); // TODO combine broker and brokermain, set offline mode.
brokerThread.start();

// Start some workers.
Properties workerConfig = new Properties();
workerConfig.setProperty("initial-graph-id", "GRAPH");
List<Thread> workerThreads = new ArrayList<>();
for (int i = 0; i < N_WORKERS; i++) {
AnalystWorker worker = new AnalystWorker(workerConfig);
worker.dryRunFailureRate = FAILURE_RATE;
Thread workerThread = new Thread(worker);
workerThreads.add(workerThread);
workerThread.start();
}

// Feed some work to the broker.
JobSimulator jobSimulator = new JobSimulator();
jobSimulator.nOrigins = N_TASKS;
jobSimulator.graphId = "GRAPH";
jobSimulator.sendFakeJob();

// Wait for all tasks to be marked finished
while (brokerMain.broker.anyJobsActive()) {
try {
LOG.info("Some jobs are still not complete.");
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

LOG.info("All jobs finished.");
System.exit(0);
}

}

0 comments on commit 5ce078a

Please sign in to comment.