In [1]:
import pandas as pd
import numpy as np
# GTFS protobuf wrapper was generated using protoc and https://developers.google.com/transit/gtfs-realtime/gtfs-realtime.proto
import gtfs_realtime_pb2
# The NYCT extensions to GTFS are also compiled from this protobuf: http://datamine.mta.info/sites/all/files/pdfs/nyct-subway.proto.txt
import nyct_subway_pb2
import folium #pip
from tqdm import trange
import requests
import copy
import urllib
from sqlalchemy import create_engine

In [2]:
with open('./mta_key.txt', 'r') as fil:
    KEY = fil.read().strip()
# The following is from: http://datamine.mta.info/list-of-feeds
lines = {
    '123456S': 1,
    'ACEHS': 26,
    'NQRW': 16,
    'BDFM': 21,
    'L': 2,
    'G': 31,
    'JZ': 36,
    '7': 51,
}    
# From wikipedia: https://en.wikipedia.org/wiki/New_York_City_Subway_nomenclature
line_colors = {
    'ACE': '#2850ad',
    'BDFM': '#ff6319',
    'G': '#6cbe45',
    'L': '#a7a9ac',
    'JZ': '#996633',
    'NQRW': '#fccc0a',
    '123': '#ee352e',
    '456': '#00933c',
    '7': '#b933ad',
    'S': '#808183',
}
def find_color(line_char):
    for line in line_colors:
        if line_char in line:
            return line_colors[line]
BASEURL = 'http://datamine.mta.info/mta_esi.php?key=%s&feed_id=%d'

# Parsing the Static MTA data

Before we can make heads or tail of the realtime MTA data, we need to learn some things about the available routes and stops. We can get that data from the MTA's GTFS schedule data:
```
http://web.mta.info/developers/data/nyct/subway/google_transit.zip
```
That's been unpacked to the ```metadata/``` folder next to this notebook. The first order of business is knowing what stops are available on each route and how to display them properly.

## Create a map with subway routes

The ```shapes.txt``` dataset contains geometry for every subway route. Nice! However it looks like it's a rather verbose dataset - each unique trip is listed out seperately, and there is a lot of overlap.

Before we coalesce the data, Let's plot up a couple of lines. We'll be using Folium to create an interactive map and populate it with polygon data from ```shapes.txt```. To demonstrate the overlap, we'll offset each route on the same train line by a little bit:

In [3]:
shapes = pd.read_csv('metadata/shapes.txt')
shapes.head()

Unnamed: 0,shape_id,shape_pt_lat,shape_pt_lon,shape_pt_sequence,shape_dist_traveled
0,1..N03R,40.702068,-74.013664,0,
1,1..N03R,40.703199,-74.014792,1,
2,1..N03R,40.703226,-74.01482,2,
3,1..N03R,40.703253,-74.014846,3,
4,1..N03R,40.70328,-74.01487,4,


In [4]:
shapes[shapes.shape_id.str.startswith('E.')].shape_id.unique()

array(['E..N05R', 'E..N55R', 'E..N66R', 'E..N70R', 'E..N72R', 'E..S04R',
       'E..S56R', 'E..S69R', 'E..S71R'], dtype=object)

In [5]:
def overlay_route(line, m, offset=0.0):
    idx = 0
    for shape_id in shapes[shapes.shape_id.str.startswith(line+'.')].shape_id.unique():
        folium.PolyLine(
            zip(shapes[shapes.shape_id==shape_id].shape_pt_lat+idx*offset,
                shapes[shapes.shape_id==shape_id].shape_pt_lon+idx*offset),
            color=find_color(line), opacity=0.4, popup=urllib.quote(shape_id)).add_to(m)
        idx += 1

In [6]:
m = folium.Map(location=[shapes.shape_pt_lat.mean(), shapes.shape_pt_lon.mean()],
               tiles='CartoDB positron', zoom_start=12)
