In [37]:
# Required to setup DB and consume GTFS static data
import mysql.connector
from mysql.connector import Error
from mysql.connector import errorcode
from mysql.connector.constants import ClientFlag
import os
import csv
from datetime import datetime
from time import time, sleep

STATIC_CSV = "static_.csv"
DB_CSV = "db_.csv"
RETRIEVAL_FREQ = 30
RETRIEVAL_TIME = 2

In [22]:
def executeScriptsFromFile(filename, cursor):
    # Open and read the file as a single buffer
    fd = open(filename, 'r')
    sqlFile = fd.read()
    fd.close()

    # all SQL commands (split on ';')
    sqlCommands = sqlFile.split(';')

    # Execute every command from the input file
    for command in sqlCommands:
        # This will skip and report errors
        # For example, if the tables do not yet exist, this will skip over
        # the DROP TABLE commands
        try:
            cursor.execute(command)
        except mysql.connector.Error as error:
            print("Error occurred while executing script : ", error)

In [23]:
## Create Tables and Indexes

def db_setup(filepath, tab=True, idx=False):
    connection = mysql.connector.connect(host='localhost', user='root', password='admin')
    try:
        cursor = connection.cursor()
        print("FILEPATH received : ", filepath)
        createtab_file = filepath + "create-tables.sql"
        createidx_file = filepath + "create-index.sql"
        
        if(tab):
            executeScriptsFromFile(createtab_file, cursor)
            print("TABLES CREATED ...")
            connection.commit()
        
        if(idx):
            executeScriptsFromFile(createidx_file, cursor)
            print("INDEXES CREATED ...")
            connection.commit()
    
    except mysql.connector.Error as error:
        connection.rollback() #rollback if any exception occured
        print("The following error has occurred ... ".format(error))

    finally:
        #closing database connection.
        if(connection.is_connected()):
            cursor.close()
            connection.close()
            print("DB Setup complete. MySQL connection is closed.")

In [24]:
def all_tables(path, folder, date):
    filepath = "\'" + path + folder + "/"
    
    # Agency
    filename = "agency.txt'"
    tablename = "gtfs.agency"
    print(filepath)
    print(filename)
    print(tablename)
    bulk_ins(filepath, filename, tablename)
    
    # Calendar_Dates
    filename = "calendar_dates.txt'"
    tablename = "gtfs.calendar_dates"
    bulk_ins(filepath, filename, tablename)
    
    # Routes
    filename = "routes.txt'"
    tablename = "gtfs.routes"
    bulk_ins(filepath, filename, tablename)
    
    # Shapes
    filename = "shapes.txt'"
    tablename = "gtfs.shapes"
    bulk_ins(filepath, filename, tablename)
    
    # Stop_Times
    filename = "stop_times.txt'"
    tablename = "gtfs.stop_times"
    bulk_ins(filepath, filename, tablename)
    
    #Stops
    filename = "stops.txt'"
    tablename = "gtfs.stops"
    bulk_ins(filepath, filename, tablename)
    
    #Transfers
    filename = "transfers.txt'"
    tablename = "gtfs.transfers"
    bulk_ins(filepath, filename, tablename)
    
    #Trips
    filename = "trips.txt'"
    tablename = "gtfs.trips"
    bulk_ins(filepath, filename, tablename, date)

In [25]:
def bulk_ins(filepath, filename, tablename, date=None):
    try:
        connection = mysql.connector.connect(host='localhost', database='gtfs', user='root', \
                                             password='admin', client_flags=[ClientFlag.LOCAL_FILES])
        print("Connected to DB ...", ClientFlag.LOCAL_FILES)
        
        # Create cursor and execute Load SQL
        autoc_sql = "SET autocommit=0;"
        ucheck_sql = "SET unique_checks=0;"
        fcheck_sql = "set foreign_key_checks=0;"
        logcheck_sql = "set sql_log_bin=0;"
        
        load_sql = ("LOAD DATA LOCAL INFILE " + filepath + filename +
                    " INTO TABLE " + tablename +
                    " FIELDS TERMINATED BY ','"
                    " OPTIONALLY ENCLOSED BY '\"'"
                    " LINES TERMINATED BY '\\n'"
                    " IGNORE 1 LINES")
        
        # INSERT DATE IN TABLES STOP_TIMES AND TRIPS
