# API Extraction <a class="tocSkip">
Author: Stefan Roland Schwingenschlögl <br>
email: stefan.roland.schwingenschloegl@gmail.com <br>
github: github.com/stefan-schwingenschloegl <br>
___
*Projekt File No: 4 <br>*
    
This file is about requesting the live data from the API and extracting the relevant response information in json format. This relevant data is then transferred to the database immediately after the query and stored there. The reason for this is that the data should be transferred to a persistent memory as quickly as possible. If the data were first stored in RAM as a pandas dataframe, the data would be lost in the event of a technical failure (e.g. power failure). On the database server, a power failure during the extraction phase would have no effect on the already loaded data. An alternative would be to save each individual response as a csv. However, all flat files would then have to be read in individually via a loop and then concatinated. In this case, access via SQL is definitely more convenient.  <br>
    
The collected data, which is extracted from the response in json format, is as follows:
* `timestamp`: timestamp of the Wiener Linien server of the query.
* `stop_name`: Human readable name of the stop
* `line_name`: Human readable name of the line
* `towards`: Human readable name of the final stop
* `directionID`: Unique Identifier of the direction of the line
* `type`: Type of vehicle
* `timePlanned`: timestamp at which the next vehicle should arrive according to the timetable
* timeReal': live projection of the timestamp at which the next vehicle should actually arrive
* `DIVA`: Unique Identifier of a station area with multiple stops
* `rbl`: Old name for `stopID`. Unique Identifier of a stop  

# Set General Properties

In [1]:
import os
import datetime
import time

import numpy as np
import pandas as pd
import json

import requests

import pyodbc

In [2]:
#set the path to the Wiener Linien API documentation
wl_docu_url = 'http://www.wienerlinien.at/ogd_realtime/doku/'

#set the URL to the Wiener Linien API real time monitor
wl_monitor_url = 'http://www.wienerlinien.at/ogd_realtime/monitor?'

#list of all stations which will be observed
str_stationen = ['Kardinal-Nagl-Platz', 
                 'Margaretenplatz, Schönbrunner Straße', 
                 'Schönbrunn U', 
                 'Laxenburger Straße / Gudrunstraße']

# set string to data folders
input_folder = "./input_data/"

In [3]:
def validate_timestamps(start_time, end_time):
    """    
    Method to check if the window of the timestamps are valid.
    restrictions:
    - if start_time is in the past -> changed to current timestamp
    - end_time must not be in the past
    - start_time must not be later than end_time
    
    input args: start_time, end_time as string format
    output args: start_time, end_time as datetime format
    """
    
    #check if start_time is valid | if not set it to current time
    try:     # evaluate if input string is in the right format
        start_time = pd.to_datetime(start_time).tz_localize('Europe/Vienna') 
        if start_time < get_current_timestamp(): #check if start_time is in past
            raise
        else:
            print(f'Start time valid.')
        
    except: # if input string is in wrong format or in the past replace with current timestamp
            print(f"Your chosen timestamp({start_time.replace(tzinfo=None)}) is not valid.\n"
                  f"Maybe the string is not correct or the time is in the past\n"
                  f"Since it is the start time it got replaced with the current timestamp({get_current_timestamp().replace(tzinfo=None)}).\n")
            start_time=get_current_timestamp()
            
    # check if input string for end_time is in the right format| if not raise error       
    try:
        end_time = pd.to_datetime(end_time).tz_localize('Europe/Vienna')
        
    except:
        raise Exception(f"Endtime({end_time}) is not Valid. Please enter valid time.")
        
    #check if end_time is later than start_time | if not raise error
    if start_time > end_time:
        raise Exception(f'Starttime({start_time.replace(tzinfo=None)}) is later than Endtime({end_time.replace(tzinfo=None)})\n'
                        f'Please check order or enter valid time.')
    else:
        print(f'End time valid.')
        return start_time, end_time

