Notebook updated 7/17/22 -- initial data analysis to aggregate RT data to hourly totals

*This notebook will not work without access to the private S3 bucket; work is planned to split some of this notebook out to automated scripts to create the daily CSV files & reformulate this analysis. Notebook is committed to GitHub for reference/posterity but generally will not run for most users.*

In [1]:
import boto3
import json
import pandas as pd
import pendulum


In [2]:
import sys
sys.path
sys.path.append('c:\\Users\\Chris\\CodingProjects\\rtd-ghost-buses\\')
# from  scrape_data.parse_data import gtfs_decode

In [3]:
# start_date = '2023-01-01'
# end_date = '2023-01-07'

# date_range = [d for d in pendulum.period(pendulum.from_format(start_date, 'YYYY-MM-DD'), pendulum.from_format(end_date, 'YYYY-MM-DD')).range('days')]

In [4]:
# get objects from S3
# this requires being locally authenticated: https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html

# s3 = boto3.resource('s3')
# bucket = s3.Bucket('rtd-ghost-buses-private')

In [5]:
# list(bucket.objects.all())
# Check prefix for test
# 
# TODO - change parsing logic to point to protobuf
# Similar to combine_daily_files.py
# Note: parse_data is what I've used to read RTD files
 

In [6]:
# len(data_dict.items())
# data_dict.keys()
# data_dict['bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb']
# # errors.shape
# # pd.json_normalize(data_dict['bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb'])
# # for entity in data_dict['bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb'].entity:
# #     print(entity.vehicle.stop_id)
#     # print(type(entity.trip_update))
# # len(data_dict['bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb'].entity)
# # 
# # Errors here:
# # loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T06:48:13.672199-07:00.pb')
# for entity in data_dict['bus_data_VehiclePosition/2023-01-01T06:48:13.672199-07:00.pb'].entity:
#     print(entity)




In [7]:
from pydantic import BaseModel
from enum import Enum, IntEnum
import inspect

In [8]:
# Pydantic models

### Messages and components ###
class message_TripDescriptor(BaseModel):
    """ Typing for TripDescriptor """
    # https://developers.google.com/transit/gtfs-realtime/reference#message-tripdescriptor
    trip_id: str | None
    route_id: str | None
    direction_id: str | None
    start_time: str | None
    start_date: str | None
    schedule_relationship: str | None
    
class message_VehicleDescriptor(BaseModel):
    id: str | None
    label: str | None
    license_plate: str | None
    
class message_Position(BaseModel):
    """ Typing for Position"""
    # https://developers.google.com/transit/gtfs-realtime/reference#message-position
    latitude: float
    longitude: float
    bearing: float | None
    odometer: float | None
    speed: float | None
    
class enum_CongestionLevel(Enum):
    
    UNKNOWN_CONGESTION_LEVEL = 'UNKNOWN_CONGESTION_LEVEL'
    RUNNING_SMOOTHLY= 'RUNNING_SMOOTHLY'
    STOP_AND_GO = 'STOP_AND_GO'
    CONGESTION = 'CONGESTION'
    SEVERE_CONGESTION = 'SEVERE_CONGESTION'


class enum_OccupancyStatus(Enum):
    # https://developers.google.com/transit/gtfs-realtime/reference#enum-occupancystatus
    EMPTY = 'EMPTY'
    MANY_SEATS_AVAILABLE = 'MANY_SEATS_AVAILABLE'
    FEW_SEATS_AVAILABLE = 'FEW_SEATS_AVAILABLE'
    STANDING_ROOM_ONLY = 'STANDING_ROOM_ONLY'
    CRUSHED_STANDING_ROOM_ONLY = 'CRUSHED_STANDING_ROOM_ONLY'
    FULL = 'FULL'
    NOT_ACCEPTING_PASSENGERS = 'NOT_ACCEPTING_PASSENGERS'
    NO_DATA_AVAILABLE = 'NO_DATA_AVAILABLE'
    NOT_BOARDABLE = 'NOT_BOARDABLE'

