# Loading Cusotmer data to gold

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
from pyspark.sql.types import *

from pyspark.sql.window import Window

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, count, date_format, lit, sum, round
from pyspark.sql.functions import col, count, when, round, date_format,avg

from pyspark.sql.functions import col, date_format, count, lit



In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!

# Setup & Helper Function

# Helper function: writes a DataFrame to a Delta table in a given lakehouse.

def create_table_from_df(df, fully_qualified_table_name, mode="overwrite"):
    
    df.write.format("delta").mode(mode).saveAsTable(fully_qualified_table_name)

print("Setup complete.")


# Silver_Customer

In [None]:
# Read raw tables from Silver

df_customer = spark.table("#Lakehouse_Silver#.dbo.customer")
df_cust_telephone = spark.table("#Lakehouse_Silver#.dbo.customertelephonenumber")
df_location = spark.table("#Lakehouse_Silver#.dbo.location")
df_state = spark.table("#Lakehouse_Silver#.dbo.state")
df_country = spark.table("#Lakehouse_Silver#.dbo.country")
df_city = spark.table("#Lakehouse_Silver#.dbo.city")
df_cust_satisfaction = spark.table("#Lakehouse_Silver#.dbo.customersatisfactionrating")
df_rating_criteria = spark.table("#Lakehouse_Silver#.dbo.ratingcriteria")
df_satisfaction_rating = spark.table("#Lakehouse_Silver#.dbo.SatisfactionRating")

In [None]:
# Join phone -> location for city, state, country

df_cust_phone_loc = (
    df_cust_telephone.alias("ctn")
    .join(
        df_location.alias("l"),
        col("ctn.LocationId") == col("l.LocationId"),
        "left"
    )
    .join(
        df_state.alias("s"),
        col("l.LocationStateId") == col("s.StateId"),
        "left"
    )
    .join(
        df_country.alias("ctry"),
        col("l.CountryId") == col("ctry.CountryId"),
        "left"
    )
    .join(
        df_city.alias("cty"),
        col("l.LocationCity") == col("cty.CityId"),
        "left"
    )
    .select(
        col("ctn.CustomerId"),
        col("ctn.TelephoneNumber"),
        col("l.LocationId").alias("LocationId"),
        col("l.LocationName").alias("LocationName"),
        col("cty.CityName").alias("City"),
        col("s.StateName").alias("State"),
        col("ctry.ISOCountryName").alias("Country")
    )
)

display(df_cust_phone_loc)

In [None]:
# Join Customer Rating Data

df_customer_ratings = (
    df_cust_satisfaction.alias("csr")
    .join(
        df_rating_criteria.alias("rc"), 
        col("csr.RatingCriteriaId") == col("rc.RatingCriteriaId"), 
        "left"
    )
    .join(
        df_satisfaction_rating.alias("sr"), 
        col("csr.SatisfactionRatingId") == col("sr.SatisfactionRatingId"), 
        "left"
    )
    .select(
        col("csr.CustomerId"),
        col("csr.RatingCriteriaId"),
        col("csr.SatisfactionRatingId").alias("SatisfactionScore"),
        col("csr.CustomerSatisfactionRatingNote"),
        col("sr.SatisfactionRatingName"),
        col("csr.PeriodStartDate"),
        col("rc.RatingCriteriaName")
    )
    .withColumn("YearMonth", date_format(col("PeriodStartDate"), "yyyyMM"))
)

display(df_customer_ratings)

In [None]:
# Calculate NPS metrics

# Step 1: Calculate Counts for SatisfactionRatingId Categories by Month
nps_counts_by_month = (df_customer_ratings
    .groupBy("YearMonth")  # Group by month
    .agg(
        count(when(col("SatisfactionScore").isin(5), 1)).alias("Promoters"),  # Count for SatisfactionRatingId 4, 5
        count(when(col("SatisfactionScore") .isin(1), 1)).alias("Detractors")       # Count for SatisfactionRatingId 1
    ))

# Show the result
nps_counts_by_month.show()

In [None]:
# Unify Ratings data

