In [28]:
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values

In [3]:
flights = pd.read_parquet('flights.parquet')

In [3]:
flights.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1191805 entries, 0 to 1191804
Data columns (total 31 columns):
TRANSACTIONID        1191805 non-null int64
FLIGHTDATE           1191805 non-null datetime64[ns]
AIRLINECODE          1191805 non-null object
AIRLINENAME          1191805 non-null object
TAILNUM              1034988 non-null object
FLIGHTNUM            1191805 non-null int64
ORIGINAIRPORTCODE    1191805 non-null object
ORIGAIRPORTNAME      1191805 non-null object
ORIGINCITYNAME       1191805 non-null object
ORIGINSTATE          1191805 non-null object
ORIGINSTATENAME      1191805 non-null object
DESTAIRPORTCODE      1191805 non-null object
DESTAIRPORTNAME      1191805 non-null object
DESTCITYNAME         1191805 non-null object
DESTSTATE            1191805 non-null object
DESTSTATENAME        1191805 non-null object
CRSDEPTIME           1191805 non-null object
DEPTIME              1191805 non-null object
DEPDELAY             1163470 non-null float64
TAXIOUT              1011

In [4]:
def initConnection():
    try:
        connection = psycopg2.connect(dbname='tests_data_engineering',
                                user='candidate6901',
                                host='iw-recruiting-test.cygkjm9anrym.us-west-2.rds.amazonaws.com',
                                port='5432',
                                password='OViCELrLDYP70EH9')
        return connection
    except:
        print("Not able to connect to the database")

### Creation and data load of DIM_AIRLINE

In [8]:
try:
    connection = initConnection()
    cursor = connection.cursor()
    
    drop_table_query = '''drop table if exists dim_airline;'''
    cursor.execute(drop_table_query)
    
    create_table_query = '''create table dim_airline
                            (
                            AIRLINECODE          text not null,
                            AIRLINENAME          text not null
                            ); '''
    
    cursor.execute(create_table_query)
    print("dim_airline table created successfully")
    
    # Removing Airline Code from name
    flights['AIRLINENAME'] = flights['AIRLINENAME'].map(lambda x: x.split(':')[0])
    
    # Getting data from the DataFrame
    setAirline = set(zip(flights.AIRLINECODE, flights.AIRLINENAME))
    
    #cursor = connection.cursor()
    for item in setAirline:
        str_insert_query = """ INSERT INTO dim_airline (AIRLINECODE, AIRLINENAME) VALUES (%s,%s)"""
        cursor.execute(str_insert_query, item)

    connection.commit()
    print("Data inserted into dim_airline")
    
except Exception as e:
    print("Error with table creation or data loading of dim_airline - " + e)
finally:
    if(connection):
        cursor.close()
        connection.close()

dim_airline table created successfully
Data inserted into dim_airline


### Creation and data load of DIM_AIRPORT

In [14]:
flights['ORIGAIRPORTNAME'] = flights['ORIGAIRPORTNAME'].map(lambda x: x.split(':')[1].strip())
flights['DESTAIRPORTNAME'] = flights['DESTAIRPORTNAME'].map(lambda x: x.split(':')[1].strip())

originAirport = set(zip(flights.ORIGINAIRPORTCODE, 
        flights.ORIGAIRPORTNAME, 
        flights.ORIGINCITYNAME, 
        flights.ORIGINSTATE, 
        flights.ORIGINSTATENAME))
destAirport = set(zip(flights.DESTAIRPORTCODE, 
        flights.DESTAIRPORTNAME, 
        flights.DESTCITYNAME, 
        flights.DESTSTATE, 
        flights.DESTSTATENAME))

setAirports = originAirport.union(destAirport)

In [15]:
try:
    connection = initConnection()
    cursor = connection.cursor()
    
    drop_table_query = '''drop table if exists dim_airport;'''
    cursor.execute(drop_table_query)
    
    create_table_query = '''create table dim_airport
                            (
                            AIRPORTCODE text not null,
                            AIRPORTNAME text,
                            CITYNAME text,
                            STATECODE text,
                            STATENAME text
                            ); '''
    
    cursor.execute(create_table_query)
    print("dim_airline table created successfully")
    
    #cursor = connection.cursor()
    for item in setAirports:
        str_insert_query = """ INSERT INTO dim_airport (AIRPORTCODE, AIRPORTNAME, CITYNAME, STATECODE, STATENAME) VALUES (%s,%s,%s,%s,%s)"""
        cursor.execute(str_insert_query, item)

    connection.commit()
    print("Data inserted into dim_airport")
    
except Exception as e:
    print("Error with table creation or data loading of dim_airport - " + e)
finally:
    if(connection):
        cursor.close()
        connection.close()

dim_airline table created successfully
Data inserted into dim_airport


### Creation and data load of the fact table FACT_FLIGHT

