# Light Rail Realtime to Elasticsearch

In [1]:
from pandas.io.json import json_normalize
from sodapy import Socrata
from elasticsearch import Elasticsearch

import json
import requests
import numpy as np
import pandas as pd
import datetime
from datetime import timedelta 
import os, time
import pytz
import arrow

In [3]:
# get credentials
with open('../.credentials') as f:
    credentials = f.readlines()
    credentials = json.loads(''.join(credentials).strip())

# Data Acquisition

1. Socrata
2. GTFS (`trips.csv`, `stops.csv`) 

## Get data from Socrata

In [4]:
def getRealtimeFeed():
    # authenticate Socrata
    client = Socrata('www.data.act.gov.au',
                 '4jqogoRJ9NKj1gr8QAZ9CCKFI',
                 username=credentials['username'],
                 password=credentials['password'])
    # get endpoint
    results = client.get("r9a8-xw6s")
    # Convert to pandas DataFrame
    results_df = pd.DataFrame.from_records(results)
    return results_df

## Get data from GTFS

In [5]:
# get data from trips and stops
trips = pd.read_csv('../GTFS/google_transit_lr/trips.csv')
stops = pd.read_csv('../GTFS/google_transit_lr/stops.csv')

## Converting, Sorting and Dropping Columns

In [6]:
# adds information from trips.csv and stops.csv
def CSDM(dff, trips, stops):
    
    # TODO: check with Selva later for R (reserved buses) but removing them for now
    dff = dff[dff['trip_id'].map(lambda x: 'R' not in x)]
    
    # convert columns to datetime and int
    dff['arrival_delay'] = dff['arrival_delay'].astype(int)
    dff['depature_delay'] = dff['depature_delay'].astype(int)
    dff['stop_sequence'] = dff['stop_sequence'].astype(int)
    dff['trip_id'] = dff['trip_id'].astype(int)
    trips['trip_id'] = trips['trip_id'].astype(int)
    stops['stop_id'] = stops['stop_id'].astype(str)
    
    dff['arrival_time'] = dff['arrival_time'].apply(lambda x: arrow.get(x, tzinfo='Australia/Canberra'))
    dff['arrival_date'] = dff['arrival_time'].apply(lambda x: x.date())
    dff['depature_time'] = dff['depature_time'].apply(lambda x: arrow.get(x, tzinfo='Australia/Canberra'))
    dff['depature_date'] = dff['depature_time'].apply(lambda x: x.date())
    dff['timestamp'] = dff['timestamp'].apply(lambda x: arrow.get(x, tzinfo='Australia/Canberra'))
    
    # sort columns based on arrival time
    dff.sort_values(by='arrival_time',ascending=True, inplace=True)

    # drop columns
    dff_dropped = dff[['arrival_delay', 'arrival_time', 'arrival_date', 'depature_delay', 'depature_time', 'depature_date', 'stop_id', 'stop_sequence', 'trip_id', 'timestamp']].copy()
    trips_dropped = trips[['route_id', 'service_id', 'trip_id','trip_headsign', 'direction_id']]
    stops_dropped = stops[stops.columns[:2]]
    
    # merge columns
    merged = pd.merge(left=stops_dropped, left_on='stop_id', right=dff_dropped, right_on='stop_id', how='right')
    merged = pd.merge(left=trips_dropped, left_on='trip_id', right=merged, right_on='trip_id', how='right').copy()
#     print('|     Dates    |\n----------------')
#     for i in sorted(merged['arrival_date'].unique()):
#         print('|  {}  |'.format(str(i)))
    return merged

# Periodically Collect Data

In [7]:
df = CSDM(getRealtimeFeed(), trips, stops)

In [14]:
import datetime as dt

In [43]:
for i in range(1, 5):
    print('Count {}, {}'.format(i, dt.datetime.now()))
    df = CSDM(getRealtimeFeed(), trips, stops)
    df.insert(loc=15, column='timedelta', value=(df['timestamp'] - df['depature_time']))
    df = df[df['timedelta'].map(lambda x: (x.days == 0) & (x.seconds <= 30))].copy()
    df.sort_values(by=['timedelta'], inplace=True)
    if len(df) == 0:
        time.sleep(15)
    else:
        print(df[['trip_id', 'stop_sequence', 'timedelta']])
        time.sleep(15)

Count 1, 2019-06-29 17:49:07.005385
Count 2, 2019-06-29 17:49:24.438437
Count 3, 2019-06-29 17:49:41.125752
Count 4, 2019-06-29 17:49:59.125517


In [175]:
dff = CSDM(getRealtimeFeed(), trips, stops)
dff.insert(loc=15, column='timedelta', value=(dff['timestamp'] - dff['arrival_time']))
trip_676 = dff[dff['trip_id'] == 676]
dff = dff[dff['timedelta'].map(lambda x: (x.days == 0) & (x.seconds <= 30))].copy()
dff.sort_values(by=['timedelta'], inplace=True)
dff
# dff[dff['timedelta'].map(lambda x: (x.days == 0))].iloc[:5]

Unnamed: 0,route_id,service_id,trip_id,trip_headsign,direction_id,stop_id,stop_name,arrival_delay,arrival_time,arrival_date,depature_delay,depature_time,depature_date,stop_sequence,timestamp,timedelta
228,ACTO001,SA,678,Alinga St,1,8104,Manning Clark Crescent Platform 1,-14,2019-06-29T18:16:50+10:00,2019-06-29,-14,2019-06-29T18:17:10+10:00,2019-06-29,2,2019-06-29T18:17:00+10:00,00:00:10
442,ACTO001,SA,597,Gungahlin Pl,0,8111,Well Station Drive Platform 2,14,2019-06-29T18:16:47+10:00,2019-06-29,14,2019-06-29T18:17:07+10:00,2019-06-29,9,2019-06-29T18:17:00+10:00,00:00:13
666,ACTO001,SA,598,Gungahlin Pl,0,8127,Elouera Street Platform 2,-15,2019-06-29T18:16:39+10:00,2019-06-29,-15,2019-06-29T18:16:59+10:00,2019-06-29,2,2019-06-29T18:17:00+10:00,00:00:21
24,ACTO001,SA,677,Alinga St,1,8120,Dickson Platform 1,-46,2019-06-29T18:16:31+10:00,2019-06-29,-46,2019-06-29T18:16:51+10:00,2019-06-29,9,2019-06-29T18:17:00+10:00,00:00:29


Trip: 676

In [173]:
trip_676

Unnamed: 0,route_id,service_id,trip_id,trip_headsign,direction_id,stop_id,stop_name,arrival_delay,arrival_time,arrival_date,depature_delay,depature_time,depature_date,stop_sequence,timestamp,timedelta
661,ACTO001,SA,676,Alinga St,1,8129,Alinga Street Platform 2,59,2019-06-29T18:09:59+10:00,2019-06-29,59,2019-06-29T18:09:59+10:00,2019-06-29,13,2019-06-29T18:09:15+10:00,-1 days +23:59:16


# Moving to Elastic

In [299]:
es=Elasticsearch([{'host':'localhost','port':9200, 'http_auth':('elastic', 'changeme')}])

In [300]:
df3['arrival_time'] = df3['arrival_time'].apply(lambda x: x.datetime)
df3['depature_time'] = df3['depature_time'].apply(lambda x: x.datetime)
df3['timestamp'] = df3['timestamp'].apply(lambda x: x.datetime)
for record in df3.to_dict(orient='records'):
    es.index(index='trips_realtime', doc_type='lightrail', body=record)