## Prototyping live bus feed 
(based on "get the newest parquet in the data lake" method)

In [133]:
# system_id="mtba_all"
system_id="tfnsw_bus"
# system_id = "nyct_mta_bus_gtfsrt"
bucket_name="bus-observatory-staging"
# prefix = f"feeds/tfnsw_bus"
prefix = f"feeds/{system_id}"

print(f"Grabbing most recent parquet from s3://{bucket_name}/{prefix}")

Grabbing most recent parquet from s3://bus-observatory-staging/feeds/tfnsw_bus


In [134]:
import boto3
import pytz
from shapely.geometry import Point
import json
from geojson import Feature, FeatureCollection, dump, dumps
import boto3
import pandas as pd
import io
import numpy as np

In [135]:
# after https://stackoverflow.com/questions/45375999/how-to-download-the-latest-file-of-an-s3-bucket-using-boto3

def get_most_recent_s3_object(bucket_name, prefix):
    s3 = boto3.client('s3')
    paginator = s3.get_paginator( "list_objects_v2" )
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
    latest = None
    for page in page_iterator:
        if "Contents" in page:
            latest2 = max(page['Contents'], key=lambda x: x['LastModified'])
            if latest is None or latest2['LastModified'] > latest['LastModified']:
                latest = latest2
    return latest

latest = get_most_recent_s3_object(bucket_name, prefix)
print(latest['Key'])

feeds/tfnsw_bus/INCOMING_tfnsw_bus_2023-03-31_00_35_59.parquet


In [136]:


# Set up an Amazon S3 client object
s3 = boto3.client('s3')

# Retrieve the Parquet file from S3

response = s3.get_object(Bucket=bucket_name, Key=latest['Key'])
parquet_object = response['Body'].read()

# Create an in-memory buffer from the Parquet file
buffer = io.BytesIO(parquet_object)

# Read the data from the in-memory buffer and create a GeoDataFrame
df = pd.read_parquet(buffer)
df

Unnamed: 0,id,vehicle.trip.trip_id,vehicle.trip.start_time,vehicle.trip.start_date,vehicle.trip.schedule_relationship,vehicle.trip.route_id,vehicle.position.latitude,vehicle.position.longitude,vehicle.position.bearing,vehicle.position.speed,...,vehicle.congestion_level,vehicle.vehicle.id,vehicle.occupancy_status,is_deleted,vehicle.trip.direction_id,vehicle.current_stop_sequence,vehicle.current_status,vehicle.stop_id,vehicle.vehicle.label,vehicle.vehicle.license_plate
0,3913_24695413_2457_S208_1,1578545,15:38:00,20230331,0,2457_S208,-33.246105,151.447479,312.0,12.1,...,0,3913_24695413_2457_S208_1,1,,,,,,,
1,33553_25030831_2436_2616_1,1429429,15:10:00,20230331,0,2436_2616,-33.782291,150.996826,201.0,10.5,...,1,33553_25030831_2436_2616_1,1,,,,,,,
2,34456_408784_2456_S085_1,1732934,15:15:00,20230331,0,2456_S085,-34.502724,150.787476,210.0,9.3,...,0,34456_408784_2456_S085_1,2,,,,,,,
3,33553_25031639_2436_626_1,1726993,15:30:00,20230331,0,2436_626,-33.706795,150.955902,11.0,6.9,...,1,33553_25031639_2436_626_1,1,,,,,,,
4,42558_414516_3000_851_1,1481914,15:35:00,20230331,0,3000_851,-32.968822,151.652679,97.0,0.0,...,0,42558_414516_3000_851_1,1,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4678,38743_15172832_2435_807_1,1647084,15:45:00,20230331,0,2435_807,-33.885429,150.878448,282.0,7.0,...,0,38743_15172832_2435_807_1,1,,,,,,,
4679,38743_15172779_2435_817_1,1724170,15:51:00,20230331,0,2435_817,-33.893402,150.892532,22.0,1.5,...,0,38743_15172779_2435_817_1,1,,,,,,,
4680,38743_15173472_2435_820_1,1049450,15:54:00,20230331,0,2435_820,-33.821766,150.980423,100.0,10.9,...,0,38743_15173472_2435_820_1,1,,,,,,,
4681,33556__7024_1L1_1112308,,11:23:08,20230331,2,7024_1L1,-32.783958,151.651733,284.0,0.0,...,0,33556__7024_1L1_1112308,2,0.0,2.0,0.0,2.0,,,