df_rating_details = (
    df_customer_ratings.alias("crs")
    .join(
        nps_counts_by_month.alias("nbm"),
        col("crs.YearMonth") == col("nbm.YearMonth"),
        "left"
    )
    .select(
        col("crs.CustomerId"),
        col("crs.RatingCriteriaId"),
        col("crs.SatisfactionScore"),
        col("crs.SatisfactionRatingName"),
        col("crs.CustomerSatisfactionRatingNote"),
        col("crs.PeriodStartDate"),
        col("crs.RatingCriteriaName"),
        col("nbm.Promoters"),
        col("nbm.Detractors"),
        col("nbm.YearMonth")
    )
)

display(df_rating_details)

In [None]:
# Unify into a single Silver_Customer table

df_silver_customer = (
    df_customer.alias("c")
    .join(df_cust_phone_loc.alias("cp"),
          col("c.CustomerId") == col("cp.CustomerId"),
          "left")
    .join(df_rating_details.alias("rd"),
          col("c.CustomerId") == col("rd.CustomerId"),
          "left")
    .select(
        col("c.CustomerId"),
        col("c.CustomerEstablishedDate"),
        col("c.CustomerTypeId"),
        col("c.PartyId"),
        col("c.CustomerNote"),
        col("cp.TelephoneNumber"),
        col("cp.LocationId"),
        col("cp.LocationName"),
        col("cp.City"),
        col("cp.State"),
        col("cp.Country"),
        col("rd.RatingCriteriaId"),
        col("rd.SatisfactionScore"),
        col("rd.SatisfactionRatingName"),
        col("rd.CustomerSatisfactionRatingNote"),
        col("rd.PeriodStartDate"),
        col("rd.RatingCriteriaName"),
        col("rd.Promoters"),
        col("rd.Detractors"),
        col("rd.YearMonth")
    )
)

display(df_silver_customer)

In [None]:
# Write to Silver lakehouse

create_table_from_df(df_silver_customer, "#Lakehouse_Gold#.dbo.Customer_Details")

print("Created #Lakehouse_Gold#.dbo.Customer_Details successfully.")

# Silver_Call_Center

In [None]:
# Read raw tables from Bronze

df_call_center = spark.table("#Lakehouse_Silver#.dbo.CallCenter")
df_call_center_vol_target = spark.table("#Lakehouse_Silver#.dbo.CallCenterVolumeTargets")
df_employee = spark.table("#Lakehouse_Silver#.dbo.Employee")

In [None]:
# Add a combined year-month column to both DataFrames

df_call_center = df_call_center.withColumn("YearMonth", (year(col("CallStartTimestamp")) * 100 + month(col("CallStartTimestamp"))))
df_call_center_vol_target = df_call_center_vol_target.withColumn("YearMonth", (year(col("TargetDate")) * 100 + month(col("TargetDate"))))

In [None]:

df_call_center_vol_target = (df_call_center_vol_target
    .groupBy("CallCenterId", "YearMonth")  # Group by CallCenterId and Month
    .agg(
        avg("CallVolumeTarget").cast(IntegerType()).alias("MonthlyCallVolumeTarget"),
        avg("AnswerRateGoalInPercent").cast(IntegerType()).alias("MonthlyAnswerRateGoalinPercent"),
        avg("AbandoRateGoalInPercent").cast(IntegerType()).alias("MonthlyAbandoRateGoalinPercent"),
        avg("QueueRateGoalInPercent").cast(IntegerType()).alias("MonthlyQueueRateGoalinPercent"),
        avg("TimetoAnswerInSeconds").cast(IntegerType()).alias("MonthlyTimetoAnswerTargetinSeconds"),
        avg("HandlingTimeInSeconds").cast(IntegerType()).alias("MonthlyHandlingTimeTargetinSeconds")
    )
)

display(df_call_center_vol_target)




In [None]:
actual_call_volumes = (df_call_center
    .groupBy("CallCenterId", "YearMonth")  # Group by CallCenterId and Month
    .agg(count("*").alias("ActualCallVolume"))  # Count actual calls
)

display(actual_call_volumes)

In [None]:
# Create AgentPerformance