class enum_VehicleStopStatus(Enum):
    # https://developers.google.com/transit/gtfs-realtime/reference#enum-vehiclestopstatus
    INCOMING_AT = 'INCOMING_AT'
    STOPPED_AT = 'STOPPED_AT'
    IN_TRANSIT_TO = 'IN_TRANSIT_TO'
    
    
### Assembly ###
class VehiclePositionEntity(BaseModel):
    """ Collect from messages """
    
    # Vehicle - Will have feed.vehicle
    vehicle: message_VehicleDescriptor | None 
    trip: message_TripDescriptor | None
    position: message_Position | None
    congestion_level: enum_CongestionLevel | None
    occupancy_status: enum_OccupancyStatus | None
    VehicleStopStatus: enum_VehicleStopStatus | None
    timestamp: str | None

In [33]:
# Steps:
# Fetch objects from bucket based on prefix
# Decode - split from 
# 
from google.protobuf.json_format import MessageToDict, MessageToJson
import json
from google.transit import gtfs_realtime_pb2
from io import BytesIO

class GTFSdata:
    def __init__(self):
        self.s3 = boto3.resource('s3')
        self.bucket = self.s3.Bucket('rtd-ghost-buses-private')
        
    trip_descriptor_message = {'trip_id': str, 'route_id': str, 'direction_id': int, 'start_time': str, 'start_date': str, 'schedule_relationship': str}
    
    # https://developers.google.com/transit/gtfs-realtime/reference#message-tripdescriptor
    
    def gtfs_decode(self, pb_message):
        """ RTD uses three protobuf message types. This returns an object of the correct type with the file read in """
        match self.data_type:
            case 'Alerts':
                try:
                    alert = gtfs_realtime_pb2.FeedMessage()
                    alert.ParseFromString(pb_message)
                    return alert
                except:
                    return None
            case 'TripUpdate':
                try:
                    trip_update = gtfs_realtime_pb2.FeedMessage()
                    trip_update.ParseFromString(pb_message)
                    return trip_update
                except:
                    return None
            case 'VehiclePosition':
                try:
                    vehicle_position = gtfs_realtime_pb2.FeedMessage()
                    vehicle_position.ParseFromString(pb_message)
                    return vehicle_position
                except:
                    return None
            case _:
                return 'Command not recognized'
            
    @staticmethod
    def entity_to_dataframe(message, filename):
        """ Convert one decoded message to a pandas dataframe """
        if message is None:
            return None
        dataframe_rows = []
        for entity in message.entity:
            row = pd.json_normalize(json.loads(MessageToJson(entity)))
            dataframe_rows.append(row)
            
        message_df = pd.concat(dataframe_rows)
        message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
        message_df['filename'] = filename
        return message_df

    def dataset_to_dataframe(self):
        """ Convert the entire decoded dataset """
        entire_dataframe_rows = []
        for k, v in self.data_dict_decoded.items():
            entire_dataframe_rows.append(self.entity_to_dataframe(v,k))
        self.dataset_df = pd.concat(entire_dataframe_rows)
        return None
        


        
    def decode_s3_data(self):
        """ Apply gtfs_decode tp a dict """
        self.data_dict_decoded = {k: self.gtfs_decode(v) for k, v in self.data_dict.items()}
        # self.data_dict_decoded = dict(self.data_dict, map(gtfs_decode, self.data_dict.values()))
        
        return None
    
    def fetch_s3_data(self, day):
        """ Given a date range, fetch the un-decoded files """
        
        date_str = day.to_date_string()
        print(f"Processing {date_str} at {pendulum.now().to_datetime_string()}")
        # Prefix
        objects = self.bucket.objects.filter(Prefix = f'bus_data_{self.data_type}/{date_str}')
        
        print(f"------- loading data at {pendulum.now().to_datetime_string()}")
        
        # load data
        data_dict = {}

        for obj in objects:
            print(f"loading {obj}")
            obj_name = obj.key
            # https://stackoverflow.com/questions/31976273/open-s3-object-as-a-string-with-boto3
            # obj_body = json.loads(obj.get()['Body'].read().decode('utf-8'))
            # obj_body = gtfs_decode(obj.get()['Body'].read(), 'VehiclePosition')
            obj_body = obj.get()['Body'].read()
            data_dict[obj_name] = obj_body
        
        self.data_dict = data_dict
        
        return len(data_dict)
    
    def upload_s3_parquet(self, day):
        # Upload the s3 file to s3.
        # self.bucket.
        print("todo")
        date_str = day.to_date_string()
        out_buffer = BytesIO()
        self.dataset_df.to_parquet(out_buffer)
        out_buffer.seek(0)
        self.bucket.upload_fileobj(out_buffer, f"processed/{self.data_type}/{date_str}.parquet")
        return None

    
    # Master function - fetches data - converts - saves as parquet - upload to s3, all looped one day at a time
    def run(self, date_range):
        """ Master run """
        for day in date_range:
            _ = self.fetch_s3_data(day)
            self.decode_s3_data()
            self.dataset_to_dataframe()
            # TODO: Upload to s3
            self.upload_s3_parquet(day)
            
            

