## Exercise 1: Data Modeling

Using draw.io create a data model. Your data model MUST meet the following requirements:

1. Contain a _tickets_ fact table
1. Contain the following dimensions: _airlines_, _airports_, and _passengers_
1. Develop _passengers_ as an SCD Type2 dimension:
    - Passenger email can be used as the natural key
    - Be sure to add a surrogate key and effective start/end dates
    - You can optionally add an active column
1. IATA codes can be used as the primary key for both _airlines_ and _airports_
1. Use the t-ticket number as the primary key for the _tickets_ fact

In [23]:
# Import the data from json file and explore the data

import os
import sys
import pandas as pd
import logging
from google.cloud import bigquery
from hashlib import md5
from typing import List
import json

# **** SETUP ****

# change to match your filesystem
DATA_DIR = "../data/airtravel/"
DEFAULT_RECEIPTS_FILE = os.path.join(DATA_DIR, "tickets.json")
PROJECT_NAME = "deb-01-371820"
DATASET_NAME = "air_travel"

data = []
with open('./data/air_travel/tickets.json', 'r') as f:
    for line in f:
        data.append(json.loads(line))

df = pd.DataFrame.from_dict(pd.json_normalize(data), orient='columns')

display(df.head(n=10))

Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,status,airline.name,airline.iata,airline.icao,airline.callsign,...,passenger.last_name,passenger.gender,passenger.birth_date,passenger.email,passenger.street,passenger.city,passenger.state,passenger.zip,origin,destination
0,498-938211-0795,ZVFDC4,2022-03-23,723.42,31I,active,China Eastern Airlines,MU,CES,CHINA EASTERN,...,Brown,M,1969-02-17,robert.brown.69@hotmail.com,5007 Thomas Way,Lake Hollystad,DC,20027,,
1,482-850738-6048,IL5GUI,2022-03-23,765.18,29B,active,Hawaiian Airlines,HA,HAL,HAWAIIAN,...,Kent,F,1998-08-05,laura.kent.98@hotmail.com,13991 Davis Village,North Catherineborough,PA,16516,,
2,275-207321-8092,CYEFBC,2022-03-21,753.89,26I,active,Wizz Air,W6,WZZ,WIZZ AIR,...,Tucker,F,1965-01-22,lisa.tucker.65@hotmail.com,04135 Marvin Via,North Kristabury,MA,1093,,
3,246-793315-3102,ZNGPC2,2022-03-22,793.89,15A,active,AirAsia,AK,AXM,ASIAN EXPRESS,...,Yates,NB,1975-03-31,matthew.yates.75@yahoo.com,76045 Samantha Road Suite 111,Lake Jeffrey,DE,19898,,
4,091-128904-1226,MGSBD9,2022-03-24,820.25,17F,active,Xiamen Airlines,MF,CXA,XIAMEN AIR,...,Villanueva,NB,1945-08-14,megan.villanueva.45@hotmail.com,848 Melissa Springs Suite 947,Kellerstad,TX,76177,,
5,115-196069-8963,XFYQC0,2022-03-23,892.69,18C,active,Air New Zealand,NZ,ANZ,NEW ZEALAND,...,Hall,NB,1944-08-31,sarah.hall.44@gmail.com,75420 Michael Mountains Suite 485,New Victoria,HI,96727,,
6,396-673460-1326,N5UOOZ,2022-03-23,889.53,3C,active,Jeju Air,7C,JJA,JEJU AIR,...,Thompson,M,1968-05-02,seth.thompson.68@yahoo.com,22455 Higgins Junction Apt. 042,New Keith,OR,97405,,
7,380-894599-8109,PAA19Y,2022-03-22,706.78,7D,active,American Airlines,AA,AAL,AMERICAN,...,Garcia,F,1950-02-12,jennifer.garcia.50@gmail.com,6607 Sharp Common,Chadstad,VA,22121,,
8,614-960971-2686,EF4BHJ,2022-03-23,486.4,24J,active,Juneyao Airlines,HO,DKH,JUNEYAO AIRLINES,...,Clark,F,1991-11-09,becky.clark.91@gmail.com,691 Jones Cliffs,Michaelburgh,TX,76003,,
9,481-321233-0702,FVM9EE,2022-03-23,855.93,16A,active,Royal Air Maroc,AT,RAM,ROYALAIR MAROC,...,Cook,M,1976-07-29,ronald.cook.76@hotmail.com,93328 Davis Island,Rodriguezside,MD,21408,,


