Introduction

In [None]:
'''
Objective: ETL Out-Station Performance data

Input: Excel file containing 
    1. flight_date,
    2. flight_status, 
    3. flight_no, 
    4. sector, 
    5. aircraft, 
    6. capacity_business, 
    7. capacity_p_economy, 
    8. capacity_economy,
    9. travelled_business, 
    10. travelled_p_economy, 
    11. travelled_economy, 
    12. cargo_quantity, 
    13. ex_baggage_quantity, 
    14. ex_baggage_revenue_local,
    15. ex_baggage_revenue_bd, 
    16. ex_baggage_revenue_local_currency,
    17. otp_quantity, 
    18. otp_status, 
    19. otp_remarks,
    20. pax_load_factor_business, 
    21. pax_load_factor_p_economy,
    22. pax_load_factor_economy,
    23. pax_load_factor_total, 
    24. remarks
    
Output: Storage of KPIs Pax-Ratio, OTP, Excess-Baggage Revenue and Cargo Revenue
'''

Process Flow

In [None]:
import os

uploadedFolder = r"raw_data_location"
insertedFolder = r"processed_data_location"

for file in os.listdir(uploadedFolder):
    if file.endswith('.xlsx'):
        file_path = uploadedFolder +"\\"+ file
       
        read_df     = read_excel_file(file_path)
        clean_df    = clean_data(read_df)
        custom_df   = customize_data(clean_df)
        loadedFile  = insert_data(custom_df)
        relocate_file(file, file_path, custom_df, insertedFolder)

Data Import

In [None]:
import pandas as pd
import numpy as np

target_string_column = "DATE"
target_string_row = "TOTAL"

def read_excel_file(file):
    global df
    print("---------Reading starting----------")
    xls = pd.ExcelFile(file)
    
    # Iterate over sheet names
    for sheet_name in xls.sheet_names:
        df = pd.read_excel(file, sheet_name=sheet_name, header=None , keep_default_na=False)
    
        # Find the rows where the target string is located in the first column
        column_match =  np.argwhere(np.char.startswith(df.values.astype(str), target_string_column))
        row_match = df[df.iloc[:, 0].astype(str).str.contains(target_string_row)]
    
        if not column_match.size == 0:
            df_filtered = df.iloc[2:row_match.index[0], column_match[0][1]:]
            print(f"Reading up to '{target_string_column}' in sheet '{sheet_name}':")
        else:
            print(f"'{target_string_column}' not found in sheet '{sheet_name}'")
            
    xls.close()  # Close the Excel file

    print("---------Reading ended----------")
    return df_filtered

Data Cleaning

In [None]:
nan = np.nan
na_values = ['N/A']

def clean_data(read_df):
    print("---------Cleaning starting----------")
    
    d3= read_df.convert_dtypes()
    d3.replace('', np.nan, inplace=True)
    
    # Identify the columns intended to be float
    float_columns = [11, 12, 13, 14, 19, 20, 21, 22]
    d3[float_columns] = d3[float_columns].astype(float)

    # Identify the columns intended to be integers
    int_columns = [16]
    
    # Convert columns to integers and handle NaN or infinite values
    d3[int_columns] = d3[int_columns].apply(lambda x: pd.to_numeric(x, errors='coerce')).astype('Int64')

    # df_datatype_corrected = d3.astype({ 11: float, 12: float, 13:float, 14:float, 16: int, 19:float, 20:float, 21:float, 22:float})
    df_datatype_corrected = d3
    non_numeric_columns = df_datatype_corrected.select_dtypes(exclude='number').columns
    df_datatype_corrected[non_numeric_columns] = df_datatype_corrected[non_numeric_columns].fillna('N/A')
    
    df_nullRowFiltered = df_datatype_corrected.dropna(how='all')
    df_nullCellTransformed = df_nullRowFiltered.fillna(0)
    df_allCellTrimmed = trim_all_columns(df_nullCellTransformed)
    
    print("---------Cleaning ended----------")
    return df_allCellTrimmed

