Skip to content

Commit

Permalink
run all graph updater setup methods sequentially in caller thread
Browse files Browse the repository at this point in the history
addresses #2545
  • Loading branch information
abyrd committed May 10, 2018
1 parent caa687e commit 544e1b1
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 152 deletions.
17 changes: 9 additions & 8 deletions src/main/java/org/opentripplanner/updater/GraphUpdater.java
Expand Up @@ -13,6 +13,8 @@ the License, or (at your option) any later version.


package org.opentripplanner.updater; package org.opentripplanner.updater;


import org.opentripplanner.routing.graph.Graph;

/** /**
* Interface for graph updaters. Objects that implement this interface should always be configured * Interface for graph updaters. Objects that implement this interface should always be configured
* via PreferencesConfigurable.configure after creating the object. GraphUpdaterConfigurator should * via PreferencesConfigurable.configure after creating the object. GraphUpdaterConfigurator should
Expand All @@ -28,26 +30,25 @@ public interface GraphUpdater extends JsonConfigurable {
/** /**
* Graph updaters must be aware of their manager to be able to execute GraphWriterRunnables. * Graph updaters must be aware of their manager to be able to execute GraphWriterRunnables.
* GraphUpdaterConfigurator should take care of calling this function. * GraphUpdaterConfigurator should take care of calling this function.
*
* @param updaterManager is the parent updater manager
* @see GraphWriterRunnable
*/ */
public void setGraphUpdaterManager(GraphUpdaterManager updaterManager); public void setGraphUpdaterManager(GraphUpdaterManager updaterManager);


/** /**
* Here the updater can be initialized. If it throws, the updater won't be started (i.e. the run * Here the updater can be initialized. If it throws, the updater won't be started (i.e. the run
* method won't be called). * method won't be called). All updaters' setup methods will be run sequentially in a single-threaded manner
* before updates begin, in order to avoid concurrent reads/writes.
*/ */
public void setup() throws Exception; public void setup(Graph graph) throws Exception;


/** /**
* This is where the updater thread receives updates and applies them to the graph. This method * This method will run in its own thread. It pulls or receives updates and applies them to the graph.
* only runs once. * It must perform any writes to the graph by passing GraphWriterRunnables to GraphUpdaterManager.execute().
* This queues up the write operations, ensuring that only one updater performs writes at a time.
*/ */
public void run() throws Exception; public void run() throws Exception;


/** /**
* Here the updater can cleanup after itself. * Here the updater can clean up after itself.
*/ */
public void teardown(); public void teardown();