Here is the Data Model that includes:
1. tickets fact table
2. airlines dimension table
3. airports dimension table
4. passengers dimension table

<img src="./imgs/air_travel_data_modeling.drawio (3).png" alt="data model" width="640" /> 

## Exercise 2: Data Loading and Normalization

Develop an ETL pipeline that loads our dimensions and facts from the source file. You pipeline MUST meet the following requirements:

**General**:
- Load all dimensions in order: _airlines_, _airports_, and _passengers_
- Load the _tickets_ fact table after loading dimensions
- Your pipeline can drop/replace tables
- You can assume only inserts at this state. No updates, deletes, or merges

**Airlines Dim:**
- Identify unique airlines
- Use IATA code as the dimension key

**Airports Dim:**
- Identify unique airports from both origin and destination fields
- Use IATA code as the dimension key

**Passengers Dim:**
- Identify unique passengers
- Use the passenger email as the dimension natural key
- Generate UUIDs for the dimension surrogate keys
- Set the effective start date to any date. You can either use the ticket date, current date, or a fixed set date in the past
- Set the effective end date to None
- Optionally set your active flag to 'Y'
- Passenger address columns are considered SCD Type 2 columns
- All other columns are SCD Type 1

**Tickets Fact:**
- Link to _airlines_ and _airports_ dimensions by their IATA codes. 
- You don't need a lookup for _airlines_ and _airports_ since we use their IATA as dimension keys
- Link to the _passengers_ dimension by its surrogate key
- You need to perform a lookup to the _passengers_ dimension
- Load all the tickets

<br><br>

In [24]:
DIMS_TABLE_METADATA = {
    'airlines': {
        'table_name': 'airlines',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('airline_iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('airline_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('airline_icao', 'string', mode='NULLABLE'),
            bigquery.SchemaField('airline_callsign', 'string', mode='NULLABLE'),
            bigquery.SchemaField('airline_country', 'string', mode='NULLABLE'),
            bigquery.SchemaField('created_at', 'timestamp', mode='NULLABLE'),
            bigquery.SchemaField('modified_at', 'timestamp', mode='NULLABLE'),
        ],
    },
     'airports': {
        'table_name': 'airports',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('airport_iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('airport_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('airport_city', 'string', mode='NULLABLE'),
            bigquery.SchemaField('airport_icao', 'string', mode='NULLABLE'),
            bigquery.SchemaField('airport_latitude', 'float', mode='NULLABLE'),
            bigquery.SchemaField('airport_longitude', 'float', mode='NULLABLE'),
            bigquery.SchemaField('airport_altitude', 'float', mode='NULLABLE'),
            bigquery.SchemaField('airport_tz_timezone', 'string', mode='NULLABLE'),
            bigquery.SchemaField('created_at', 'timestamp', mode='NULLABLE'),
            bigquery.SchemaField('modified_at', 'timestamp', mode='NULLABLE'),
        ],
    },
    'passengers': {
        'table_name': 'passengers',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('passenger_id', 'string', mode='REQUIRED'),
            bigquery.SchemaField('email', 'string', mode='REQUIRED'),
            bigquery.SchemaField('first_name', 'string', mode='NULLABLE'),
            bigquery.SchemaField('last_name', 'string', mode='NULLABLE'),
            bigquery.SchemaField('birth_date', 'string', mode='NULLABLE'),
            bigquery.SchemaField('passenger_street', 'string', mode='NULLABLE'),
            bigquery.SchemaField('passenger_city', 'string', mode='NULLABLE'),
            bigquery.SchemaField('passenger_state', 'string', mode='NULLABLE'),
            bigquery.SchemaField('passenger_zip', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('effective_start_date', 'string', mode='NULLABLE'),
            bigquery.SchemaField('effective_end_date', 'string', mode='NULLABLE'),
            bigquery.SchemaField('created_at', 'timestamp', mode='NULLABLE'),
            bigquery.SchemaField('modified_at', 'timestamp', mode='NULLABLE'),
        ],
    },
}

In [25]:
# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level


# **** BIGQUERY CLIENT ****
logger.debug(f"Creating bigquery client")
client = bigquery.Client()

logger.info(f"Setup Completed")

[DEBUG][2023-01-06 21:03:53,861][309645063:0013] : Creating bigquery client
[INFO ][2023-01-06 21:03:53,869][309645063:0016] : Setup Completed


In [26]:
# create dataset
dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

