Skip to content

Commit

Permalink
Clear all data from buffer before applying updates when feed message …
Browse files Browse the repository at this point in the history
…header indicates it is a FULL_DATASET
  • Loading branch information
jkoelewijn committed Mar 26, 2015
1 parent 9d63b15 commit 84b7ec0
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 35 deletions.
Expand Up @@ -249,6 +249,19 @@ public TimetableSnapshot commit(boolean force) {
return ret;
}

/**
* Clear all data from snapshot
*/
public void clear() {
if (dirty == null) {
throw new ConcurrentModificationException("This TimetableSnapshot is read-only.");
}

// Clear all data from snapshot
timetables.clear();
lastAddedTripPattern.clear();
}

/**
* Removes all Timetables which are valid for a ServiceDate on-or-before the one supplied.
*/
Expand Down
Expand Up @@ -18,14 +18,14 @@ the License, or (at your option) any later version.
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.prefs.Preferences;

import com.fasterxml.jackson.databind.JsonNode;

import org.opentripplanner.routing.graph.Graph;
import org.opentripplanner.updater.JsonConfigurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.transit.realtime.GtfsRealtime;
import com.google.transit.realtime.GtfsRealtime.FeedEntity;
import com.google.transit.realtime.GtfsRealtime.FeedMessage;
import com.google.transit.realtime.GtfsRealtime.TripUpdate;
Expand All @@ -37,6 +37,12 @@ public class GtfsRealtimeFileTripUpdateSource implements TripUpdateSource, JsonC

private File file;

/**
* True iff the last list with updates represent all updates that are active right now, i.e. all
* previous updates should be disregarded
*/
private boolean fullDataset = true;

