In [1]:
#!/usr/bin/env python3
import requests
from datetime import datetime, timedelta
import re
import time
from xml.etree import ElementTree

import numpy as np
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values

import secret


def connect_to_rds():
    conn = psycopg2.connect(
        host=secret.HOST,
        database=secret.DATABASE,
        user=secret.UID,
        password=secret.PWD)
    return conn

def get_epoch_and_cet_24hr():
    utc = datetime.utcnow()
    cet = timedelta(hours=1)
    current_hour = (utc + cet).hour
    epoch = round(utc.timestamp())
    return current_hour, epoch

def xml_to_dict(element):
    # Recursively create a dictionary of XML field -> text value
    element_dict = {}
    for child in element:
        tag = re.split("}", child.tag)[1]
        if child.text != None:
            element_dict[tag] = child.text
        elif tag in element_dict.keys(): # In case multiple children with same tag exist in this element, make a list
            if type(element_dict[tag]) == list:
                element_dict[tag].append(xml_to_dict(child))
            else:
                first_elem = element_dict[tag]
                element_dict[tag] = []
                element_dict[tag].append(first_elem)
        else:
            element_dict[tag] = xml_to_dict(child)
    return element_dict

def exists_str_or_none(values, key):
    if key in values.keys():
        return str(values[key])
    else:
        return None

def clean_active_trips(vehicle_statuses):
    to_remove = []
    necessary_keys = ['FramedVehicleJourneyRef','VehicleLocation'] # The absolute minimum for useful data point
    # Find indices of trips that are not monitored.
    for i, vehicle in enumerate(vehicle_statuses):
        try:
            if vehicle['MonitoredVehicleJourney']['Monitored'] != 'true':
                to_remove.append(i)
            for key in necessary_keys:
                if not key in vehicle['MonitoredVehicleJourney'].keys():
                    to_remove.append(i)
                    break
        # If the requested value isn't found, except and remove
        except:
            to_remove.append(i)
    # Remove inactive trips starting with the last index
    for idx in sorted(to_remove, reverse=True):
        del vehicle_statuses[idx]
    return vehicle_statuses

def upload_to_rds(to_upload, conn, collected_time):
    to_upload_list = []
    for vehicle_status in to_upload:
        datedvehiclejourney = exists_str_or_none(vehicle_status['MonitoredVehicleJourney']['FramedVehicleJourneyRef'], 'DatedVehicleJourneyRef'), # JourneyPatternRef[1:3]-VehicleJourneyRef-??-JourneyPatternRef[4:6]-YYYYMMDD-????
        dataframe = exists_str_or_none(vehicle_status['MonitoredVehicleJourney']['FramedVehicleJourneyRef'], 'DataFrameRef'),
        vehicle = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'VehicleRef'),
        mode = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'VehicleMode'),
        line = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'LineRef'),
        linename = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'PublishedLineName'),
        direction = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'DirectionRef'),
        operator = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'OperatorRef'),
        datasource = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'DataSource'),
        lat = exists_str_or_none(vehicle_status['MonitoredVehicleJourney']['VehicleLocation'], 'Latitude'),
        lon = exists_str_or_none(vehicle_status['MonitoredVehicleJourney']['VehicleLocation'], 'Longitude'),
        bearing = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'Bearing'),
        delay = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'Delay'),
        # nextstop = exists_str_or_none(vehicle_status['MonitoredVehicleJourney']['OnwardCalls']['OnwardCall'][0]['StopPointRef']),
        locationtime = exists_str_or_none(vehicle_status['MonitoredVehicleJourney'], 'LocationRecordedAtTime'),
        collectedtime = collected_time
        to_upload_list.append((datedvehiclejourney, dataframe, vehicle, mode, line, linename, direction, operator, datasource, lat, lon, bearing, delay, locationtime, collectedtime))
    with conn.cursor() as curs:
        try:
            args_str = ','.join(curs.mogrify('(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', x).decode('utf-8') for x in to_upload_list)
            query_str = 'INSERT INTO active_trips_norway (datedvehiclejourney, dataframe, vehicle, mode, line, linename, direction, operator, datasource, lat, lon, bearing, delay, locationtime, collectedtime) VALUES ' + args_str
            curs.execute(query_str)
            conn.commit()
        except Exception as e:
            # Catch all errors and continue to keep server up and running
            print(e)
            conn.rollback()
    return query_str

In [2]:
# Limited to 4 requests/minute, otherwise need publish/subscribe
endpoint = 'https://api.entur.io/realtime/v1/rest/vm'
conn = connect_to_rds()
current_hour, current_epoch = get_epoch_and_cet_24hr()

# Call Entur SIRI API (returns XML)
response = requests.get(endpoint)
root = ElementTree.fromstring(response.content)
# root = ElementTree.parse('vm.xml').getroot()

# Look at list of active vehicles from response
root_dict = xml_to_dict(root)
vehicle_statuses = root_dict['ServiceDelivery']['VehicleMonitoringDelivery']['VehicleActivity']
print(len(vehicle_statuses))
clean_active_trips(vehicle_statuses) # Modifies in-place by deleting elements
print(len(vehicle_statuses))

current_hour, current_epoch = get_epoch_and_cet_24hr()
args_str = upload_to_rds(vehicle_statuses, conn, current_epoch)
conn.close()

1536
1066


In [None]:
#TODO:
# Better source for locationtime; convert to epoch
# Possibly add back nextStop
# Move to .py and start scraping
# Possibly et more efficient types in DB instead of all varchar