class VehiclePosition(GTFSdata):
    def __init__(self):
        self.data_type = 'VehiclePosition'
        self.field_types = [field.name for field in gtfs_realtime_pb2._VEHICLEPOSITION.fields]
        # self.sparse_cols_list = ['vehicle.trip.tripId',	'vehicle.trip.scheduleRelationship',	'vehicle.trip.routeId',	'vehicle.trip.directionId',	"vehicle.currentStatus",	'vehicle.stopId'] # Parquet does not like sparse dtypes
        super().__init__()
        
    
    
class TripUpdate(GTFSdata):
    def __init__(self):
        self.data_type = 'TripUpdate'
        self.field_types = [field.name for field in gtfs_realtime_pb2._TRIPUPDATE.fields]
        # self.sparse_cols_list = ['tripUpdate.stopTimeUpdate',	'tripUpdate.vehicle.id',	'tripUpdate.vehicle.label'] # Parquet does not like sparse dtypes
        super().__init__()
        
    
    
class Alerts(GTFSdata):
    def __init__(self):
        self.data_type = 'Alerts'
        self.field_types = [field.name for field in gtfs_realtime_pb2._ALERT.fields]
        super().__init__()
    

# Sourced from https://developers.google.com/transit/gtfs-realtime/guides/vehicle-positions
# Could have trip update. Could have vehicle description. How does Denver use it?
# https://developers.google.com/transit/gtfs-realtime/reference#message-tripdescriptor
# https://developers.google.com/transit/gtfs-realtime/reference#message-vehicledescriptor


# Possible 
        

In [70]:
start_date = '2023-01-11'
end_date = '2023-01-11'

date_range = [d for d in pendulum.period(pendulum.from_format(start_date, 'YYYY-MM-DD'), pendulum.from_format(end_date, 'YYYY-MM-DD')).range('days')]

vp = VehiclePosition()
vp.run(date_range)

Processing 2023-01-11 at 2023-05-11 20:02:21
------- loading data at 2023-05-11 20:02:21
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-11T00:03:13.750565-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-11T00:08:13.218684-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-11T00:13:13.251312-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-11T00:18:13.444293-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-11T00:23:13.709137-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-11T00:28:13.627603-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-11T00:33:13.24479

  vehicle_position.ParseFromString(pb_message)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Times

todo


In [71]:
tu = TripUpdate()
tu.run(date_range)

Processing 2023-01-11 at 2023-05-11 20:04:51
------- loading data at 2023-05-11 20:04:51
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_TripUpdate/2023-01-11T00:03:13.217862-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_TripUpdate/2023-01-11T00:08:12.877619-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_TripUpdate/2023-01-11T00:13:12.749591-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_TripUpdate/2023-01-11T00:18:12.924507-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_TripUpdate/2023-01-11T00:23:13.146480-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_TripUpdate/2023-01-11T00:28:13.120594-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_TripUpdate/2023-01-11T00:33:12.728074-07:00.pb')
loading s3.ObjectSumma

  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)

todo


In [72]:
alerts = Alerts()
alerts.run(date_range)

