# Create a silver operational table - Merge Operational statistics tables into one table 


In [8]:

df1 = spark.read.table("`operating profit`")
df2 = spark.read.table("`operational revenue and expense`")
df3 = spark.read.table("`operational revenue per employee`")
df4 = spark.read.table("`operational statistics`")


df1 = df1.withColumnRenamed("Date", "Date")
df2 = df2.withColumnRenamed("Date", "Date_df2")
df3 = df3.withColumnRenamed("Date", "Date_df3")
df4 = df4.withColumnRenamed("Date", "Date_df4")
# Align schemas (optional but often needed)
common_col = "ID" 

merged_df_new = (
    df1
    .join(df2, on=common_col, how="left")
    .join(df3, on=common_col, how="left")
    .join(df4, on=common_col, how="left")
)
merged_df_new.show()


StatementMeta(, 2c497fca-bdb2-4efb-b199-544de5641ead, 10, Finished, Available, Finished)

+---+-------+-----------------+-------------------------+----------------+--------+------------------------------------+---------------------------------------------------------------------+--------------------------------------+----------------------------------+------------------------------------+----------------------------+-----------------------------+--------+------------------------------+-----------------------------------+--------+-----------------------------+----------------------------+----------------------------------+---------------------------------------+-------------------------------------+---------------------------------+---------------------------------+----------------------+
| ID|  Date_|Operating ratio 8|Operating profit margin 9|Profit margin 10|Date_df2|Total_ scheduled services  _x 1_000_|Total_ operating revenue_ scheduled and charter services  _x 1_000_/$|All other operating revenue  _x 1_000_|Total_ charter services  _x 1_000_|Total_ operating expenses  

# Cleaning the columns that have symbols like () {}

In [9]:
def clean_columns(df):
    import re
    new_columns = []
    for c in merged_df_new.columns:
        # 1. Remove leading/trailing spaces
        new_c = c.strip()
        # 2. Replace spaces and special characters with underscore
        new_c = re.sub(r'[ ,;{}()\n\t=/$]', '_', new_c)
        # 3. Collapse multiple underscores
        new_c = re.sub(r'_+', '_', new_c)
        # 4. Remove trailing underscores
        new_c = new_c.strip('_')
        new_columns.append(new_c)
    # Rename columns in DataFrame
    for old, new in zip(df.columns, new_columns):
        df = df.withColumnRenamed(old, new)
    return df

merged_df_new = clean_columns(merged_df_new)

StatementMeta(, 2c497fca-bdb2-4efb-b199-544de5641ead, 11, Finished, Available, Finished)

In [10]:
merged_df_new.printSchema()
print(merged_df_new.columns)

StatementMeta(, 2c497fca-bdb2-4efb-b199-544de5641ead, 12, Finished, Available, Finished)

root
 |-- ID: long (nullable = true)
 |-- Date: string (nullable = true)
 |-- Operating_ratio_8: double (nullable = true)
 |-- Operating_profit_margin_9: double (nullable = true)
 |-- Profit_margin_10: double (nullable = true)
 |-- Date_df2: string (nullable = true)
 |-- Total_scheduled_services_x_1_000: long (nullable = true)
 |-- Total_operating_revenue_scheduled_and_charter_services_x_1_000: long (nullable = true)
 |-- All_other_operating_revenue_x_1_000: long (nullable = true)
 |-- Total_charter_services_x_1_000: long (nullable = true)
 |-- Total_operating_expenses_x_1_000: long (nullable = true)
 |-- Net_income_loss_x_1_000: long (nullable = true)
 |-- Average_number_of_employees_8: long (nullable = true)
 |-- Date_df3: string (nullable = true)
 |-- Operating_revenue_per_employee: long (nullable = true)
 |-- Tonne-kilometres_flown_per_employee: long (nullable = true)
 |-- Date_df4: string (nullable = true)
 |-- Number_of_carriers_included_8: long (nullable = true)
 |-- Passengers_

# Writing the updated Table to LakehouseLakehouse 


In [11]:
merged_df_new.write.format("delta").mode("overwrite").saveAsTable("merged_operation_Silver")

StatementMeta(, 2c497fca-bdb2-4efb-b199-544de5641ead, 13, Finished, Available, Finished)

# Create Customer silver table - Merging customer loyality table 

In [12]:
df5 = spark.read.table("`customer_flight_sliver`")
df6 = spark.read.table("`customer_loyalty_silver`")



# Align schemas (optional but often needed)
common_col = "Loyalty Number" 

merged_df_Customer = (
    df5
    .join(df6, on=common_col, how="left")
 
   
)
merged_df_Customer.show()

StatementMeta(, 2c497fca-bdb2-4efb-b199-544de5641ead, 14, Finished, Available, Finished)

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+--------------+---------------------------+---------------+----------------+--------------+-----------+------+--------------------+--------+--------------+---------------+-----------------------+---------------+----------+----------+------+------------------------+
|Loyalty Number|Year|Month|Total Flights|Distance|Points Accumulated|Points Redeemed|Dollar Cost Points Redeemed|Points Balance|Average Distance Per Flight|Points Per Mile|        Province|          City|Postal Code|Gender|           Education|  Salary|Marital Status|Membership Tier|Customer Lifetime Value|Enrollment Type| Join Date|Leave Date|Status|Customer Tenure _Months_|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+--------------+---------------------------+---------------+----------------+--------------+-----------+------+-------------------

In [13]:
display(merged_df_Customer)

StatementMeta(, 2c497fca-bdb2-4efb-b199-544de5641ead, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 21855f51-7538-4701-84ee-147721781dd2)

# Clean the column name that have different symbols


In [14]:
def clean_columns(df):
    import re
    new_columns = []
    for c in merged_df_Customer.columns:
        # 1. Remove leading/trailing spaces
        new_c = c.strip()
        # 2. Replace spaces and special characters with underscore
        new_c = re.sub(r'[ ,;{}()\n\t=/$]', '_', new_c)
        # 3. Collapse multiple underscores
        new_c = re.sub(r'_+', '_', new_c)
        # 4. Remove trailing underscores
        new_c = new_c.strip('_')
        new_columns.append(new_c)
    # Rename columns in DataFrame
    for old, new in zip(df.columns, new_columns):
        df = df.withColumnRenamed(old, new)
    return df

merged_df_Customer = clean_columns(merged_df_Customer)

StatementMeta(, 2c497fca-bdb2-4efb-b199-544de5641ead, 16, Finished, Available, Finished)

# Writing the merged table to the lakehouse table 

In [15]:
merged_df_Customer.write.format("delta").mode("overwrite").saveAsTable("merged_Customer_Silver")

StatementMeta(, 2c497fca-bdb2-4efb-b199-544de5641ead, 17, Finished, Available, Finished)