# Data Extraction and Pre-processing

We're given an [API Endpoint](https://data.gov.hk/en-data/dataset/aahk-team1-flight-info) that provides data flights to/from HKG by the Hong Kong Airport Authority. On a daily basis, these datasets will be updated and the range of available data is updated to be from ```[t-91, t+14]```. Given this moving window, our end goal is to first get all the available past data from today, and on a daily basis, request for the latest data on a rolling basis. The eventual goal of this section is to have a database in GCP BigQuery storing all the flights data continuously, then we can run both descriptive and predictive analytics based on the data stored on a regular cadence.

To summarise, the goal of this particular notebook will be:

- [x] Get the data from the API
- [x] Process the data into a tabular format
- [ ] Store the data into a BigQuery table
- [ ] Setup a GCP Cloud Function regular called by Cloud Scheduler to run this process daily


In [2]:
import requests
from datetime import datetime
import pandas as pd


c:\Users\yoshi\AppData\Local\Programs\Python\Python39\lib\site-packages\numpy\.libs\libopenblas.EL2C6PLE4ZYW3ECEVIV3OXXGRN2NRFM2.gfortran-win_amd64.dll
c:\Users\yoshi\AppData\Local\Programs\Python\Python39\lib\site-packages\numpy\.libs\libopenblas.WCDJNK7YVMPZQ2ME2ZZHJJRJ3JIKNDB7.gfortran-win_amd64.dll


### Exploring the data structure of the API

Here we begin by exploring the data structure of arrival flights ```(arrival=True)```, both cargo and passenger flights should have the same data fields as documented by the API documentation. We'll also see how we should modify our code so that we can parse the departure flights ```(arrival=False)``` as well.

The aim is to get two separate tables for arrivals and departures:
| field | dtype |
|--------|-------------|
|scheduled_arrival/departure|datetime|
|actual_arrival/departure|datetime|
|flight_num|str|
|origin|str|
|destination|str|
|airline|str|
|arrival|bool|
|cargo|bool|

In [3]:
# initialise a URL to request data
date_string = '2022-04-12'
arrival = 'true' # arrival flights for now
cargo = 'true'

flight_API_URL = f'https://www.hongkongairport.com/flightinfo-rest/rest/flights/past?date={date_string}&arrival={arrival}&lang=en&cargo={cargo}'

print(flight_API_URL)

https://www.hongkongairport.com/flightinfo-rest/rest/flights/past?date=2022-04-12&arrival=true&lang=en&cargo=true


In [4]:
# request data from API and extract required data
request = requests.get(flight_API_URL)

if request.status_code == 200:
    data = request.json()
else:
    data = []
    print(f'Invalid Response from API: {request.status_code}')

df = pd.DataFrame(data)
df.head()

Unnamed: 0,date,arrival,cargo,list
0,2022-04-12,True,True,"[{'time': '00:25', 'flight': [{'no': 'LD 129',..."


It seems as though the data is a nested JSON object, a better approach will be to normalize the dataset and figure out what each field corresponds to in terms of the flight information. In order to do this, we can make use of the ```pd.json_normalize``` to do all the heavy lifting.

In [5]:
# get normalised data from pandas
normalized_df = pd.json_normalize(data, "list", 
                        ["date", "arrival", "cargo"],
                        errors='ignore', record_prefix='')

normalized_df.head()

Unnamed: 0,time,flight,status,statusCode,origin,date,arrival,cargo
0,00:25,"[{'no': 'LD 129', 'airline': 'AHK'}]",At gate 00:06,,[ICN],2022-04-12,True,True
1,00:25,"[{'no': 'LD 681', 'airline': 'AHK'}]",At gate 00:11,,[TPE],2022-04-12,True,True
2,00:30,"[{'no': 'KZ 203', 'airline': 'NCA'}]",At gate 00:38,,[NRT],2022-04-12,True,True
3,00:30,"[{'no': 'RH 318', 'airline': 'HKC'}]",At gate 00:59,,[HAN],2022-04-12,True,True
4,00:40,"[{'no': 'LD 205', 'airline': 'AHK'}]",At gate 01:25,,[KIX],2022-04-12,True,True


Before we move to cleaning the data, it might be useful to see how different departure flights are, so that we can adjust our data extraction process according to ```arrival```. The same process was copied over into one cell.

In [6]:
# initialise a URL to request data
date_string = '2022-04-12'
arrival = 'false' # changed to false for departure
cargo = 'true'

flight_API_URL = f'https://www.hongkongairport.com/flightinfo-rest/rest/flights/past?date={date_string}&arrival={arrival}&lang=en&cargo={cargo}'

# request data from API and extract required data
request = requests.get(flight_API_URL)

if request.status_code == 200:
    data = request.json()
