Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

remove (native code) ZeroMQ dependency

  • Loading branch information...
commit f5fe4e15d89e45ab989ea745045bed49eef5fe1d 1 parent b2e3320
@abyrd abyrd authored
View
5 opentripplanner-updater/pom.xml
@@ -26,11 +26,6 @@
<dependencies>
<dependency>
- <groupId>org.zeromq</groupId>
- <artifactId>jzmq</artifactId>
- <version>1.0.0</version>
- </dependency>
- <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>opentripplanner-routing</artifactId>
<version>${project.version}</version>
View
56 ...ner-updater/src/main/java/org/opentripplanner/updater/stoptime/GTFSZMQUpdateStreamer.java
@@ -1,56 +0,0 @@
-package org.opentripplanner.updater.stoptime;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.onebusaway.gtfs.model.AgencyAndId;
-import org.opentripplanner.routing.trippattern.Update;
-import org.opentripplanner.routing.trippattern.UpdateList;
-
-import com.google.transit.realtime.GtfsRealtime;
-import com.google.transit.realtime.GtfsRealtime.FeedEntity;
-import com.google.transit.realtime.GtfsRealtime.FeedHeader;
-import com.google.transit.realtime.GtfsRealtime.FeedMessage;
-import com.google.transit.realtime.GtfsRealtime.TripUpdate;
-import com.google.transit.realtime.GtfsRealtime.TripUpdate.StopTimeUpdate;
-
-public class GTFSZMQUpdateStreamer {
-
- private static final File file = new File("/var/otp/data/nl/gtfs-rt.protobuf");
-
- public UpdateList getUpdates() {
- try {
- InputStream is = new FileInputStream(file);
- FeedMessage feed = GtfsRealtime.FeedMessage.parseFrom(is);
- // System.out.println(feed);
- FeedHeader header = feed.getHeader();
- long timestamp = header.getTimestamp();
- UpdateList updates = new UpdateList(null);
- for (FeedEntity entity : feed.getEntityList()) {
- System.out.println(entity);
- TripUpdate tUpdate = entity.getTripUpdate();
- String trip = tUpdate.getTrip().getTripId();
- AgencyAndId tripId = new AgencyAndId("agency", trip);
- for (StopTimeUpdate sUpdate : tUpdate.getStopTimeUpdateList()) {
- Update u = new Update(tripId,
- sUpdate.getStopId(),
- sUpdate.getStopSequence(),
- (int) sUpdate.getArrival().getTime(),
- (int) sUpdate.getDeparture().getTime());
- updates.addUpdate(u);
- }
- }
- return updates;
- } catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return null;
- }
-}
View
111 ...nner-updater/src/main/java/org/opentripplanner/updater/stoptime/KV8ZMQUpdateStreamer.java
@@ -1,111 +0,0 @@
-package org.opentripplanner.updater.stoptime;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.zip.GZIPInputStream;
-
-import org.onebusaway.gtfs.model.AgencyAndId;
-import org.opentripplanner.common.CTX;
-import org.opentripplanner.routing.core.ServiceDay;
-import org.opentripplanner.routing.trippattern.Update;
-import org.opentripplanner.routing.trippattern.UpdateList;
-import org.zeromq.ZFrame;
-import org.zeromq.ZMQ;
-import org.zeromq.ZMsg;
-
-/** StoptimeUpdateStreamer for CTX-encoded Dutch KV8 realtime updates over ZeroMQ */
-public class KV8ZMQUpdateStreamer implements UpdateStreamer {
-
- private ZMQ.Context context;
- private ZMQ.Socket subscriber;
- private int count = 0;
- private String defaultAgencyId = "";
- private String address = "tcp://node01.post.openov.nl:7817";
- private static String feed = "/GOVI/KV8";
-
- public KV8ZMQUpdateStreamer() {
- context = ZMQ.context(1);
- subscriber = context.socket(ZMQ.SUB);
- subscriber.connect(address);
- subscriber.subscribe(feed.getBytes());
- }
-
- public UpdateList getUpdates() {
- ZMsg msg = ZMsg.recvMsg(subscriber);
- try {
- Iterator<ZFrame> msgs = msg.iterator();
- msgs.next();
- ArrayList<Byte> receivedMsgs = new ArrayList<Byte>();
- while (msgs.hasNext()) {
- for (byte b : msgs.next().getData()) {
- receivedMsgs.add(b);
- }
- }
- byte[] fullMsg = new byte[receivedMsgs.size()];
- for (int i = 0; i < fullMsg.length; i++) {
- fullMsg[i] = receivedMsgs.get(i);
- }
- InputStream gzipped = new ByteArrayInputStream(fullMsg);
- InputStream in = new GZIPInputStream(gzipped);
- StringBuffer out = new StringBuffer();
- byte[] b = new byte[4096];
- for (int n; (n = in.read(b)) != -1;) {
- out.append(new String(b, 0, n));
- }
- return parseCTX(out.toString());
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }
-
- public UpdateList parseCTX(String ctxString) {
- System.out.println("CTX MSG " + count++);
- CTX ctx = new CTX(ctxString);
- UpdateList ret = new UpdateList(null); // indicate that updates may have mixed trip IDs
- for (int i = 0; i < ctx.rows.size(); i++) {
- HashMap<String, String> row = ctx.rows.get(i);
- int arrival = secondsSinceMidnight(row.get("ExpectedArrivalTime"));
- int departure = secondsSinceMidnight(row.get("ExpectedDepartureTime"));
- Update u = new Update(
- kv7TripId(row),
- row.get("UserStopCode"),
- Integer.parseInt(row.get("UserStopOrderNumber")),
- arrival, departure);
- ret.addUpdate(u);
- }
- return ret;
- }
-
- /** no good for DST */
- private int secondsSinceMidnight(String hhmmss) {
- String[] time = hhmmss.split(":");
- int hours = Integer.parseInt(time[0]);
- int minutes = Integer.parseInt(time[1]);
- int seconds = Integer.parseInt(time[2]);
- return (hours * 60 + minutes) * 60 + seconds;
- }
-
- /**
- * convert KV7 fields into a GTFS trip_id
- * trip_ids must be data set unique in GTFS, which is why we use the DataOwnerCode (~=agency_id)
- * twice, in the trip_id itself and the enclosing AgencyAndId
- */
- public AgencyAndId kv7TripId (HashMap<String, String> row) {
- String tripId = String.format("%s_%s_%s_%s_%s",
- row.get("DataOwnerCode"),
- row.get("LinePlanningNumber"),
- row.get("LocalServiceLevelCode"),
- row.get("JourneyNumber"),
- row.get("FortifyOrderNumber"));
- return new AgencyAndId(row.get("DataOwnerCode"), tripId);
- }
-
- public void setAddress(String address) {
- this.address = address;
- }
-
-}

12 comments on commit f5fe4e1

@skywave

What exactly do i have to put in application-context.xml to get this running (i have this revert commit re-reverted)?

@abyrd
Owner

You don't need to re-revert the commit. There is a branch called zmqUpdateStreamer that is identical to master, but with the classes that have native code zmq dependencies. I just caught it up to master and re-pushed.

Switch to that branch and re-build.
Then add to your org/opentripplanner/api/application-context.xml:

<bean id="periodicGraphUpdater" class="org.opentripplanner.api.servlet.PeriodicGraphUpdater">
        <property name="updaters">
            <list>
                <!-- stream stop time updates from Dutch KV8 server -->
                <bean class="org.opentripplanner.updater.stoptime.StoptimeUpdater">
                    <property name="updateStreamer">
                        <bean class="org.opentripplanner.updater.stoptime.KV8ZMQUpdateStreamer">
                            <property name="address" value="tcp://node01.post.openov.nl:7817" />
                        </bean>
                    </property>
                </bean>
            </list>
        </property>
    </bean>
@abyrd
Owner

It should start streaming in updates and replacing the trip times in such a way that each search gets a consistent view of each trip it boards. To ensure good trip plans we need to give each search a snapshot of the entire set of trip patterns, or at least of each pattern that it touches (even if it doesn't board). I'm currently working on this, and considering that we can't afford to continue using linear search for next departures, because the promising Raptor algorithm David Turner is working on makes heavy use of those methods.

@skywave

Thanks, i do get a nullpointer very quickly crashing the updater. Is this a known problemen?

2012-08-15 13:37:10,341 DEBUG [StoptimeUpdater.java:71] : pattern found for ARR_ARR_9156_113766_8044_0
2012-08-15 13:37:10,341 DEBUG [TableTripPattern.java:379] : update block did not match stopIds
CTX MSG 7
tripId: EBS_EBS_22109_119343_1058_0
EBS_EBS_22109_119343_1058_0 37400030 13 48987 49004
EBS_EBS_22109_119343_1058_0 37401090 14 49063 49063

2012-08-15 13:37:10,345 DEBUG [StoptimeUpdater.java:71] : pattern found for EBS_EBS_22109_119343_1058_0
2012-08-15 13:37:10,345 DEBUG [UpdateList.java:130] : found matching stop block at index 12
Exception in thread "Thread-3" java.lang.NullPointerException
at org.opentripplanner.routing.trippattern.TripTimes.clone(TripTimes.java:136)
at org.opentripplanner.routing.trippattern.TripTimes.updatedClone(TripTimes.java:116)
at org.opentripplanner.routing.edgetype.TableTripPattern.update(TableTripPattern.java:383)
at org.opentripplanner.updater.stoptime.StoptimeUpdater.run(StoptimeUpdater.java:72)
at org.opentripplanner.api.servlet.PeriodicGraphUpdater$UpdateTask.run(PeriodicGraphUpdater.java:84)
at java.lang.Thread.run(Thread.java:679)

@abyrd
Owner

Not that I know of, but I am in the process of rewriting this anyway. Did you rebuild your graph when changing branches? I'll keep an eye out for such problems.

@skywave

Merged all changes up to now and now i got a different one. This all might have something to do with me loading just a subset of the data, just Amsterdam and surroundings. Except GVB is Amsterdam's city transport agency, so definitely in the data.

2012-08-16 21:05:59,242 DEBUG [StoptimeUpdater.java:75] : pattern found for GVB_GVB_17_134313_366_0
2012-08-16 21:05:59,243 DEBUG [UpdateList.java:130] : found matching stop block at index 14
74640_74830 74848_74922 74940_75013 75031_75102 75120_75198 75216_75262 75280_75334 75352_75405 75423_75469 75487_75557 75575_75607 75625_75662 75680_75720 75720_75821 75839_75882 75900_75962 75980_76067 76085_76122 76140_76182 76200_76262 76280_76328 76346_76423 76441_76500
74640_74830 74848_74922 74940_75013 75031_75102 75120_75198 75216_75262 75280_75334 75352_75405 75423_75469 75487_75557 75575_75607 75625_75662 75680_75720 75720_75944 75944_75975 75975_76018 76018_76079 76085_76122 76140_76182 76200_76262 76280_76328 76346_76423 76441_76500
tripId: GVB_GVB_37_134418_250_0
GVB_GVB_37_134418_250_0 013812 42 75906 75925
GVB_GVB_37_134418_250_0 013592 43 75974 0

2012-08-16 21:05:59,244 DEBUG [StoptimeUpdater.java:75] : pattern found for GVB_GVB_37_134418_250_0
2012-08-16 21:05:59,244 DEBUG [UpdateList.java:130] : found matching stop block at index 41
Exception in thread "Thread-3" java.lang.ArrayIndexOutOfBoundsException: 42
at org.opentripplanner.routing.trippattern.TripTimes.updatedClone(TripTimes.java:121)
at org.opentripplanner.routing.edgetype.TableTripPattern$Timetable.update(TableTripPattern.java:547)
at org.opentripplanner.routing.edgetype.TableTripPattern.update(TableTripPattern.java)
at org.opentripplanner.updater.stoptime.StoptimeUpdater.run(StoptimeUpdater.java:76)
at org.opentripplanner.api.servlet.PeriodicGraphUpdater$UpdateTask.run(PeriodicGraphUpdater.java:84)
at java.lang.Thread.run(Thread.java:679)

@abyrd
Owner

The updater is not going to work right now, I'm still building a graph using your GTFS feed and haven't adapted the updater to the new code.

@skywave

I merged today's changes and it's working :)

@abyrd
Owner

Good to know! I left it running all night here and no crashes. All the basics for better concurrency are in place but I've still got a queue of improvements to make. I want to improve robustness wrt non-increasing stop times, stop block matching, etc. before making any statements that "it works" :)

@skywave

One thing from parsing the CTX seems to be missing. If a KV8/CTX message has TripStopStatus == 'CANCEL', it means the trip is canceled. A trip can be uncanceled if a new message arrives for that trip where tripstopstatus != 'CANCEL'

@abyrd
Owner

Noted. Let's get timetable updates (the more difficult part) working reliably first. Maybe you can help me with a detail: should each KV8 stoptime update message be applied directly to the scheduled trip times for that trip, or are they cumulative? Meaning, if I receive 3 messages numbered 1, 2, and 3 all of which refer to trip XYZ, after receiving all of them should I be planning trips with XYZ+3 or XYZ+1+2+3 ?

@skywave

You need all stops in those 3 messages, you could however merge them into one message by taking the newest stoptime for stoporder x. So if message 1,2,3 contain stoptimes for order 3,4,5, you only need the last message. However if message 2 contains info about orders 3,4,5,6., you need to store the times of stop #6 of that message. Basically a hashmap with key 'trip_id + stop_order' would suffice

Please sign in to comment.
Something went wrong with that request. Please try again.