#         if(tablename == "gtfs.stop_times" or tablename == "gtfs.trips"):
        if(tablename == "gtfs.trips"):
            print("DATE VALUE RECEIVED ... ", date)
            load_sql = load_sql + " SET trip_date = '" + str(date) + "';"
        else:
            load_sql = load_sql + ";"
        
        cursor = connection.cursor()
        print("BULK INS ... ")
        print("FILEPATH received : ", filepath.strip('\''))
        print(filepath + filename)
        print(load_sql)
        
        cursor.execute(autoc_sql)
        cursor.execute(ucheck_sql)
        cursor.execute(fcheck_sql)
        cursor.execute(logcheck_sql)
        
        cursor.execute(load_sql)
        connection.commit()
        print("Succuessfully loaded the table " + tablename + " from " + filename.strip('\'') + " ... ")
    
    except mysql.connector.Error as error :
        print(cursor.statement)
        connection.rollback() #rollback if any exception occured
        print("Failed inserting record into table " + tablename + " from " + filename.strip('\'') + " ... {}".format(error))
        
    finally:
        #closing database connection.
        if(connection.is_connected()):
            cursor.close()
            connection.close()
            print("MySQL connection is closed")

In [6]:
def main():

    # FETCH FILES
    # READS TWO CSV FILES FROM THE CURRENT WORKING DIRECTORY
    # DB_CSV PROVIDES THE PATH WHERE THE TABLE AND INDEX CREATION SCRIPTS ARE KEPT
    # STATIC_CSV PROVIDES THE PATH WHERE THE UNZIPPED STATIC DATA IS KEPT
    cwd = os.getcwd()
    db_csv = cwd.replace("\\", "/") + "/" + DB_CSV
    static_csv = cwd.replace("\\", "/") + "/" + STATIC_CSV
    
    # GET DB SCRIPTS PATH
    db_scripts = ""
    with open(db_csv, "r") as file:
        next(file)
        for row in file:
            db_scripts = row.strip().replace("\\", "/") + "/"
    
#     # SETUP DB
#     db_setup(db_scripts, tab=True, idx=False)
    
#     # DUMP DATA TO DB
#     dataset_path = ""
#     folder_name = ""
#     folder_date = ""
#     with open(static_csv, "r") as file:
#         next(file)
#         for row in file:
#             row = row.strip().split(",")
#             dataset_path = row[0].replace("\\", "/") + "/"
#             folder_name = row[1]
#             folder_date = row[2]
#             all_tables(dataset_path, folder_name, folder_date)
    
if __name__ == "__main__":
    main()

FILEPATH received :  C:/Users/kakka/Documents/GTFS_/code/
TABLES CREATED ...
DB Setup complete. MySQL connection is closed.
'C:/Users/kakka/Documents/GTFS_/static_data/NL-20190110.gtfs/
agency.txt'
gtfs.agency
Connected to DB ... 128
BULK INS ... 
FILEPATH received :  C:/Users/kakka/Documents/GTFS_/static_data/NL-20190110.gtfs/
'C:/Users/kakka/Documents/GTFS_/static_data/NL-20190110.gtfs/agency.txt'
LOAD DATA LOCAL INFILE 'C:/Users/kakka/Documents/GTFS_/static_data/NL-20190110.gtfs/agency.txt' INTO TABLE gtfs.agency FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 LINES;
Succuessfully loaded the table gtfs.agency from agency.txt ... 
MySQL connection is closed
Connected to DB ... 128
BULK INS ... 
FILEPATH received :  C:/Users/kakka/Documents/GTFS_/static_data/NL-20190110.gtfs/
'C:/Users/kakka/Documents/GTFS_/static_data/NL-20190110.gtfs/calendar_dates.txt'
LOAD DATA LOCAL INFILE 'C:/Users/kakka/Documents/GTFS_/static_data/NL-20190110.gtfs/calendar_