else:
    data = []
    print(f'Invalid Response from API: {request.status_code}')

# get normalised data from pandas
normalized_df = pd.json_normalize(data, "list", 
                        ["date", "arrival", "cargo"],
                        errors='ignore', record_prefix='')

normalized_df.head()

Unnamed: 0,time,flight,status,statusCode,destination,date,arrival,cargo
0,00:00,"[{'no': 'SV 989', 'airline': 'SVA'}]",Dep 00:02,,"[HYD, RUH]",2022-04-12,False,True
1,00:00,"[{'no': 'CV 7331', 'airline': 'CLX'}]",Dep 06:46,,"[GYD, LUX]",2022-04-12,False,True
2,01:20,"[{'no': '5Y 097', 'airline': 'GTI'}]",Dep 03:02,,"[ICN, LAX]",2022-04-12,False,True
3,01:55,"[{'no': 'MS 510', 'airline': 'MSR'}]",Dep 06:10,,[BKK],2022-04-12,False,True
4,02:00,"[{'no': 'RH 829', 'airline': 'HKC'}]",Dep 01:47,,[PVG],2022-04-12,False,True


### Data Preprocessing - Getting to what's needed

Side by side comparison shows that the difference is in the table fields, arrivals have ```origin``` and departures have ```destination```, which is obvious as the destination and origin for arrivals and departures will be ```HKG``` respectively. The difference

An interesting point about ```flight``` and ```origin/destination``` is that there are multiple flights and origins. To simplify the dataset, we will:
- Take the last leg before arriving to HKG as the flight for arrivals
- Take the first leg after departing from HKG as the flight for departure

In other words, we will only take the flights immediately connecting to HKG into consideration only.

In [8]:
# Initialise a new dataframe for our needed results
df = pd.DataFrame()

# Adjustments needed to be made depending on the query parameter arrival
scheduled_field_name = 'scheduled_arrival' if arrival == 'true' else 'scheduled_departure'
actual_field_name = 'actual_arrival' if arrival == 'true' else 'actual_departure'


df[scheduled_field_name] = pd.to_datetime(normalized_df['date']+' '+normalized_df['time'], 
                                        infer_datetime_format=True)

# clean away all alphabetical characters in status, this is the actual timestamp
df['date'] = normalized_df['date']
df['status'] = normalized_df['status'].str.replace(r"[a-zA-Z]",'').str.strip()

df['flight_num'] = normalized_df['flight'].apply(lambda x : x[-1].get("no")) if arrival == 'true' else normalized_df['flight'].apply(lambda x : x[0].get("no"))
df['airline'] = normalized_df['flight'].apply(lambda x : x[-1].get("airline")) if arrival == 'true' else normalized_df['flight'].apply(lambda x : x[0].get("airline"))

df['origin'] = normalized_df['origin'].apply(lambda x : x[-1]) if arrival == 'true' else 'HKG'
df['destination'] = 'HKG' if arrival == 'true' else normalized_df['destination'].apply(lambda x : x[0])

df['arrival'] = normalized_df['arrival'].astype(bool)
df['cargo'] = normalized_df['cargo'].astype(bool)

df.head()


  df['status'] = normalized_df['status'].str.replace(r"[a-zA-Z]",'').str.strip()


Unnamed: 0,scheduled_departure,date,status,flight_num,airline,origin,destination,arrival,cargo
0,2022-04-12 00:00:00,2022-04-12,00:02,SV 989,SVA,HKG,HYD,False,True
1,2022-04-12 00:00:00,2022-04-12,06:46,CV 7331,CLX,HKG,GYD,False,True
2,2022-04-12 01:20:00,2022-04-12,03:02,5Y 097,GTI,HKG,ICN,False,True
3,2022-04-12 01:55:00,2022-04-12,06:10,MS 510,MSR,HKG,BKK,False,True
4,2022-04-12 02:00:00,2022-04-12,01:47,RH 829,HKC,HKG,PVG,False,True


In [9]:
# sometimes in status, there are dates next to the timestamp to reflect the actual date
df['status'].unique()