df_agent_performance = (
    df_call_center
    .groupBy("EmployeeID", "YearMonth", "CallCenterId")
    .agg(
        count("CallId").alias("TotalCallsHandled"),
        sum(when(col("CallStatus") == "Answered", 1).otherwise(0)).alias("AnsweredCalls"),
        sum(when(col("CallStatus") == "Abandoned", 1).otherwise(0)).alias("AbandonedCalls"),
        avg((col("CallEndTimestamp").cast("long") - col("QueueEntryTimestamp").cast("long"))).cast(IntegerType()).alias("HandleTimeInSecs")
    )
    .withColumnRenamed("EmployeeID", "AgentID")
)

display(df_agent_performance)


In [None]:
# Join data


window_spec = Window.partitionBy()

df_silver_call_center = (
    df_call_center.alias("cc")
    .join(
        df_call_center_vol_target.alias("ccvt"),
        (col("cc.CallCenterId") == col("ccvt.CallCenterId")) &
        (col("cc.YearMonth") == col("ccvt.YearMonth")),
        "left"
    )
    .join(
        df_employee.alias("e"),
        (col("cc.CallCenterId") == col("e.CallCenterId")) &
        (col("cc.EmployeeId") == col("e.EmployeeId")),
        "left"
    )
    .join(
        actual_call_volumes.alias("acv"),
        (col("cc.CallCenterId") == col("acv.CallCenterId")) &
        (col("cc.YearMonth") == col("acv.YearMonth")),
        "left"
    )
    .join(
        df_agent_performance.alias("ap"),
        (col("cc.CallCenterId") == col("ap.CallCenterId")) &
        (col("cc.EmployeeId") == col("ap.AgentId")) &
        (col("cc.YearMonth") == col("ap.YearMonth")),
        "left"
    )
    .select(
        col("cc.CallId"),
        col("cc.CallStartTimestamp"),
        col("cc.CallEndTimestamp"),
        col("cc.QueueEntryTimestamp"),
        col("cc.CallCenterId"),
        col("cc.CallStatus"),
        col("cc.EmployeeID"),
        col("cc.CustomerId"),
        col("e.Employeename"),
        col("e.Role"),
        col("ccvt.YearMonth"),
        col("ccvt.MonthlyCallVolumeTarget"),
        col("ccvt.MonthlyAnswerRateGoalinPercent"),
        col("ccvt.MonthlyAbandoRateGoalinPercent"),
        col("ccvt.MonthlyQueueRateGoalinPercent"),
        col("ccvt.MonthlyTimetoAnswerTargetinSeconds"),
        (col("ccvt.MonthlyHandlingTimeTargetinSeconds")+180).alias("MonthlyHandlingTimeTargetinSeconds"),
        col("acv.ActualCallVolume"),
        col("ap.TotalCallsHandled"),
        col("ap.AnsweredCalls"),
        col("ap.AbandonedCalls"),
        col("ap.HandleTimeInSecs")
    )
    .withColumn("QueueTime", when(col("CallStatus") == "Answered", col("QueueEntryTimestamp").cast("long") - col("CallStartTimestamp").cast("long")).otherwise(0))
    .withColumn("HandleTime", when(col("CallStatus") == "Answered", col("CallEndTimestamp").cast("long") - col("QueueEntryTimestamp").cast("long")).otherwise(0))
    .withColumn("AbandonedInTime", when(col("CallStatus") == "Abandoned", col("CallEndTimestamp").cast("long") - col("CallStartTimestamp").cast("long")).otherwise(0))
    .withColumn("AnsweredCallRate", round((when(col("ccvt.YearMonth") <= '202407', (sum(col("ap.AnsweredCalls")).over(window_spec) * 0.75) / sum(col("ap.TotalCallsHandled")).over(window_spec)).otherwise(0))*100,1))
    .withColumn("AbandonedCallRate", round((when(col("ccvt.YearMonth") <= '202407', (sum(col("ap.AbandonedCalls")).over(window_spec) * 5.9) / sum(col("ap.TotalCallsHandled")).over(window_spec)).otherwise(0))*100,1)
)
    
)

display(df_silver_call_center)


In [None]:


from pyspark.sql.functions import col, avg

df_res1 = df_silver_call_center.groupBy("YearMonth") \
    .agg(avg(col("AbandonedCalls") / col("TotalCallsHandled")))


display(df_res1)



In [None]:
df_res = df_silver_call_center.groupBy("YearMonth").agg(avg("QueueTime"))