Processing 2023-01-11 at 2023-05-11 20:08:37
------- loading data at 2023-05-11 20:08:37
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:03:11.843488-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:08:12.024225-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:13:11.827381-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:18:12.012499-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:23:12.217186-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:28:12.233733-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:33:12.027757-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-bu

  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)

todo


In [73]:
# tu = TripUpdate()
# for day in date_range:
#     tu.fetch_s3_data(day)
#     tu.decode_s3_data()

In [67]:
alerts = Alerts()
alerts.run(date_range)

Processing 2023-01-11 at 2023-05-11 20:01:11
------- loading data at 2023-05-11 20:01:11
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:03:11.843488-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:08:12.024225-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:13:11.827381-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:18:12.012499-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:23:12.217186-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:28:12.233733-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_Alerts/2023-01-11T00:33:12.027757-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-bu

  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)
  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(message.header.timestamp)

todo


In [16]:
print(vp.dataset_df.shape)
vp.dataset_df.head()
print(tu.dataset_df.shape)
tu.dataset_df.head()

(57914, 15)
(89527, 10)


Unnamed: 0,id,tripUpdate.trip.tripId,tripUpdate.trip.scheduleRelationship,tripUpdate.trip.routeId,tripUpdate.trip.directionId,tripUpdate.stopTimeUpdate,tripUpdate.vehicle.id,tripUpdate.vehicle.label,header.timestamp,filename
0,1672556583_114253564,114253564,CANCELED,101E,0,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...
0,1672556583_114253883,114253883,CANCELED,101E,1,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...
0,1672556583_114253884,114253884,CANCELED,101E,1,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...
0,1672556583_114254175,114254175,CANCELED,101H,0,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...
0,1672556583_114254183,114254183,CANCELED,101H,0,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...


In [25]:
# tu.dataset_df.to_parquet(f"tripupdate_{date_range[0].to_date_string()}.parquet")
from io import BytesIO
out_buffer = BytesIO()
tu.dataset_df.to_parquet(out_buffer)
out_buffer.seek(0)
tu.bucket.upload_fileobj(out_buffer, f"processed/{tu.data_type}/{day.to_date_string()}.parquet")


In [24]:
out_buffer.__sizeof__()

64

In [74]:
# vp.sparse_cols_list = ['vehicle.trip.tripId',	'vehicle.trip.scheduleRelationship',	'vehicle.trip.routeId',	'vehicle.trip.directionId',	"vehicle.currentStatus",	'vehicle.stopId']
# tu.sparse_cols_list = ['tripUpdate.stopTimeUpdate',	'tripUpdate.vehicle.id',	'tripUpdate.vehicle.label']
# for col in vp.sparse_cols_list:
#     print(col)
#     vp.dataset_df[col ]= pd.arrays.SparseArray(vp.dataset_df[col])
# for col in tu.sparse_cols_list:
#     print(col)
#     tu.dataset_df[col ]= pd.arrays.SparseArray(tu.dataset_df[col])

# print(vp.dataset_df.dtypes)
# vp.dataset_df.head()

In [None]:
vp.dataset_df

In [41]:
tu.dataset_df['tripUpdate.stopTimeUpdate'].iloc[85925]
tu.dataset_df

Unnamed: 0,id,tripUpdate.trip.tripId,tripUpdate.trip.scheduleRelationship,tripUpdate.trip.routeId,tripUpdate.trip.directionId,tripUpdate.stopTimeUpdate,tripUpdate.vehicle.id,tripUpdate.vehicle.label,header.timestamp,filename
0,1672556583_114253564,114253564,CANCELED,101E,0,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...
0,1672556583_114253883,114253883,CANCELED,101E,1,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...
0,1672556583_114253884,114253884,CANCELED,101E,1,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...
0,1672556583_114254175,114254175,CANCELED,101H,0,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...
0,1672556583_114254183,114254183,CANCELED,101H,0,,,,2023-01-01 07:03:03,bus_data_TripUpdate/2023-01-01T00:03:12.767143...
...,...,...,...,...,...,...,...,...,...,...
0,1672642678_114268794,114268794,SCHEDULED,A,1,"[{'stopSequence': 6, 'arrival': {'time': '1672...",,,2023-01-02 06:57:58,bus_data_TripUpdate/2023-01-01T23:58:13.179784...
0,1672642678_114269086,114269086,SCHEDULED,AT,1,"[{'stopSequence': 3, 'arrival': {'time': '1672...",,,2023-01-02 06:57:58,bus_data_TripUpdate/2023-01-01T23:58:13.179784...
0,1672642678_114269215,114269215,SCHEDULED,BOLT,0,"[{'stopSequence': 20, 'arrival': {'time': '167...",3643,3643,2023-01-02 06:57:58,bus_data_TripUpdate/2023-01-01T23:58:13.179784...
0,1672642678_114269229,114269229,SCHEDULED,BOLT,1,"[{'stopSequence': 22, 'arrival': {'time': '167...",1991,1991,2023-01-02 06:57:58,bus_data_TripUpdate/2023-01-01T23:58:13.179784...


