In [None]:
from pyspark.sql.functions import col

# Load data to the dataframe as a starting point to create the gold layer
salesorderdetail = spark.read.table("silver_adventureworks.salesorderdetail").where(col("current") == 1)
salesorderdetail = salesorderdetail.dropDuplicates(["SalesOrderID"])
salesorderdetail = salesorderdetail[["SalesOrderID", "SalesOrderDetailID", "ProductID", "OrderQty", "UnitPrice"]]
salesorderdetail = salesorderdetail.withColumn("Revenue",salesorderdetail["OrderQty"] * salesorderdetail["UnitPrice"] )

salesorderheader = spark.read.table("silver_adventureworks.salesorderheader").where(col("current") == 1)
salesorderheader = salesorderheader.dropDuplicates(["SalesOrderID"])
salesorderheader = salesorderheader[["SalesOrderID", "CustomerID", "ShipToAddressID", "Status", "OrderDate"]]
salesorderheader = salesorderheader.withColumnRenamed("SalesOrderID", "SalesOrderID2")

# Perform the joins
join = salesorderdetail.join(salesorderheader, salesorderdetail['SalesOrderID'] == salesorderheader['SalesOrderID2'], "inner")

from pyspark.sql.functions import concat
join = join.withColumn('SalesKey', concat(join['SalesOrderID'],join['SalesOrderDetailID']))
join = join[["SalesKey", "ProductID", "CustomerID", "Status", "Revenue", "OrderDate", "OrderQty", "UnitPrice"]]

In [None]:
import pandas as pd

dimension_customer = spark.read.table("gold_adventureworks.dimension_customer")
dimension_product = spark.read.table("gold_adventureworks.dimension_product")

customer_pandas = dimension_customer.toPandas()
product_pandas = dimension_product.toPandas()
join_pandas = join.toPandas()

def lookup_customer_key(row):  
    key = row['CustomerID']  
    return customer_pandas.loc[customer_pandas['CustomerID'] == key, 'ID'].values[0] if key in customer_pandas['CustomerID'].values else '0'  

def lookup_product_key(row):  
    key = row['ProductID']  
    return product_pandas.loc[product_pandas['ProductID'] == key, 'ID'].values[0] if key in product_pandas['ProductID'].values else '0'  

join_pandas['CustomerKey'] = join_pandas.apply(lookup_customer_key, axis=1)
join_pandas['ProductKey'] = join_pandas.apply(lookup_product_key, axis=1)  

fact_sales = spark.createDataFrame(join_pandas)
fact_sales = fact_sales[["SalesKey", "CustomerKey", "ProductKey", "Status", "OrderDate", "Revenue", "OrderQty", "UnitPrice"]]

In [None]:
from pyspark.sql.types import *
from delta.tables import*
    
 # Define the schema for the fact_sales table
DeltaTable.createIfNotExists(spark) \
    .tableName("gold_adventureworks.fact_sales") \
    .addColumn("SalesKey", StringType()) \
    .addColumn("CustomerKey", StringType()) \
    .addColumn("ProductKey", StringType()) \
    .addColumn("Status", IntegerType()) \
    .addColumn("OrderDate", DateType()) \
    .addColumn("Revenue", DoubleType()) \
    .addColumn("OrderQty", IntegerType()) \
    .addColumn("UnitPrice", DoubleType()) \
    .execute()

In [None]:
from delta.tables import *
    
deltaTable = DeltaTable.forPath(spark, 'Tables/gold_adventureworks/fact_sales')
  
deltaTable.alias('silver') \
  .merge(
    fact_sales.alias('updates'),
    'silver.SalesKey = updates.SalesKey AND silver.CustomerKey = updates.CustomerKey AND silver.ProductKey = updates.ProductKey'
  ) \
   .whenMatchedUpdate(set =
    {
          
    }
  ) \
 .whenNotMatchedInsert(values =
    {
      "SalesKey": "updates.SalesKey",
      "CustomerKey": "updates.CustomerKey",
      "ProductKey": "updates.ProductKey",
      "Status": "updates.Status",
      "OrderDate": "updates.OrderDate",
      "Revenue": "updates.Revenue",
      "OrderQty": "updates.OrderQty",
      "UnitPrice": "updates.UnitPrice"
    }
  ) \
  .execute()