#  ETL Pipeline

In [2]:
#pip install psycopg2-binary

In [3]:
#pip install mysql-connector-python

In [4]:
# Importing Libraries
import psycopg2
import pandas as pd
import numpy as np
# Connecting to mySQL server
import mysql.connector

## Extract

In [5]:
# Extracting Data from PostgreSql Database

In [6]:
def extract():
    # Connection variables
    hostname = 'localhost'
    database='demo'
    username = 'postgres'
    pwd = 'sachin123'
    port_id = 5432
    
    # Establishing connection
    conn = None
    cur = None
    try:
        conn = psycopg2.connect(
            host= hostname,
            dbname = database,
            user = username,
            password = pwd,
            port = port_id)
        cur = conn.cursor()

        # Import Customer Table
        cur.execute("Select * from customer")
        entries = cur.fetchall()
        df_customer = pd.DataFrame(entries, columns=['c_EmailID','c_name','mileage','registration_in_mlg_program','home_airport','miles_count','password','IATA_location_code'])

        # Import Booking Id Table
        cur.execute("Select * from booking_id")
        entries = cur.fetchall()
        df_bookid = pd.DataFrame(entries, columns=['booking_ID','c_EmailID','credit_card_no'])


        # Import Credit card Table
        cur.execute("Select * from Credit_card")
        entries = cur.fetchall()
        df_credit = pd.DataFrame(entries, columns=['credit_card_no','card_type','bank_Name','expiry_Date','Name_on_card','c_EmailID'])


        # Import Address Table
        cur.execute("Select * from Address")
        entries = cur.fetchall()
        df_address = pd.DataFrame(entries, columns=['Address','c_EmailID'])


         # Import Book Flight Table
        cur.execute("Select * from book_flight")
        entries = cur.fetchall()
        df_bflight = pd.DataFrame(entries, columns=['booking_Id','Airline_code','flight_no','flight_date','booking_date'])


        # Import Flight Table
        cur.execute("Select * from Flight")
        entries = cur.fetchall()
        df_flight = pd.DataFrame(entries, columns=['Airline_code','Flight_no','Date','Dep_airport','Dep_time','Arr_airport','Arr_time','Max_seat_firstclass','Max_seat_economy','Max_seat_economy','Booking_type',
                                                  'Bonus_miles','c_EmailID','Firstclass_price','Economy_price'])

        # Import BonusMiles_cust Table
        cur.execute("Select * from Bonusmiles_cust")
        entries = cur.fetchall()
        df_bmiles = pd.DataFrame(entries, columns=['c_EmailID','Bonus_miles','airline_code','flight_no','date','points_acquired_date'])


        """  
        ********** Demo on table creation/Insertion/Select query *******************


        # Dropping table employee
        cur.execute('DROP TABLE employee')


        # Creating Table employee in demo DataBase
        create_script = '''CREATE TABLE IF NOT EXISTS employee(
                        emp_id int PRIMARY KEY,
                        name VARCHAR(30) NOT NULL,
                        dept VARCHAR(20),
                        salary int)'''
        cur.execute(create_script)

        # Inserting entries in employee table
        insert_script = '''INSERT INTO employee(emp_id, name, dept, salary) VALUES (%s, %s, %s, %s)'''
        values_script = [('1','roy','IT',120000),('2','max','COMP', 125000),('3','kiran','EXTC',100000)]
        for value in values_script:
            cur.execute(insert_script, value)


        # Fetching entries for employee table in demo database
        cur.execute('select * from employee')
        entries = cur.fetchall()
        df = pd.DataFrame(entries, columns=['emp_id','name','dept','salary'])
        print(df)

        """
        conn.commit()
    except Exception as e:
        print(e)
    finally:
        if conn is not None:
            conn.close()
        if cur is not None:
            cur.close()
            
    transform(df_customer, df_bookid, df_credit, df_address, df_bflight, df_flight, df_bmiles)

## Transform

In [7]:
def transform(df_customer, df_bookid, df_credit, df_address, df_bflight, df_flight, df_bmiles):
    # Combining each DataFrame
    df_combined = pd.merge(df_customer, df_bookid, how='inner', on=['c_EmailID'])
    df_combined = pd.merge(df_combined, df_credit, how='inner', on=['c_EmailID'])
    df_combined = pd.merge(df_combined, df_address, how='inner', on=['c_EmailID'])
    df_combined = pd.merge(df_combined, df_flight, how='inner', on=['c_EmailID'])
    df_combined = pd.merge(df_combined, df_bmiles, how='inner', on=['c_EmailID'])
    df_combined = pd.merge(df_combined, df_bflight, how='inner')
    # Removing unwanted columns
    df_combined = df_combined[['c_EmailID', 'c_name', 'mileage', 'registration_in_mlg_program',
                       'home_airport', 'miles_count', 'password', 'IATA_location_code','credit_card_no_x',
                         'card_type','bank_Name', 'expiry_Date', 'booking_Id', 'Address','Bonus_miles_x',
                          'Airline_code','Flight_no','Dep_airport', 'Dep_time', 'Arr_airport',
                           'Arr_time','Booking_type','flight_date', 'booking_date']]
    load(df_combined)



## Load

In [8]:
# Loading Data into MySQL Database

In [9]:
def load(df_combined):
    db=None
    mysql_cur=None

    try:

        db = mysql.connector.connect(host="localhost",user="root",passwd="password",database="demo_mysql")
        mysql_cur = db.cursor()


        insert_script = '''INSERT INTO Customer_details (c_EmailID, c_name, mileage, registration_in_mlg_program,
           home_airport, miles_count, password, IATA_location_code, credit_card_no, card_type, bank_Name, expiry_Date,
           booking_Id, Address, Bonus_miles, airline_code, Flight_no, Dep_airport, Dep_time, Arr_airport, Arr_time, 
        Booking_type, flight_date, booking_date) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
        %s, %s, %s, %s, %s, %s, %s, %s)'''
        # Converting DataFrame into List of Tuples
        df_records = df_combined.to_records(index=False)
        df_output = list(df_records)

        # Inserting the data
        for record in df_output:
            mysql_cur.execute(insert_script, record.tolist())



        db.commit()
    except Exception as e:
        print(e)

    finally:
        if db is not None:
            db.close()
        if mysql_cur is not None:
            mysql_cur.close()

In [10]:
if __name__ == "__main__":
    extract()