Succuessfully loaded the table gtfs.trips from trips.txt ... 
MySQL connection is closed


In [7]:
# Segregate Index Creation for the DB as it takes up a lot of time
# Run after all data has been inserted

# # FETCH FILES
# cwd = os.getcwd()
# db_csv = cwd.replace("\\", "/") + "/" + DB_CSV
# static_csv = cwd.replace("\\", "/") + "/" + STATIC_CSV

# # GET DB SCRIPTS PATH
# db_scripts = ""
# with open(db_csv, "r") as file:
#     next(file)
#     for row in file:
#         db_scripts = row.strip().replace("\\", "/") + "/"
# db_setup(db_scripts, tab=False, idx=True)

FILEPATH received :  C:/Users/kakka/Documents/GTFS_/code/
INDEXES CREATED ...
DB Setup complete. MySQL connection is closed.


In [27]:
# Generate a set of usable trip and service ids from the DB
def push_routes(filepath):
    try:
        connection = mysql.connector.connect(host='localhost', database='gtfs', user='root', \
                                             password='admin', client_flags=[ClientFlag.LOCAL_FILES])
        print("Connected to DB ...", ClientFlag.LOCAL_FILES)
        
        #
#         sel_sql = ( " SELECT DISTINCT ROUTE_SHORT_NAME, "
#                 " CASE ROUTE_TYPE "
#                 " WHEN '0' THEN 'TRAM'"
#                 " WHEN '1' THEN 'SUBWAY'"
#                 " WHEN '2' THEN 'RAIL'"
#                 " WHEN '3' THEN 'BUS'"
#                 " WHEN '4' THEN 'FERRY'"
#                 " END AS ROUTE_TYPE"
#                 " FROM GTFS.ROUTES ORDER BY ABS(ROUTE_SHORT_NAME), ROUTE_TYPE;" )
        
        sel_sql = ( " SELECT DISTINCT R.ROUTE_SHORT_NAME,"
                    " CASE R.ROUTE_TYPE "
                    " WHEN '0' THEN 'TRAM'"
                    " WHEN '1' THEN 'SUBWAY'"
                    " WHEN '2' THEN 'RAIL'"
                    " WHEN '3' THEN 'BUS'"
                    " WHEN '4' THEN 'FERRY'"
                    " END AS ROUTE_TYPE"
                    " FROM GTFS.ROUTES R, GTFS.TRIPS T"
                    " WHERE R.ROUTE_ID = T.ROUTE_ID"
                    " AND T.TRIP_ID IN ("
                    " SELECT DISTINCT ST.TRIP_ID"
                    " FROM GTFS.STOP_TIMES ST, GTFS.STOPS S"
                    " WHERE ST.STOP_ID = S.STOP_ID"
                    " AND UPPER(S.STOP_NAME) LIKE '%AMSTERDAM,%'"
                    " AND S.STOP_ID NOT LIKE '%stoparea%');" )
        
        cursor = connection.cursor()
        print("push_routes() ... ")
        print("FILEPATH received : ", filepath)
        print(sel_sql)
        cursor.execute(sel_sql)
        all_rows = cursor.fetchall()
        
        with open(filepath + 'routes_list.csv', 'w') as myfile:
            for row in all_rows:
                myfile.write(row[0] + "," + row[1] + "\n")
            print("Data write success ... ")
            print("Please check CSV file ROUTES.CSV at " + filepath)
            
#         with open(filepath + 'routes_list.csv', 'w') as myfile:
#             wr = csv.writer(myfile)
#             for row in all_rows:
#                 wr.writerow(row)
#             print("Data write success ... ")
#             print("Please check CSV file ROUTES.CSV at " + filepath)
    
    except mysql.connector.Error as error :
        print(cursor.statement)
        connection.rollback() #rollback if any exception occured
        print("Failed fetching data from GTFS.ROUTES ... {}".format(error))
        
    finally:
        #closing database connection.
        if(connection.is_connected()):
            cursor.close()
            connection.close()
            print("MySQL connection is closed")

