In [1]:
# Imports
import pandas as pd
import sqlite3

## Load Processed Dataframe

---

In [2]:
# Load processed dataframe
aig_df = pd.read_csv('../data/processed/aig_data_processed.csv')

## Data Modeling for Data Warehousing

---

For this specific use case, since it is desired to do analytics on the data, it is necesary get a schema that helps simplifying queries, improving performance and facilitates the analysis. Thus, a `Star Schema` will be ideal.

In this schema I will include a fact table for `Flights` referencing to relevant dimension tables for `OpCo and Subsidiaries`, `Airports`, `Time`, `Cabin and Service`.

Lastly, I will ensure that each dimension table has unique entries for its attributes. For the `Flights` fact table, I will use foreign keys that reference the primary keys in the dimension tables. This setup will allow efficient querying and aggregation of data.

### **Creating DB and Tables in a SQLite DDBB**

For simplicity and portability, I will be using a SQLite3 DDBB to populate the data that later will be copied into a Redshift data warehoue.

In [3]:
# SQLite database
conn = sqlite3.connect('../data/processed/aig_data_warehouse.db')

# Cursor object
cursor = conn.cursor()

# Create Dim_OpCo table
cursor.execute('''
CREATE TABLE IF NOT EXISTS Dim_OpCo (
    OpCo_ID INTEGER PRIMARY KEY AUTOINCREMENT,
    OpCo_Code TEXT UNIQUE,
    OpCo_Name TEXT,
    Subsidiary_Code TEXT,
    Subsidiary_Name TEXT
)
''')

# Create Dim_Airport table
cursor.execute('''
CREATE TABLE IF NOT EXISTS Dim_Airport (
    Airport_ID INTEGER PRIMARY KEY AUTOINCREMENT,
    Airport_Code TEXT UNIQUE,
    Airport_Name TEXT,
    Country_Code TEXT,
    Country_Name TEXT,
    Region TEXT
)
''')

# Create Dim_Time table
cursor.execute('''
CREATE TABLE IF NOT EXISTS Dim_Time (
    Time_ID INTEGER PRIMARY KEY AUTOINCREMENT,
    Date DATE UNIQUE,
    Year INTEGER,
    Quarter INTEGER
)
''')

# Create Fact_Flights table
cursor.execute('''
CREATE TABLE IF NOT EXISTS Fact_Flights (
    Flight_ID INTEGER PRIMARY KEY AUTOINCREMENT,
    OpCo_ID INTEGER,
    Departure_Airport_ID INTEGER,
    Arrival_Airport_ID INTEGER,
    Time_ID INTEGER,
    Aircraft_Type TEXT,
    Cabin TEXT,
    Service TEXT,
    Passengers INTEGER,
    Flights INTEGER,
    FOREIGN KEY (OpCo_ID) REFERENCES Dim_OpCo(OpCo_ID),
    FOREIGN KEY (Departure_Airport_ID) REFERENCES Dim_Airport(Airport_ID),
    FOREIGN KEY (Arrival_Airport_ID) REFERENCES Dim_Airport(Airport_ID),
    FOREIGN KEY (Time_ID) REFERENCES Dim_Time(Time_ID)
)
''')

# Commit the changes and close the connection
conn.commit()
conn.close()

### **Populate the Tables**

In [4]:
aig_df.columns

Index(['OpCo_Code', 'OpCo_Name', 'Subsidiary_Code', 'Subsidiary_Name',
       'Departure_Airport_Code', 'Departure_Airport_Name',
       'Departure_Country_Code', 'Departure_Country_Name', 'Departure_Region',
       'Arrival_Airport_Code', 'Arrival_Airport_Name', 'Arrival_Country_Code',
       'Arrival_Country_Name', 'Arrival_Region', 'Aircraft_Type', 'Date',
       'Cabin', 'Service', 'Passengers', 'Flights', 'Year', 'Quarter'],
      dtype='object')