display(df_res)

In [None]:
# Write Silver_Call_Center table

create_table_from_df(df_silver_call_center, "#Lakehouse_Gold#.dbo.Call_Center_Details")

print("Created #Lakehouse_Gold#.dbo.Call_Center_Details successfully.")

# Loading Operation data to gold

In [None]:
# Setup & Helper Function

# Helper function: writes a DataFrame to a Delta table in a given lakehouse.

def create_table_from_df(df, fully_qualified_table_name, mode="overwrite"):
    
    df.write.format("delta").mode(mode).saveAsTable(fully_qualified_table_name)

print("Setup complete.")




In [None]:
# Read raw tables from Bronze


df_communication_device = spark.table("#Lakehouse_Silver#.dbo.CommunicationDevice")
df_device = spark.table("#Lakehouse_Silver#.dbo.Device")
df_issue = spark.table("#Lakehouse_Silver#.dbo.Issue")
df_issue_status = spark.table("#Lakehouse_Silver#.dbo.IssueStatus")
df_issue_activity = spark.table("#Lakehouse_Silver#.dbo.IssueActivity")
df_issue_type = spark.table("#Lakehouse_Silver#.dbo.IssueType")
df_employee = spark.table("#Lakehouse_Silver#.dbo.Employee")

# Silver_Communication_Device

In [None]:
# Select CommunicationDevice data


df_silver_communication_device = (
    df_communication_device.alias("cd")
    .select(
        col("cd.CommunicationDeviceId"),
        col("cd.CommunicationDeviceName"),
        col("cd.CommunicationDeviceDescription"),
        col("cd.CommunicationDeviceManufacturerName"),
        col("cd.CommunicationDeviceTypeId"),
        col("cd.CommunicationDeviceModel"),
        round(col("cd.ProcessorClockSpeedGhz"),2).alias("ProcessorClockSpeedGhz"),
        round(col("cd.MaximumMemoryGb"),0).alias("MaximumMemoryGb"),
        col("cd.Note")
    )
)

display(df_silver_communication_device)

In [None]:
# Write Silver_Communication_Device table

create_table_from_df(df_silver_communication_device, "#Lakehouse_Gold#.dbo.Communication_Device_Details")

print("Created #Lakehouse_Gold#.dbo.Communication_Device_Details successfully.")

# Loading Network data to gold


In [None]:
# Setup & Helper Function

# Helper function: writes a DataFrame to a Delta table in a given lakehouse.

def create_table_from_df(df, fully_qualified_table_name, mode="overwrite"):
    
    df.write.format("delta").mode(mode).saveAsTable(fully_qualified_table_name)

print("Setup complete.")

In [None]:
# Read raw tables from Bronze

df_network = spark.table("#Lakehouse_Bronze#.dbo.Network")
df_occupied_bw = spark.table("#Lakehouse_Bronze#.dbo.NetworkOccupiedBandwidth")
df_network_tx = spark.table("#Lakehouse_Bronze#.dbo.NetworkTransaction")
df_network_event = spark.table("#Lakehouse_Bronze#.dbo.NetworkEvent")
df_network_service_area = spark.table("#Lakehouse_Bronze#.dbo.NetworkServiceArea")
df_unplanned_disruption = spark.table("#Lakehouse_Bronze#.dbo.UnplannedDisruption")
df_base_station = spark.table("#Lakehouse_Bronze#.dbo.BaseStation")
df_base_station_metrics = spark.table("#Lakehouse_Bronze#.dbo.BaseStationMetrics")
df_network = spark.table("#Lakehouse_Bronze#.dbo.Network")
df_occupied_bw = spark.table("#Lakehouse_Bronze#.dbo.NetworkOccupiedBandwidth")
df_network_tx = spark.table("#Lakehouse_Bronze#.dbo.NetworkTransaction")
df_network_event = spark.table("#Lakehouse_Bronze#.dbo.NetworkEvent")
df_network_service_area = spark.table("#Lakehouse_Bronze#.dbo.NetworkServiceArea")
df_unplanned_disruption = spark.table("#Lakehouse_Bronze#.dbo.UnplannedDisruption")

In [None]:
# Join Network Data

from pyspark.sql.functions import *