overlay_route('A', m, 0.0001)
overlay_route('F', m, 0.0001)
overlay_route('G', m, 0.0001)
overlay_route('R', m, 0.0001)
m

## Route Coalesce: eliminate redundant routes

Let's try and reduce each train line into a single unique path. So let's try to remove trip IDs that are completely contained within another trip

In [7]:
remove = []
drop_idxes = pd.Int64Index([])
routes = shapes.shape_id.unique()
for i in trange(len(routes)):
    route = routes[i]
    for other_route in shapes[shapes.shape_id.str.startswith(route[0])].shape_id.unique():
        if other_route == route:
            continue
        if other_route in remove:
            continue
        latcommon = np.setdiff1d(shapes[shapes.shape_id==route].shape_pt_lat,
                                 shapes[shapes.shape_id==other_route].shape_pt_lat)
        loncommon = np.setdiff1d(shapes[shapes.shape_id==route].shape_pt_lon,
                                 shapes[shapes.shape_id==other_route].shape_pt_lon)
        if len(latcommon) == 0 and len(loncommon) == 0:
            # print route+" is contained within "+other_route
            remove.append(route)
            drop_idxes = np.concatenate((drop_idxes, shapes[shapes.shape_id==route].index))
            break
print "Removing %d duplicate routes" % (len(remove))
shapes = shapes.drop(drop_idxes)

100%|██████████| 222/222 [00:25<00:00,  8.65it/s]

Removing 189 duplicate routes





In [8]:
shapes.shape_id.unique()

array(['1..S05R', '2..S03R', '2..S08R', '3..S03R', '4..S40R', '5..S04R',
       '5..S09R', '5..S15R', '5..S18R', '6..S52X010', '7..S98R', 'A..S55R',
       'A..S68R', 'A..S77R', 'B..S45R', 'C..S04R', 'D..S14R', 'E..S04R',
       'E..S56R', 'E..S71R', 'F..S69R', 'FS.S01R', 'G..S13R', 'GS.S04R',
       'H..S21R', 'J..S16R', 'L..S01R', 'M..S20R', 'N..S20R', 'N..S36R',
       'Q..S55R', 'R..S93R', 'SI.S31R'], dtype=object)

### Our own sweet subway map!

Now the shapes dataset is much more manageable, we can essentially generate our own subway map by overlaying all of the common NYC routes:

In [9]:
m = folium.Map(location=[shapes.shape_pt_lat.mean(), shapes.shape_pt_lon.mean()],
               tiles='CartoDB positron', zoom_start=12)
for train in 'ACEBDFM123456NQRWJLZG':
    overlay_route(train, m)
m

## An even sweeter map would have stops listed too

Let's look at the GTFS dataset containing stops. Similarly, we will need to reduce the dataset to reduce information that's duplicative to us. It looks as though reducing the set to when ```location_type == 1``` should do the trick.

In [10]:
stops = pd.read_csv('metadata/stops.txt')
stops = stops[stops.location_type==1]
stops.index = pd.Int64Index(range(len(stops)))
del stops['location_type']
del stops['stop_url']
del stops['zone_id']
del stops['stop_desc']
del stops['stop_code']
del stops['parent_station']
stops.head()

Unnamed: 0,stop_id,stop_name,stop_lat,stop_lon
0,101,Van Cortlandt Park - 242 St,40.889248,-73.898583
1,103,238 St,40.884667,-73.90087
2,104,231 St,40.878856,-73.904834
3,106,Marble Hill - 225 St,40.874561,-73.909831
4,107,215 St,40.869444,-73.915279


In [11]:
def overlay_stops(line, m):
    idx = 0
    for idx, stop in stops[stops.stop_id.str.startswith(line)].iterrows():
        folium.features.Circle(location=[stop.stop_lat, stop.stop_lon],
                               radius=35, popup=urllib.quote(stop.stop_name), fill=True,
                               color=find_color(line), opacity=0.6, fill_opacity=0.2).add_to(m)
