In [33]:
import os
import sys
import pandas as pd
import logging
import json
import uuid
import datetime
from google.cloud import bigquery
from hashlib import md5
from typing import List


# SETUP

DATA_DIR = "data/air_travel/"
DEFAULT_TICKETS_FILE = os.path.join(DATA_DIR, "tickets.json") 
PROJECT_NAME = "deb-01-372120"
DATASET_NAME = "air_travel"


# **** TABLE SCHEMAS ****

TABLE_METADATA = {
    'airlines': {
        'table_name': 'airlines',
        'schema': [
            bigquery.SchemaField('name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('icao', 'string', mode='REQUIRED'),
            bigquery.SchemaField('callsign', 'string', mode='REQUIRED'),
            bigquery.SchemaField('country', 'string', mode='REQUIRED')
        ]
    },
    'airports': {
        'table_name': 'airports',
        'schema': [
            bigquery.SchemaField('name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('city', 'string', mode='REQUIRED'),
            bigquery.SchemaField('country', 'string', mode='REQUIRED'),
            bigquery.SchemaField('iata', 'string', mode='REQUIRED'),
            bigquery.SchemaField('icao', 'string', mode='REQUIRED'),
            bigquery.SchemaField('latitude', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('longitude', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('altitude', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('tz_timezone', 'string', mode='REQUIRED')
        ]
    },
    'passengers': {
        'table_name': 'passengers',
        'schema': [
            bigquery.SchemaField('first_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('last_name', 'string', mode='REQUIRED'),
            bigquery.SchemaField('gender', 'string', mode='REQUIRED'),
            bigquery.SchemaField('birth_date', 'datetime64[ns]', mode='REQUIRED'),
            bigquery.SchemaField('email', 'string', mode='REQUIRED'),
            bigquery.SchemaField('street','string', mode='REQUIRED'),
            bigquery.SchemaField('city', 'string', mode='REQUIRED'),
            bigquery.SchemaField('state', 'string', mode='REQUIRED'),
            bigquery.SchemaField('zip', 'int64', mode='REQUIRED'),
            bigquery.SchemaField('start_date', 'datetime64[ns]', mode='NULLABLE'),
            bigquery.SchemaField('end_date', 'datetime[ns]', mode='NULLABLE'),
            bigquery.SchemaField('uuid', 'string', mode='REQUIRED')
        ]
    },
    'tickets': {
        'table_name': 'tickets',
        'schema': [
            bigquery.SchemaField('eticket_num', 'string', mode='REQUIRED'),
            bigquery.SchemaField('confirmation', 'string', mode='REQUIRED'),
            bigquery.SchemaField('ticket_date', 'date', mode='REQUIRED'),
            bigquery.SchemaField('price', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('seat', 'string', mode='REQUIRED'),
            bigquery.SchemaField('status','string', mode='REQUIRED'),
            bigquery.SchemaField('airline', 'string', mode='REQUIRED'), 
            bigquery.SchemaField('origin', 'string', mode='REQUIRED'),
            bigquery.SchemaField('destination', 'string', mode='REQUIRED'),
            # bigquery.SchemaField('passenger', 'string', mode='REQUIRED')
        ]
    }
}


# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level


# **** BIGQUERY CLIENT ****
logger.debug(f"Creating bigquery client")
client = bigquery.Client()

logger.info(f"Setup Completed")


[DEBUG][2023-01-31 20:07:23,436][3978412263:0095] : Creating bigquery client
[INFO ][2023-01-31 20:07:23,696][3978412263:0098] : Setup Completed


In [2]:
# create dataset 
dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

logger.info(f"Created air travel dataset: {dataset.full_dataset_id}")

[INFO ][2023-01-31 15:27:09,133][2632008059:0007] : Created air travel dataset: deb-01-372120:air_travel


In [3]:
# air travel data file name
filename = DEFAULT_TICKETS_FILE
logger.debug(f"attempting to process: {filename}")

# check if the file exists
assert os.path.exists(filename), f"Data file does not exists: '{filename}'"
# check if the file contains any data 
assert os.path.getsize(filename) > 78, f"Data file size incorrect; does not seem to contain data: '{filename}'"

# convert json into an array and then load into dataframe
data = [json.loads(line) for line in open(filename, 'r')]

df = pd.json_normalize(data)
logger.info(f"loaded {len(df.index)} rows from: {filename}")

# check schema: contains all expected columns?
expected_columns = [
    'eticket_num',
    'confirmation',
    'ticket_date',
    'price',
    'seat',
    'status',
    'airline.name',
    'airline.iata',
    'airline.icao',
    'airline.callsign',
    'airline.country',
    'origin.name',
    'origin.city',
    'origin.country', 
    'origin.iata', 
    'origin.icao', 
    'origin.latitude', 
    'origin.longitude', 
    'origin.altitude', 
    'origin.tz_timezone', 
    'destination.name', 
    'destination.city', 
    'destination.country', 
    'destination.iata', 
    'destination.icao', 
    'destination.latitude', 
    'destination.longitude', 
    'destination.altitude', 
    'destination.tz_timezone', 
    'passenger.first_name', 
    'passenger.last_name', 
    'passenger.gender', 
    'passenger.birth_date', 
    'passenger.email', 
    'passenger.street', 
    'passenger.city', 
    'passenger.state', 
    'passenger.zip'
    ]
for col in expected_columns:
    assert col in list(df.columns), f"Data file missing required column: {col}"

# assign & remember receipts dataframe
tickets_df = df
display(tickets_df.head(n=10))

[DEBUG][2023-01-31 15:27:14,364][829190179:0003] : attempting to process: data/air_travel/tickets.json
[INFO ][2023-01-31 15:27:14,735][829190179:0014] : loaded 4096 rows from: data/air_travel/tickets.json


Unnamed: 0,eticket_num,confirmation,ticket_date,price,seat,status,airline.name,airline.iata,airline.icao,airline.callsign,...,passenger.last_name,passenger.gender,passenger.birth_date,passenger.email,passenger.street,passenger.city,passenger.state,passenger.zip,origin,destination
0,498-938211-0795,ZVFDC4,2022-03-23,723.42,31I,active,China Eastern Airlines,MU,CES,CHINA EASTERN,...,Brown,M,1969-02-17,robert.brown.69@hotmail.com,5007 Thomas Way,Lake Hollystad,DC,20027,,
1,482-850738-6048,IL5GUI,2022-03-23,765.18,29B,active,Hawaiian Airlines,HA,HAL,HAWAIIAN,...,Kent,F,1998-08-05,laura.kent.98@hotmail.com,13991 Davis Village,North Catherineborough,PA,16516,,
2,275-207321-8092,CYEFBC,2022-03-21,753.89,26I,active,Wizz Air,W6,WZZ,WIZZ AIR,...,Tucker,F,1965-01-22,lisa.tucker.65@hotmail.com,04135 Marvin Via,North Kristabury,MA,1093,,
3,246-793315-3102,ZNGPC2,2022-03-22,793.89,15A,active,AirAsia,AK,AXM,ASIAN EXPRESS,...,Yates,NB,1975-03-31,matthew.yates.75@yahoo.com,76045 Samantha Road Suite 111,Lake Jeffrey,DE,19898,,
4,091-128904-1226,MGSBD9,2022-03-24,820.25,17F,active,Xiamen Airlines,MF,CXA,XIAMEN AIR,...,Villanueva,NB,1945-08-14,megan.villanueva.45@hotmail.com,848 Melissa Springs Suite 947,Kellerstad,TX,76177,,
5,115-196069-8963,XFYQC0,2022-03-23,892.69,18C,active,Air New Zealand,NZ,ANZ,NEW ZEALAND,...,Hall,NB,1944-08-31,sarah.hall.44@gmail.com,75420 Michael Mountains Suite 485,New Victoria,HI,96727,,
6,396-673460-1326,N5UOOZ,2022-03-23,889.53,3C,active,Jeju Air,7C,JJA,JEJU AIR,...,Thompson,M,1968-05-02,seth.thompson.68@yahoo.com,22455 Higgins Junction Apt. 042,New Keith,OR,97405,,
7,380-894599-8109,PAA19Y,2022-03-22,706.78,7D,active,American Airlines,AA,AAL,AMERICAN,...,Garcia,F,1950-02-12,jennifer.garcia.50@gmail.com,6607 Sharp Common,Chadstad,VA,22121,,
8,614-960971-2686,EF4BHJ,2022-03-23,486.4,24J,active,Juneyao Airlines,HO,DKH,JUNEYAO AIRLINES,...,Clark,F,1991-11-09,becky.clark.91@gmail.com,691 Jones Cliffs,Michaelburgh,TX,76003,,
9,481-321233-0702,FVM9EE,2022-03-23,855.93,16A,active,Royal Air Maroc,AT,RAM,ROYALAIR MAROC,...,Cook,M,1976-07-29,ronald.cook.76@hotmail.com,93328 Davis Island,Rodriguezside,MD,21408,,


In [4]:
# Gather unique airlines 
# start from the tickets_df
df = tickets_df

logger.debug(f"getting unique airlines...")

# set of unique columns to return
cols = [
  'airline.name', 
  'airline.iata', 
  'airline.icao', 
  'airline.callsign', 
  'airline.country'
  ]
# group by unique columns and only select them
df = df.groupby(cols).all()
df = df.reset_index().loc[:, cols]
# rename columns
df = df.rename(columns={
  'airline.name': 'name', 
  'airline.iata':'iata', 
  'airline.icao':'icao', 
  'airline.callsign':'callsign', 
  'airline.country':'country'
  })

# Set index to IATA
df = df.set_index(keys='iata')

logger.info(f"airlines dim - found {len(df.index)} rows")
df.head(10)

[DEBUG][2023-01-31 15:27:25,540][641296019:0005] : getting unique airlines...
[INFO ][2023-01-31 15:27:25,668][641296019:0018] : airlines dim - found 48 rows


Unnamed: 0_level_0,name,icao,callsign,country
iata,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
AC,Air Canada,ACA,AIR CANADA,Canada
CA,Air China,CCA,AIR CHINA,China
AF,Air France,AFR,AIRFRANS,France
NZ,Air New Zealand,ANZ,NEW ZEALAND,New Zealand
AK,AirAsia,AXM,ASIAN EXPRESS,Malaysia
AS,Alaska Airlines,ASA,Inc.,ALASKA
G4,Allegiant Air,AAY,ALLEGIANT,United States
AA,American Airlines,AAL,AMERICAN,United States
BA,British Airways,BAW,SPEEDBIRD,United Kingdom
9K,Cape Air,KAP,CAIR,United States


In [5]:
# Create load_table function that will deal with loading airline dataframe to BigQuery

def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: List[bigquery.SchemaField], 
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    # *** run some checks ***
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    #  create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

In [6]:
# Load airline table to BigQuery
# get table name and schema from our TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['airlines']['table_name']}"
schema = schema=TABLE_METADATA['airlines']['schema']
# load dataframe
load_table(df, client, table_name, schema)

logger.info(f"loaded airlines dim")

[INFO ][2023-01-31 15:28:04,771][315478070:0031] : loading table: 'deb-01-372120.air_travel.airlines'
[INFO ][2023-01-31 15:28:10,346][315478070:0036] : loaded 48 rows into deb-01-372120:air_travel.airlines
[INFO ][2023-01-31 15:28:10,348][1629074712:0008] : loaded airlines dim


In [7]:
# Gather unique aiports 
# start from the tickets_df
df = tickets_df

logger.debug(f"getting unique airports...")

# set of unique columns to return
cols = [
  'origin.name', 
  'origin.city', 
  'origin.country', 
  'origin.iata', 
  'origin.icao', 
  'origin.latitude', 
  'origin.longitude', 
  'origin.altitude', 
  'origin.tz_timezone'
  ]
# group by unique columns and only select them
df = df.groupby(cols).all()
df = df.reset_index().loc[:, cols]
# rename columns
df = df.rename(columns={
  'origin.name':'name',
  'origin.city':'city', 
  'origin.country':'country', 
  'origin.iata':'iata', 
  'origin.icao':'icao', 
  'origin.latitude':'latitude', 
  'origin.longitude':'longitude', 
  'origin.altitude':'altitude', 
  'origin.tz_timezone':'tz_timezone'
  })

# Set index to IATA
df = df.set_index(keys='iata')

logger.info(f"airports dim - found {len(df.index)} rows")
df.head(10)

[DEBUG][2023-01-31 15:28:45,157][2483278288:0005] : getting unique airports...
[INFO ][2023-01-31 15:28:45,324][2483278288:0018] : airports dim - found 386 rows


Unnamed: 0_level_0,name,city,country,icao,latitude,longitude,altitude,tz_timezone
iata,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
AUH,Abu Dhabi International Airport,Abu Dhabi,United Arab Emirates,OMAA,24.43,54.65,88.0,Asia/Dubai
MAD,Adolfo Suárez Madrid–Barajas Airport,Madrid,Spain,LEMD,40.47,-3.56,1998.0,Europe/Madrid
CWB,Afonso Pena Airport,Curitiba,Brazil,SBCT,-25.53,-49.18,2988.0,America/Sao_Paulo
MCP,Alberto Alcolumbre Airport,Macapa,Brazil,SBMQ,0.05,-51.07,56.0,America/Fortaleza
ABQ,Albuquerque International Sunport,Albuquerque,United States,KABQ,35.04,-106.61,5355.0,America/Denver
CUZ,Alejandro Velasco Astete International Airport,Cuzco,Peru,SPZO,-13.54,-71.94,10860.0,America/Lima
CLO,Alfonso Bonilla Aragon International Airport,Cali,Colombia,SKCL,3.54,-76.38,3162.0,America/Bogota
ALC,Alicante International Airport,Alicante,Spain,LEAL,38.28,-0.56,142.0,Europe/Madrid
REL,Almirante Marco Andres Zar Airport,Trelew,Argentina,SAVT,-43.21,-65.27,141.0,America/Catamarca
AMS,Amsterdam Airport Schiphol,Amsterdam,Netherlands,EHAM,52.31,4.76,-11.0,Europe/Amsterdam


In [8]:
# Load airport dim into BigQuery
# get table name and schema from our TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['airports']['table_name']}"
schema = schema=TABLE_METADATA['airports']['schema']
# load dataframe
load_table(df, client, table_name, schema)

logger.info(f"loaded airports dim")

[INFO ][2023-01-31 15:29:02,056][315478070:0031] : loading table: 'deb-01-372120.air_travel.airports'
[INFO ][2023-01-31 15:29:06,904][315478070:0036] : loaded 386 rows into deb-01-372120:air_travel.airports
[INFO ][2023-01-31 15:29:06,906][3846598676:0008] : loaded airports dim


In [14]:
uuid_ser = pd.Series([uuid.uuid4() for i in range(len(df.index))])
uuid_ser

0      3e0ee99b-991d-4cfc-9147-e089a79e10f7
1      8615fcec-f2b9-4151-8505-db58428d4821
2      89bd494b-66e1-4dc1-94b7-59b172853168
3      941afa8f-c323-4ddd-8528-6e8b38ecd621
4      5da0df1e-8e7e-4579-805a-8baca1c95a46
                       ...                 
381    a62a4409-eb9a-480e-8bd6-08e9d9d406cd
382    22a06e85-d767-4075-92a4-02f18141f28a
383    3b0dfb66-3317-4be2-9c2e-fadcc2564669
384    eca36f38-cb13-4eed-8dfb-30a1ecccb075
385    59b28058-6e4c-4a50-b667-1a4166e85377
Length: 386, dtype: object

In [23]:
# Gather unique passengers 
# start from the tickets_df
df = tickets_df

logger.debug(f"getting unique passengers...")

# set of unique columns to return
cols = [
  'passenger.first_name', 
  'passenger.last_name',
  'passenger.gender', 
  'passenger.birth_date', 
  'passenger.email',
  'passenger.street',
  'passenger.city',
  'passenger.state',
  'passenger.zip'
  ]
# group by unique columns and only select them
df = df.groupby(cols).all()
df = df.reset_index().loc[:, cols]
# rename columns
df = df.rename(columns={
  'passenger.first_name':'first_name',
  'passenger.last_name':'last_name',
  'passenger.gender':'gender',
  'passenger.birth_date':'birth_date',
  'passenger.email':'email',
  'passenger.street':'street',
  'passenger.city':'city',
  'passenger.state':'state',
  'passenger.zip':'zip'
  })



# Add start and end date columns
df['start_date'] = date.today()
df['end_date'] = None

# Generate UUID for passengers df
# Create UUID Series
df['uuid'] = pd.Series([uuid.uuid4() for i in range(len(df.index))])

logger.info(f"generated passengers uuids")

# Set index to email
df = df.set_index(keys='email')

logger.info(f"passengers dim - found {len(df.index)} rows")
df.head(5)

[DEBUG][2023-01-31 16:14:33,612][803234976:0005] : getting unique passengers...
[INFO ][2023-01-31 16:14:33,704][803234976:0045] : generated passengers uuids
[INFO ][2023-01-31 16:14:33,708][803234976:0050] : passengers dim - found 32 rows


Unnamed: 0_level_0,first_name,last_name,gender,birth_date,street,city,state,zip,start_date,end_date,uuid
email,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
annette.hawkins.43@yahoo.com,Annette,Hawkins,F,1943-07-11,361 Robinson Green Apt. 635,North Lynntown,NV,89825,2023-01-31,,64587807-5c0e-47f7-a943-1bc0d1e3534c
autumn.morse.60@hotmail.com,Autumn,Morse,F,1960-01-18,6984 Price Shoals,Erictown,HI,96818,2023-01-31,,fbb801e5-c12c-4283-95df-e49b5111cfb9
becky.clark.91@gmail.com,Becky,Clark,F,1991-11-09,691 Jones Cliffs,Michaelburgh,TX,76003,2023-01-31,,f54c6eb5-9219-48e6-8dc1-7742f22933bd
belinda.cook.91@hotmail.com,Belinda,Cook,F,1991-01-26,1965 Kelly Field Apt. 094,Jonesberg,IL,60613,2023-01-31,,4382e720-1e97-480a-91be-138b9d459542
carl.wilson.80@hotmail.com,Carl,Wilson,M,1980-04-24,2814 Houston Hills,Rodriguezside,IA,51971,2023-01-31,,257f679f-06f7-4bf7-a5d0-99aa5c20ea41


In [24]:
# Check the data types of the schema
print(df.dtypes)

first_name    object
last_name     object
gender        object
birth_date    object
street        object
city          object
state         object
zip           object
start_date    object
end_date      object
uuid          object
dtype: object


In [38]:
# Force the data types of the dated columns to match the schema requirements
df[['birth_date', 'start_date', 'end_date']] = df[['birth_date', 'start_date', 'end_date']].astype('datetime64[ns]')

# Force the data types of the string category to match their schema requirements
df[['first_name', 'last_name', 'gender', 'street', 'city', 'state', 'uuid']] = df[['first_name', 'last_name', 'gender', 'street', 'city', 'state', 'uuid']].astype('string')

# Force data type of zip to be int64
df['zip'] = df['zip'].astype('int64')

# Print datatypes of columns
(df.dtypes)


first_name            string
last_name             string
gender                string
birth_date    datetime64[ns]
street                string
city                  string
state                 string
zip                    int64
start_date    datetime64[ns]
end_date      datetime64[ns]
uuid                  string
dtype: object

In [24]:
# Load passengers dim into BigQuery
# get table name and schema from our TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['passengers']['table_name']}"
schema = schema=TABLE_METADATA['passengers']['schema']
# load dataframe
load_table(df, client, table_name, schema)

logger.info(f"loaded passengers dim")

[INFO ][2023-01-18 15:29:35,787][1459098983:0031] : loading table: 'deb-01-372120.air_travel.passengers'




BadRequest: 400 POST https://bigquery.googleapis.com/upload/bigquery/v2/projects/deb-01-372120/jobs?uploadType=multipart: Invalid value for type: OBJECT is not a valid value

In [None]:
# Set Tickets Fact table with all columns except passenger
df = tickets_df
df = df[['eticket_num', 'confirmation', 'price', 'ticket_date', 'seat', 'status', 'airline.iata', 'origin.iata', 'destination.iata']]

# Set index
df = df.set_index(keys='eticket_num')

# Rename columns
df = df.rename(columns={'airline.iata':'airline', 'origin.iata':'origin', 'destination.iata':'destination'})

logger.info(f"tickets fact - found {len(df.index)} rows")
df.head(10)

In [None]:
# Lookup passenger UUID and join it to the tickets df
query = f"""
  SELECT
    uuid
  FROM
    `{PROJECT_NAME}.{DATASET_NAME}.passengers`
"""

pass_df = client.query(query).to_dataframe()

df = df.drop(columns='uuid', errors='ignore')

df = df.join(pass_df, how='right-join')

df.head()

In [None]:
# Load tickets fact table to BigQuery
# get table name and schema from our TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{TABLE_METADATA['tickets']['table_name']}"
schema = schema=TABLE_METADATA['tickets']['schema']
# load dataframe
load_table(df, client, table_name, schema)

logger.info(f"loaded tickets dim")