Expand Down
Expand Up @@ -48,18 +48,15 @@ public abstract class GraphUpdaterConfigurator {
private static Logger LOG = LoggerFactory.getLogger(GraphUpdaterConfigurator.class); private static Logger LOG = LoggerFactory.getLogger(GraphUpdaterConfigurator.class);


public static void setupGraph(Graph graph, JsonNode mainConfig) { public static void setupGraph(Graph graph, JsonNode mainConfig) {
// Create a updater manager for this graph
GraphUpdaterManager updaterManager = new GraphUpdaterManager(graph);


// Look for embedded config if it exists // Look for embedded config if it exists
// TODO figure out how & when we will use embedded config in absence of main config. // TODO figure out how & when we will use embedded config in absence of main config.
JsonNode embeddedConfig = null; // graph.routerConfig; JsonNode embeddedConfig = null; // graph.routerConfig;
LOG.info("Using configurations: " + (mainConfig == null ? "" : "[main]") + " " LOG.info("Using configurations: " + (mainConfig == null ? "" : "[main]") + " "
+ (embeddedConfig == null ? "" : "[embedded]")); + (embeddedConfig == null ? "" : "[embedded]"));


// Apply configuration // Create a updater manager for this graph, and create updaters according to the JSON configuration.
// FIXME why are we returning the same updatermanager object that has been modified ? this method could just create it. GraphUpdaterManager updaterManager = createManagerFromConfig(graph, mainConfig);
updaterManager = applyConfigurationToGraph(graph, updaterManager, mainConfig);


// Stop the updater manager if it contains nothing // Stop the updater manager if it contains nothing
if (updaterManager.size() == 0) { if (updaterManager.size() == 0) {
Expand All @@ -72,12 +69,12 @@ public static void setupGraph(Graph graph, JsonNode mainConfig) {
} }


/** /**
* @param graph * @param graph the graph that will be modified by these updaters
* @param updaterManager is the graph updater manager to which all updaters should be added * @return a GraphUpdaterManager containing all the created updaters
* @return reference to the same updaterManager as was given as input
*/ */
private static GraphUpdaterManager applyConfigurationToGraph(Graph graph, GraphUpdaterManager updaterManager, JsonNode config) { private static GraphUpdaterManager createManagerFromConfig(Graph graph, JsonNode config) {


GraphUpdaterManager updaterManager = new GraphUpdaterManager(graph);
for (JsonNode configItem : config.path("updaters")) { for (JsonNode configItem : config.path("updaters")) {


// For each sub-node, determine which kind of updater is being created. // For each sub-node, determine which kind of updater is being created.
Expand Down Expand Up @@ -113,27 +110,27 @@ else if (type.equals("opentraffic-updater")) {
} }
} }


// Configure and activate the new updater. if (updater == null) {
try { LOG.error("Unknown updater type: " + type);
// Check whether no updater type was found } else {
if (updater == null) { try {
LOG.error("Unknown updater type: " + type); // Inform the GraphUpdater of its parent Manager so the updater can enqueue write operations.
} else { // Perhaps this should be done in "addUpdater" below, to ensure the link is reciprocal.
// Add manager as parent
updater.setGraphUpdaterManager(updaterManager); updater.setGraphUpdaterManager(updaterManager);
// Configure updater if found and necessary // All GraphUpdaters are JsonConfigurable - send them their config information.
if (updater instanceof JsonConfigurable) { updater.configure(graph, configItem);
((JsonConfigurable) updater).configure(graph, configItem); // Perform any initial setup in a single-threaded manner to avoid concurrent reads/writes.
} updater.setup(graph);
// Add graph updater to manager // Add graph updater to manager.
updaterManager.addUpdater(updater); updaterManager.addUpdater(updater);
LOG.info ("Configured GraphUpdater: {}", updater); LOG.info("Configured GraphUpdater: {}", updater);
} catch (Exception e) {
LOG.error("Failed to configure graph updater:" + configItem.asText(), e);
} }
} catch (Exception e) {
LOG.error("Can't configure: " + configItem.asText(), e);
// Continue on to the next node
} }
} }
// Now that all the updaters are configured, kick them all off in their own threads.
updaterManager.startUpdaters();
return updaterManager; return updaterManager;
} }