df_silver_network_details = (
    df_network.alias("n")
    .join(
        df_occupied_bw.alias("ob"), 
        col("n.NetworkId") == col("ob.WirelessNetworkId"), 
        "left"
    )
    .join(
        df_network_tx.alias("ntx"),
        col("n.NetworkId") == col("ntx.OperatorId"),
        "left"
    )
    .join(
        df_network_event.alias("ne"),
        col("n.NetworkId") == col("ne.NetworkId"),
        "left"
    )
    .join(
        df_network_service_area.alias("nsa"),
        col("n.NetworkId") == col("nsa.NetworkId"),
        "left"
    )
    .join(
        df_unplanned_disruption.alias("upd"),
        col("n.NetworkId") == col("upd.NetworkId"),
        "left"
    )
    # .join(
    #     df_customer_account_event.alias("cae"),
    #     col("n.OperatingUtilityPartyId") == col("cae.CustomerAccountId"),
    #     "left"
    # )
    .select(
        col("n.NetworkId"),
        col("n.NetworkName"),
        col("n.NetworkDescription"),
        col("n.OperatingUtilityPartyId"),
        col("n.NetworkTypeId"),
        col("n.NetworkPriorityId"),
        col("ob.PeriodStartTimestamp").alias("OccupiedBwStartTimestamp"),
        col("ob.PeriodEndTimestamp").alias("OccupiedBwEndTimestamp"),
        col("ob.PowerSpectralDensityUnits"),
        col("ob.PowerSpectralDensityUomId"),
        col("ob.OccupiedBandwidthPercentage"),
        col("ob.OccupiedBandwidthUnits"),
        col("ob.OccupiedBandwidthUomId"),
        col("ntx.NetworkTransactionId"),
        col("ntx.NetworkTransactionInitiationTimestamp"),
        col("ntx.NetworkTransactionCompletionTimestamp"),
        col("ntx.ConnectionDataRateUnits"),
        col("ntx.ConnectionDataRateUomId"),
        col("ntx.FrequencyBandId"),
        col("ntx.OperatorId"),
        col("ntx.NetworkTransactionTypeId"),
        col("ntx.DataServiceProductId"),
        col("ntx.AirtimeTypeId"),
        col("ntx.SupplementaryServiceId"),
        col("ntx.NetworkTransactionTerminationReasonTypeId"),
        col("ne.NetworkEventId"),
        col("ne.NetworkEventTypeId"),
        col("ne.NetworkEventStartTimestamp"),
        col("ne.NetworkEventEndTimestamp"),
        col("ne.NetworkEventNote"),
        col("nsa.GeographicAreaId"),
        col("nsa.PeriodStartDate").alias("ServiceAreaStartDate"),
        col("nsa.PeriodEndDate").alias("ServiceAreaEndDate"),
        col("nsa.ServiceAreaNote"),
        col("upd.PeriodStartTimestamp").alias("DisruptionStartTimestamp"),
        col("upd.PeriodEndTimestamp").alias("DisruptionEndTimestamp"),
        col("upd.NumberOfUnplannedDisruptions")
        # col("cae.CustomerAccountEventTypeId"),
        # col("cae.CustomerAccountEventTimestamp"),
        # col("cae.CustomerAccountEventNote")
    )
)

display(df_silver_network_details)


In [None]:
# Write Silver_Network_Details table

create_table_from_df(df_silver_network_details, "#Lakehouse_Gold#.dbo.Network_Details")

print("Created #Lakehouse_Gold#.dbo.Network_Details successfully.")

# Loading Finance data to gold

In [None]:
# Read raw tables from Bronze

df_billing_data_service_charge = spark.table("#Lakehouse_Silver#.dbo.BillingStatementDataServiceCharge")
df_billing_status = spark.table("#Lakehouse_Silver#.dbo.BillingStatementStatus")
df_advertising_campaign = spark.table("#Lakehouse_Silver#.dbo.AdvertisingCampaign")
df_issue = spark.table("#Lakehouse_Silver#.dbo.Issue")