logger.info(f"Created air_travel dataset: {dataset.full_dataset_id}")

[INFO ][2023-01-06 21:03:57,393][2174618521:0007] : Created air_travel dataset: deb-01-371820:air_travel


In [27]:
# Create dataframe for airlines to prep for table load

airlines_df = df

airline_cols = ['airline.iata', 'airline.name','airline.icao', 'airline.callsign', 'airline.country']
airlines_df = airlines_df.groupby(airline_cols).all()
airlines_df = airlines_df.reset_index().loc[:, airline_cols]


airlines_df = airlines_df.rename(columns={
    'airline.iata': 'airline_iata', 
    'airline.name':'airline_name', 
    'airline.icao':'airline_icao',
    'airline.callsign':'airline_callsign',
    'airline.country':'airline_country',
    })

airlines_df['created_at'] = pd.Timestamp.now()
airlines_df['modified_at'] = None

airlines_df.set_index('airline_iata', inplace=True)

logger.info(f"airlines dim - found {len(airlines_df.index)} rows")

display(airlines_df)

[INFO ][2023-01-06 21:04:01,566][2107107189:0023] : airlines dim - found 48 rows


Unnamed: 0_level_0,airline_name,airline_icao,airline_callsign,airline_country,created_at,modified_at
airline_iata,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
3U,Sichuan Airlines,CSC,SI CHUAN,China,2023-01-06 21:04:01.565246,
7C,Jeju Air,JJA,JEJU AIR,Republic of Korea,2023-01-06 21:04:01.565246,
9K,Cape Air,KAP,CAIR,United States,2023-01-06 21:04:01.565246,
9S,Spring Airlines,CQH,AIR SPRING,China,2023-01-06 21:04:01.565246,
AA,American Airlines,AAL,AMERICAN,United States,2023-01-06 21:04:01.565246,
AC,Air Canada,ACA,AIR CANADA,Canada,2023-01-06 21:04:01.565246,
AF,Air France,AFR,AIRFRANS,France,2023-01-06 21:04:01.565246,
AK,AirAsia,AXM,ASIAN EXPRESS,Malaysia,2023-01-06 21:04:01.565246,
AS,Alaska Airlines,ASA,Inc.,ALASKA,2023-01-06 21:04:01.565246,
AT,Royal Air Maroc,RAM,ROYALAIR MAROC,Morocco,2023-01-06 21:04:01.565246,


In [28]:
origin_df = df
dest_df = df

airport_cols_names = ['name', 'city','country', 'iata', 'icao','latitude','longitude','altitude', 'tz_timezone']

origin_cols = ['origin.' + col for col in airport_cols_names]
dest_cols = ['destination.' + col for col in airport_cols_names]
airports_cols = ['airport_' + col for col in airport_cols_names]

# get unique values for origin airports
origin_df = origin_df.groupby(origin_cols).all()
origin_df = origin_df.reset_index().loc[:, origin_cols]

# rename cols in prep for concat and creation of consolidated airport table

origin_df = origin_df.rename(columns={
    'origin.name': 'airport_name', 
    'origin.city':'airport_city', 
    'origin.country':'airport_country',
    'origin.iata':'airport_iata',
    'origin.icao':'airport_icao',
    'origin.latitude':'airport_latitude',
    'origin.longitude':'airport_longitude',
    'origin.altitude':'airport_altitude',
    'origin.tz_timezone':'airport_tz_timezone',
    })

# get unique values for dest airports

dest_df = dest_df.groupby(dest_cols).all()
dest_df = dest_df.reset_index().loc[:, dest_cols]

dest_df = dest_df.rename(columns={
    'destination.name': 'airport_name', 
    'destination.city':'airport_city', 
    'destination.country':'airport_country',
    'destination.iata':'airport_iata',
    'destination.icao':'airport_icao',
    'destination.latitude':'airport_latitude',
    'destination.longitude':'airport_longitude',
    'destination.altitude':'airport_altitude',
    'destination.tz_timezone':'airport_tz_timezone',
    })

# rename cols in prep for concat and creation of consolidated airport table

df_for_concat = [origin_df,dest_df]

airports_df = pd.concat(df_for_concat)

# get unique values for combined origin and dest airports

airports_df = airports_df.groupby(airports_cols).all()
airports_df = airports_df.reset_index().loc[:, airports_cols]

airports_df['created_at'] = pd.Timestamp.now()
airports_df['modified_at'] = None