In [28]:
# GENERATE ROUTE DETAILS FOR USER
cwd = os.getcwd()
print(cwd)
push_routes(cwd.replace("\\", "/") + "/")

C:\Users\kakka\Documents\GTFS_\code
Connected to DB ... 128
push_routes() ... 
FILEPATH received :  C:/Users/kakka/Documents/GTFS_/code/
 SELECT DISTINCT R.ROUTE_SHORT_NAME, CASE R.ROUTE_TYPE  WHEN '0' THEN 'TRAM' WHEN '1' THEN 'SUBWAY' WHEN '2' THEN 'RAIL' WHEN '3' THEN 'BUS' WHEN '4' THEN 'FERRY' END AS ROUTE_TYPE FROM GTFS.ROUTES R, GTFS.TRIPS T WHERE R.ROUTE_ID = T.ROUTE_ID AND T.TRIP_ID IN ( SELECT DISTINCT ST.TRIP_ID FROM GTFS.STOP_TIMES ST, GTFS.STOPS S WHERE ST.STOP_ID = S.STOP_ID AND UPPER(S.STOP_NAME) LIKE '%AMSTERDAM,%' AND S.STOP_ID NOT LIKE '%stoparea%');
Data write success ... 
Please check CSV file ROUTES.CSV at C:/Users/kakka/Documents/GTFS_/code/
MySQL connection is closed


In [29]:
# Required to parse GTFS real-time feed
from google.transit import gtfs_realtime_pb2
import requests

In [51]:
# READ THE SET OF TRANSIT LINES FOR WHICH THE USER WANTS TO EXTRACT STOP TIME DETAILS
def read_userpref(filepath):
    try:
        connection = mysql.connector.connect(host='localhost', database='gtfs', user='root', \
                                             password='admin', client_flags=[ClientFlag.LOCAL_FILES])
        print("Connected to DB ...", ClientFlag.LOCAL_FILES)
        
        # Open and read the file as a single buffer
        fd = open(filepath + 'user_routes.csv', 'r')
        usrFile = fd.read()
        fd.close()
        
        lines = usrFile.split("\n")
        print(lines)
        
        routes = []

        sel_sql = ( " SELECT DISTINCT ROUTE_ID, ROUTE_TYPE, ROUTE_SHORT_NAME"
                    " FROM GTFS.ROUTES"
                    " WHERE ROUTE_TYPE = %s"
                    " AND ROUTE_SHORT_NAME = %s"
                    " ORDER BY ROUTE_ID;")
        
        cursor = connection.cursor()
        print("read_userpref() ... ")
        print("FILEPATH received : ", filepath)
        print(sel_sql)
        
        for line in lines:
            line = line.split(',')
            print(line)
            # REVERSE MAP ROUTE TYPE
#             " WHEN '0' THEN 'TRAM'"
#             " WHEN '1' THEN 'SUBWAY'"
#             " WHEN '2' THEN 'RAIL'"
#             " WHEN '3' THEN 'BUS'"
#             " WHEN '4' THEN 'FERRY'"
            if line[1] == 'TRAM':
                line[1] = 0
            elif line[1] == 'SUBWAY':
                line[1] = 1
            elif line[1] == 'RAIL':
                line[1] = 2
            elif line[1] == 'BUS':
                line[1] = 3
            elif line[1] == 'FERRY':
                line[1] = 4
            cursor.execute(sel_sql, (line[1], line[0]))
            routes.append([r for r in cursor.fetchall()])
            
        return routes
    
    except mysql.connector.Error as error :
        print(cursor.statement)
        connection.rollback() #rollback if any exception occured
        print("Failed inserting record into table " + tablename + " from " + filename + " ... {}".format(error))
        
    finally:
        #closing database connection.
        if(connection.is_connected()):
            cursor.close()
            connection.close()
            print("MySQL connection is closed")