In [5]:
# Populate Aircraft dimension table
def insert_dim_opco(conn, row):
    cursor = conn.cursor()
    try:
        cursor.execute('''INSERT OR IGNORE INTO Dim_OpCo (OpCo_Code, OpCo_Name, Subsidiary_Code, Subsidiary_Name)
                        VALUES (?, ?, ?, ?)''',
                    (row['OpCo_Code'], row['OpCo_Name'], row['Subsidiary_Code'], row['Subsidiary_Name']))
        cursor.execute('SELECT OpCo_ID FROM Dim_OpCo WHERE OpCo_Code = ?', (row['OpCo_Code'],))
        result = cursor.fetchone()
        if result is None:
            raise ValueError(f"OpCo_ID not found for OpCo_Code {row['OpCo_Code']}")
        return result[0]
    except sqlite3.Error as e:
        raise sqlite3.Error(f"Failure in insert_dim_opco: {e}")
    finally:
        cursor.close()

# Populate Airport dimension table
def insert_dim_airport(conn, row, airport_code, airport_name, country_code, country_name, region):
    cursor = conn.cursor()
    try:
        cursor.execute('''INSERT OR IGNORE INTO Dim_Airport (Airport_Code, Airport_Name, Country_Code, Country_Name, Region)
                        VALUES (?, ?, ?, ?, ?)''',
                    (row[airport_code], row[airport_name], row[country_code], row[country_name], row[region]))
        cursor.execute('SELECT Airport_ID FROM Dim_Airport WHERE Airport_Code = ?', (row[airport_code],))
        return cursor.fetchone()[0]
    except sqlite3.Error as e:
        raise sqlite3.Error(f"Failure in insert_dim_airport: {e}")
    finally:
        cursor.close()

# Populate Time dimension table
def insert_dim_time(conn, row):
    cursor = conn.cursor()
    try:
        cursor.execute('''INSERT OR IGNORE INTO Dim_Time (Date, Year, Quarter)
                        VALUES (?, ?, ?)''',
                    (row['Date'], row['Year'], row['Quarter']))
        cursor.execute('SELECT Time_ID FROM Dim_Time WHERE Date = ?', (row['Date'],))
        return cursor.fetchone()[0]
    except sqlite3.Error as e:
        raise sqlite3.Error(f"Failure in insert_dim_time: {e}")
    finally:
        cursor.close()

# Populate Flight fact table
def insert_fact_flights(conn, row, opco_id, departure_airport_id, arrival_airport_id, time_id):
    cursor = conn.cursor()
    try:
        cursor.execute('''INSERT INTO Fact_Flights (OpCo_ID, Departure_Airport_ID, Arrival_Airport_ID, Time_ID, Aircraft_Type, Cabin, Service, Passengers, Flights)
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''',
                    (opco_id, departure_airport_id, arrival_airport_id, time_id, row['Aircraft_Type'], row['Cabin'], row['Service'], row['Passengers'], row['Flights']))
    except sqlite3.Error as e:
        raise sqlite3.Error(f"Failure in insert_fact_flights: {e}")
    finally:
        cursor.close()

# Populating DDBB
def main(conn):
    try:
        for _, row in aig_df.iterrows():
            opco_id = insert_dim_opco(conn, row)
            departure_airport_id = insert_dim_airport(conn, row, 'Departure_Airport_Code', 'Departure_Airport_Name', 'Departure_Country_Code', 'Departure_Country_Name', 'Departure_Region')
            arrival_airport_id = insert_dim_airport(conn, row, 'Arrival_Airport_Code', 'Arrival_Airport_Name', 'Arrival_Country_Code', 'Arrival_Country_Name', 'Arrival_Region')
            time_id = insert_dim_time(conn, row)
            insert_fact_flights(conn, row, opco_id, departure_airport_id, arrival_airport_id, time_id)
    except Exception as e:  # Catching a broad exception
        print(f"An error occurred during general data insertion: {e}")
        conn.rollback()  # Rollback any changes if part of the operations failed
    else:
        conn.commit()  # Commit the transaction if all operations succeeded

# Connect to the SQLite DB
with sqlite3.connect('../data/processed/aig_data_warehouse.db') as conn:
    main(conn)