airports_df.set_index('airport_iata', inplace=True)

logger.info(f"airport dim - found {len(airports_df.index)} rows")

display(airports_df)

[INFO ][2023-01-06 21:04:08,010][4108904893:0061] : airport dim - found 386 rows


Unnamed: 0_level_0,airport_name,airport_city,airport_country,airport_icao,airport_latitude,airport_longitude,airport_altitude,airport_tz_timezone,created_at,modified_at
airport_iata,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
AUH,Abu Dhabi International Airport,Abu Dhabi,United Arab Emirates,OMAA,24.43,54.65,88.0,Asia/Dubai,2023-01-06 21:04:08.009606,
MAD,Adolfo Suárez Madrid–Barajas Airport,Madrid,Spain,LEMD,40.47,-3.56,1998.0,Europe/Madrid,2023-01-06 21:04:08.009606,
CWB,Afonso Pena Airport,Curitiba,Brazil,SBCT,-25.53,-49.18,2988.0,America/Sao_Paulo,2023-01-06 21:04:08.009606,
MCP,Alberto Alcolumbre Airport,Macapa,Brazil,SBMQ,0.05,-51.07,56.0,America/Fortaleza,2023-01-06 21:04:08.009606,
ABQ,Albuquerque International Sunport,Albuquerque,United States,KABQ,35.04,-106.61,5355.0,America/Denver,2023-01-06 21:04:08.009606,
...,...,...,...,...,...,...,...,...,...,...
ZUH,Zhuhai Jinwan Airport,Zhuhai,China,ZGSD,22.01,113.38,23.0,Asia/Shanghai,2023-01-06 21:04:08.009606,
MCZ,Zumbi dos Palmares Airport,Maceio,Brazil,SBMO,-9.51,-35.79,387.0,America/Fortaleza,2023-01-06 21:04:08.009606,
ZRH,Zürich Airport,Zurich,Switzerland,LSZH,47.46,8.55,1416.0,Europe/Zurich,2023-01-06 21:04:08.009606,
MDQ,Ástor Piazzola International Airport,Mar Del Plata,Argentina,SAZM,-37.93,-57.57,72.0,America/Buenos_Aires,2023-01-06 21:04:08.009606,


In [29]:
import uuid

pngr_df = df

pngr_cols_names = ['email','first_name','last_name', 'birth_date', 'street','city','state','zip',]
pngr_cols = ['passenger.' + col for col in pngr_cols_names]

# get unique passengers
pngr_df = pngr_df.groupby(pngr_cols).all()
pngr_df = pngr_df.reset_index().loc[:, pngr_cols]

pngr_df = pngr_df.rename(columns={
    'passenger.email': 'email', 
    'passenger.first_name':'first_name', 
    'passenger.last_name':'last_name',
    'passenger.birth_date':'birth_date',
    'passenger.street':'passenger_street',
    'passenger.city':'passenger_city',
    'passenger.state':'passenger_state',
    'passenger.zip':'passenger_zip',
    })

# create UUIDs
pngr_df.insert(0,'passenger_id', uuid.uuid4())
pngr_df = pngr_df.astype({'passenger_id': str, 'passenger_zip': int})

# create effective_start_date and effective_end_date col created
pngr_df['effective_start_date'] = "2001-01-01"
pngr_df['effective_end_date'] = None

pngr_df['created_at'] = pd.Timestamp.now()
pngr_df['modified_at'] = None

display(pngr_df)