In [137]:

# drop any with no location
df = df.dropna(subset=['vehicle.position.latitude', 'vehicle.position.longitude'])
df



Unnamed: 0,id,vehicle.trip.trip_id,vehicle.trip.start_time,vehicle.trip.start_date,vehicle.trip.schedule_relationship,vehicle.trip.route_id,vehicle.position.latitude,vehicle.position.longitude,vehicle.position.bearing,vehicle.position.speed,...,vehicle.congestion_level,vehicle.vehicle.id,vehicle.occupancy_status,is_deleted,vehicle.trip.direction_id,vehicle.current_stop_sequence,vehicle.current_status,vehicle.stop_id,vehicle.vehicle.label,vehicle.vehicle.license_plate
0,3913_24695413_2457_S208_1,1578545,15:38:00,20230331,0,2457_S208,-33.246105,151.447479,312.0,12.1,...,0,3913_24695413_2457_S208_1,1,,,,,,,
1,33553_25030831_2436_2616_1,1429429,15:10:00,20230331,0,2436_2616,-33.782291,150.996826,201.0,10.5,...,1,33553_25030831_2436_2616_1,1,,,,,,,
2,34456_408784_2456_S085_1,1732934,15:15:00,20230331,0,2456_S085,-34.502724,150.787476,210.0,9.3,...,0,34456_408784_2456_S085_1,2,,,,,,,
3,33553_25031639_2436_626_1,1726993,15:30:00,20230331,0,2436_626,-33.706795,150.955902,11.0,6.9,...,1,33553_25031639_2436_626_1,1,,,,,,,
4,42558_414516_3000_851_1,1481914,15:35:00,20230331,0,3000_851,-32.968822,151.652679,97.0,0.0,...,0,42558_414516_3000_851_1,1,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4678,38743_15172832_2435_807_1,1647084,15:45:00,20230331,0,2435_807,-33.885429,150.878448,282.0,7.0,...,0,38743_15172832_2435_807_1,1,,,,,,,
4679,38743_15172779_2435_817_1,1724170,15:51:00,20230331,0,2435_817,-33.893402,150.892532,22.0,1.5,...,0,38743_15172779_2435_817_1,1,,,,,,,
4680,38743_15173472_2435_820_1,1049450,15:54:00,20230331,0,2435_820,-33.821766,150.980423,100.0,10.9,...,0,38743_15173472_2435_820_1,1,,,,,,,
4681,33556__7024_1L1_1112308,,11:23:08,20230331,2,7024_1L1,-32.783958,151.651733,284.0,0.0,...,0,33556__7024_1L1_1112308,2,0.0,2.0,0.0,2.0,,,


In [138]:
# convert all nan to None
df = df.replace(np.nan, None)
df

Unnamed: 0,id,vehicle.trip.trip_id,vehicle.trip.start_time,vehicle.trip.start_date,vehicle.trip.schedule_relationship,vehicle.trip.route_id,vehicle.position.latitude,vehicle.position.longitude,vehicle.position.bearing,vehicle.position.speed,...,vehicle.congestion_level,vehicle.vehicle.id,vehicle.occupancy_status,is_deleted,vehicle.trip.direction_id,vehicle.current_stop_sequence,vehicle.current_status,vehicle.stop_id,vehicle.vehicle.label,vehicle.vehicle.license_plate
0,3913_24695413_2457_S208_1,1578545,15:38:00,20230331,0,2457_S208,-33.246105,151.447479,312.0,12.1,...,0,3913_24695413_2457_S208_1,1,,,,,,,
1,33553_25030831_2436_2616_1,1429429,15:10:00,20230331,0,2436_2616,-33.782291,150.996826,201.0,10.5,...,1,33553_25030831_2436_2616_1,1,,,,,,,
2,34456_408784_2456_S085_1,1732934,15:15:00,20230331,0,2456_S085,-34.502724,150.787476,210.0,9.3,...,0,34456_408784_2456_S085_1,2,,,,,,,
3,33553_25031639_2436_626_1,1726993,15:30:00,20230331,0,2436_626,-33.706795,150.955902,11.0,6.9,...,1,33553_25031639_2436_626_1,1,,,,,,,
4,42558_414516_3000_851_1,1481914,15:35:00,20230331,0,3000_851,-32.968822,151.652679,97.0,0.0,...,0,42558_414516_3000_851_1,1,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4678,38743_15172832_2435_807_1,1647084,15:45:00,20230331,0,2435_807,-33.885429,150.878448,282.0,7.0,...,0,38743_15172832_2435_807_1,1,,,,,,,
4679,38743_15172779_2435_817_1,1724170,15:51:00,20230331,0,2435_817,-33.893402,150.892532,22.0,1.5,...,0,38743_15172779_2435_817_1,1,,,,,,,
4680,38743_15173472_2435_820_1,1049450,15:54:00,20230331,0,2435_820,-33.821766,150.980423,100.0,10.9,...,0,38743_15173472_2435_820_1,1,,,,,,,
4681,33556__7024_1L1_1112308,,11:23:08,20230331,2,7024_1L1,-32.783958,151.651733,284.0,0.0,...,0,33556__7024_1L1_1112308,2,0.0,2.0,0.0,2.0,,,