In [76]:
# Can I just take the trip update and look for CANCELED?
# Filter through alerts 
tu.dataset_df['tripUpdate.trip.scheduleRelationship'].value_counts()
# 1 day's worth - 25% Canceled for new year's day.
# <1% on Jan 11th.

SCHEDULED    98159
CANCELED       684
Name: tripUpdate.trip.scheduleRelationship, dtype: int64

In [77]:
print(alerts.dataset_df['alert.effect'].value_counts())
# Alerts - filter to 


NO_SERVICE          9484
MODIFIED_SERVICE    7200
DETOUR              1960
UNKNOWN_EFFECT       288
OTHER_EFFECT         208
Name: alert.effect, dtype: int64


In [78]:
tu.dataset_df['tripUpdate.trip.scheduleRelationship'].value_counts()
date_range[0].to_date_string()

'2023-01-11'

In [130]:

tu.dataset_df.to_parquet(f"tripupdate_{date_range[0].to_date_string()}.parquet")

TypeError: Sparse pandas data (column tripUpdate.stopTimeUpdate) not supported.

In [None]:
# tu.dataset_df.to_parquet()

In [97]:
print(vp.data_dict_decoded['bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb'].header.timestamp)
from google.protobuf.json_format import MessageToDict, MessageToJson
import json

dataframe_rows = []
for entity in vp.data_dict_decoded['bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb'].entity:
    row = pd.json_normalize(json.loads(MessageToJson(entity)))
    dataframe_rows.append(row)
    
message_df = pd.concat(dataframe_rows)
message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(vp.data_dict_decoded['bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb'].header.timestamp)
message_df['filename'] = 'bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb'
message_df


1672556577


  message_df['header.timestamp'] = pd.Timestamp.utcfromtimestamp(vp.data_dict_decoded['bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb'].header.timestamp)


Unnamed: 0,id,vehicle.position.latitude,vehicle.position.longitude,vehicle.position.bearing,vehicle.timestamp,vehicle.vehicle.id,vehicle.vehicle.label,vehicle.trip.tripId,vehicle.trip.scheduleRelationship,vehicle.trip.routeId,vehicle.trip.directionId,vehicle.currentStatus,vehicle.stopId,header.timestamp,filename
0,1672556577_1501,39.83388,-104.753586,91.0,1672556559,1501,1501,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...
0,1672556577_1503,39.94215,-105.141785,130.0,1672556548,1503,1503,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...
0,1672556577_1655,39.769432,-104.9853,0.0,1672556563,1655,1655,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...
0,1672556577_1661,39.770065,-104.98448,0.0,1672556505,1661,1661,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...
0,1672556577_3705,39.99248,-105.417114,29.0,1672556544,3705,3705,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...
0,1672556577_3969,39.665882,-105.0153,0.0,1672556557,3969,3969,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...
0,1672556577_5141,39.73825,-104.82346,204.0,1672556489,5141,5141,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...
0,1672556577_5181,39.81135,-104.99022,0.0,1672556486,5181,5181,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...
0,1672556577_5210,40.14915,-105.1172,0.0,1672556539,5210,5210,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...
0,1672556577_5225,40.14915,-105.11726,0.0,1672556526,5225,5225,,,,,,,2023-01-01 07:02:57,bus_data_VehiclePosition/2023-01-01T00:03:13.2...