Unnamed: 0,passenger_id,email,first_name,last_name,birth_date,passenger_street,passenger_city,passenger_state,passenger_zip,effective_start_date,effective_end_date,created_at,modified_at
0,b742d38f-af6e-49ad-bebf-b9c8850720c6,annette.hawkins.43@yahoo.com,Annette,Hawkins,1943-07-11,361 Robinson Green Apt. 635,North Lynntown,NV,89825,2001-01-01,,2023-01-06 21:04:14.228785,
1,b742d38f-af6e-49ad-bebf-b9c8850720c6,autumn.morse.60@hotmail.com,Autumn,Morse,1960-01-18,6984 Price Shoals,Erictown,HI,96818,2001-01-01,,2023-01-06 21:04:14.228785,
2,b742d38f-af6e-49ad-bebf-b9c8850720c6,becky.clark.91@gmail.com,Becky,Clark,1991-11-09,691 Jones Cliffs,Michaelburgh,TX,76003,2001-01-01,,2023-01-06 21:04:14.228785,
3,b742d38f-af6e-49ad-bebf-b9c8850720c6,belinda.cook.91@hotmail.com,Belinda,Cook,1991-01-26,1965 Kelly Field Apt. 094,Jonesberg,IL,60613,2001-01-01,,2023-01-06 21:04:14.228785,
4,b742d38f-af6e-49ad-bebf-b9c8850720c6,carl.wilson.80@hotmail.com,Carl,Wilson,1980-04-24,2814 Houston Hills,Rodriguezside,IA,51971,2001-01-01,,2023-01-06 21:04:14.228785,
5,b742d38f-af6e-49ad-bebf-b9c8850720c6,cheryl.hughes.45@gmail.com,Cheryl,Hughes,1945-05-20,00992 Garcia Plaza Suite 367,North Chelseamouth,CT,6315,2001-01-01,,2023-01-06 21:04:14.228785,
6,b742d38f-af6e-49ad-bebf-b9c8850720c6,christian.stevenson.93@hotmail.com,Christian,Stevenson,1993-06-14,75945 Jennifer Loaf,Pooleland,KY,40009,2001-01-01,,2023-01-06 21:04:14.228785,
7,b742d38f-af6e-49ad-bebf-b9c8850720c6,corey.cook.83@gmail.com,Corey,Cook,1983-06-14,9606 Barton Station Apt. 271,Jacquelinemouth,IN,47081,2001-01-01,,2023-01-06 21:04:14.228785,
8,b742d38f-af6e-49ad-bebf-b9c8850720c6,danielle.henderson.70@hotmail.com,Danielle,Henderson,1970-08-11,7389 Alec Squares Suite 508,Port Jonathan,NM,87320,2001-01-01,,2023-01-06 21:04:14.228785,
9,b742d38f-af6e-49ad-bebf-b9c8850720c6,hannah.smith.66@gmail.com,Hannah,Smith,1966-07-01,230 Donna Street,Lake Adrianstad,MN,56413,2001-01-01,,2023-01-06 21:04:14.228785,


**Tickets Fact:**
- Link to _airlines_ and _airports_ dimensions by their IATA codes. 
- You don't need a lookup for _airlines_ and _airports_ since we use their IATA as dimension keys
- Link to the _passengers_ dimension by its surrogate key
- You need to perform a lookup to the _passengers_ dimension
- Load all teh tickets

<br><br>

In [30]:
## Create schema for the Fact table

FACTS_TABLE_METADATA = {
    'tickets': {
        'table_name': 'tickets',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('eticket_num', 'string', mode='REQUIRED'),
            bigquery.SchemaField('confirmation', 'string', mode='REQUIRED'),
            bigquery.SchemaField('ticket_date', 'string', mode='NULLABLE'),
            bigquery.SchemaField('price', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('seat', 'string', mode='NULLABLE'),
            bigquery.SchemaField('status', 'string', mode='NULLABLE'),
            bigquery.SchemaField('origin_iata', 'string', mode='NULLABLE'),
            bigquery.SchemaField('dest_iata', 'string', mode='NULLABLE'),
            bigquery.SchemaField('airline_iata', 'string', mode='NULLABLE'),
            bigquery.SchemaField('passenger_id', 'string', mode='NULLABLE'),
            bigquery.SchemaField('created_at', 'timestamp', mode='NULLABLE'),
            bigquery.SchemaField('modified_at', 'timestamp', mode='NULLABLE'),
        ],
    },
    'tmp_tickets': {
        'table_name': 'tmp_tickets',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('eticket_num', 'string', mode='REQUIRED'),
            bigquery.SchemaField('confirmation', 'string', mode='REQUIRED'),
            bigquery.SchemaField('ticket_date', 'string', mode='NULLABLE'),
            bigquery.SchemaField('price', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('seat', 'string', mode='NULLABLE'),
            bigquery.SchemaField('status', 'string', mode='NULLABLE'),
            bigquery.SchemaField('created_at', 'timestamp', mode='NULLABLE'),
            bigquery.SchemaField('modified_at', 'timestamp', mode='NULLABLE'),
        ],
    },
}

In [31]:
## create tickets dataframe and prep for fact table load

tickets_df = df

ticket_cols = ["eticket_num","confirmation","ticket_date","price","seat","status","origin.iata","destination.iata","airline.iata","passenger.email"]

# get unique passengers
tickets_df = tickets_df.groupby(ticket_cols).all()
tickets_df = tickets_df.reset_index().loc[:, ticket_cols]

