In [331]:
%load_ext autoreload
%autoreload 2

import re, os, sys
#sys.path.insert(0, os.path.realpath('/content/jupyter/mta-accessibility/notebooks/routing'))
sys.path.insert(0, os.getcwd())
from gcs_utils import gcs_util
import GTFS_Utils as gu

import os.path
from os import listdir
from os.path import isfile, join
from google.transit import gtfs_realtime_pb2 as gtfs_rt
from protobuf_to_dict import protobuf_to_dict
import datetime
import numpy as np
import pandas as pd

from pandas.io.json import json_normalize
from itertools import groupby
import requests
from datetime import datetime
from typing import List, Dict, Tuple

pd.set_option('max_colwidth', 200)
gcs = gcs_util()

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [376]:
import logging

class Plog:    
    def __init__(self):        
        log = logging.getLogger('gtfs_consolidator')
        log.setLevel(logging.DEBUG)

        filename = "consolidator{}.log".format(datetime.now().strftime("%Y%m%d-%I%M%S"))
        # create file handler which logs even debug messages
        fh = logging.FileHandler(filename)
        fh.setLevel(logging.DEBUG)

        # create console handler with a higher log level
        ch = logging.StreamHandler()
        ch.setLevel(logging.ERROR)

        # create formatter and add it to the handlers
        formatter = logging.Formatter('%(asctime)s-%(levelname)s: %(message)s')
        fh.setFormatter(formatter)
        ch.setFormatter(formatter)

        if log.hasHandlers():
            log.handlers.clear()
        # add the handlers to the logger
        log.addHandler(fh)
        log.addHandler(ch)

        self.log_instance = log
                    
    def error(self, *args, **kwargs):
        self.log_instance.error(*args, **kwargs)
    def info(self, *args, **kwargs):
        self.log_instance.info(*args, **kwargs)
    def warn(self, *args, **kwargs):
        self.log_instance.warning(*args, **kwargs)
        
log = Plog()

## GTFS Consolidation

In [377]:

vehicle_columns = ['alert.header_text.translation',
       'current_status', 'current_stop_sequence', 'id', 
 'route_id','start_date', 'start_time', 'stop_id', 'stop_name', 'timestamp','trip_id']

trip_columns = ['alert_header_text_translation',
       'arrival_time', 'departure_time', 'id', 'route_id', 'start_date',
       'start_time', 'stop_id', 'stop_name', 'trip_id']

def validate_hourly(df: pd.DataFrame, msg_type: str, date: str, hour: str):
    """Sanity check GTFS files rolled up hourly
        Checks that the column schema is as expected
        Checks that there are timestamps spanning at least the full hour
        and that there are no large gaps in time
    """
    # Max gap in time between rows.
    MAX_GAP_THRESHOLD = pd.to_timedelta(15, unit='m')
    
    error_msg = []
    
    df_columns = list(df.columns)    
    # Verify column schema
    # Expected columns schema should at least be a subset of actual schema
    if msg_type == 'vehicle_updates':
        if not (all(x in df_columns for x in vehicle_columns)):
            error_msg.append("Vehicle schema incorrect")
    elif msg_type == 'trip_updates':
        if not (all(x in df_columns for x in vehicle_columns)):
            error_msg.append("Trip update schema incorrect")
    else:
        error_msg.append("Unrecognized msg type")
        
    # Verify date range and diff
    expected_beg = pd.Timestamp(f"{date}-{hour}:00:00")
    expected_end = expected_beg + pd.to_timedelta(1, unit='h')
    
    beggining = df.timestamp.min()
    end = df.timestamp.max()
    max_diff = df.timestamp.sort_values().diff().max()
        
#     if not (expected_beg <= beggining and end <= expected_end):
#         error_msg.append(f"Expected [{expected_beg}, {expected_end}], actual [{beggining}, {end}]")
    
    if (max_diff > MAX_GAP_THRESHOLD):
        error_msg.append(f"Maximum diff of {max_diff}, threshold {MAX_GAP_THRESHOLD}")
    
    return error_msg

In [382]:
def parse_blob_name(name: str):
    """Returns msg_type, date, time"""
    pat = re.compile("realtime\/(.*)_([0-9]*)-([0-9]*)")
    return pat.match(name).groups()


blobs_by_date = {}
for blob in gcs.list_blobs('realtime'):
    
    msg_type, date, time = parse_blob_name(blob.name)
    blobs_by_date.setdefault(date, []).append(blob.name)    
    
for key,val in blobs_by_date.items():
    print(f"{key}: {len(val)}")

20200501: 2690
20200502: 2708
20200503: 2720
20200504: 2710
20200505: 2708
20200506: 2720
20200507: 2590
20200512: 74
20200419: 230
20200420: 229
20200422: 57
20200423: 112
20200424: 230
20200425: 230
20200426: 57
20200427: 58
20200428: 230
20200429: 616
20200430: 1073


In [380]:
date = "20200419"

files = blobs_by_date[date]

count = {}
for f in files:
    msg_type, date, time = parse_blob_name(f)
    hour = time[:2]
    c = count.setdefault(hour, 0)
    count[hour] += 1
count

{'03': 57, '07': 58, '08': 58, '09': 57}

False