/**
* Default agency id that is used for the trip ids in the TripUpdates
*/
Expand All @@ -53,14 +59,26 @@ public List<TripUpdate> getUpdates() {
FeedMessage feedMessage = null;
List<FeedEntity> feedEntityList = null;
List<TripUpdate> updates = null;
fullDataset = true;
try {
InputStream is = new FileInputStream(file);
if (is != null) {
// Decode message
feedMessage = FeedMessage.PARSER.parseFrom(is);
feedEntityList = feedMessage.getEntityList();

// Change fullDataset value if this is an incremental update
if (feedMessage.hasHeader()
&& feedMessage.getHeader().hasIncrementality()
&& feedMessage.getHeader().getIncrementality()
.equals(GtfsRealtime.FeedHeader.Incrementality.DIFFERENTIAL)) {
fullDataset = false;
}

// Create List of TripUpdates
updates = new ArrayList<TripUpdate>(feedEntityList.size());
for (FeedEntity feedEntity : feedEntityList) {
updates.add(feedEntity.getTripUpdate());
if (feedEntity.hasTripUpdate()) updates.add(feedEntity.getTripUpdate());
}
}
} catch (Exception e) {
Expand All @@ -69,12 +87,17 @@ public List<TripUpdate> getUpdates() {
return updates;
}

@Override
public boolean getFullDatasetValueOfLastUpdates() {
return fullDataset;
}

public String toString() {
return "GtfsRealtimeFileTripUpdateSource(" + file + ")";
}

@Override
public String getAgencyId() {
return this.agencyId;
}
@Override
public String getAgencyId() {
return this.agencyId;
}
}
Expand Up @@ -16,15 +16,16 @@ the License, or (at your option) any later version.
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.prefs.Preferences;

import com.fasterxml.jackson.databind.JsonNode;

import org.opentripplanner.updater.JsonConfigurable;
import org.opentripplanner.routing.graph.Graph;
import org.opentripplanner.util.HttpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.transit.realtime.GtfsRealtime;
import com.google.transit.realtime.GtfsRealtime.FeedEntity;
import com.google.transit.realtime.GtfsRealtime.FeedMessage;
import com.google.transit.realtime.GtfsRealtime.TripUpdate;
Expand All @@ -33,6 +34,12 @@ public class GtfsRealtimeHttpTripUpdateSource implements TripUpdateSource, JsonC
private static final Logger LOG =
LoggerFactory.getLogger(GtfsRealtimeHttpTripUpdateSource.class);

/**
* True iff the last list with updates represent all updates that are active right now, i.e. all
* previous updates should be disregarded
*/
private boolean fullDataset = true;

/**
* Default agency id that is used for the trip ids in the TripUpdates
*/
Expand All @@ -55,14 +62,26 @@ public List<TripUpdate> getUpdates() {
FeedMessage feedMessage = null;
List<FeedEntity> feedEntityList = null;
List<TripUpdate> updates = null;
fullDataset = true;
try {
InputStream is = HttpUtils.getData(url);
if (is != null) {
// Decode message
feedMessage = FeedMessage.PARSER.parseFrom(is);
feedEntityList = feedMessage.getEntityList();

// Change fullDataset value if this is an incremental update
if (feedMessage.hasHeader()
&& feedMessage.getHeader().hasIncrementality()
&& feedMessage.getHeader().getIncrementality()
.equals(GtfsRealtime.FeedHeader.Incrementality.DIFFERENTIAL)) {
fullDataset = false;
}

// Create List of TripUpdates
updates = new ArrayList<TripUpdate>(feedEntityList.size());
for (FeedEntity feedEntity : feedEntityList) {
updates.add(feedEntity.getTripUpdate());
if (feedEntity.hasTripUpdate()) updates.add(feedEntity.getTripUpdate());
}
}
} catch (Exception e) {
Expand All @@ -71,12 +90,17 @@ public List<TripUpdate> getUpdates() {
return updates;
}

@Override
public boolean getFullDatasetValueOfLastUpdates() {
return fullDataset;
}

public String toString() {
return "GtfsRealtimeHttpUpdateStreamer(" + url + ")";
}

@Override
public String getAgencyId() {
return this.agencyId;
}
@Override
public String getAgencyId() {
return this.agencyId;
}
}
Expand Up @@ -15,8 +15,6 @@ the License, or (at your option) any later version.

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.prefs.Preferences;

import com.fasterxml.jackson.databind.JsonNode;
import org.opentripplanner.updater.JsonConfigurable;
import org.opentripplanner.routing.graph.Graph;
Expand Down Expand Up @@ -150,11 +148,12 @@ public void run(Graph graph) {
public void runPolling() {
// Get update lists from update source
List<TripUpdate> updates = updateSource.getUpdates();
boolean fullDataset = updateSource.getFullDatasetValueOfLastUpdates();

if (updates != null && updates.size() > 0) {
// Handle trip updates via graph writer runnable
TripUpdateGraphWriterRunnable runnable =
new TripUpdateGraphWriterRunnable(updates, agencyId);
new TripUpdateGraphWriterRunnable(fullDataset, updates, agencyId);
updaterManager.execute(runnable);
}
}
Expand Down
Expand Up @@ -170,10 +170,12 @@ private TimetableSnapshot getTimetableSnapshot(boolean force) {
* feed when matching IDs.
*
* @param graph graph to update (needed for adding/changing stop patterns)
* @param fullDataset true iff the list with updates represent all updates that are active right
* now, i.e. all previous updates should be disregarded
* @param updates GTFS-RT TripUpdate's that should be applied atomically
* @param feedId
*/
public void applyTripUpdates(Graph graph, final List<TripUpdate> updates, final String feedId) {
public void applyTripUpdates(Graph graph, boolean fullDataset, final List<TripUpdate> updates, final String feedId) {
if (updates == null) {
LOG.warn("updates is null");
return;
Expand All @@ -183,6 +185,11 @@ public void applyTripUpdates(Graph graph, final List<TripUpdate> updates, final
bufferLock.lock();

try {
if (fullDataset) {
// Remove all updates from the buffer
buffer.clear();
}

LOG.debug("message contains {} trip updates", updates.size());
int uIndex = 0;
for (TripUpdate tripUpdate : updates) {
Expand Down
Expand Up @@ -27,19 +27,26 @@ the License, or (at your option) any later version.
public class TripUpdateGraphWriterRunnable implements GraphWriterRunnable {
private static Logger LOG = LoggerFactory.getLogger(TripUpdateGraphWriterRunnable.class);

/**
* True iff the list with updates represent all updates that are active right now, i.e. all
* previous updates should be disregarded
*/
private final boolean fullDataset;

/**
* The list with updates to apply to the graph
*/
private final List<TripUpdate> updates;

private final String feedId;

public TripUpdateGraphWriterRunnable(final List<TripUpdate> updates, final String feedId) {
public TripUpdateGraphWriterRunnable(final boolean fullDataset, final List<TripUpdate> updates, final String feedId) {
// Preconditions
Preconditions.checkNotNull(updates);
Preconditions.checkNotNull(feedId);

// Set fields
this.fullDataset = fullDataset;
this.updates = updates;
this.feedId = feedId;
}
Expand All @@ -49,7 +56,7 @@ public void run(Graph graph) {
// Apply updates to graph using realtime snapshot source
TimetableSnapshotSource snapshotSource = graph.timetableSnapshotSource;
if (snapshotSource != null) {
snapshotSource.applyTripUpdates(graph, updates, feedId);
snapshotSource.applyTripUpdates(graph, fullDataset, updates, feedId);
} else {
LOG.error("Could not find realtime data snapshot source in graph."
+ " The following updates are not applied: {}", updates);
Expand Down
Expand Up @@ -24,6 +24,12 @@ public interface TripUpdateSource {
* or null if an exception occurred while processing the message
*/
public List<TripUpdate> getUpdates();

/**
* @return true iff the last list with updates represent all updates that are active right
* now, i.e. all previous updates should be disregarded
*/
public boolean getFullDatasetValueOfLastUpdates();

public String getAgencyId();
}
Expand Up @@ -16,9 +16,8 @@ the License, or (at your option) any later version.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.prefs.Preferences;

import com.fasterxml.jackson.databind.JsonNode;

import org.opentripplanner.routing.graph.Graph;
import org.opentripplanner.updater.GraphUpdater;
import org.opentripplanner.updater.GraphUpdaterManager;
Expand All @@ -28,6 +27,7 @@ the License, or (at your option) any later version.
import org.slf4j.LoggerFactory;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.transit.realtime.GtfsRealtime;
import com.google.transit.realtime.GtfsRealtime.FeedEntity;
import com.google.transit.realtime.GtfsRealtime.FeedMessage;
import com.google.transit.realtime.GtfsRealtime.TripUpdate;
Expand Down Expand Up @@ -173,10 +173,21 @@ public void onMessage(byte[] message) {
FeedMessage feedMessage = null;
List<FeedEntity> feedEntityList = null;
List<TripUpdate> updates = null;
boolean fullDataset = true;
try {
// Decode message into List of TripUpdates
// Decode message
feedMessage = FeedMessage.PARSER.parseFrom(message);
feedEntityList = feedMessage.getEntityList();

// Change fullDataset value if this is an incremental update
if (feedMessage.hasHeader()
&& feedMessage.getHeader().hasIncrementality()
&& feedMessage.getHeader().getIncrementality()
.equals(GtfsRealtime.FeedHeader.Incrementality.DIFFERENTIAL)) {
fullDataset = false;
}

// Create List of TripUpdates
updates = new ArrayList<TripUpdate>(feedEntityList.size());
for (FeedEntity feedEntity : feedEntityList) {
if (feedEntity.hasTripUpdate()) updates.add(feedEntity.getTripUpdate());
Expand All @@ -187,7 +198,8 @@ public void onMessage(byte[] message) {

if (updates != null && updates.size() > 0) {
// Handle trip updates via graph writer runnable
TripUpdateGraphWriterRunnable runnable = new TripUpdateGraphWriterRunnable(updates, feedId);
TripUpdateGraphWriterRunnable runnable = new TripUpdateGraphWriterRunnable(
fullDataset, updates, feedId);
updaterManager.execute(runnable);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/org/opentripplanner/GtfsTest.java
Expand Up @@ -88,14 +88,15 @@ protected void setUp() {
alertsUpdateHandler.setDefaultAgencyId("MMRI");

try {
final boolean fullDataset = false;
InputStream inputStream = new FileInputStream(gtfsRealTime);
FeedMessage feedMessage = FeedMessage.PARSER.parseFrom(inputStream);
List<FeedEntity> feedEntityList = feedMessage.getEntityList();
List<TripUpdate> updates = new ArrayList<TripUpdate>(feedEntityList.size());
for (FeedEntity feedEntity : feedEntityList) {
updates.add(feedEntity.getTripUpdate());
}
timetableSnapshotSource.applyTripUpdates(graph, updates, agencyId);
timetableSnapshotSource.applyTripUpdates(graph, fullDataset, updates, agencyId);
alertsUpdateHandler.update(feedMessage);
} catch (Exception exception) {}
}
Expand Down

0 comments on commit 84b7ec0

Please sign in to comment.