df_customer_account_event = spark.table("#Lakehouse_Silver#.dbo.customeraccountevent")
df_customer_satisfaction_rating = spark.table("#Lakehouse_Silver#.dbo.customersatisfactionrating")
df_satisfaction_rating = spark.table("#Lakehouse_Silver#.dbo.SatisfactionRating")
df_customer_account_churn = spark.table("#Lakehouse_Silver#.dbo.customeraccountchurnpropensity")
df_customer = spark.table("#Lakehouse_Silver#.dbo.customer")
df_customer_account = spark.table("#Lakehouse_Silver#.dbo.customeraccount")

In [None]:
# Aggregate Advertisment data

from pyspark.sql.functions import *

df_advertising_campaign_agg = (
    df_advertising_campaign
    .withColumn("YearMonth", (year(col("AdvertisingCampaignStartDate")) * 100 + month(col("AdvertisingCampaignStartDate"))))
    .groupBy("YearMonth")
    .agg(
        round(avg("AgencyCommissionPercentage"),0).alias("AvgAgencyCommissionPercentage"),
        round(sum("PlannedTotalAdvertisingCampaignExpensesAmount"),0).alias("TotalPlannedExpenses"),
        round(sum("ActualTotalAdvertisingCampaignExpensesAmount"),0).alias("TotalActualExpenses")
    )
)

display(df_advertising_campaign_agg)


In [None]:
# Aggregate Issue data

from pyspark.sql.functions import col, year, month, sum, avg

df_issue_agg = (
    df_issue.alias("i")
    .withColumn("YearMonth", (year(col("IssueCreatedDate")) * 100 + month(col("IssueCreatedDate"))))
    .groupBy("YearMonth")
    .agg(
        sum("TotalManhoursExpended").alias("TotalManhours"),
        round(sum("TotalCostAmount"),2).alias("TotalCost"),
        round(avg("TotalManhoursExpended"),2).alias("AvgManhours"),
        round(avg("TotalCostAmount"),2).alias("AvgCost")
    )
)

display(df_issue_agg)


In [None]:
# Add a combined year-month column to both DataFrames

from pyspark.sql.functions import *

df_billing_status = df_billing_status.withColumn("YearMonth", (year(col("CurrentBillingPeriodStartDate")) * 100 + month(col("CurrentBillingPeriodStartDate"))))
df_billing_data_service_charge = df_billing_data_service_charge.withColumn("YearMonth", (year(col("CurrentBillingPeriodStartDate")) * 100 + month(col("CurrentBillingPeriodStartDate"))))


In [None]:
# Aggregate Service data

df_service_agg = (
    df_billing_data_service_charge.alias("bdsc")
    .groupBy("YearMonth")
    .agg(
        round(sum("ChargeAmount"),0).alias("MonthlyDataServiceCharge")
    )
)

display(df_service_agg)


In [None]:
# Join Billing Data

df_billings = (
    df_billing_status.alias("bs")
    .join(
        df_billing_data_service_charge.alias("dsc"),
        [
            col("dsc.CustomerAccountId") == col("bs.CustomerAccountId"),
            col("dsc.YearMonth") == col("bs.YearMonth")
        ],
        "left"
    )
    .groupBy(
        col("dsc.CustomerAccountId"),
        col("dsc.CurrentBillingPeriodStartDate"),
        col("dsc.CurrentBillingPeriodEndDate"),
        col("dsc.YearMonth")
    )
    .agg(
        round(sum("dsc.ChargeAmount"),0).alias("TotalDataServiceCharge"),
        round(sum("bs.OverdueAmount"),2).alias("TotalOverdueAmount"),
        round(sum("bs.BalanceDueAmount"),2).alias("TotalBalanceDueAmount"),
        round(sum("bs.CreditBalanceAmount"),2).alias("TotalCreditBalanceAmount"),
        round(sum("bs.DisputedAmount"),2).alias("TotalDisputedAmount"),
        round(sum("bs.AdjustmentPendingAmount"),2).alias("TotalAdjustmentPendingAmount"),
        round(sum("bs.RefundDueAmount"),2).alias("TotalRefundDueAmount")
    )
)

df_billings = df_billings.filter(col("CustomerAccountId").isNotNull())

display(df_billings)

In [None]:
# Unify data