array(['00:02', '06:46', '03:02', '06:10', '01:47', '02:20', '02:33',
       '02:25', '02:49', '03:14', '03:41', '03:12', '07:58', '03:34', '',
       '03:43', '03:47', '03:57', '04:28', '05:06', '05:15', '05:30',
       '05:33', '06:05', '05:44', '06:14', '07:47', '06:39', '08:34',
       '07:36', '09:42', '09:06', '12:57', '07:00', '07:33', '08:54',
       '16:46', '12:42', '09:18', '13:54', '08:33', '13:58', '09:08',
       '14:10', '09:05', '10:09', '11:26', '10:52', '11:45', '11:58',
       '20:33', '11:38', '11:52', '12:18', '12:51', '12:55', '12:28',
       '13:23', '13:32', '14:11', '14:31', '14:51', '21:44', '15:33',
       '15:26', '15:17', '08:42 (13/04/2022)', '15:28', '15:37', '15:44',
       '18:56', '16:08', '16:25', '16:16', '17:45', '17:55', '20:49',
       '11:41 (13/04/2022)', '17:58', '18:19', '18:35', '19:21', '19:16',
       '18:58', '20:10', '19:43', '19:33', '19:40', '20:30', '20:24',
       '20:38', '06:18 (13/04/2022)', '21:17', '21:07', '21:08',
       '13:04

In [10]:
# apply the logic, if there is a date then use it, if not then use the date parameter in our query
df[actual_field_name] = pd.to_datetime(
                        df.apply(lambda x: datetime.strptime(x['status'].split(' ')[-1][1:-1], '%d/%m/%Y').strftime('%Y-%m-%d') + ' ' +  
                                            x['status'].split(' ')[0]
                                            if len(x['status'].split(' ')) > 1 
                                            else x['date'] + ' ' + x['status'], axis=1)
                        , infer_datetime_format=True)

df.head()

Unnamed: 0,scheduled_departure,date,status,flight_num,airline,origin,destination,arrival,cargo,actual_departure
0,2022-04-12 00:00:00,2022-04-12,00:02,SV 989,SVA,HKG,HYD,False,True,2022-04-12 00:02:00
1,2022-04-12 00:00:00,2022-04-12,06:46,CV 7331,CLX,HKG,GYD,False,True,2022-04-12 06:46:00
2,2022-04-12 01:20:00,2022-04-12,03:02,5Y 097,GTI,HKG,ICN,False,True,2022-04-12 03:02:00
3,2022-04-12 01:55:00,2022-04-12,06:10,MS 510,MSR,HKG,BKK,False,True,2022-04-12 06:10:00
4,2022-04-12 02:00:00,2022-04-12,01:47,RH 829,HKC,HKG,PVG,False,True,2022-04-12 01:47:00


In [11]:
# finally, reorganise the dataframe into the needed order and drop unnecessary columns
df = df[[scheduled_field_name, actual_field_name, 'flight_num', 
        'origin', 'destination', 'airline', 'arrival', 'cargo']]

df.head()

Unnamed: 0,scheduled_departure,actual_departure,flight_num,origin,destination,airline,arrival,cargo
0,2022-04-12 00:00:00,2022-04-12 00:02:00,SV 989,HKG,HYD,SVA,False,True
1,2022-04-12 00:00:00,2022-04-12 06:46:00,CV 7331,HKG,GYD,CLX,False,True
2,2022-04-12 01:20:00,2022-04-12 03:02:00,5Y 097,HKG,ICN,GTI,False,True
3,2022-04-12 01:55:00,2022-04-12 06:10:00,MS 510,HKG,BKK,MSR,False,True
4,2022-04-12 02:00:00,2022-04-12 01:47:00,RH 829,HKG,PVG,HKC,False,True


### Putting it all together as functions

Formalising this process will require us to break this down into multiple functions. This will be helpful in the future if we would like to split this process into a DAG, where orchestration tools like Airflow can be used to regularly schedule data extraction. Summarising our approach from above, our DAG looks like this:

```Get API URL -> Request API -> Normalise JSON Response -> Extract and Transform Columns -> Return DataFrame```



In [12]:
def get_API_url(date_string: str, arrival: str, cargo: str) -> str:
    """
    Given date, arrival, cargo, return the API URL endpoint
    """
    return f'https://www.hongkongairport.com/flightinfo-rest/rest/flights/past?date={date_string}&arrival={arrival}&lang=en&cargo={cargo}'

In [13]:
def request_flights_data(API_url: str) -> list:
    """
    Request from API_url, if unsuccessful response, return an empty list.
    """ 
    request = requests.get(API_url)

    if request.status_code == 200:
        data = request.json()
    else:
        data = []
        print(f'Invalid Response from API: {request.status_code}')

    return data

In [14]:
def normalize_flights_data(flights_data: list) -> pd.DataFrame:
    """
    Takes the nested list of JSON objects and flattens it into a dataframe.
    """
    normalized_df = pd.json_normalize(flights_data, "list", 
                        ["date", "arrival", "cargo"],
                        errors='ignore', record_prefix='')

    return normalized_df
    

In [15]:
def extract_transform_flights(normalized_df: pd.DataFrame, arrival: str) -> pd.DataFrame:
    """
    Takes the needed columns from the normalized df and transforms them according to
    the type of flights being extracted: arrival or departure.
    
    If arrival flights:
    - Take the last flight before arriving to HKG as flight
    - Take the last origin before arriving to HKG as origin
    - Use HKG for destination 

    If departure flights:
    - Take the first flight after departing from HKG as flight
    - Take the first destination after departing from HKG as destination
    - Use HKG for origin 
    """

    df = pd.DataFrame()

    # Adjustments needed to be made depending on the query parameter arrival
    scheduled_field_name = 'scheduled_arrival' if arrival == 'true' else 'scheduled_departure'
    actual_field_name = 'actual_arrival' if arrival == 'true' else 'actual_departure'


    df[scheduled_field_name] = pd.to_datetime(normalized_df['date']+' '+normalized_df['time'], 
                                            infer_datetime_format=True)

    # clean away all alphabetical characters in status, this is the actual timestamp
    df['date'] = normalized_df['date']
    df['status'] = normalized_df['status'].str.replace(r"[a-zA-Z]",'').str.strip()

    df['flight_num'] = normalized_df['flight'].apply(lambda x : x[-1].get("no")) if arrival == 'true' else normalized_df['flight'].apply(lambda x : x[0].get("no"))
    df['airline'] = normalized_df['flight'].apply(lambda x : x[-1].get("airline")) if arrival == 'true' else normalized_df['flight'].apply(lambda x : x[0].get("airline"))

    df['origin'] = normalized_df['origin'].apply(lambda x : x[-1]) if arrival == 'true' else 'HKG'
    df['destination'] = 'HKG' if arrival == 'true' else normalized_df['destination'].apply(lambda x : x[0])

    df['arrival'] = normalized_df['arrival'].astype(bool)
    df['cargo'] = normalized_df['cargo'].astype(bool)

    # apply the logic, if there is a date then use it, if not then use the date parameter in our query
    df[actual_field_name] = pd.to_datetime(
                            df.apply(lambda x: datetime.strptime(x['status'].split(' ')[-1][1:-1], '%d/%m/%Y').strftime('%Y-%m-%d') + ' ' +  
                                                x['status'].split(' ')[0]
                                                if len(x['status'].split(' ')) > 1 
                                                else x['date'] + ' ' + x['status'], axis=1)
                            , infer_datetime_format=True)

    return df[[scheduled_field_name, actual_field_name, 'flight_num', 
        'origin', 'destination', 'airline', 'arrival', 'cargo']]

In [16]:
def get_flights(date_string: str, arrival: str, cargo: str) -> pd.DataFrame:
    """
    Return the parsed flights data into a dataframe, encapsulating all subfunctions into one.
    """

    API_url = get_API_url(date_string, arrival, cargo)
    flights_data = request_flights_data(API_url)
    normalized_df = normalize_flights_data(flights_data)
    df = extract_transform_flights(normalized_df, arrival)

    return df

In [17]:
# Testing out the functions
DATE_STRING = '2022-06-08'
ARRIVAL = 'true'
CARGO = 'true'

flights = get_flights(DATE_STRING, ARRIVAL, CARGO)

flights.head()

  df['status'] = normalized_df['status'].str.replace(r"[a-zA-Z]",'').str.strip()


Unnamed: 0,scheduled_arrival,actual_arrival,flight_num,origin,destination,airline,arrival,cargo
0,2022-06-08 00:25:00,2022-06-08 00:06:00,LD 129,ICN,HKG,AHK,True,True
1,2022-06-08 00:25:00,2022-06-08 00:04:00,LD 681,TPE,HKG,AHK,True,True
2,2022-06-08 00:30:00,2022-06-08 01:19:00,RH 318,HAN,HKG,HKC,True,True
3,2022-06-08 00:30:00,2022-06-08 00:00:00,EK 9824,MNL,HKG,UAE,True,True
4,2022-06-08 00:40:00,2022-06-08 00:26:00,LD 205,KIX,HKG,AHK,True,True


### Next Steps

Now that we have one function ```get_flights``` that directly returns flights given ```date```, ```arrival```, ```cargo```, we can then run a loop and for:
- ```date``` in ```range(t-91, t-1)```
- ```arrival``` in ```['true', 'false']```
- ```cargo``` in ```['true', 'false']```

Once we get the past records, we can then:
1. Initialise a table in GCP Bigquery
2. Set up a Google Cloud Function to run ```get_flights``` for ```t=t-1```, ```arrival``` in ```['true', 'false']```, and ```cargo``` in ```['true', 'false']```
3. Schedule the function to run every day using a Cloud Scheduler

Once this is set, everyday we will get the previous day of flight records, and our records will have a continuous stream of records flowing in. Setting this up earlier on in this project is very helpful as we will continue to accumulate records as we develop this project further into exploration and prediction tasks.