def trim_all_columns(df):
    """
    Trim whitespace from ends of each value across all series in dataframe
    """
    trim_strings = lambda x: x.strip() if isinstance(x, str) else x
    return df.applymap(trim_strings)

Data Processing

In [None]:
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP

currency_rate = {
    'GBP':	111,
    'AOA':	111,
    'AUD':	111,
    'BHD':	111,
    'EUR':	111,
    'BGN':	111,
    'CAD':	111,
    'XAF':	111,
    'CNY':	111,
    'TWD':	111,
    'CZK':	111,
    'ETB':	111
    }

def customize_data(cleanedFile):
    print("---------Customization started----------")
   
    df_headers = ['flight_date','flight_status', 'flight_no', 'sector', 'aircraft', 
                    'capacity_business', 'capacity_p_economy', 'capacity_economy' ,
                    'travelled_business', 'travelled_p_economy', 'travelled_economy', 
                    'cargo_quantity', 'ex_baggage_quantity', 'ex_baggage_revenue_local','ex_baggage_revenue_bd', 'ex_baggage_revenue_local_currency',
                    'otp_quantity', 'otp_status', 'otp_remarks',
                    'pax_load_factor_business', 'pax_load_factor_p_economy','pax_load_factor_economy','pax_load_factor_total', 'remarks'
                ]
   
    cleanedFile.columns = df_headers
    '''
    The `ex_baggage_revenue_bd` column is being calculated based on the `ex_baggage_revenue_local` and `ex_baggage_revenue_local_currency` columns in the DataFrame. 
    The calculation involves converting the revenue from the local currency to Bangladeshi Taka (BDT) using the currency exchange rates provided in the `currency_rate` dictionary. 
    The conversion is done by multiplying the revenue in the local currency by the corresponding exchange rate for that currency. 
    The result is then rounded to the nearest whole number using the `ROUND_HALF_UP` method from the `decimal` module with precision set to 3 decimal places. 
    If the currency is not found in the `currency_rate` dictionary, the value in the `ex_baggage_revenue_bd` column is set to 0.
    '''
    cleanedFile[' ex_baggage_revenue_bd'] = cleanedFile.apply(lambda row: Decimal(row['ex_baggage_revenue_local'] * currency_rate[row['ex_baggage_revenue_local_currency']]).quantize(Decimal('0.000'), rounding=ROUND_HALF_UP) if row['ex_baggage_revenue_local_currency'] in currency_rate else 0, axis=1)

    cleanedFile['pax_load_factor_business']     = cleanedFile.apply(lambda x: calculate_load_factor(x,1), axis=1)
    cleanedFile['pax_load_factor_p_economy']    = cleanedFile.apply(lambda x: calculate_load_factor(x,2), axis=1)
    cleanedFile['pax_load_factor_economy']      = cleanedFile.apply(lambda x: calculate_load_factor(x,3), axis=1)
    cleanedFile['pax_load_factor_total']        = cleanedFile.apply(lambda x: calculate_load_factor(x,4), axis=1) 
    cleanedFile['flight_date']                  = cleanedFile['flight_date'].apply(lambda x: transform_date(x))
     
    print("---------Customization ended----------")
    
    return cleanedFile

def transform_date(input_date):
    formats = ['%d/%m/%Y','%d-%m-%Y','%d.%m.%Y','%d.%m.%y', '%m/%d/%Y', "%b'%y","%d %b.'%y",'%y/%m/%d', '%Y-%m-%d %H:%M:%S', "%d %b'%y"]

    # Check if the input_date is a Pandas Timestamp
    if isinstance(input_date, pd.Timestamp):
        date_obj = input_date.to_pydatetime()
        return date_obj.strftime('%Y-%m-%d')

    # Convert the input_date to a string if it's not already
    input_date_str = str(input_date)

    for date_format in formats:
        try:
            date_obj = datetime.strptime(input_date_str, date_format)
            return date_obj.strftime('%Y-%m-%d')
        except ValueError:
            continue
    raise ValueError('Invalid date format: {}'.format(input_date_str))