In [52]:
# INSERT REALTIME DATA TO DB
def insert_rtdata(trip_updates):
    try:
        connection = mysql.connector.connect(host='localhost', database='gtfs', user='root', password='admin')
        cursor = connection.cursor()
        tu_query = """ INSERT INTO GTFS.trip_updates
                        (entity_id, route_id, route_short_name, route_type, trip_id, direction_id, start_date, start_time,
                        stop_sequence, stop_id, arrival_time, arrival_delay, departure_time, departure_delay, fetch_time)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"""
        # INCREASE PACKET SIZE TO BULK INSERT DATA
        packet_sql = 'SET GLOBAL max_allowed_packet=256*1024*1024'
        cursor.execute(packet_sql)
        result  = cursor.executemany(tu_query, trip_updates)
        connection.commit()
        print("Record inserted successfully into GTFS.trip_updates table ... ")
        
    except mysql.connector.Error as error :
        connection.rollback() #rollback if any exception occured
        print("Failed inserting record into GTFS.trip_updates table ... {}".format(error))

    finally:
        #closing database connection.
        if(connection.is_connected()):
            cursor.close()
            connection.close()
            print("MySQL connection is closed ... ")

In [65]:
def parse_gtfsrt(user_routes):
    all_tus = []
    feed = gtfs_realtime_pb2.FeedMessage()
    response = requests.get('http://gtfs.ovapi.nl/nl/tripUpdates.pb')

    if(response.status_code == 200):
        feed.ParseFromString(response.content)
        ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        for entity in feed.entity:
          if entity.HasField('trip_update'):
            for ur in user_routes:
                for u in ur:
                    if(entity.trip_update.trip.route_id == str(u[0])):
                        print("entity.id --- ", entity.id)
                        for x in entity.trip_update.stop_time_update:
                            all_tus.append((entity.id, entity.trip_update.trip.route_id, u[2], u[1], entity.trip_update.trip.trip_id, \
                                            entity.trip_update.trip.direction_id, entity.trip_update.trip.start_date, \
                                            entity.trip_update.trip.start_time, x.stop_sequence, x.stop_id, x.arrival.time, \
                                            x.arrival.delay, x.departure.time, x.departure.delay, ts))
    else:
        print("ERROR FETCHING REALTIME DATA FROM THE gtfs.ovapi.nl server")
        print(response.reason)

    print(len(all_tus))
    insert_rtdata(all_tus)

In [66]:
# Read User Input for lines from a CSV and FETCH THE CORRESPONDING TRIP UPDATE DETAILS FROM THE REALTIME FEED, and INSERT TO DB

cwd = os.getcwd()
print(cwd)
user_routes = read_userpref(cwd.replace("\\", "/") + "/")
print(user_routes)