Expand Down
59 changes: 20 additions & 39 deletions src/main/java/org/opentripplanner/updater/GraphUpdaterManager.java
Expand Up @@ -76,12 +76,12 @@ public class GraphUpdaterManager {
/** /**
* Keep track of all updaters so we can cleanly free resources associated with them at shutdown. * Keep track of all updaters so we can cleanly free resources associated with them at shutdown.
*/ */
List<GraphUpdater> updaterList = new ArrayList<>(); private List<GraphUpdater> updaterList = new ArrayList<>();


/** /**
* The Graph that will be updated. * The Graph that will be updated.
*/ */
Graph graph; private Graph graph;


/** /**
* Constructor. * Constructor.
Expand Down Expand Up @@ -140,18 +140,6 @@ public void stop() {
*/ */
public void addUpdater(final GraphUpdater updater) { public void addUpdater(final GraphUpdater updater) {
updaterList.add(updater); updaterList.add(updater);
updaterPool.execute(() -> {
try {
updater.setup();
try {
updater.run();
} catch (Exception e) {
LOG.error("Error while running updater {}:", updater.getClass().getName(), e);
}
} catch (Exception e) {
LOG.error("Error while setting up updater {}:", updater.getClass().getName(), e);
}
});
} }


/** /**
Expand All @@ -162,43 +150,36 @@ public void addUpdater(final GraphUpdater updater) {
* @param runnable is a graph writer runnable * @param runnable is a graph writer runnable
*/ */
public void execute(GraphWriterRunnable runnable) { public void execute(GraphWriterRunnable runnable) {
executeReturningFuture(runnable); scheduler.submit(() -> {
}

/**
* This is another method to use to modify the graph from the updaters. It behaves like execute,
* but blocks until the runnable has been executed. This might be particularly useful in the
* setup method of an updater.
*
* @param runnable is a graph writer runnable
* @throws ExecutionException
* @throws InterruptedException
* @see GraphUpdaterManager.execute
*/
public void executeBlocking(GraphWriterRunnable runnable) throws InterruptedException,
ExecutionException {
Future<?> future = executeReturningFuture(runnable);
// Ask for result of future. Will block and return null when runnable is successfully
// finished, throws otherwise
future.get();
}

private Future<?> executeReturningFuture(final GraphWriterRunnable runnable) {
// TODO: check for high water mark?
Future<?> future = scheduler.submit(() -> {
try { try {
runnable.run(graph); runnable.run(graph);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error while running graph writer {}:", runnable.getClass().getName(), e); LOG.error("Error while running graph writer {}:", runnable.getClass().getName(), e);
} }
}); });
return future;
} }


public int size() { public int size() {
return updaterList.size(); return updaterList.size();
} }


/**
* This should be called only once at startup to kick off every updater in its own thread, and only after all
* the updaters have had their setup methods called.
*/
public void startUpdaters() {
for (GraphUpdater updater : updaterList) {
LOG.info("Starting new thread for updater {}", updater.toString());
updaterPool.execute(() -> {
try {
updater.run();
} catch (Exception e) {
LOG.error("Error while running updater {}:", updater.getClass().getName(), e);
}
});
}
}

/** /**
* Just an example of fetching status information from the graph updater manager to expose it in a web service. * Just an example of fetching status information from the graph updater manager to expose it in a web service.
* More useful stuff should be added later. * More useful stuff should be added later.
Expand Down
Expand Up @@ -85,7 +85,7 @@ protected void configurePolling(Graph graph, JsonNode config) throws Exception {
} }


@Override @Override
public void setup() { public void setup(Graph graph) {
if (updateHandler == null) { if (updateHandler == null) {
updateHandler = new AlertsUpdateHandler(); updateHandler = new AlertsUpdateHandler();
} }
Expand Down
Expand Up @@ -96,17 +96,11 @@ protected void configurePolling(Graph graph, JsonNode config) throws Exception {
} }


@Override @Override
public void setup() throws InterruptedException, ExecutionException { public void setup(Graph graph) {
// Creation of network linker library will not modify the graph // Creation of network linker library will not modify the graph
linker = new SimpleStreetSplitter(graph); linker = new SimpleStreetSplitter(graph);

// Adding a bike park station service needs a graph writer runnable // Adding a bike park station service needs a graph writer runnable
updaterManager.executeBlocking(new GraphWriterRunnable() { bikeService = graph.getService(BikeRentalStationService.class, true);
@Override
public void run(Graph graph) {
bikeService = graph.getService(BikeRentalStationService.class, true);
}
});
} }


@Override @Override
Expand Down
Expand Up @@ -54,8 +54,6 @@ public class BikeRentalUpdater extends PollingGraphUpdater {


private BikeRentalDataSource source; private BikeRentalDataSource source;


private Graph graph;

private SimpleStreetSplitter linker; private SimpleStreetSplitter linker;


private BikeRentalStationService service; private BikeRentalStationService service;
Expand Down Expand Up @@ -115,19 +113,22 @@ protected void configurePolling (Graph graph, JsonNode config) throws Exception


// Configure updater // Configure updater
LOG.info("Setting up bike rental updater."); LOG.info("Setting up bike rental updater.");
this.graph = graph;
this.source = source; this.source = source;
this.network = config.path("networks").asText(DEFAULT_NETWORK_LIST); this.network = config.path("networks").asText(DEFAULT_NETWORK_LIST);
LOG.info("Creating bike-rental updater running every {} seconds : {}", pollingPeriodSeconds, source); if (pollingPeriodSeconds <= 0) {
LOG.info("Creating bike-rental updater running once only (non-polling): {}", source);
} else {
LOG.info("Creating bike-rental updater running every {} seconds: {}", pollingPeriodSeconds, source);
}

} }


@Override @Override
public void setup() throws InterruptedException, ExecutionException { public void setup(Graph graph) throws InterruptedException, ExecutionException {
// Creation of network linker library will not modify the graph // Creation of network linker library will not modify the graph
linker = new SimpleStreetSplitter(graph); linker = new SimpleStreetSplitter(graph);

// Adding a bike rental station service needs a graph writer runnable // Adding a bike rental station service needs a graph writer runnable
updaterManager.executeBlocking(graph -> service = graph.getService(BikeRentalStationService.class, true)); service = graph.getService(BikeRentalStationService.class, true);
} }


@Override @Override
Expand Down
Expand Up @@ -71,22 +71,8 @@ public void setGraphUpdaterManager(GraphUpdaterManager updaterManager) {


// Here the updater can be initialized. // Here the updater can be initialized.
@Override @Override
public void setup() { public void setup(Graph graph) {
LOG.info("Setup example updater"); LOG.info("Setup example updater");

// Execute anonymous graph writer runnable and wait for its termination
try {
updaterManager.executeBlocking(new GraphWriterRunnable() {
@Override
public void run(Graph graph) {
LOG.info("Anonymous graph writer {} runnable is run on the "
+ "graph writer scheduler.", this.hashCode());
// Do some writing to the graph here
}
});
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
} }


// This is where the updater thread receives updates and applies them to the graph. // This is where the updater thread receives updates and applies them to the graph.
Expand Down
Expand Up @@ -67,7 +67,7 @@ public void setGraphUpdaterManager(GraphUpdaterManager updaterManager) {


// Here the updater can be initialized. // Here the updater can be initialized.
@Override @Override
public void setup() { public void setup(Graph graph) {
LOG.info("Setup example polling updater"); LOG.info("Setup example polling updater");
} }


Expand Down
Expand Up @@ -119,34 +119,27 @@ public void configurePolling(Graph graph, JsonNode config) throws Exception {
} }


@Override @Override
public void setup() throws InterruptedException, ExecutionException { public void setup(Graph graph) {
// Create a realtime data snapshot source and wait for runnable to be executed // Only create a realtime data snapshot source if none exists already
updaterManager.executeBlocking(new GraphWriterRunnable() { TimetableSnapshotSource snapshotSource = graph.timetableSnapshotSource;
@Override if (snapshotSource == null) {
public void run(Graph graph) { snapshotSource = new TimetableSnapshotSource(graph);
// Only create a realtime data snapshot source if none exists already // Add snapshot source to graph
TimetableSnapshotSource snapshotSource = graph.timetableSnapshotSource; graph.timetableSnapshotSource = (snapshotSource);
if (snapshotSource == null) { }
snapshotSource = new TimetableSnapshotSource(graph); // Set properties of realtime data snapshot source
// Add snapshot source to graph if (logFrequency != null) {
graph.timetableSnapshotSource = (snapshotSource); snapshotSource.logFrequency = (logFrequency);
} }

if (maxSnapshotFrequency != null) {
// Set properties of realtime data snapshot source snapshotSource.maxSnapshotFrequency = (maxSnapshotFrequency);
if (logFrequency != null) { }
snapshotSource.logFrequency = (logFrequency); if (purgeExpiredData != null) {
} snapshotSource.purgeExpiredData = (purgeExpiredData);
if (maxSnapshotFrequency != null) { }
snapshotSource.maxSnapshotFrequency = (maxSnapshotFrequency); if (fuzzyTripMatcher != null) {
} snapshotSource.fuzzyTripMatcher = fuzzyTripMatcher;
if (purgeExpiredData != null) { }
snapshotSource.purgeExpiredData = (purgeExpiredData);
}
if (fuzzyTripMatcher != null) {
snapshotSource.fuzzyTripMatcher = fuzzyTripMatcher;
}
}
});
} }


/** /**
Expand Down

0 comments on commit 544e1b1

Please sign in to comment.