In [45]:
try:
    connection = initConnection()
    cursor = connection.cursor()
    
    drop_table_query = '''drop table if exists fact_flight;'''
    cursor.execute(drop_table_query)

    create_table_query = '''create table fact_flight
                            (
                            TRANSACTIONID        bigint,
                            FLIGHTDATE           date,
                            TAILNUM              text,
                            FLIGHTNUM            text,
                            AIRLINECODE          text,
                            ORIGINAIRPORTCODE    text,
                            DESTAIRPORTCODE      text,
                            CRSDEPTIME           time,
                            DEPTIME              text, -- should be time,
                            DEPDELAY             real,
                            TAXIOUT              float,
                            WHEELSOFF            float, -- could have been time
                            WHEELSON             float, -- could have been time
                            TAXIIN               float, -- could have been time
                            CRSARRTIME           time,
                            ARRTIME              text, -- should be time,
                            ARRDELAY             real,
                            CRSELAPSEDTIME       float,
                            ACTUALELAPSEDTIME    float,
                            CANCELLED            boolean,
                            DIVERTED             boolean,
                            DISTANCE             float
                            ); '''
    cursor.execute(create_table_query)
    
    print("Fact table fact_flight created successfully")
    
    # Getting data from the DataFrame
    flights2 = flights.loc[:, ['TRANSACTIONID','FLIGHTDATE','TAILNUM','FLIGHTNUM','AIRLINECODE','ORIGINAIRPORTCODE',
                              'DESTAIRPORTCODE','CRSDEPTIME','DEPTIME','DEPDELAY','TAXIOUT','WHEELSOFF','WHEELSON',
                              'TAXIIN','CRSARRTIME','ARRTIME','ARRDELAY','CRSELAPSEDTIME','ACTUALELAPSEDTIME',
                              'CANCELLED','DIVERTED','DISTANCE']]

    print("Starting data load into fact table")

# OPTION 1
#    for index, item in flights2.iterrows():
#        str_insert_query = """ INSERT INTO fact_flight 
#        VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
#        cursor.execute(str_insert_query, tuple(item.tolist()))
#        if index%100000 == 0:
#            print(str(index) + " rows have been loaded")

# OPTION 2
#    i = 0
#    j = 1
#    incr = 1000
#    while i*incr < 1191804:
#        listFactData = flights2.iloc[i*incr: j*incr, :]
#        tupleFactData = [tuple(x) for x in listFactData.values]
#        args_str = ','.join(cursor.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tupleFactData)
#        cursor.execute("INSERT INTO fact_flight VALUES " + args_str)
#        i = i + 1
#        j = j + 1
#        if (i*incr)%100000 == 0:
#            print(str(index) + " rows have been loaded")        

# OPTION 3
    i = 0
    j = 1
    incr = 10000
    while i*incr < flights2.shape[0]:
        listFactData = flights2.iloc[i*incr: j*incr, :]
        tupleFactData = [tuple(x) for x in listFactData.values]
        str_insert_query = """ INSERT INTO fact_flight VALUES %s"""
        execute_values (
            cursor, str_insert_query, tupleFactData, template=None, page_size=incr
        )
        i = i + 1
        j = j + 1
        if (i*incr)%100000 == 0:
            print(str(i*incr) + " rows have been loaded") 
    
    connection.commit()
    print("Data inserted into fact_flight")
    
except Exception as e:
    print("Error with table creation or data loading of fact_flight - " + e)
finally:
    if(connection):
        cursor.close()
        connection.close()

Fact table fact_flight created successfully
Starting data load into fact table
100000 rows have been loaded
200000 rows have been loaded
300000 rows have been loaded
400000 rows have been loaded
500000 rows have been loaded
600000 rows have been loaded
700000 rows have been loaded
800000 rows have been loaded
900000 rows have been loaded
1000000 rows have been loaded
1100000 rows have been loaded
1200000 rows have been loaded
Data inserted into fact_flight


<br/>
Columns arrtime and deptime have '' values and convert them to NULL

In [47]:
try:
    connection = initConnection()
    cursor = connection.cursor()

    update_table_query = '''update fact_flight 
                            set 
                            arrtime = case when arrtime = '' then NULL else cast(arrtime as time) end,
                            deptime = case when deptime = '' then NULL else cast(deptime as time) end; '''
    cursor.execute(update_table_query)
    
    print("Columns arrtime and deptime in fact table fact_flight updated successfully")
    
    connection.commit()
    
except Exception as e:
    print("Error with updating fact table - " + e)
finally:
    if(connection):
        cursor.close()
        connection.close()

Columns arrtime and deptime in fact table fact_flight updated successfully


### Issues with data
- ARRTIME and DEPTIME: Although DEPTIME and FLIGHTDATE are in the data, there is no way to deduce Arrival Date.