## Exercise

Write a data pipeline that ingests the source data into a **fact** called `air_travel_passengers` with two supporting **dimensions** as `airlines` and `airports`.

To load the dimension tables, lookup additional columns from the supporting files: `global_airlines.csv` and `global_airports.csv`

<br>

Your data pipeline should look similar to:

<img src="./imgs/dm_air_travel_exercise.jpg" alt="Air Travel Pipeline" width="700" />

<br>

Your pipeline must meet the following requirements:

1. _airlines_ dimension:
    - Looking up additional airline columns such as iata and icao codes, callsign, and country
    - Generate a new airline_id by **hashing** the airline name
1. _airports_ dimension:
    - Using the airport iata code, look up additional column such as: airport lat/lon, icao code, and timezone information
    - Set the iata code as the airport_id column
1. _air\_travel\_passengers_ fact:
    - Look up both airline_id and airport_id from their dimension tables
    - Add a new column called _report\_date_ set to the 1st of the report month/year (as date data type)
    - Create a fact_id by hashing a **composite key** of: airline name, src, dest, year, and month

<br>

### Data Model

Using draw.io, create a data model of your target tables. You must show at least three final tables: `air_travel_passengers`, `airlines`, and `airports`

See data model below:

<img src="./imgs/us_monthly_air_passangers.drawio.png" alt="Air Travel Pipeline" width="400" />


### Data Pipeline

Develope your pipeline code. We recommend breaking down the pipeline into the following sections (code cells):

In [None]:
import os
import sys
import pandas as pd
import logging
from google.cloud import bigquery
from hashlib import md5
from typing import List

# **** SETUP ****

# change to match your filesystem
DATA_DIR = "../data/air_travel/"
DEFAULT_RECEIPTS_FILE = os.path.join(DATA_DIR, "us_monthly_air_passengers_sample.csv")
# change to match your gcloud project 
PROJECT_NAME = "deb-01-371820"
DATASET_NAME = "us_monthly_air_passengers"

# **** TABLE SCHEMAS ****

TABLE_METADATA = {
    'airlines': {
        'table_name': 'airlines',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('airline_id', 'string', mode='REQUIRED'),
            bigquery.SchemaField('carrier_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('iata', 'string', mode='NULLABLE'),
            bigquery.SchemaField('icao', 'string', mode='NULLABLE'),
            bigquery.SchemaField('callsign', 'string', mode='NULLABLE'),
            bigquery.SchemaField('country', 'string', mode='NULLABLE'),
        ],
    },
    'airports': {
        'table_name': 'airports',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('airport_id', 'string', mode='REQUIRED'),
            bigquery.SchemaField('name', 'string', mode='NULLABLE'),
            bigquery.SchemaField('city', 'string', mode='NULLABLE'),
            bigquery.SchemaField('country', 'string', mode='NULLABLE'),
            bigquery.SchemaField('icao', 'string', mode='NULLABLE'),
            bigquery.SchemaField('latitude', 'float', mode='NULLABLE'),
            bigquery.SchemaField('longitude', 'float', mode='NULLABLE'),
        ],
    },
}

filename = DEFAULT_RECEIPTS_FILE

: 

In [None]:
# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level


# **** BIGQUERY CLIENT ****
logger.debug(f"Creating bigquery client")
client = bigquery.Client()

logger.info(f"Setup Completed")

In [None]:
# create dataset if needed
dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

logger.info(f"Created US Monthly Passengers dataset: {dataset.full_dataset_id}")

In [None]:
# load source csv
air_df = pd.read_csv(filename)

# *** always perform check at the end ***
# check schema: contains all expected columns?
expected_columns = ['Sum_PASSENGERS','AIRLINE_ID','CARRIER_NAME','ORIGIN','ORIGIN_CITY_NAME','ORIGIN_STATE_ABR','ORIGIN_STATE_NM','ORIGIN_COUNTRY','ORIGIN_COUNTRY_NAME','DEST','DEST_CITY_NAME','DEST_STATE_ABR','DEST_STATE_NM','DEST_COUNTRY','DEST_COUNTRY_NAME','YEAR','MONTH']
for col in expected_columns:
    assert col in list(air_df.columns), f"Data file missing required column: {col}"

# assign & remember receipts dataframe
air_receipts_df = air_df
display(air_receipts_df.head(n=10))

In [None]:
# load airlines dim

# start from the receipts
airlines_df = air_receipts_df

# set of unique columns to return
cols = ['CARRIER_NAME','AIRLINE_ID']
# group by unique columns and only select them
airlines_df = airlines_df.groupby(cols).all()
airlines_df = airlines_df.reset_index().loc[:, cols]
# rename columns
airlines_df = airlines_df.rename(columns={'CARRIER_NAME': 'carrier_name'})

logger.info(f"airline dim - found {len(airlines_df.index)} rows")
display(airlines_df)

In [None]:
from hashlib import md5

# first define a generic function that returns the md4 hash for
# any combination of values
def get_hash(value) -> str:
    """return the md5 hash of all parameters"""
    return md5(value.encode(encoding='utf-16')).hexdigest()


logger.info(f"assigning airline ids: using md5 hash of airline name")

# airline_id = md5 hash of carrier_name
airlines_df['airline_id'] = airlines_df['carrier_name'].map(get_hash)
# set index by airline_id
airlines_df = airlines_df.set_index(keys='airline_id')

logger.info(f"airline ids generated")
display(airlines_df)

In [None]:
global_airline_df = pd.read_csv('../data/air_travel/global_airlines.csv', header=0)

global_airline_df = global_airline_df.rename(columns={'name': 'carrier_name'})
global_airline_df = global_airline_df[['carrier_name','iata','icao','callsign','country']]
global_airline_df = global_airline_df.set_index(keys='carrier_name')

airlines_final_df = airlines_df.join(global_airline_df, on='carrier_name', how='left')

airlines_final_df = airlines_final_df.drop(columns='AIRLINE_ID')
display(airlines_final_df)

In [None]:

def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: List[bigquery.SchemaField], 
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    # *** run some checks ***
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    #  create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

In [None]:
# get table name and schema from our TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['airlines']['table_name']}"
schema = schema=TABLE_METADATA['airlines']['schema']
# load dataframe
load_table(airlines_final_df, client, table_name, schema)

logger.info(f"loaded airlines dim")

In [None]:
# load airports dim

airports_df = pd.read_csv('../data/air_travel/global_airports.csv', header=0)

cols = ['iata','name','city','country','icao','latitude','longitude']

airports_df = airports_df.groupby(cols).all()
airports_df = airports_df.reset_index().loc[:, cols]

# rename columns
airports_df = airports_df.rename(columns={'iata': 'airport_id'})
# remove duplicates and airports without iata code
airports_df = airports_df.drop_duplicates(subset='airport_id')
airports_df = airports_df[airports_df.name != '(Duplicate) Playa Samara Airport']

airports_df = airports_df.set_index(keys='airport_id')

logger.info(f"airport dim - found {len(airlines_df.index)} rows")
display(airports_df)

In [None]:
# get table name and schema from our TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['airports']['table_name']}"
schema = schema=TABLE_METADATA['airports']['schema']
# load dataframe
load_table(airports_df, client, table_name, schema)

logger.info(f"loaded airports dim")