## AIRSPRINT PRIVATE AVIATION PROJECT

# ETL PROCESS


In [1]:
import pandas as pd
import re
import os
import requests
import oracledb
import numpy as np
import pandas.api.types as ptypes
import warnings
import logging


warnings.                   filterwarnings("ignore")

path =                      os.path.abspath("C:/Users/Balli/Desktop/BI Intern Folder/BI Intern Folder")


CleanedDataPath =           os.path.join(path,"CleanedDataset")
os.makedirs(CleanedDataPath, exist_ok=True)




## DATA EXTRACTION: METHOD APPLICATION PROGRAMMING INTERFACE API

In [2]:
def api_dataextract(api_url):  

    Params =    {"from": "2025-01-01",
                    "to": "2025-03-15",
                    "value": "ALL",
                    "timeZone": "UTC"}

    Header =    { "content-type": "Application/json","X-Auth-Token":"4y9lRGIoQtzDdPLphsO4rtzhTbLPk6Sg" }

    response = requests.get(api_url, params=Params,headers=Header)
    if response.status_code == 200 and response.text.strip():
                    try:
                        FlightData = response.json()
                        Data = pd.DataFrame(FlightData)
                        
                        filteredData = Data[Data["flightId"].isin([79952, 79953, 79956, 79957, 79959, 79960])]
                        selectedColumns = filteredData[["flightId","accountId","customerId","accountName","realAirportFrom",
                                            "realAirportTo","eta","etd"]]
                        selectedColumns.loc[len(selectedColumns)] = [79958,24882,100067,"JimJets",None,None,None,None]
                        selectedColumns.to_csv(os.path.join(path,  "Flight.csv"), index=False)
                    except Exception as e:
                            print(f"Error: {e}")
    else:
        print(f"Response return error{response.status_code},{response.text}") 





## DATA PROFILING STEP: TRANSFORMATION

In [None]:


def data_wrangling(df):
# Drop empty rows         
          df = df.dropna(axis=1, how='all')
# Drop empty columns
          df = df.dropna(how ="all")
# Remove duplicate rows
          df = df.drop_duplicates()
# Replace NAN with None compatable with oracle null 
          df = df.replace({np.NaN: None})

# Engineering features to data types compatable with oracle 
          for column in df.columns:
               
               if column in ["Lease_Renewal_Date__c","CreatedDate", "Lease_Renewal_Date_2__c", "Lease_Renewal_Date_3__c","INVDATE"]: 
                         try:
                              df[column] = pd.to_datetime(df[column]).dt.strftime("%d-%b-%y").str.upper().replace({np.NaN:None}) 
                         except:
                              pass
               elif column in ["flightId","Fl3xx_Id__c","customerId","CUSTOMER","INVUNIQ"]:
                         df[column] = df[column].astype(str)
               else:
                       pass
               if column == "accountId":
                    df[column]=df[column].astype(int).astype(str)
               if  column == "FLIGHTID":
                       df[column]=df[column].apply(lambda x: str(int(x)) if pd.notna(x) and isinstance(x,float) else None)
               if column in ["eta","etd"]: 
                    df[column] = pd.to_datetime(df[column]).dt.strftime("%d-%b-%y %I:%M:%S %p").str.upper().replace({np.NaN:None})
               
               df.rename(columns ={"COMMENT":"COMMENTING"}, inplace = True)
          
               if column in ["Aircraft_Ownership__c","Aircraft_Ownership_2__c","Aircraft_Ownership_3__c"]:
                         df[column] = df[column].apply(lambda x: re.search(r'(\d+)',x).group() if pd.notna(x) and
                                                       re.search(r'(\d+)',x) else None)
                         df[column] = pd.to_numeric(df[column], errors="coerce").fillna(0).astype(int)
              
               if column == "Registration__c" :
                              df["Name"] = df["Registration__c"].str.replace(r'^\b[A-Z]-','',regex = True)
               if column == "Id" :
                    df = df.drop_duplicates(subset = "Id")
          return df

def process_csv(filename):
    file_path = os.path.join(path, filename)
    if os.path.exists(file_path):
          df = pd.read_csv(file_path)
          cleaned_df = data_wrangling(df)
          return cleaned_df




## CROSS-COLUMN PROFILE/TABLE PROFILING
# PURPOSE: TO IDENTIFY KEY RELATIONSHIP BETWEEN TABLES AND INTEGRATE ACCORDING TO AIRSPRINT BUSINESS OPERATION

In [4]:

def filter_rows(Cleaned_df):
        AccountId_filterOpportunityTable = []
        AccountId_filterContactTable= []
        OpportuityId_filterAssetTable= []

        if Cleaned_df.empty:
                print(f"File not found:")
        else:
                
                try:
                        for column in Cleaned_df.columns:
                                if  column == "Id":
                                        AccountId_filterOpportunityTable.extend(Cleaned_df[column].drop_duplicates().tolist())
                                elif column == "Primary_Contact__c":
                                        AccountId_filterContactTable.extend(Cleaned_df[column].drop_duplicates().tolist())
                                elif column == "IsWon":
                                       OpportuityId_filterAssetTable.extend(Cleaned_df[Cleaned_df[column] == 1]["Id"].drop_duplicates().tolist())
                                       
                except Exception as e:
                        print(F"Error:{e}")
                
                return AccountId_filterOpportunityTable,AccountId_filterContactTable,OpportuityId_filterAssetTable
        
            

def Transform_df(df, filter_values):
        if df.empty:
           print(f"File not found:")
        else:
           filteredObject = pd.DataFrame()
           try:

                for column in df.columns:
                        if  column  in ["AccountId","Id","Opportunity__c"]:
                                if not isinstance(filter_values, list):  # Ensure filter_values is a list
                                        filter_values = [filter_values]
                                filteredObject = df[df[column].isin(filter_values)]
                        if not filteredObject.empty:
                                break
           except Exception as e:
                  print(f"Error:{e}")     
                
        return filteredObject



## DATA LOADING
# METHOD: EXECUTE MANY INSERT INTO ORACLE DATABASE

In [22]:
# Configure logging
logging.basicConfig(
    filename='oracle_insert.log', 
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s')


def insert_into_oracledb(table_name, df):
    db_user                 = os.environ.get('user')
    db_connection_service   = os.environ.get('service')
    db_password             = os.environ.get('dbpassword')
    conn = None
    cursor = None
    try:
# Connect to Oracle
        conn    =    oracledb.connect(user=db_user, password=db_password, dsn=db_connection_service)
        cursor  =    conn.cursor()
        logging.info("Connected to Oracle Database successfully.")
        
        data    =    [tuple(row) for row in df.itertuples(index=False, name=None)]
        logging.info(f"Prepared file with {len(df)} rows and {len(df.columns)} columns. Table Name:{table_name}")

       

        placeholders =  ", ".join([f":{i+1}" for i in range(len(df.columns))])
        sql =           f"INSERT INTO {table_name} ({','.join(df.columns)}) VALUES ({placeholders})"

        try:
            cursor.executemany(sql, data)
            conn.commit()
            logging.info(f"Successfully inserted data into {table_name}.")
            print(f"Data inserted into {table_name}.")
        
        except Exception as e:
            logging.error(f"Error during batch insert: {e}")
            conn.rollback()
            logging.info("Rolled back the transaction.")

            
    except Exception as e:
        logging.critical(f"Critical error connecting to Oracle or processing file: {e}")
        
    finally:
        if cursor is not None:
            cursor.close()
        if conn is not None:
            conn.close()
        logging.info("Database connection closed.")





In [None]:

Extract_api_Data =api_dataextract("https://test.fl3xx.com/api/external/flight/flights")


accId_forOpp,accId_forCont,OppId_forAsset = filter_rows(process_csv("Account.csv"))
accId_forOpp1,accId_forCont1,OppId_forAsset1 = filter_rows(Transform_df(process_csv("Opportunity.csv"),accId_forOpp))

dataframes = {
                        "Account":      process_csv("Account.csv"),
                        "Opportunity" : Transform_df(process_csv("Opportunity.csv"),accId_forOpp),
                        "Aircraft":     process_csv("Aircraft.csv"),
                        "Asset":        Transform_df(process_csv("Asset.csv"),OppId_forAsset1),
                        "Contact":      Transform_df(process_csv("Contact.csv"),accId_forCont),
                        "FlightA":      process_csv("Flight.csv"),
                        "Invoice":      process_csv("invoices.csv")}

# Insert all DataFrames into their respective Oracle tables
for table_name, cleaned_df in dataframes.items():
    print(f"Inserting data into table: {table_name}")
    insert_into_oracledb(table_name, cleaned_df)

Inserting data into table: Account
Inserting data into table: Opportunity
Inserting data into table: Aircraft
Inserting data into table: Asset
Inserting data into table: Contact
Inserting data into table: FlightA
Inserting data into table: Invoice