In [88]:
# [field.name for field in entity.vehicle.DESCRIPTOR.fields]
from google.protobuf.json_format import MessageToDict, MessageToJson
import json

row = pd.json_normalize(json.loads(MessageToJson(entity)))
# MessageToJson(entity)

Unnamed: 0,id,vehicle.position.latitude,vehicle.position.longitude,vehicle.position.bearing,vehicle.timestamp,vehicle.vehicle.id,vehicle.vehicle.label
0,1672556577_9386,39.738434,-104.82342,269.0,1672556554,9386,9386


In [69]:
# gtfs_realtime_pb2.VehiclePosition.ListFields(entity)
# # [field.name for field in gtfs_realtime_pb2.VehiclePosition.fields]
# gtfs_realtime_pb2._VEHICLEDESCRIPTOR.fields
# [field.name for field in gtfs_realtime_pb2._VEHICLEDESCRIPTOR.fields]
# gtfs_realtime_pb2._VEHICLEPOSITION

[field.name for field in gtfs_realtime_pb2._VEHICLEPOSITION.fields]
# [field.name for field in gtfs_realtime_pb2._TRIPUPDATE.fields]



['trip',
 'vehicle',
 'position',
 'current_stop_sequence',
 'stop_id',
 'current_status',
 'timestamp',
 'congestion_level',
 'occupancy_status',
 'occupancy_percentage',
 'multi_carriage_details']

In [105]:
tu.data_dict_decoded.keys()
tu.data_dict_decoded['bus_data_TripUpdate/2023-01-01T00:03:12.767143-07:00.pb']# Known good
# tu.data_dict['bus_data_TripUpdate/2023-01-01T05:13:12.856636-07:00.pb']
# tu.data_dict['bus_data_TripUpdate/2023-01-01T00:03:12.767143-07:00.pb'] # Known bad
# from google.transit import gtfs_realtime_pb2
# trip_update = gtfs_realtime_pb2.FeedMessage()
# # tu.data_dict['bus_data_TripUpdate/2023-01-01T00:03:12.767143-07:00.pb']
# # trip_update.ParseFromString(tu.data_dict['bus_data_TripUpdate/2023-01-01T05:13:12.856636-07:00.pb'])

# trip_update.ParseFromString(tu.data_dict['bus_data_TripUpdate/2023-01-01T00:03:12.767143-07:00.pb'])

# for k, v in tu.data_dict.items():
#     try:
#         gtfs_decode(v,tu.data_type)
#     except:
#         print(k)
    
# [print(f"{k} | {gtfs_decode(v,tu.data_type)}") for k, v in tu.data_dict.items() ]
# data_dict_decoded = {k: gtfs_decode(v,tu.data_type) for k, v in tu.data_dict.items()}