tickets_df= tickets_df.rename(columns={
    'origin.iata': 'origin_iata', 
    'destination.iata':'dest_iata', 
    'airline.iata':'airline_iata',
    'passenger.email':'email',
    })

display(tickets_df)

Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,status,origin_iata,dest_iata,airline_iata,email
0,000-238065-9359,KXDIKE,2022-03-22,752.01,10D,active,JJN,ZCO,NZ,janice.zamora.00@gmail.com
1,000-444627-2536,CIC9HE,2022-03-24,530.72,15F,active,NVT,ORF,S7,laura.kent.98@hotmail.com
2,000-530995-8891,G85MJ9,2022-03-24,900.92,19J,active,KGD,GOI,IW,hannah.smith.66@gmail.com
3,000-833236-7714,7MGJRU,2022-03-22,659.55,30F,active,TUC,RIX,HO,julie.carlson.77@hotmail.com
4,001-188247-0462,Z4JMS3,2022-03-21,946.37,17J,active,TFN,RGN,ET,annette.hawkins.43@yahoo.com
...,...,...,...,...,...,...,...,...,...,...
3976,999-399884-2380,60PZJN,2022-03-24,489.56,11A,active,LIH,SKG,HR,cheryl.hughes.45@gmail.com
3977,999-414389-5735,PJLIZT,2022-03-23,786.83,4B,active,PMV,UPG,9K,laurie.york.84@gmail.com
3978,999-715254-1131,0DHGA3,2022-03-22,392.45,28J,active,BGA,PHL,MU,sarah.rivera.04@hotmail.com
3979,999-919488-5735,65VSGX,2022-03-23,364.47,22J,active,GVA,DCA,FM,kevin.moore.79@gmail.com


In [32]:
temp_png = pngr_df.reset_index()
temp_png = temp_png[['email','passenger_id']]
temp_png = temp_png.set_index('email')


consolidated_tickets = tickets_df.join(temp_png, on='email', how='inner')

tickets_df = consolidated_tickets.drop(columns='email').set_index('eticket_num')

tickets_df['created_at'] = pd.Timestamp.now()
tickets_df['modified_at'] = None

tickets_df = tickets_df.astype({'passenger_id': str})

logger.info(f"tickets dim - found {len(tickets_df.index)} rows")

display(tickets_df)

[INFO ][2023-01-06 21:04:26,283][1472635947:0015] : tickets dim - found 3981 rows


Unnamed: 0_level_0,confirmation,ticket_date,price,seat,status,origin_iata,dest_iata,airline_iata,passenger_id,created_at,modified_at
eticket_num,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
000-238065-9359,KXDIKE,2022-03-22,752.01,10D,active,JJN,ZCO,NZ,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159,
007-431058-7762,MJYSLN,2022-03-24,720.21,29E,active,OAK,SRE,MH,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159,
011-433525-9702,5MF358,2022-03-23,988.02,20F,active,ORF,PRG,SJ,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159,
013-115939-1473,90IN8Z,2022-03-23,364.47,30B,active,KWL,YNT,QR,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159,
025-257697-5811,NT8E2A,2022-03-23,507.59,28F,active,REC,NTE,MW,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159,
...,...,...,...,...,...,...,...,...,...,...,...
979-413386-4728,FU66JE,2022-03-22,419.36,4E,active,MSY,HEL,CZ,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159,
983-119488-7703,98BV7H,2022-03-22,713.44,28C,active,POS,LGA,NK,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159,
984-245886-5879,KIQDQM,2022-03-24,861.75,30E,active,TYN,CLO,DL,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159,
986-705619-4205,EAH78F,2022-03-22,853.21,18D,active,SJU,SXM,DL,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159,


In [33]:
# define the load table function

def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: List[bigquery.SchemaField], 
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    # *** run some checks ***
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    #  create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

In [34]:
# Load airlines dim table

# get table name and schema from DIMS_TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{DIMS_TABLE_METADATA['airlines']['table_name']}"
schema = DIMS_TABLE_METADATA['airlines']['schema']
# load dataframe
load_table(airlines_df, client, table_name, schema)

logger.info(f"loaded airlines dim")

[INFO ][2023-01-06 21:04:33,319][2444475212:0031] : loading table: 'deb-01-371820.air_travel.airlines'
[INFO ][2023-01-06 21:04:36,366][2444475212:0036] : loaded 48 rows into deb-01-371820:air_travel.airlines
[INFO ][2023-01-06 21:04:36,366][4250334571:0009] : loaded airlines dim


