Make an ETL pipeline that loads from your `tickets.json` file

In [1]:
import os
import sys
import logging
import pandas as pd
from google.cloud import bigquery
from hashlib import md5
from typing import List
import json


# **** SETUP ****

# global variables for file system/ loading JSON file
JSON_DATA = "../data/tickets.json"
# project paths  
PROJECT_NAME = "abstract-flame-407818"
DATASET_NAME = "air_travel"


#schemas we will use for each table 

TABLE_METADATA = {
    'airlines': {
        'table_name': 'airlines',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('airline_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('airline_iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('airline_callsign', 'string', mode='REQUIRED'),
            bigquery.SchemaField('airline_country', 'string', mode='REQUIRED'),
        ],
    },
    'airports': {
        'table_name': 'airports',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('origin_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('origin_city', 'string', mode='REQUIRED'),
            bigquery.SchemaField('origin_country', 'string', mode='REQUIRED'),
            bigquery.SchemaField('origin_iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('origin_icao', 'string', mode='REQUIRED'),
            bigquery.SchemaField('dest_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('dest_city', 'string', mode='REQUIRED'),
            bigquery.SchemaField('dest_country', 'string', mode='REQUIRED'),
            bigquery.SchemaField('dest_iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('dest_icao', 'string', mode='REQUIRED'),
        ],
    },
    'passengers': {
        'table_name': 'passengers',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('UUID', 'string', mode='REQUIRED'),
            bigquery.SchemaField('pass_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('pass_gender', 'string', mode='REQUIRED'),
            bigquery.SchemaField('pass_birth_date', 'date', mode='REQUIRED'),
            bigquery.SchemaField('pass_email', 'string', mode='REQUIRED'),
            bigquery.SchemaField('pass_street', 'string', mode='REQUIRED'),
            bigquery.SchemaField('pass_city', 'string', mode='REQUIRED'),
            bigquery.SchemaField('pass_state', 'string', mode='REQUIRED'),
            bigquery.SchemaField('pass_zip', 'int', mode='REQUIRED'),
            bigquery.SchemaField('start_date', 'date', mode='REQUIRED'),
            bigquery.SchemaField('end_date', 'date', mode='NULLABLE'),
        ],
    },
    'tickets': {
        'table_name': 'tickets',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('airline_iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('origin_iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('eticket_num', 'string', mode='REQUIRED'),
            bigquery.SchemaField('confirmation', 'string', mode='REQUIRED'),
            bigquery.SchemaField('price', 'float', mode='REQUIRED'),
            bigquery.SchemaField('seat', 'string', mode='REQUIRED'),
            bigquery.SchemaField('status', 'string', mode='REQUIRED'),
            bigquery.SchemaField('ticket_date', 'date', mode='REQUIRED'),
        ],
    },
}

# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level

Create dataset in bigquery

In [5]:
#establish dataset ID
dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
client = bigquery.Client()

def create_dataset(client: bigquery.Client ,dataset_id: str, location: str = 'US') -> None:
    """create a bigquery dataset"""
    # create the dataset
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = location
    dataset = client.create_dataset(dataset, exists_ok=True)
    logger.info(f"Created Air Travel dataset: {dataset.full_dataset_id}")

[DEBUG][2023-12-23 13:27:21,915][_default:0255] : Checking /Users/kairo/.creds/dsa-deb-sa.json for explicit credentials as part of auth process...
[DEBUG][2023-12-23 13:27:21,917][_default:0255] : Checking /Users/kairo/.creds/dsa-deb-sa.json for explicit credentials as part of auth process...


Load tickets JSON into pandas dataframe

In [12]:
# Load JSON into pandas dataframe
df = pd.read_json('./data/tickets.json', lines=True)
def extract_columns(dataframe, column_name): 
        df_columns = dataframe[column_name].apply(pd.Series)
        df_columns = df_columns.rename(columns={col: column_name + '_' + col for col in df_columns.columns})
        extract_df = pd.concat([dataframe.drop([column_name], axis=1), df_columns], axis=1)
        return extract_df

airline_df = extract_columns(df, 'airline')
#df2 = extract_columns(df1, 'origin')
#df3 = extract_columns(df2, 'destination')
#final_df = extract_columns(df3, 'passenger')
display(airline_df)

Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,status,origin,destination,passenger,airline_name,airline_iata,airline_icao,airline_callsign,airline_country
0,498-938211-0795,ZVFDC4,2022-03-23,723.42,31I,active,{'name': 'Montreal / Pierre Elliott Trudeau In...,{'name': 'Chicago Midway International Airport...,"{'first_name': 'Robert', 'last_name': 'Brown',...",China Eastern Airlines,MU,CES,CHINA EASTERN,China
1,482-850738-6048,IL5GUI,2022-03-23,765.18,29B,active,"{'name': 'Longdongbao Airport', 'city': 'Guiya...","{'name': 'Ninoy Aquino International Airport',...","{'first_name': 'Laura', 'last_name': 'Kent', '...",Hawaiian Airlines,HA,HAL,HAWAIIAN,United States
2,275-207321-8092,CYEFBC,2022-03-21,753.89,26I,active,{'name': 'Licenciado Gustavo Díaz Ordaz Intern...,"{'name': 'Ibiza Airport', 'city': 'Ibiza', 'co...","{'first_name': 'Lisa', 'last_name': 'Tucker', ...",Wizz Air,W6,WZZ,WIZZ AIR,Hungary
3,246-793315-3102,ZNGPC2,2022-03-22,793.89,15A,active,"{'name': 'El Tepual Airport', 'city': 'Puerto ...","{'name': 'Gdańsk Lech Wałęsa Airport', 'city':...","{'first_name': 'Matthew', 'last_name': 'Yates'...",AirAsia,AK,AXM,ASIAN EXPRESS,Malaysia
4,091-128904-1226,MGSBD9,2022-03-24,820.25,17F,active,{'name': 'Baltimore/Washington International T...,"{'name': 'London Gatwick Airport', 'city': 'Lo...","{'first_name': 'Megan', 'last_name': 'Villanue...",Xiamen Airlines,MF,CXA,XIAMEN AIR,China
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4091,030-327889-3270,MVIBWK,2022-03-24,600.37,23E,active,"{'name': 'Don Mueang International Airport', '...",,"{'first_name': 'Janice', 'last_name': 'Zamora'...",Hainan Airlines,HU,CHH,HAINAN,China
4092,513-551750-0628,WZZGGB,2022-03-21,583.41,24F,active,{'name': 'Governador Aluízio Alves Internation...,{'name': 'Charles de Gaulle International Airp...,"{'first_name': 'Lisa', 'last_name': 'Tucker', ...",Malaysia Airlines,MH,MAS,MALAYSIAN,Malaysia
4093,118-106280-2530,WUD4KR,2022-03-22,203.45,17H,active,"{'name': 'Taiyuan Wusu Airport', 'city': 'Taiy...","{'name': 'Miami International Airport', 'city'...","{'first_name': 'Janice', 'last_name': 'Zamora'...",Frontier Airlines,F9,FFT,FRONTIER FLIGHT,United States
4094,961-278558-3018,VI5039,2022-03-21,554.59,18G,active,"{'name': 'San Carlos De Bariloche Airport', 'c...","{'name': 'Hamad International Airport', 'city'...","{'first_name': 'Corey', 'last_name': 'Cook', '...",Royal Air Maroc,AT,RAM,ROYALAIR MAROC,Morocco


Load airports dimension table 

In [17]:
def airlines_get_unique(df: pd.DataFrame) -> pd.DataFrame:
    """
    Returns a unique set of products from the tickets dataframe 

    Args:
        df (pd.DataFrame): tickets dataframe

    Returns:
        pd.DataFrame: return value. unique airline iatas 
    """
    logger.debug(f"getting unique iatas...")
    # group by unique columns and only select them
    cols = ['airline_name', 'airline_iata', 'airline_icao', 'airline_callsign', 'airline_country']
    df = df.groupby(cols).all()
    return df
airline_df = airlines_get_unique(airline_df)
display(airline_df)


[DEBUG][2023-12-23 13:45:33,997][3548695768:0011] : getting unique iatas...


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,eticket_num,confirmation,ticket_date,price,seat,status,origin,destination,passenger
airline_name,airline_iata,airline_icao,airline_callsign,airline_country,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
Air Canada,AC,ACA,AIR CANADA,Canada,True,True,True,True,True,True,True,True,True
Air China,CA,CCA,AIR CHINA,China,True,True,True,True,True,True,True,True,True
Air France,AF,AFR,AIRFRANS,France,True,True,True,True,True,True,True,True,True
Air New Zealand,NZ,ANZ,NEW ZEALAND,New Zealand,True,True,True,True,True,True,True,True,True
AirAsia,AK,AXM,ASIAN EXPRESS,Malaysia,True,True,True,True,True,True,True,True,True
Alaska Airlines,AS,ASA,Inc.,ALASKA,True,True,True,True,True,True,True,True,True
Allegiant Air,G4,AAY,ALLEGIANT,United States,True,True,True,True,True,True,True,True,True
American Airlines,AA,AAL,AMERICAN,United States,True,True,True,True,True,True,True,True,True
British Airways,BA,BAW,SPEEDBIRD,United Kingdom,True,True,True,True,True,True,True,True,True
Cape Air,9K,KAP,CAIR,United States,True,True,True,True,True,True,True,True,True


Load the airlines dataframe columns into a bigquery table