In [384]:
bdf = pd.DataFrame()
for date, file_lst in blobs_by_date.items():
    veh_by_hour = {}
    upd_by_hour = {}
    
    if date.startswith('202005'):
        continue          
    if date != '20200422':
        continue
        
    # {'vehicle_updates', 'trip_updates'}
    for file in file_lst:
        msg_type, date, time = parse_blob_name(file)
        hour = time[:2]
        if msg_type == "vehicle_updates":
            veh_by_hour.setdefault(hour, []).append(file)
        elif msg_type == "trip_updates":
            upd_by_hour.setdefault(hour, []).append(file)
        else:
            print(f"Unknown type for file {file}")
                            
    log.info(f"{date} {len(file_lst)}")
    
    def consolidate_files(file_lst: List[str]):        
        df = pd.DataFrame()
        for file in file_lst:
            df = gcs.read_dataframe(file)
            df = df.append(df, sort=True).drop_duplicates()
        return df            
    
    def move_files_to_processed(files: List[str]):
        for file in files:
            file_name = file.split("/")[1]
            new_path = f"realtime/processed_df/{file_name}"            
            gcs.move_blob(file, new_path)            
    
    for hour, files in veh_by_hour.items():        
        msg_type = "vehicle_updates"        
        df = consolidate_files(files)        
        errors = validate_hourly(df, msg_type, date, hour)
        output_name = f"realtime/hourly/{msg_type}_{date}_{hour}"
        bdf = df
        if len(errors):
            log.error(f"{output_name} - {errors}")            
        else:            
            gcs.write_dataframe(df, output_name)
            move_files_to_processed(files)
            log.info(f"Uploading {output_name}")            
    
    for hour, files in upd_by_hour.items():
        msg_type = "trip_updates"
        df = consolidate_files(files)
        # TODO Maybe validate these too?
        output_name = f"realtime/hourly/{msg_type}_{date}_{hour}"
        gcs.write_dataframe(df, output_name)
        move_files_to_processed(files)
        log.info(f"Uploading {output_name}")        
                
    
print("Update done")

2020-05-13 14:24:25,453-ERROR: realtime/hourly/vehicle_updates_20200422_03 - ['Maximum diff of 0 days 23:36:17, threshold 0 days 00:15:00']


Update done


In [386]:
print(bdf.timestamp.min(), bdf.timestamp.max())

2020-04-21 04:12:55 2020-04-22 04:57:30


In [388]:
bdf.sort_values('timestamp').head()

Unnamed: 0,alert.header_text.translation,alert.informed_entity,current_status,current_stop_sequence,id,route_id,start_date,start_time,stop_id,stop_name,timestamp,trip_id
12,,,,1.0,76,3,20200422,,127N,Times Sq - 42 St,2020-04-21 04:12:55,146300_3..N42R
25,,,2.0,2.0,72,3,20200422,,123N,72 St,2020-04-22 03:49:12,144200_3..N42R
48,,,1.0,1.0,118,6,20200422,,640N,Brooklyn Bridge - City Hall,2020-04-22 03:53:51,144100_6..N01R
17,,,2.0,49.0,42,2,20200421,,201N,Wakefield - 241 St,2020-04-22 03:56:34,135200_2..N01R
42,,,1.0,22.0,102,4,20200421,,635N,14 St - Union Sq,2020-04-22 03:57:48,139950_4..N01R


In [283]:
files = ['realtime/vehicle_updates_20200417-173153', 'realtime/vehicle_updates_20200417-173258', 'realtime/vehicle_updates_20200417-173402']
for file in files:
    file_name = file.split("/")[1]
    
    print(file_name)

vehicle_updates_20200417-173153
vehicle_updates_20200417-173258
vehicle_updates_20200417-173402


In [40]:
# Split files between vehicle and trip updates
vehicle_updates = []
trip_updates = []

for b in gcs.list_blobs('realtime'):
    if "trip_updates" in b.name:
        trip_updates.append(b)
    elif "vehicle_updates" in b.name:
        vehicle_updates.append(b)    
    else:
        print(f"Found unknown path {b.name}")

In [48]:
unique_dates = set()
files_by_dates = {}

for blob in vehicle_updates[:10]:
    name = blob.name
    date = name.split("-")[0].split("_")[2]
    
    file_lst = files_by_dates.get(date, [])
    file_lst.append(blob.name)      
    files_by_dates[date] = file_lst
    
files_by_dates

{'20200417': ['realtime/vehicle_updates_20200417-173153',
  'realtime/vehicle_updates_20200417-173258',
  'realtime/vehicle_updates_20200417-173402',
  'realtime/vehicle_updates_20200417-173508',
  'realtime/vehicle_updates_20200417-173611',
  'realtime/vehicle_updates_20200417-173715',
  'realtime/vehicle_updates_20200417-173819',
  'realtime/vehicle_updates_20200417-173923',
  'realtime/vehicle_updates_20200417-174027',
  'realtime/vehicle_updates_20200417-174130']}

In [63]:
# Consolidate vehicle updates
unique_dates = set()
files_by_dates = {}

for blob in vehicle_updates:
    name = blob.name
    date = name.split("-")[0].split("_")[2]
    
    file_lst = files_by_dates.get(date, [])
    file_lst.append(blob.name)
      
    files_by_dates[date] = file_lst

74259


In [65]:
l = df.timestamp.unique()
l.sort()

list(l[:10])

[numpy.datetime64('NaT'),
 numpy.datetime64('2020-04-17T17:24:35.000000000'),
 numpy.datetime64('2020-04-17T17:26:17.000000000'),
 numpy.datetime64('2020-04-17T17:27:33.000000000'),
 numpy.datetime64('2020-04-17T17:28:21.000000000'),
 numpy.datetime64('2020-04-17T17:29:10.000000000'),
 numpy.datetime64('2020-04-17T17:29:21.000000000'),
 numpy.datetime64('2020-04-17T17:30:13.000000000'),
 numpy.datetime64('2020-04-17T17:30:15.000000000'),
 numpy.datetime64('2020-04-17T17:30:19.000000000')]