def get_stop_by_name(stop):
    stop = stop[:3]
    ret = stops[stops.stop_id == stop].iloc[0]
    return ret

In [12]:
m = folium.Map(location=[shapes.shape_pt_lat.mean(), shapes.shape_pt_lon.mean()-0.04],
               tiles='CartoDB positron', zoom_start=14)
for train in 'ACEBDFM123456NQRWJLZG':
    overlay_route(train, m)
    overlay_stops(train, m)
m

## Parsing Realtime MTA Data

Ok, now we're ready to try to make sense of the realtime MTA data. Our goal will be to determine train position from the live schedule data. The MTA API is essentially an HTML endpoint (URL listed above ```BASEURL```) that serves up protobufs. The protobufs are generally in Google's GTFS Live format (```gtfw_realtime.proto```), but it also includes extensions unique to the MTA (```nyct_subway.proto```). By combining these two protobuf specifications, we can decode the binary data served up by the API endpoint.

Let's look around. There's a ```vehicle``` field that correlates to the ```VehiclePosition``` message from the NYCT GTFS extension. One would think you could derive position from this message, but it tends to only report on the vehicle ID and direction. ```current_stop_sequence``` looks like a tempting field, but we'll look at it later.

In [13]:
feed = gtfs_realtime_pb2.FeedMessage()
response = requests.get(BASEURL % (KEY, lines['BDFM']))
feed.ParseFromString(response.content)

52986

In [14]:
for entity in feed.entity:
    if entity.HasField('vehicle') and entity.vehicle.current_status == 1:
        print entity
        break
    
for entity in feed.entity:
    if entity.HasField('vehicle') and entity.vehicle.current_status != 1:
        print entity
        break

id: "22000002"
vehicle {
  trip {
    trip_id: "070150_D..S"
    start_date: "20180701"
    route_id: "D"
    [nyct_trip_descriptor] {
      train_id: "1D 1141+ BPK/STL"
      is_assigned: true
      direction: SOUTH
    }
  }
  current_stop_sequence: 33
  current_status: STOPPED_AT
  timestamp: 1530465309
}

id: "22000036"
vehicle {
  trip {
    trip_id: "079900_D..S"
    start_date: "20180701"
    route_id: "D"
    [nyct_trip_descriptor] {
      train_id: "1D 1319 BPK/STL"
      is_assigned: false
      direction: SOUTH
    }
  }
  current_stop_sequence: 0
  current_status: IN_TRANSIT_TO
  timestamp: 1530465313
}



### Heuristics: Determining train position from it's schedule

So what do we know about trains that are in-transit? Our first clues should be from the MTA documentation:
```
A VehiclePosition entity is provided for every trip when it starts moving. Note that a train can be
assigned (see TripUpdate) but has not started to move (e.g. a train waiting to leave the origin station),
therefore, no VehiclePosition is provided.

Usage notes:

The motivation to include VehiclePosition is to provide the timestamp field. This is the time of the last
detected movement of the train. This allows feed consumers to detect the situation when a train stops
moving (aka stalled). The platform countdown clocks only count down when trains are moving
otherwise they persist the last published arrival time for that train. If one wants to mimic this
behavior you must first determine the absence of movement (stalled train condition) ), then the
countdown must be stopped.

As an example, a countdown could be stopped for a trip when the difference between the timestamp in
the VehiclePosition and the timestamp in the field header is greater than, 90 seconds.

Note: since VehiclePosition information is not provided until the train starts moving, it is recommended
that feed consumers use the origin terminal departure to determine a train stalled condition. 
```
It may be tempting to rely on ```current_stop_sequence``` as an indicator for train position. However we'll show below that ```current_stop_sequence``` isn't often very helpful.

The key takeaway is that the timestamp should be a primary clue to real train position. However another strong source of information should be the ```StopTimeUpdate```  - a list of predicted arrival times for a given train. It's probably safe to assume that any in-transit trains are somewhere between the stop they just left and the next predicted arrival. Plan of attack:

* Retrieve the ```timestamp```, ```current_stop_sequence``` and ```current_status``` from each ```VehiclePosition``` message
* Retrieve the current stop and ETA from the ```TripUpdate``` message. Also attempt to grab the next stop and it's ETA
* Store both of those things along with additional metadata in a DF called "trips", keyed by trip ID and retrieval time: This way we can look at how trips change over time.
* "trips" should be enough to infer where trains are in the system, but we also have access to a lot of other train arrival data. We can store that stuff in "updates". maybe it'll come in handy later.

In [15]:
def generate_trip_info(trip):
    nyct_trip = trip.Extensions[nyct_subway_pb2.nyct_trip_descriptor]
    data = {}
    data['trip_id'] = trip.trip_id
    data['is_assigned'] = nyct_trip.is_assigned
    data['train_id'] = nyct_trip.train_id
    data['direction'] = nyct_trip.direction
    data['route_id'] = trip.route_id
    data['alert'] = False
    # If vehicle data is never reported, let's set some defaults
    data['timestamp'] = -1
    data['current_stop_sequence'] = -1
    data['current_status'] = -1
    return data

def copy_into_trips(trips, trip_data, feed_query_time):
    tid = trip_data['trip_id']+"_"+str(feed_query_time)
    if tid not in trips:
        trips[tid] = trip_data
    else:
        trips[tid].update(trip_data)
    return trips

def parse_feed(feed):
    unknown = []
    trips_raw = {}
    updates_raw = {}
    feed_query_time = feed.header.timestamp
    for idx in range(len(feed.entity)):
        entity = feed.entity[idx]
        if entity.HasField("vehicle"):
            data = generate_trip_info(entity.vehicle.trip)
            data['timestamp'] = entity.vehicle.timestamp
            data['current_stop_sequence'] = entity.vehicle.current_stop_sequence
            data['current_status'] = entity.vehicle.current_status
            trips_raw = copy_into_trips(trips_raw, data, feed_query_time)
        elif entity.HasField("trip_update"):
            data = generate_trip_info(entity.trip_update.trip)
            for update in entity.trip_update.stop_time_update:
                trip_update = copy.deepcopy(data)
                trip_update['arrival'] = update.arrival.time
                trip_update['departure'] = update.departure.time
                trip_update['stop'] = update.stop_id
                trip_update['schedule_relationship'] = update.schedule_relationship
                nyct_update = update.Extensions[nyct_subway_pb2.nyct_stop_time_update]
                trip_update['scheduled_track'] = nyct_update.scheduled_track
                trip_update['actual_track'] = nyct_update.actual_track
                updates_raw[trip_update['trip_id']+"_"+trip_update['stop']+"_"+str(feed_query_time)] = trip_update
            data['curr_stop'] = entity.trip_update.stop_time_update[0].stop_id
            data['curr_stop_time'] = entity.trip_update.stop_time_update[0].departure.time
            if len(entity.trip_update.stop_time_update) > 1:
                data['next_stop'] = entity.trip_update.stop_time_update[1].stop_id
                data['next_stop_time'] = entity.trip_update.stop_time_update[1].departure.time
            else:
                data['next_stop'] = data['curr_stop']
                data['next_stop_time'] = data['curr_stop_time']
            trips_raw = copy_into_trips(trips_raw, data, feed_query_time)
        elif entity.HasField("alert"):
            for selector in entity.alert.informed_entity:
                data = generate_trip_info(selector.trip)
                data['alert'] = True
                trips_raw = copy_into_trips(trips_raw, data, feed_query_time)
                # Really not much else we know here, soooo
        else:
            unknown.append(entity.SerializeToString())
    trips = pd.DataFrame(index=trips_raw.keys(), data=trips_raw.values()).sort_values('curr_stop_time')
    trips['at_station'] = (trips.timestamp == trips.curr_stop_time)
    trips['progress'] = ((trips.timestamp - trips.curr_stop_time)/(trips.next_stop_time - trips.curr_stop_time))
    updates = pd.DataFrame(index=updates_raw.keys(), data=updates_raw.values()).sort_values('departure')
    return trips, updates, unknown
