### Gold Notebook

In [None]:
# Load in silver data
load_path = "qa_lakehouse_1312.orders_silver"
df = spark.read.table(load_path)

### dimdate_gold

In [None]:
# Instantiate dimdate_gold if not exists

from pyspark.sql.types import *
from delta.tables import*

# Define the schema for the dimdate_gold table
DeltaTable.createIfNotExists(spark) \
    .tableName("dimdate_gold") \
    .addColumn("OrderDate", DateType()) \
    .addColumn("Day", IntegerType()) \
    .addColumn("Month", IntegerType()) \
    .addColumn("Year", IntegerType()) \
    .addColumn("mmmyyyy", StringType()) \
    .addColumn("yyyymm", StringType()) \
    .execute()

In [None]:
from pyspark.sql.functions import col, dayofmonth, month, year, date_format

# Create dataframe for dimDate_gold

dfdimDate_gold = df.dropDuplicates(["OrderDate"]).select(col("OrderDate"), \
        dayofmonth("OrderDate").alias("Day"), \
        month("OrderDate").alias("Month"), \
        year("OrderDate").alias("Year"), \
        date_format(col("OrderDate"), "MMM-yyyy").alias("mmmyyyy"), \
        date_format(col("OrderDate"), "yyyyMM").alias("yyyymm"), \
    ).orderBy("OrderDate")

In [None]:
from delta.tables import *
from datetime import datetime 
from pyspark.sql.types import *
from pyspark.sql.functions import * 
import time
deltaTable = DeltaTable.forName(spark, 'dimdate_gold')

start_time = time.time()

table_name = "dimdate_gold"
load_path = "qa_lakehouse_1312.dimdate_gold"

try:
    dfUpdates = dfdimDate_gold

    record_count = dfUpdates.count()

    deltaTable.alias('gold') \
    .merge(
        dfUpdates.alias('updates'),
        'gold.OrderDate = updates.OrderDate'
    ) \
    .whenMatchedUpdate(set =
        {

        }
    ) \
    .whenNotMatchedInsert(values =
        {
        "OrderDate": "updates.OrderDate",
        "Day": "updates.Day",
        "Month": "updates.Month",
        "Year": "updates.Year",
        "mmmyyyy": "updates.mmmyyyy",
        "yyyymm": "updates.yyyymm"
        }
    ) \
    .execute()

    load_status = "SUCCESS"
    error_message = None

except Exception as e:
    record_count = 0
    load_status = "FAILURE"
    error_message = str(e)

end_time = time.time()
duration = (end_time - start_time)

monitoring_schema = StructType([
    StructField("load_timestamp", TimestampType()),
    StructField("Table_name", StringType()),
    StructField("Source_path", StringType()),
    StructField("record_count", IntegerType()),
    StructField("Status", StringType()),
    StructField("error_message", StringType()),
    StructField("duration", FloatType())
])

monitoring_data = [(datetime.now(), table_name, load_path, record_count, load_status, error_message, duration)]

monitoring_df = spark.createDataFrame(monitoring_data, schema=monitoring_schema)

monitoring_df.write.mode("append").saveAsTable("monitoring_etl")

### dimcustomer_gold

In [None]:
from pyspark.sql.types import *
from delta.tables import *

# Create customer_gold dimension delta table
DeltaTable.createIfNotExists(spark) \
    .tableName("dimcustomer_gold") \
    .addColumn("CustomerName", StringType()) \
    .addColumn("Email",  StringType()) \
    .addColumn("First", StringType()) \
    .addColumn("Last", StringType()) \
    .addColumn("CustomerID", LongType()) \
    .execute()

In [None]:
from pyspark.sql.functions import col, split

# Create customer_silver dataframe

dfdimCustomer_silver = df.dropDuplicates(["CustomerName","Email"]).select(col("CustomerName"),col("Email")) \
    .withColumn("First",split(col("CustomerName"), " ").getItem(0)) \
    .withColumn("Last",split(col("CustomerName"), " ").getItem(1)) 

In [None]:
from pyspark.sql.functions import monotonically_increasing_id, col, when, coalesce, max, lit

dfdimCustomer_temp = spark.read.table("dimCustomer_gold")

MAXCustomerID = dfdimCustomer_temp.select(coalesce(max(col("CustomerID")),lit(0)).alias("MAXCustomerID")).first()[0]

dfdimCustomer_gold = dfdimCustomer_silver.join(dfdimCustomer_temp,(dfdimCustomer_silver.CustomerName == dfdimCustomer_temp.CustomerName) & (dfdimCustomer_silver.Email == dfdimCustomer_temp.Email), "left_anti")

dfdimCustomer_gold = dfdimCustomer_gold.withColumn("CustomerID",monotonically_increasing_id() + MAXCustomerID + 1)

In [None]:
from delta.tables import *

deltaTable = DeltaTable.forName(spark, 'dimcustomer_gold')

dfUpdates = dfdimCustomer_gold