<img src="./imgs/airlines_table.png" alt="airlines_dim" width="640" />

In [35]:
# Load airports dim table

# get table name and schema from DIMS_TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{DIMS_TABLE_METADATA['airports']['table_name']}"
schema = DIMS_TABLE_METADATA['airports']['schema']
# load dataframe
load_table(airports_df, client, table_name, schema)

logger.info(f"loaded airports dim")

[INFO ][2023-01-06 21:04:45,050][2444475212:0031] : loading table: 'deb-01-371820.air_travel.airports'
[INFO ][2023-01-06 21:04:48,175][2444475212:0036] : loaded 386 rows into deb-01-371820:air_travel.airports
[INFO ][2023-01-06 21:04:48,177][2565523412:0009] : loaded airports dim


<img src="./imgs/airports_table.png" alt="airports dim" width="640" />

In [36]:
# Load passengers dim table

# get table name and schema from DIMS_TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{DIMS_TABLE_METADATA['passengers']['table_name']}"
schema = DIMS_TABLE_METADATA['passengers']['schema']
# load dataframe
load_table(pngr_df, client, table_name, schema)

logger.info(f"loaded passengers dim")

[INFO ][2023-01-06 21:04:51,907][2444475212:0031] : loading table: 'deb-01-371820.air_travel.passengers'
[INFO ][2023-01-06 21:04:55,534][2444475212:0036] : loaded 32 rows into deb-01-371820:air_travel.passengers
[INFO ][2023-01-06 21:04:55,534][2729046973:0009] : loaded passengers dim


<img src="./imgs/passengers_table.png" alt="passengers dim" width="640" />

In [37]:
# Load tickets fact table

# get table name and schema from FACTS_TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{FACTS_TABLE_METADATA['tickets']['table_name']}"
schema = FACTS_TABLE_METADATA['tickets']['schema']
# load dataframe
load_table(tickets_df, client, table_name, schema)

logger.info(f"loaded tickets facts")

[INFO ][2023-01-06 21:04:59,478][2444475212:0031] : loading table: 'deb-01-371820.air_travel.tickets'
[INFO ][2023-01-06 21:05:03,369][2444475212:0036] : loaded 3981 rows into deb-01-371820:air_travel.tickets
[INFO ][2023-01-06 21:05:03,370][3001121536:0009] : loaded tickets facts


<img src="./imgs/tickets_fact.png" alt="tickets fact table" width="640" />



## Exercise 3: Merging

We now would like to add a new ETL task to update issued tickets.

Take a look at the source file: [`data/air_travel/ticket_updates/ticket_updates.csv`](./data/air_travel/ticket_updates/ticket_updates.csv)

This file contains some updates to our tickets. Either the _price_, _seat number_, or the _status_ has been updated for these tickets.

Create an ETL task that:
1. Loads the updates into an staging table
1. Use the SQL merge statement to update the _tickets_ fact

In [40]:
# Creates the staging table for ticket fact updates

table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{FACTS_TABLE_METADATA['tmp_tickets']['table_name']}"
schema = FACTS_TABLE_METADATA['tmp_tickets']['schema']

tmp_tickets_table = bigquery.Table(table_name, schema=schema)
tmp_tickets_table = client.create_table(tmp_tickets_table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(tmp_tickets_table.project, tmp_tickets_table.dataset_id, tmp_tickets_table.table_id)
)

Created table deb-01-371820.air_travel.tmp_tickets


In [41]:
# Load the ticket updates from csv file

ticket_updates = pd.read_csv('./data/air_travel/ticket_updates/ticket_updates.csv', header=0)

ticket_updates['created_at'] = pd.Timestamp.now()
ticket_updates['modified_at'] = None