def calculate_load_factor(row, indicator):
    try:
        if indicator == 4:
            numerator = int(row['travelled_business']) + int(row['travelled_p_economy']) + int(row['travelled_economy'])
            denominator = int(row['capacity_business']) + int(row['capacity_p_economy']) + int(row['capacity_economy'])
        elif indicator == 3:
            numerator = int(row['travelled_economy'])
            denominator = int(row['capacity_economy'])
        elif indicator == 2:
            numerator = int(row['travelled_p_economy'])
            denominator = int(row['capacity_p_economy'])
        elif indicator == 1:
            numerator = int(row['travelled_business'])
            denominator = int(row['capacity_business'])
            
        if denominator == 0:
            # Handle division by zero
            return 0
        else:
            return (numerator * 100) / denominator
    except ZeroDivisionError:
        # Handle division by zero
        return 0
    except Exception as e:
        # Handle other exceptions if necessary
        print(f"An error occurred: {e}")
        return 0

Data Storage

In [None]:
import pymysql
from sqlalchemy import create_engine

def insert_data (df_data_final):
    
    print("---------Loading starting----------")
    mydb = pymysql.connect(
        host        = "host_name",
        user        = "user_name",
        database    = "db_name",
        password    = "password")

    engine = create_engine("mysql+pymysql://{user}:{pw}@{host}/{db}"
                           .format(user ="user_name",
                                   host ="host_name",
                                   pw   ="password",
                                   db   ="db_name"))
    df_data_reset = df_data_final.reset_index(drop=True)
    
    # Define a mapping from Pandas data types to SQL data types
    pandas_to_sql_type_mapping = {
        'int32'         : 'INT',
        'int64'         : 'INT',
        'float64'       : 'FLOAT',
        'object'        : 'TEXT',  # This includes strings
        'datetime64[ns]': 'TIMESTAMP',
    }
    
    SQL_CREATE_TBL = "CREATE TABLE IF NOT EXISTS table_name(id INT NOT NULL AUTO_INCREMENT,"
    
    for column_name, data_type in zip(df_data_reset.columns, df_data_reset.dtypes):
        # Map Pandas data types to SQL data types
        sql_data_type = pandas_to_sql_type_mapping.get(str(data_type), 'TEXT')
    
        # Add column definition to the SQL statement
        SQL_CREATE_TBL += "{} {}, ".format(column_name, sql_data_type)
    SQL_CREATE_TBL += "created_at TIMESTAMP NOT NULL DEFAULT current_timestamp(), updated_at TIMESTAMP NOT NULL DEFAULT current_timestamp(), PRIMARY KEY (id));"
    
    mycursor = mydb.cursor()
    
    try:
        print("Creating table {}: ".format("table_name"), end='')
        mycursor.execute(SQL_CREATE_TBL)
        df_data_reset.to_sql('table_name', con= engine, if_exists= 'append', chunksize=1000, index=False)
    except pymysql.Error as err:
        print(err)
        pass
    else:
        print("OK")
        
    print("---------Loading ended----------")

Processed Data File Relocation

In [None]:
import time
import pathlib

def relocate_file(file, file_path, customizedFile, insertedFolder):
    print("---------Renaming and Relocating starting----------")
    
    timestr = time.strftime("%d%m%Y_%H%M%S_")
    
    transformedDataFile =  timestr + customizedFile.iloc[0].sector[:3] + "_"+customizedFile['flight_date'].iloc[0]+"_"+customizedFile['flight_date'].iloc[-1]+".xlsx"
    uploaded_file_path = insertedFolder + "\\" + transformedDataFile
    customizedFile.to_excel(uploaded_file_path, index= False)
    
    backup_file_path = insertedFolder + "\\" + timestr + file
    newName = pathlib.PurePosixPath(backup_file_path).stem + '.xlsx'
    
    # os.rename(transfomedData, transformedDataFile)
    os.rename(file_path, newName)
    print("---------Renaming and Relocating ended----------")