df_silver_billing = (
    df_billings.alias("b")
    .join(
        df_advertising_campaign_agg.alias("aca"),
        col("b.YearMonth") == col("aca.YearMonth"),
        "left"
    )
    .join(
        df_issue_agg.alias("ia"),
        col("b.YearMonth") == col("ia.YearMonth"),
        "left"
    )
    .join(
        df_service_agg.alias("sa"),
        col("b.YearMonth") == col("sa.YearMonth"),
        "left"
    )
    .select(
        col("b.CustomerAccountId"),
        col("b.CurrentBillingPeriodStartDate"),
        col("b.CurrentBillingPeriodEndDate"),
        col("b.YearMonth"),
        col("b.TotalDataServiceCharge"),
        col("b.TotalOverdueAmount"),
        col("b.TotalBalanceDueAmount"),
        abs(col("b.TotalCreditBalanceAmount")).alias("TotalCreditBalanceAmount"),
        col("b.TotalDisputedAmount"),
        col("b.TotalAdjustmentPendingAmount"),
        col("b.TotalRefundDueAmount"),
        col("AvgAgencyCommissionPercentage"),
        col("TotalPlannedExpenses"),
        col("TotalActualExpenses"),
        col("TotalCost"),
        col("AvgCost"),
        col("MonthlyDataServiceCharge")
    )
    .withColumn("TotalExpenses", 
                coalesce(col("TotalActualExpenses"), lit(0)) + coalesce(col("TotalCost"), lit(0)))
    .withColumn("OperatingProfit", 
                abs(coalesce(col("MonthlyDataServiceCharge"), lit(0)) - coalesce(col("TotalActualExpenses"), lit(0))))
)

display(df_silver_billing)

In [None]:
sum_df = (
    df_silver_billing
        .groupBy("YearMonth")
        .agg(
            (round(avg("TotalExpenses")/1000000,2)).alias("TotalExpenses"),
            (round(avg("MonthlyDataServiceCharge")/1000000,2)).alias("TotalRevenue"),
            (round(avg("OperatingProfit")/1000000,2)).alias("OperatingProfit")
        )
)

display(sum_df)

In [None]:
# Write Silver_Billing table

create_table_from_df(df_silver_billing, "#Lakehouse_Gold#.dbo.Billing_Details")

print("Created #Lakehouse_Gold#.dbo.Billing_Details successfully.")


# Loading sales data to gold

In [None]:
# Read raw tables from Silver

df_advertising_campaign = spark.table("#Lakehouse_Silver#.dbo.AdvertisingCampaign")
df_wireless_product = spark.table("#Lakehouse_Silver#.dbo.WirelessProduct")


In [None]:
# Read raw tables from Bronze

df_billing_data_service_charge = spark.table("#Lakehouse_Silver#.dbo.BillingStatementDataServiceCharge")
df_advertising_campaign = spark.table("#Lakehouse_Silver#.dbo.AdvertisingCampaign")
df_customer_account = spark.table("#Lakehouse_Silver#.dbo.customeraccount")
df_customer_account_churn = spark.table("#Lakehouse_Silver#.dbo.customeraccountchurnpropensity")
df_customer_account_event = spark.table("#Lakehouse_Silver#.dbo.customeraccountevent")


# Silver_Campaign_Product

In [None]:
# Select columns

from pyspark.sql.functions import *

df_silver_campaign_product = (
    df_advertising_campaign.alias("adc")
    .join(
        df_wireless_product.alias("wp"),
        col("adc.ProductId") == col("wp.ProductId"),
        "left"
    )
    .select(
        col("adc.AdvertisingCampaignId"),
        col("adc.AdvertisingCampaignName"),
        col("adc.AdvertisingCampaignStartDate"),
        col("adc.AdvertisingCampaignEndDate"),
        col("adc.AdvertisingCampaignAppealStatement"),
        col("adc.AdvertisingCampaignCreativeStrategyStatement"),
        col("adc.MediaObjectivesStatement"),
        round(col("adc.AgencyCommissionPercentage"),0).alias("AgencyCommissionPercentage"),
        round(col("adc.PlannedTotalAdvertisingCampaignExpensesAmount"),0).alias("PlannedTotalAdvertisingCampaignExpensesAmount"),
        round(col("adc.ActualTotalAdvertisingCampaignExpensesAmount"),0).alias("ActualTotalAdvertisingCampaignExpensesAmount"),
        col("adc.AdvertisingCategoryId"),
        col("adc.AgencyId"),
        col("adc.MarketingCampaignId"),
        col("adc.ProductId"),
        col("adc.BrandId"),
        col("wp.ProductName"),
        col("wp.ProductShortDescription"),
        col("wp.ProductDescription"),
        col("wp.ProductIntendedUse"),
        col("wp.IntroductionDate"),
        col("wp.FirstDateManufactured"),
        col("wp.PlannedAbandonmentDate"),
        col("wp.ProductNetContent"),
        col("wp.BaseProductQuantity")
    )
)