In [4]:
# generate current timestamp
def get_current_timestamp(granularity = "min"):
    """    
    Get timestamp from system:
    - if granularity == 'sec' -> timestamp in hour:min:seconds
    - else -> timestamp in hour:min
    
    input args: granularity as string format (default 'min')
    output args: timestamp in datetime format
    """
    if granularity == "sec":
        return pd.to_datetime(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")).tz_localize('Europe/Vienna')
    else:
        return pd.to_datetime(datetime.datetime.now().strftime("%Y-%m-%d %H:%M")).tz_localize('Europe/Vienna')

# Get DIVA Numbers of all stations
___
Since the user usually does not know the DIVA number, but the names of the various stops, the DIVA numbers must first be searched for. This is done by using SQL to query all entries from the previously filled `haltepunkte` table in the database. These DIVA numbers are then stored in a numpy array.

In [5]:
# set properties for database
db_name = 'WienerLinienDB'
server = 'DESKTOP-JV1HTQR\SQLEXPRESS'
db_connection = False

In [6]:
# establish DB Connection
def db_connect(server, db_name):
    """
    Establish database connection
    
    input args: server, db_name as string format
    return args: database connection object
    """
    conn = pyodbc.connect("driver={SQL Server};server="+server+"; database="+db_name+"; trusted_connection=true")
    db_connection = True
   # print(f"\nConnection with {server} sucessfull!\n"
    #      f"Current Database: {db_name}\n"
    #      f"DB Connection Status: {db_connection}")
    return conn

In [7]:
# close DB Connection
def close_connection(server, conn):
    conn.close()
    db_connection = False
   # print(f"\nDB-Connection with Server {server} closed.\n"
   #       f"DB Connections Status: {db_connection}")

In [8]:
def get_DIVA(str_stationen):
    conn = db_connect(server, db_name)
    
    with conn:
        df = pd.read_sql(f"select * from haltepunkte where StopText in " + str(tuple(str_stationen)), conn)
    
    close_connection(server, conn)
    
    return df.loc[df['StopText'].isin(str_stationen),'DIVA'].unique()

In [9]:
DIVA_list = get_DIVA(str_stationen)

# Build Request String for API Call
___
After the DIVA numbers are known, the next step is to automatically generate the query string from all elements of the DIVA_list for the API.

In [10]:
#Generate the request for the API call from the generated dataframe
def build_request(DIVA_list):
    request = wl_monitor_url
    
    for DIVA in DIVA_list:
        request =  request + 'diva='+ DIVA.astype('int').astype('str') +'&'
    
    return request[:-1]

In [11]:
# generate the request for all stored stations in DIVA list
request = build_request(DIVA_list)
request

'http://www.wienerlinien.at/ogd_realtime/monitor?diva=60200829&diva=60201211&diva=60200136&diva=60200653'

# Get Live Data
___
This is where the magic happens. The process of an automated request via the API is as follows:
1. The User passes the request string and the desired start and end time to the 'scheduling' function. If the start time is still in the future, a 'countdown' is started on a seconds basis. As soon as the start time has been reached, the API calls are made at intervals of 1 minute. The reason for this interval is that the server should not be overloaded. 
2. The response of this API call is stored in json format in a variable.
3. The desired information is extracted from this variable and stored in a pandas dataframe. 
4. The values from this pandas dataframe are inserted into the database using SQL.
5. If the whole process is successful, the timestamp is stored in a log_dictionary, and the timestamps of unsuccessful queries are stored under a different key in the log_dictionary.

In [12]:
# fill na values in columns with datetime with the highest possible value 
# fill na values in delay with -100000
def fill_na(df):
    for col in df.columns:
        if 'time' in col:
            df[col] = df[col].fillna("9999-12-31 23:59:59+01:00")
        elif col == 'delay':
            df[col] = df[col].fillna(-100000)
    return df

In [13]:
# get information from Wiener Linien API 
def api_call(request):
    
    #get json data from Wiener Linien API call
    api_json = requests.get(request).json()
    
    #pick relevant info from received json and convert it to dataframe
    return translate_json_to_df(api_json)

In [14]:
def translate_json_to_df(entry):
    """
    Method to extract relevant data from API response and write into pandas dataframe
    
    input_args: entry (json format)
    returns: df (pandas dataframe)
    """
    data = entry['data']['monitors']
    
    # dictionary with all relevant categories as keys
    station = dict({'timestamp': [],
                    'stop_name': [],
                    'line_name': [],
                    'lineID' : [],
                    'towards': [],
                    'richtungsID': [],
                    'type': [],
                    'timePlanned': [],
                    'timeReal': [],
                    'DIVA': [],
                    'rbl': []
                    })
    
    for line in range(0, len(data)):
        
        # pick the relevant information and write it to the dictionary
        for line_plat in range(0, len(data[0]['lines'])):
            station['timestamp'].append(pd.to_datetime(entry['message']['serverTime']))
            station['stop_name'].append(data[line]['locationStop']['properties']['title'])
            station['DIVA'].append(data[line]['locationStop']['properties']['name'])
            station['rbl'].append(data[line]['locationStop']['properties']['attributes']['rbl'])
            station['line_name'].append(data[line]['lines'][line_plat]['name'])
            station['type'].append(data[line]['lines'][line_plat]['type'])
            station['towards'].append(data[line]['lines'][line_plat]['towards'])
            station['richtungsID'].append(data[line]['lines'][line_plat]['richtungsId'])
            station['lineID'].append(data[line]['lines'][line_plat]['lineId'])

            if (data[line]['lines'][line_plat]['realtimeSupported']):
                try:
                    station['timePlanned'].append(pd.to_datetime(data[line]['lines'][line_plat]['departures']['departure'][0]['departureTime']['timePlanned']))
                    station['timeReal'].append(pd.to_datetime(data[line]['lines'][line_plat]['departures']['departure'][0]['departureTime']['timeReal']))
                except:
                    station['timePlanned'].append(np.nan)
                    station['timeReal'].append(np.nan)
        else:
            continue
    
    # convert ditionary to dataframe
    station_df = pd.DataFrame(station)
    
    # calculate the delay in seconds
    station_df['delay'] = (station_df['timeReal'] - station_df['timePlanned']).dt.total_seconds()
    
    station_df = fill_na(station_df)
    
    return station_df

In [15]:
def write_data_to_stage(df, server, db_name):
    """
    Method to insert relevant data from API call into DB
    
    input_args: df (dataframe format), server (string format), db_name (string format)
    returns: - 
    """
    
    conn = db_connect(server=server, db_name=db_name)
    sql_command = '''
                INSERT INTO [dbo].[stage_delay]
                           ([timestamp]
                           ,[stop_name]
                           ,[line_name]
                           ,[lineID]
                           ,[towards]
                           ,[richtungsID]
                           ,[type]
                           ,[timePlanned]
                           ,[timeReal]
                           ,[DIVA]
                           ,[rbl]
                           ,[delay])
                     VALUES
                           (?,
                            ?,
                            ?,
                            ?,
                            ?,
                            ?,
                            ?,
                            ?,
                            ?,
                            ?,
                            ?,
                            ?)
                '''
    with conn:
        crs=conn.cursor()
        #try:
        for index, row in df.iterrows():
            crs.execute(sql_command, row['timestamp'],
                                     row['stop_name'],
                                     row['line_name'],
                                     row['lineID'],
                                     row['towards'],
                                     row['richtungsID'],
                                     row['type'],
                                     row['timePlanned'],
                                     row['timeReal'],
                                     row['DIVA'],
                                     row['rbl'],
                                     row['delay']
                                     )
        #print(f'{get_current_timestamp()}: sucessfully filled!')
        #except:
            #print('Not Sucessfull!')
    close_connection(server = server, conn=conn)

In [16]:
def get_data(start_time, end_time, request):
    """
    Scheduling workflow to automate API calls.
    while start_time < current_time: (Countdown to start)
        sleep for one second and then check again
    
    while current_time <= end_time: (API Calls)
        if second == 0:
            call API with request and write it into DB
        else:
            sleep 1 second and check again
    
    input_args: start_time, end_time, request as string format
    returns: dictionary with timestamps of sucessfull and not sucessfull loads 
    """
    data_df=pd.DataFrame()
    time_start = get_current_timestamp("min")
    log_dict = {'successfull': [],
                'failed': []}
    success_counter = 0
    failed_counter = 0
    
    print(f'######## Start Countdown Begin ########\n')
    print(f'Started Job: {time_start.replace(tzinfo=None)}\n'
          f'Start with API Calls: {start_time.replace(tzinfo=None)}\n')
    
    # check if start_time is in the future | if so 1 second delay; if not go on with data collection
    while start_time > get_current_timestamp():
        time_now = get_current_timestamp("sec")
        print('Start Countdown: '+ str(start_time - time_now), end= '\r')
        time.sleep(1)
    print(f'######## Start Countdown End ########\n\n'
          f'######## Data Collection Start ########\n')
    
    time_now = get_current_timestamp()
    
    # make first API Call and write into DB
    try:
        write_data_to_stage(api_call(request), server, db_name)
        log_dict['successfull'].append(time_now)
    except:
        print(f'{get_current_timestamp()}: Error occured!')
        log_dict['failed'].append(time_now)
        
    print(f'Start with API Calls: {get_current_timestamp().replace(tzinfo=None)}\n'
          f'End Time: {end_time.replace(tzinfo=None)}\n')
    time.sleep(5)
    
    # check if current time is the endtime | if so end Data collection ; if not get entries from API and wait 1 Minute between calls  
    while get_current_timestamp() <= end_time:
        time_now = get_current_timestamp("sec")
        
        if time_now.second == 0: # If new minute: call API and write to DB
            try:
                write_data_to_stage(api_call(request), server, db_name)
                log_dict['successfull'].append(time_now)
                time.sleep(1)

            except:
                log_dict['failed'].append(time_now)
                time.sleep(1)
                
        else:
            print('Sucessfull Calls: ' + str(len(log_dict['successfull'])) +' | Failed Calls: '+ str(len(log_dict['failed']))+' | Countdown to next API Call: ' + str(60 - time_now.second)+ '[s]', end='\r')
            time.sleep(1)
        
        if (time_now == end_time):
            break
    print('Sucessfull Calls: ' + str(len(log_dict['successfull'])) +' | Failed Calls: '+ str(len(log_dict['failed']))+' | Countdown to next API Call:', end='\r')
    print(f'\n######## Data Collection End ########\n')
    return log_dict

In [17]:
start_time, end_time = validate_timestamps(start_time='2021-01-02 16:33', end_time='2021-01-02 16:33')
log = get_data(start_time, end_time, request)

Start time valid.
End time valid.
######## Start Countdown Begin ########

Started Job: 2021-01-02 16:32:00
Start with API Calls: 2021-01-02 16:33:00

######## Start Countdown End ########

######## Data Collection Start ########

Start with API Calls: 2021-01-02 16:33:00
End Time: 2021-01-02 16:33:00

Sucessfull Calls: 1 | Failed Calls: 0 | Countdown to next API Call: 1[s]]
######## Data Collection End ########



# Discussion & Next Steps
___
With this file, data was retrieved from the API of Wiener Linien on 17.12.2020 in the period from 8:30 to 11:00 and successfully written into the database. A total of 149 retrievals were started, all of which were successful. 2384 lines were inserted. This data is alos stored as `data_17_12.csv` in the `realtime_data` folder.

In the next file, the entries are analyzed. This will be in the notebook `data_exploration.ipynb`.