starttime = time()
counter = 0
print("STARTTIME ... ", starttime)
while counter < RETRIEVAL_TIME:
    print("TICK TOCK ... ")
    parse_gtfsrt(user_routes)
    counter = counter + 1
    print("DONE COUNT ... ", counter, " ... ", time(), " ... ", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    sleep(RETRIEVAL_FREQ - ((time() - starttime) % RETRIEVAL_FREQ))

C:\Users\kakka\Documents\GTFS_\code
Connected to DB ... 128
['54,SUBWAY', '51,SUBWAY', '53,SUBWAY', '50,SUBWAY', '52,SUBWAY', '17,TRAM', '2,TRAM', '13,TRAM', '24,TRAM', '4,TRAM', '26,TRAM', '5,TRAM', '19,TRAM', '1,TRAM', '7,TRAM', '3,TRAM', '14,TRAM', '12,TRAM', '11,TRAM']
read_userpref() ... 
FILEPATH received :  C:/Users/kakka/Documents/GTFS_/code/
 SELECT DISTINCT ROUTE_ID, ROUTE_TYPE, ROUTE_SHORT_NAME FROM GTFS.ROUTES WHERE ROUTE_TYPE = %s AND ROUTE_SHORT_NAME = %s ORDER BY ROUTE_ID;
['54', 'SUBWAY']
['51', 'SUBWAY']
['53', 'SUBWAY']
['50', 'SUBWAY']
['52', 'SUBWAY']
['17', 'TRAM']
['2', 'TRAM']
['13', 'TRAM']
['24', 'TRAM']
['4', 'TRAM']
['26', 'TRAM']
['5', 'TRAM']
['19', 'TRAM']
['1', 'TRAM']
['7', 'TRAM']
['3', 'TRAM']
['14', 'TRAM']
['12', 'TRAM']
['11', 'TRAM']
MySQL connection is closed
[[(468, 1, '54')], [(481, 1, '51')], [(3337, 1, '53')], [(480, 1, '50')], [(59759, 1, '52')], [(471, 0, '17'), (47824, 0, '17')], [(455, 0, '2'), (19010, 0, '2'), (61388, 0, '2'), (62499, 0, 

entity.id ---  2019-01-22:HTM:19:190038
entity.id ---  2019-01-22:HTM:19:190033
entity.id ---  2019-01-22:HTM:19:190031
entity.id ---  2019-01-22:HTM:19:190040
entity.id ---  2019-01-22:HTM:19:190042
entity.id ---  2019-01-22:HTM:19:190029
entity.id ---  2019-01-22:RET:44:384739
entity.id ---  2019-01-22:RET:44:384740
entity.id ---  2019-01-22:RET:44:384759
entity.id ---  2019-01-22:RET:44:384712
entity.id ---  2019-01-22:GVB:51:510156
entity.id ---  2019-01-22:GVB:14:97
entity.id ---  2019-01-22:RET:44:384692
entity.id ---  2019-01-22:GVB:51:510138
entity.id ---  2019-01-22:GVB:51:510136
entity.id ---  2019-01-22:GVB:51:510142
entity.id ---  2019-01-22:GVB:51:510140
entity.id ---  2019-01-22:GVB:51:510145
entity.id ---  2019-01-22:GVB:51:510147
entity.id ---  2019-01-22:GVB:51:510154
entity.id ---  2019-01-22:GVB:51:510151
entity.id ---  2019-01-22:GVB:51:510150
entity.id ---  2019-01-22:RET:44:384642
entity.id ---  2019-01-22:RET:44:384643
entity.id ---  2019-01-22:RET:44:384663
enti

entity.id ---  2019-01-22:RET:42:402024
entity.id ---  2019-01-22:HTM:1:10115
entity.id ---  2019-01-22:HTM:1:10109
entity.id ---  2019-01-22:HTM:1:10103
entity.id ---  2019-01-22:GVB:4:86
entity.id ---  2019-01-22:RET:42:402045
entity.id ---  2019-01-22:HTM:1:10133
entity.id ---  2019-01-22:HTM:1:10127
entity.id ---  2019-01-22:HTM:1:10121
entity.id ---  2019-01-22:GVB:4:74
entity.id ---  2019-01-22:GVB:4:72
entity.id ---  2019-01-22:GVB:5:163
entity.id ---  2019-01-22:RET:42:402078
entity.id ---  2019-01-22:GVB:5:167
entity.id ---  2019-01-22:RET:42:402079
entity.id ---  2019-01-22:GVB:5:171
entity.id ---  2019-01-22:GVB:5:175
entity.id ---  2019-01-22:GVB:5:179
entity.id ---  2019-01-22:GVB:5:183
entity.id ---  2019-01-22:GVB:5:187
entity.id ---  2019-01-22:GVB:5:191
entity.id ---  2019-01-22:GVB:5:195
entity.id ---  2019-01-22:GVB:5:199
entity.id ---  2019-01-22:GVB:2:319
entity.id ---  2019-01-22:HTM:1:10062
entity.id ---  2019-01-22:HTM:1:10060
entity.id ---  2019-01-22:HTM:1:100