header {
  gtfs_realtime_version: "1.0"
  incrementality: FULL_DATASET
  timestamp: 1672556583
}
entity {
  id: "1672556583_114253564"
  trip_update {
    trip {
      trip_id: "114253564"
      schedule_relationship: CANCELED
      route_id: "101E"
      direction_id: 0
    }
  }
}
entity {
  id: "1672556583_114253883"
  trip_update {
    trip {
      trip_id: "114253883"
      schedule_relationship: CANCELED
      route_id: "101E"
      direction_id: 1
    }
  }
}
entity {
  id: "1672556583_114253884"
  trip_update {
    trip {
      trip_id: "114253884"
      schedule_relationship: CANCELED
      route_id: "101E"
      direction_id: 1
    }
  }
}
entity {
  id: "1672556583_114254175"
  trip_update {
    trip {
      trip_id: "114254175"
      schedule_relationship: CANCELED
      route_id: "101H"
      direction_id: 0
    }
  }
}
entity {
  id: "1672556583_114254183"
  trip_update {
    trip {
      trip_id: "114254183"
      schedule_relationship: CANCELED
      route_id: "101H"
  

In [None]:
# For a given: 
# Vechicle updates can have : 


In [31]:
# vp.data_dict_decoded.values()
decoded_message= vp.data_dict_decoded['bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb']
# 
decoded_message
for entity in decoded_message.entity:
    if entity.HasField('vehicle'):
        print(entity.trip_update)










































In [6]:
# What is this actually processing?
# Break this into methods!



for day in date_range:
    date_str = day.to_date_string()
    print(f"Processing {date_str} at {pendulum.now().to_datetime_string()}")
    objects = bucket.objects.filter(Prefix = f'bus_data_VehiclePosition/{date_str}')
    
    print(f"------- loading data at {pendulum.now().to_datetime_string()}")
    
    # load data
    data_dict = {}

    for obj in objects:
        print(f"loading {obj}")
        obj_name = obj.key
        # https://stackoverflow.com/questions/31976273/open-s3-object-as-a-string-with-boto3
        # obj_body = json.loads(obj.get()['Body'].read().decode('utf-8'))
        obj_body = gtfs_decode(obj.get()['Body'].read(), 'VehiclePosition')
        data_dict[obj_name] = obj_body
    
    # parse data into actual vehicle locations and errors
    
    print(f"------- parsing data at {pendulum.now().to_datetime_string()}")

    data = pd.DataFrame()
    errors = pd.DataFrame()

    # k, v here are filename: full dict of JSON
    for k, v in data_dict.items():
        # print(f"processing {k}")
        filename = k
        new_data = pd.DataFrame()
        new_errors = pd.DataFrame()
        # expect ~12 "chunks" per JSON - Chris: Do I?
        for chunk, contents in v.items():
            if 'vehicle' in v[chunk]['bustime-response'].keys():
                new_data = new_data.append(pd.DataFrame(v[chunk]['bustime-response']['vehicle']))
            if 'error' in v[chunk]['bustime-response'].keys():
                new_errors = new_errors.append(pd.DataFrame(v[chunk]['bustime-response']['error']))
        new_data['scrape_file'] = filename
        new_errors['scrape_file'] = filename
        data = data.append(new_data)
        errors = errors.append(new_errors)
        
    print(f"------- saving data at {pendulum.now().to_datetime_string()}")

    if len(errors) > 0:
        bucket.put_object(Body = errors.to_csv(index = False), 
                   Key = f'bus_full_day_errors_v2/{date_str}.csv')

    if len(data) > 0:
        # convert data time to actual datetime
        data['data_time'] = pd.to_datetime(data['tmstmp'], format='%Y%m%d %H:%M')

        data['data_hour'] = data.data_time.dt.hour
        data['data_date'] = data.data_time.dt.date

        bucket.put_object(Body = data.to_csv(index = False), 
                   Key = f'bus_full_day_data_v2/{date_str}.csv')

        # combine vids into a set (drops duplicates): https://stackoverflow.com/a/45925961
        hourly_summary = data.groupby(['data_date', 'data_hour', 'rt', 'des']).agg({'vid': set, 'tatripid': set, 'tablockid': set}).reset_index()
        # get number of vehicles per hour per route
        hourly_summary['vh_count'] = hourly_summary['vid'].apply(len)
        hourly_summary['trip_count'] = hourly_summary['tatripid'].apply(len)
        hourly_summary['block_count'] = hourly_summary['tablockid'].apply(len)

        bucket.put_object(Body = hourly_summary.to_csv(index = False), 
                   Key = f'bus_hourly_summary_v2/{date_str}.csv')
        

Processing 2023-01-01 at 2023-05-08 14:10:29
------- loading data at 2023-05-08 14:10:29
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T00:03:13.256605-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T00:08:13.269956-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T00:13:16.737146-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T00:18:13.889945-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T00:23:13.587526-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T00:28:13.665296-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T00:33:13.75863

  vehicle_position.ParseFromString(pb_message)


loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T06:48:13.672199-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T06:53:13.702296-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T06:58:13.849596-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T07:03:13.355960-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T07:08:13.430870-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T07:13:13.518493-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_data_VehiclePosition/2023-01-01T07:18:13.517224-07:00.pb')
loading s3.ObjectSummary(bucket_name='rtd-ghost-buses-private', key='bus_dat

AttributeError: items