display(ticket_updates)

Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,status,created_at,modified_at
0,347-732788-7891,Q5HNVA,2022-03-23,716.17,5H,active,2023-01-06 21:07:57.219244,
1,029-319577-2821,J6DJZ5,2022-03-23,699.31,9B,active,2023-01-06 21:07:57.219244,
2,848-510222-2496,G6KDFL,2022-03-24,536.15,23C,canceled,2023-01-06 21:07:57.219244,
3,543-508522-3675,CPVAKY,2022-03-22,674.63,10E,active,2023-01-06 21:07:57.219244,
4,276-522185-5064,SSQY4W,2022-03-23,947.09,13D,active,2023-01-06 21:07:57.219244,
5,142-179066-5861,YC9W4A,2022-03-23,233.57,32I,canceled,2023-01-06 21:07:57.219244,
6,396-827416-8143,7179Y4,2022-03-21,641.4,31I,active,2023-01-06 21:07:57.219244,
7,043-669807-8165,5CYR9R,2022-03-24,562.55,19B,active,2023-01-06 21:07:57.219244,
8,054-292614-8053,M0EZCU,2022-03-24,229.09,14I,active,2023-01-06 21:07:57.219244,
9,522-090877-0871,LGH7TL,2022-03-23,943.77,29I,active,2023-01-06 21:07:57.219244,


In [42]:
# Load ticket_updates to tmp_tickets staging table

# get table name and schema from FACTS_TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{FACTS_TABLE_METADATA['tmp_tickets']['table_name']}"
schema = FACTS_TABLE_METADATA['tmp_tickets']['schema']
# load dataframe
load_table(ticket_updates, client, table_name, schema)

logger.info(f"loaded tmp_tickets with ticket_updates facts")

[INFO ][2023-01-06 21:08:01,860][2444475212:0031] : loading table: 'deb-01-371820.air_travel.tmp_tickets'
[INFO ][2023-01-06 21:08:05,662][2444475212:0036] : loaded 10 rows into deb-01-371820:air_travel.tmp_tickets
[INFO ][2023-01-06 21:08:05,662][1447289466:0009] : loaded tmp_tickets with ticket_updates facts


In [1]:
# install db-dtypes in order for to_dataframe() to work

pip install db-dtypes

You should consider upgrading via the '/Users/Ruben/Desktop/data-modeling-cr/venv/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [45]:
# Perform the merge of updates of the ticket_updates into the tickets fact table

import db_dtypes

# using a SQL MERGE statement
# insert rows from the staging tables into the target table
merge_query = f"""
MERGE INTO `{PROJECT_NAME}.{DATASET_NAME}.tickets` as trg
USING `{PROJECT_NAME}.{DATASET_NAME}.tmp_tickets` as src
ON trg.eticket_num = src.eticket_num
WHEN MATCHED THEN
    UPDATE SET 
        price = src.price,
        seat = src.seat,
        status = src.status,
        modified_at = CURRENT_TIMESTAMP
;
"""

result = client.query(merge_query)

logger.info(f"query target and source tables to perform merge of ticket_updates staging table to ticket fact")

[INFO ][2023-01-06 21:10:46,618][4284047669:0022] : query target and source tables to perform merge of ticket_updates staging table to ticket fact


In [46]:
# Check that updates were captured for the 10 items and includes the modified_at getting updated based on the merge

check_merge_query = """
SELECT * 
FROM `deb-01-371820.air_travel.tickets`
WHERE modified_at IS NOT NULL
LIMIT 1000;
"""

check_result = client.query(check_merge_query)

check_df = check_result.to_dataframe()

display(check_df)

Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,status,origin_iata,dest_iata,airline_iata,passenger_id,created_at,modified_at
0,396-827416-8143,7179Y4,2022-03-21,641.4,31I,active,GDN,YWG,MH,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
1,543-508522-3675,CPVAKY,2022-03-22,674.63,10E,active,FAO,SNA,AS,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
2,029-319577-2821,J6DJZ5,2022-03-23,699.31,9B,active,CDG,MEM,CA,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
3,522-090877-0871,LGH7TL,2022-03-23,943.77,29I,active,ROS,WAW,HU,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
4,276-522185-5064,SSQY4W,2022-03-23,947.09,13D,active,LIH,BRU,QZ,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
5,142-179066-5861,YC9W4A,2022-03-23,233.57,32I,canceled,BLR,CMH,VN,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
6,347-732788-7891,Q5HNVA,2022-03-23,716.17,5H,active,JJN,MDE,WY,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
7,848-510222-2496,G6KDFL,2022-03-24,536.15,23C,canceled,CNF,MDZ,9K,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
8,043-669807-8165,5CYR9R,2022-03-24,562.55,19B,active,WUH,CKG,AA,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
9,054-292614-8053,M0EZCU,2022-03-24,229.09,14I,active,HKG,CLT,AF,b742d38f-af6e-49ad-bebf-b9c8850720c6,2023-01-06 21:04:26.280159+00:00,2023-01-07 05:10:46.679589+00:00