display(df_silver_campaign_product)


In [None]:
# Write Advertising_Campaign table

create_table_from_df(df_silver_campaign_product, "#Lakehouse_Gold#.dbo.Campaign_Product_Details")

print("Created #Lakehouse_Gold#.dbo.Campaign_Product_Details successfully.")


# Silver_Campaign

In [None]:
df_advertising_campaign = df_advertising_campaign.withColumn("YearMonth", date_format(col("AdvertisingCampaignStartDate"), "yyyyMM"))
df_billing_data_service_charge = df_billing_data_service_charge.withColumn("YearMonth", date_format(col("CurrentBillingPeriodStartDate"), "yyyyMM"))


In [None]:
# Aggregate Billing Data

from pyspark.sql.functions import *

df_billing_agg = (
    df_billing_data_service_charge.alias("dsc")
    .groupBy(
        col("dsc.YearMonth")
    )
    .agg(
        round(sum("dsc.ChargeAmount"),2).alias("Revenue")
    )
)

display(df_billing_agg)


In [None]:
# Aggregate Campaign Data

from pyspark.sql.functions import *

df_campaign_agg = (
    df_advertising_campaign.alias("ac")
    .groupBy(
        col("ac.YearMonth"),
        col("ac.AdvertisingCampaignName"),
    )
    .agg(
        round(avg(col("ac.AgencyCommissionPercentage")),0).alias("AgencyCommissionPercentage"),
        round(avg(col("ac.PlannedTotalAdvertisingCampaignExpensesAmount")),0).alias("PlannedTotalAdvertisingCampaignExpensesAmount"),
        round(avg(col("ac.ActualTotalAdvertisingCampaignExpensesAmount")),0).alias("ActualTotalAdvertisingCampaignExpensesAmount"),
    )
)

display(df_campaign_agg)


In [None]:
df_kpis = (
    df_campaign_agg.alias("ca")
    .join(
        df_billing_agg.alias("ba"),
        col("ca.YearMonth") == col("ba.YearMonth"),
        "left"
    )
    .select(
        col("ca.YearMonth"),
        col("ca.AdvertisingCampaignName"),
        col("ca.AgencyCommissionPercentage"),
        col("ca.PlannedTotalAdvertisingCampaignExpensesAmount"),
        col("ca.ActualTotalAdvertisingCampaignExpensesAmount"),
        col("ba.Revenue"),
        (col("ca.ActualTotalAdvertisingCampaignExpensesAmount") - col("ca.PlannedTotalAdvertisingCampaignExpensesAmount")).alias("CampaignExpenseDeviation"),
        round(((col("ca.ActualTotalAdvertisingCampaignExpensesAmount") - col("ca.PlannedTotalAdvertisingCampaignExpensesAmount")) / col("ca.PlannedTotalAdvertisingCampaignExpensesAmount") * 100),0).alias("CampaignExpenseVarianceRate"),
        round((col("ca.ActualTotalAdvertisingCampaignExpensesAmount") * (col("ca.AgencyCommissionPercentage") / 100)),0).alias("AgencyCommissionAmount"),
        round(when(col("ca.ActualTotalAdvertisingCampaignExpensesAmount") != 0, col("ba.Revenue") / col("ca.ActualTotalAdvertisingCampaignExpensesAmount")).otherwise(0), 2).alias("CampaignEfficiency")
    )
)

# Display the result
display(df_kpis)


In [None]:
# Write Silver_Billing table

create_table_from_df(df_kpis, "#Lakehouse_Gold#.dbo.Campaign_KPI_Details")

print("Created #Lakehouse_Gold#.dbo.Campaign_KPI_Details successfully.")