trips, updates, unknown = parse_feed(feed)

In [16]:
trips.head()

Unnamed: 0,alert,curr_stop,curr_stop_time,current_status,current_stop_sequence,direction,is_assigned,next_stop,next_stop_time,route_id,timestamp,train_id,trip_id,at_station,progress
077150_F..S_1530465329,False,G08S,1530464899,1,7,3,True,G14S,1530465373,F,1530465310,1F 1251+ 179/STL,077150_F..S,False,0.867089
072956_D..N_1530465329,False,A24N,1530465029,1,22,1,True,A15N,1530465388,D,1530465309,1D 1209+ STL/205,072956_D..N,False,0.779944
068750_F..S_1530465329,False,D43S,1530465032,1,46,3,True,D43S,1530465032,F,1530465309,1F 1127+ 179/STL,068750_F..S,False,inf
076100_F..S_1530465329,False,G14S,1530465144,1,8,3,True,G21S,1530465424,F,1530465309,1F 1239+ 179/STL,076100_F..S,False,0.589286
076716_M..N_1530465329,False,M18N,1530465219,1,12,1,True,M18N,1530465219,M,1530465219,1M 1247 MET/ESX,076716_M..N,True,


In [17]:
updates[updates.trip_id==trips.iloc[4].trip_id].head()

Unnamed: 0,actual_track,alert,arrival,current_status,current_stop_sequence,departure,direction,is_assigned,route_id,schedule_relationship,scheduled_track,stop,timestamp,train_id,trip_id
076716_M..N_M18N_1530465329,JM,False,1530465219,-1,-1,1530465219,1,True,M,0,JM,M18N,-1,1M 1247 MET/ESX,076716_M..N


## Maybe current_status is a lie?

Something odd is up with trains that list current_status as IN_TRANSIT_TO (2). None of them list a ```current_stop_sequence```, and all of them seem to be at the start or end of the line. It almost looks as though IN_TRANSIT_TO is used specially within the MTA to indicate trains that are not yet in service, or are traveling to become a part of service.

However it looks as though we might be able to examine the arrival estimates to predict if a train is in-between platforms, and roughly where it's located. We can create a column called ```at_station``` that checks to see if the current train timestamp matches the arrival time estimate. If the train timestamp falls in-between the estimates of two stops, we can assume that the train is traveling between those two stops. The ```progress``` column attempts to estimate the position by doing a simple linear fit between the two points.

In [18]:
trips[trips.current_status == 2].head()

Unnamed: 0,alert,curr_stop,curr_stop_time,current_status,current_stop_sequence,direction,is_assigned,next_stop,next_stop_time,route_id,timestamp,train_id,trip_id,at_station,progress
079900_D..S_1530465329,False,D03S,1530465540,2,0,3,False,D04S,1530465660,D,1530465313,1D 1319 BPK/STL,079900_D..S,False,-1.891667
080100_D..N_1530465329,False,D43N,1530465660,2,0,1,False,B23N,1530465900,D,1530465313,1D 1321 STL/205,080100_D..N,False,-1.445833
080250_M..S_1530465329,False,M18S,1530465750,2,0,3,False,M16S,1530466260,M,1530465313,1M 1322+ ESX/MET,080250_M..S,False,-0.856863
080250_F..N_1530465329,False,D43N,1530465750,2,0,1,False,D42N,1530465840,F,1530465313,1F 1322+ STL/179,080250_F..N,False,-4.855556
080300_M..N_1530465329,False,M01N,1530465780,2,0,1,False,M04N,1530465900,M,1530465313,1M 1323 MET/ESX,080300_M..N,False,-3.891667


