In [21]:
%load_ext autoreload
%autoreload 2

%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd

import json
from configparser import ConfigParser
import boto3
import botocore
import pickle
import utils

from google.transit import gtfs_realtime_pb2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# fire up the s3 bucket connection

inspired by [https://github.com/katharosada/bus-shaming](https://github.com/katharosada/bus-shaming) who has downloaded a years worth of bus realtime data into a s3 bucket:

In [6]:
# load keys
keys = ConfigParser()
keys.read('../keys.secret')
print(f"loaded keys for: {keys.sections()}")

loaded keys for: ['amazon', 'nsw_opendata']


In [9]:
s3 = boto3.resource('s3', 
                    aws_access_key_id=keys['amazon']['access_key_id'], 
                    aws_secret_access_key=keys['amazon']['secret_key'])

print(f"s3 connected, contains these buckets:")
[bucket for bucket in s3.buckets.all()]

s3 connected, contains these buckets:


[s3.Bucket(name='busshaming-lambda-bundles'),
 s3.Bucket(name='busshaming-realtime-dumps'),
 s3.Bucket(name='busshaming-timetable-dumps'),
 s3.Bucket(name='sk274nc02mfhb')]

the two buckets we're interested in:

In [10]:
bucket_realtime = s3.Bucket("busshaming-realtime-dumps")   # real time data dump from the api
bucket_timetable = s3.Bucket("busshaming-timetable-dumps") # timetable dumps from the api

first up, a list of all the objects in the buckets:

In [11]:
realtime_object_keys = [obj.key for obj in bucket_realtime.objects.all()]
timetable_object_keys = [obj.key for obj in bucket_timetable.objects.all()]

print(realtime_object_keys[:2])
print(timetable_object_keys[:2])
print(f"realtime files: {len(realtime_object_keys)} - timetable files: {len(timetable_object_keys)}")

['nsw-buses/2017-09-21T00:00:50.131463.pb', 'nsw-buses/2017-09-21T00:00:50.140293.pb']
['nsw-buses/1/2017-09-05T12:34:16.048672.zip', 'nsw-buses/1/2017-09-09T06:17:58.469365.zip']
realtime files: 716587 - timetable files: 2197


it took a while to get all the object keys, so saving them to disk:

In [25]:
utils.save_to_disk(realtime_object_keys, "data/realtime_object_keys.pkl")
utils.save_to_disk(timetable_object_keys, "data/timetable_object_keys.pkl")

saved data/realtime_object_keys.pkl to disk
saved data/timetable_object_keys.pkl to disk


So many files! we need to seperate nsw/swiss realtime data, as both are stored in the one bucket:

In [27]:
nsw_keys = [i for i in realtime_object_keys if i.startswith("nsw-buses")]
swiss_keys = [i for i in realtime_object_keys if i.startswith("swiss")]
len(nsw_keys), len(swiss_keys), len(nsw_keys) + len(swiss_keys), len(realtime_object_keys)

(448924, 267663, 716587, 716587)

In [28]:
def get_datetime_from_key(key):
    "gets a key name as string and returns a timestamp"
    key = key.split("/")[1][:-3]
    return pd.to_datetime(key)

get_datetime_from_key(nsw_keys[-1])

Timestamp('2018-08-27 23:57:50.201743')

In [192]:
nsw_keys_dates = [get_datetime_from_key(key) for key in nsw_keys]
nsw_keys_dates[:5]

KeyboardInterrupt: 

In [160]:
data = {"filename": nsw_keys, "date": nsw_keys_dates}
df = pd.DataFrame.from_dict(data)
df.head()

Unnamed: 0,filename,date
0,nsw-buses/2017-09-21T00:00:50.131463.pb,2017-09-21 00:00:50.131463
1,nsw-buses/2017-09-21T00:00:50.140293.pb,2017-09-21 00:00:50.140293
2,nsw-buses/2017-09-21T00:01:50.303404.pb,2017-09-21 00:01:50.303404
3,nsw-buses/2017-09-21T00:02:50.103409.pb,2017-09-21 00:02:50.103409
4,nsw-buses/2017-09-21T00:03:50.675896.pb,2017-09-21 00:03:50.675896


In [163]:
df["hour"] = df.date.dt.hour
df["day"] = df.date.dt.dayofweek
df["month"] = df.date.dt.month
df.head()

Unnamed: 0,filename,date,hour,day,month
0,nsw-buses/2017-09-21T00:00:50.131463.pb,2017-09-21 00:00:50.131463,0,3,9
1,nsw-buses/2017-09-21T00:00:50.140293.pb,2017-09-21 00:00:50.140293,0,3,9
2,nsw-buses/2017-09-21T00:01:50.303404.pb,2017-09-21 00:01:50.303404,0,3,9
3,nsw-buses/2017-09-21T00:02:50.103409.pb,2017-09-21 00:02:50.103409,0,3,9
4,nsw-buses/2017-09-21T00:03:50.675896.pb,2017-09-21 00:03:50.675896,0,3,9


# download a file

In [29]:
def download_file(key, bucket=bucket_realtime):
    """takes in a key and bucket, downloads it"""
    
    print(f"downloading {key}")
    
    try:
        bucket.download_file(key, key)
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        elif e.response['Error']['Code'] == "403":
            print("Forbidden")
        else:
            raise
        
download_file(nsw_keys[0])

downloading nsw-buses/2017-09-21T00:00:50.131463.pb
Forbidden


In [30]:
with open('test_download.pb', 'wb') as f:
    bucket_realtime.download_fileobj(nsw_keys[0], f)

ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden

# Parse a protobuf file

In [177]:
def parse_file(file):
    
    feed_message = gtfs_realtime_pb2.FeedMessage()
    
    # update this to parse from file not string
    feed_message.ParseFromString(file)
    
    for entity in feed_message.entity:
        if entity.HasField('trip_update'):
            #feed_message = gtfs_realtime_pb2.FeedMessage()
            print(entity)