deltaTable.alias('gold') \
.merge(
    dfUpdates.alias('updates'),
    'gold.CustomerName = updates.CustomerName AND gold.Email = updates.Email'
) \
.whenMatchedUpdate(set =
    {

    }
) \
.whenNotMatchedInsert(values =
    {
    "CustomerName": "updates.CustomerName",
    "Email": "updates.Email",
    "First": "updates.First",
    "Last": "updates.Last",
    "CustomerID": "updates.CustomerID"
    }
) \
.execute()

### dimproduct_gold

In [None]:
from pyspark.sql.types import *
from delta.tables import *

DeltaTable.createIfNotExists(spark) \
    .tableName("dimproduct_gold") \
    .addColumn("ItemName", StringType()) \
    .addColumn("ItemID", LongType()) \
    .addColumn("ItemInfo", StringType()) \
    .execute()

In [None]:
from pyspark.sql.functions import col, split, lit, when

# Create product_silver dataframe

dfdimProduct_silver = df.dropDuplicates(["Item"]).select(col("Item")) \
    .withColumn("ItemName",split(col("Item"), ", ").getItem(0)) \
    .withColumn("ItemInfo",when((split(col("Item"), ", ").getItem(1).isNull() | (split(col("Item"), ", ").getItem(1)=="")),lit("")).otherwise(split(col("Item"), ", ").getItem(1))) 

In [None]:
from pyspark.sql.functions import monotonically_increasing_id, col, lit, max, coalesce

#dfdimProduct_temp = dfdimProduct_silver
dfdimProduct_temp = spark.read.table("dimProduct_gold")

MAXProductID = dfdimProduct_temp.select(coalesce(max(col("ItemID")),lit(0)).alias("MAXItemID")).first()[0]

dfdimProduct_gold = dfdimProduct_silver.join(dfdimProduct_temp,(dfdimProduct_silver.ItemName == dfdimProduct_temp.ItemName) & (dfdimProduct_silver.ItemInfo == dfdimProduct_temp.ItemInfo), "left_anti")

dfdimProduct_gold = dfdimProduct_gold.withColumn("ItemID",monotonically_increasing_id() + MAXProductID + 1)

In [None]:
from delta.tables import *

deltaTable = DeltaTable.forName(spark, 'dimproduct_gold')

dfUpdates = dfdimProduct_gold

deltaTable.alias('gold') \
.merge(
        dfUpdates.alias('updates'),
        'gold.ItemName = updates.ItemName AND gold.ItemInfo = updates.ItemInfo'
        ) \
        .whenMatchedUpdate(set =
        {

        }
        ) \
        .whenNotMatchedInsert(values =
        {
        "ItemName": "updates.ItemName",
        "ItemInfo": "updates.ItemInfo",
        "ItemID": "updates.ItemID"
        }
        ) \
        .execute()

### factsales_gold

In [None]:
from pyspark.sql.types import *
from delta.tables import *

DeltaTable.createIfNotExists(spark) \
    .tableName("factsales_gold") \
    .addColumn("CustomerID", LongType()) \
    .addColumn("ItemID", LongType()) \
    .addColumn("OrderDate", DateType()) \
    .addColumn("Quantity", IntegerType()) \
    .addColumn("UnitPrice", FloatType()) \
    .addColumn("Tax", FloatType()) \
    .execute()

In [None]:
from pyspark.sql.functions import col

dfdimCustomer_temp = spark.read.table("dimCustomer_gold")
dfdimProduct_temp = spark.read.table("dimProduct_gold")

df = df.withColumn("ItemName",split(col("Item"), ", ").getItem(0)) \
    .withColumn("ItemInfo",when((split(col("Item"), ", ").getItem(1).isNull() | (split(col("Item"), ", ").getItem(1)=="")),lit("")).otherwise(split(col("Item"), ", ").getItem(1))) \

# Create Sales_gold dataframe

dffactSales_gold = df.alias("df1").join(dfdimCustomer_temp.alias("df2"),(df.CustomerName == dfdimCustomer_temp.CustomerName) & (df.Email == dfdimCustomer_temp.Email), "left") \
        .join(dfdimProduct_temp.alias("df3"),(df.ItemName == dfdimProduct_temp.ItemName) & (df.ItemInfo == dfdimProduct_temp.ItemInfo), "left") \
    .select(col("df2.CustomerID") \
        , col("df3.ItemID") \
        , col("df1.OrderDate") \
        , col("df1.Quantity") \
        , col("df1.UnitPrice") \
        , col("df1.Tax") \
    ).orderBy(col("df1.OrderDate"), col("df2.CustomerID"), col("df3.ItemID"))

In [None]:
from delta.tables import *

deltaTable = DeltaTable.forName(spark, 'factsales_gold')

dfUpdates = dffactSales_gold

deltaTable.alias('gold') \
.merge(
    dfUpdates.alias('updates'),
    'gold.OrderDate = updates.OrderDate AND gold.CustomerID = updates.CustomerID AND gold.ItemID = updates.ItemID'
) \
.whenMatchedUpdate(set =
    {

    }
) \
.whenNotMatchedInsert(values =
    {
    "CustomerID": "updates.CustomerID",
    "ItemID": "updates.ItemID",
    "OrderDate": "updates.OrderDate",
    "Quantity": "updates.Quantity",
    "UnitPrice": "updates.UnitPrice",
    "Tax": "updates.Tax"
    }
) \
.execute()