In [139]:


# compute age of latest data
# after https://stackoverflow.com/questions/8906926/formatting-timedelta-objects
def strfdelta(tdelta, fmt):
    d = {"days": tdelta.days}
    d["hours"], rem = divmod(tdelta.seconds, 3600)
    d["minutes"], d["seconds"] = divmod(rem, 60)
    return fmt.format(**d)

now = pd.Timestamp.now(tz=pytz.UTC)
latest_time = pd.Timestamp(df.head(1)['vehicle.timestamp'].values[0]).tz_localize('UTC')
age = now - latest_time
age_formatted = strfdelta(age, "{days} days, {hours} hours, {minutes} minutes, {seconds} seconds")
print (f"The latest parquet is {age_formatted} old")

The latest parquet is 0 days, 0 hours, 0 minutes, 46 seconds old


In [140]:
# Create a geometry column from the longitude and latitude columns
df['geometry'] = df.apply(lambda row: Point(row['vehicle.position.longitude'], row['vehicle.position.latitude']), axis=1)

# Convert the geometry column to a list of GeoJSON points
geometry_list = []
for geom in df['geometry']:
    geometry_list.append(geom)

# Remove the latitude and longitude columns from the DataFrame
df = df.drop(columns=['vehicle.position.longitude', 'vehicle.position.latitude'])

# FIXME this is a hack to get the timestamp to serialize, but it's not the right way to do it
# FIXME also will need to configure the timestamp field name from the config file
# serialize timestamp (convert to string)
df['vehicle.timestamp'] = df['vehicle.timestamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))


In [141]:
# Create a list of GeoJSON features
features_list = []
for i in range(len(df)):
    feature = Feature(geometry=geometry_list[i], properties=df.iloc[i].to_dict())
    features_list.append(feature)

# Create a GeoJSON feature collection from the list of features
feature_collection = FeatureCollection(features_list)
len(feature_collection['features'])

4680

In [142]:
with open('myfile.geojson', 'w') as f:
   dump(feature_collection, f)

dumps(feature_collection)

'{"type": "FeatureCollection", "features": [{"type": "Feature", "geometry": {"type": "Point", "coordinates": [151.447479, -33.246105]}, "properties": {"id": "3913_24695413_2457_S208_1", "vehicle.trip.trip_id": "1578545", "vehicle.trip.start_time": "15:38:00", "vehicle.trip.start_date": "20230331", "vehicle.trip.schedule_relationship": 0, "vehicle.trip.route_id": "2457_S208", "vehicle.position.bearing": 312.0, "vehicle.position.speed": 12.100000381469727, "vehicle.timestamp": "2023-03-31 04:35:41", "vehicle.congestion_level": 0, "vehicle.vehicle.id": "3913_24695413_2457_S208_1", "vehicle.occupancy_status": 1, "is_deleted": null, "vehicle.trip.direction_id": null, "vehicle.current_stop_sequence": null, "vehicle.current_status": null, "vehicle.stop_id": null, "vehicle.vehicle.label": null, "vehicle.vehicle.license_plate": null, "geometry": {"type": "Point", "coordinates": [151.447479, -33.246105]}}}, {"type": "Feature", "geometry": {"type": "Point", "coordinates": [150.996826, -33.782291]