In [19]:
trips[trips.curr_stop_time != trips.timestamp][['at_station', 'curr_stop','curr_stop_time',
                                                'next_stop','next_stop_time','timestamp', 'progress']].head()

Unnamed: 0,at_station,curr_stop,curr_stop_time,next_stop,next_stop_time,timestamp,progress
077150_F..S_1530465329,False,G08S,1530464899,G14S,1530465373,1530465310,0.867089
072956_D..N_1530465329,False,A24N,1530465029,A15N,1530465388,1530465309,0.779944
068750_F..S_1530465329,False,D43S,1530465032,D43S,1530465032,1530465309,inf
076100_F..S_1530465329,False,G14S,1530465144,G21S,1530465424,1530465309,0.589286
077578_D..S_1530465329,False,D13S,1530465259,A15S,1530465392,1530465309,0.37594


In [20]:
stops[stops.stop_id.str.startswith(trips[trips.curr_stop_time != trips.timestamp].curr_stop[0][0])]

Unnamed: 0,stop_id,stop_name,stop_lat,stop_lon
320,G05,Jamaica Center - Parsons/Archer,40.702147,-73.801109
321,G06,Sutphin Blvd - Archer Av - JFK Airport,40.700486,-73.807969
322,G07,Jamaica - Van Wyck,40.702566,-73.816859
323,G08,Forest Hills - 71 Av,40.721691,-73.844521
324,G09,67 Av,40.726523,-73.852719
325,G10,63 Dr - Rego Park,40.729846,-73.861604
326,G11,Woodhaven Blvd,40.733106,-73.869229
327,G12,Grand Av - Newtown,40.737015,-73.877223
328,G13,Elmhurst Av,40.742454,-73.882017
329,G14,Jackson Hts - Roosevelt Av,40.746644,-73.891338


# Pulling it all together: Let's draw ourselves a real-time subway map!

All the base building blocks are there - we'll iterate across all of the line endpoints, sum up all of the in-progress trips, and plot them all on a map.

In [21]:
def overlay_trains(trips, m):
    idx = 0
    for idx, trip in trips.iterrows():
        stop = get_stop_by_name(trip.curr_stop)
        folium.features.Circle(location=[stop.stop_lat, stop.stop_lon],
                               radius=20, popup=urllib.quote(trip.trip_id), fill=True,
                               color=find_color(trip.route_id), opacity=0.0, fill_opacity=0.9).add_to(m)

alltrips = []
allupdates = []
rawfeeds = {}
for line in lines:
    feed = gtfs_realtime_pb2.FeedMessage()
    response = requests.get(BASEURL % (KEY, lines[line]))
    feed.ParseFromString(response.content)
    rawfeeds[line] = feed
    trips, updates, _ = parse_feed(feed)
    alltrips.append(trips)
    allupdates.append(updates)
trips = pd.concat(alltrips)
updates = pd.concat(allupdates)

In [24]:
def overlay_trains(trips, m):
    idx = 0
    for idx, trip in trips.iterrows():
        try:
            stop = get_stop_by_name(trip.curr_stop)
            folium.features.Circle(location=[stop.stop_lat, stop.stop_lon],
                                   radius=50, popup=urllib.quote(trip.trip_id), fill=True,
                                   color=find_color(trip.route_id), opacity=0.0, fill_opacity=0.9).add_to(m)
        except:
            continue

m = folium.Map(location=[shapes.shape_pt_lat.mean(), shapes.shape_pt_lon.mean()-0.05],
               tiles='CartoDB positron', zoom_start=14)
for line in 'ACEBDFM123456NQRLG':
    overlay_route(line, m)
    overlay_stops(line, m)
    overlay_trains(trips[trips.route_id == line], m)
m

In [23]:
#engine = create_engine('postgresql://subway:subway@127.0.0.1:5432/subway',echo=False)
#trips.to_sql('trips', engine, if_exists='append')