## Transform Silver to Gold

### 1. Define the container path

In [0]:
# Define the paths
silver_path = "dbfs:/mnt/medallion/silver/"
gold_path = "dbfs:/mnt/medallion/gold/"
checkpoints_path = "dbfs:/mnt/stream_checkpoints/"

### 2. Read the Silver Data

In [0]:
# Read the streaming data from the silver directory
silver_df = spark.read\
        .format("delta")\
        .load(silver_path)

In [0]:
# Display the silver data (optional)
display(silver_df)

transaction_id,transaction_time,transaction_qty,store_id,store_location,product_id,unit_price,product_category,product_name,payment_type,customer_name,extract_time,daypart,transformed_time
a111946d-2395-40bc-8945-451167eba055,00:40:00,4,4,Hayfield,3,5.0,Coffee,Cappuccino,Credit,Cody,2025-01-13T03:20:18.544Z,Night,2025-01-13T03:20:59.915Z
2209a0ad-e83c-4547-8acc-bc41edc9f16d,07:29:47,1,3,Homerville,4,6.5,Coffee,Hazelnut Latte,Debit,Christina,2025-01-13T03:20:18.544Z,Morning,2025-01-13T03:20:59.915Z
5f789da2-e58a-43f8-a23b-0ecc8d021e8c,19:35:51,4,1,Osceola,4,6.5,Coffee,Hazelnut Latte,Credit,Darlene,2025-01-13T03:20:18.544Z,Evening,2025-01-13T03:20:59.915Z
5f789da2-e58a-43f8-a23b-0ecc8d021e8c,19:35:51,3,1,Osceola,8,5.0,Pastry,Cheese Toast,Credit,Darlene,2025-01-13T03:20:18.544Z,Evening,2025-01-13T03:20:59.915Z
e70d66c6-4539-46ff-8265-ab481cedc89b,09:30:16,1,1,Osceola,12,4.0,Other,Hot Chocolate,Credit,Jennifer,2025-01-13T03:20:18.544Z,Morning,2025-01-13T03:20:59.915Z
cf9a7b93-1c53-4a25-be20-0377b5362a17,05:49:25,4,5,Monroe,4,6.5,Coffee,Hazelnut Latte,Debit,Stefanie,2025-01-13T03:20:18.544Z,Night,2025-01-13T03:20:59.915Z
a7d6c84c-365f-44b8-aa46-f200cdc5840a,01:23:29,1,1,Osceola,12,4.0,Other,Hot Chocolate,Credit,Kaitlyn,2025-01-13T03:20:18.544Z,Night,2025-01-13T03:20:59.915Z
ffa73db8-0f83-4844-9416-7223c14a32c1,01:36:25,4,5,Monroe,2,5.5,Coffee,Double Espresso,Debit,Daniel,2025-01-13T03:20:18.544Z,Night,2025-01-13T03:20:59.915Z
fc3e8fc2-1135-400b-ad03-394632d4cdbf,14:32:17,3,2,Victoria,5,6.5,Coffee,Macchiato,Credit,Tommy,2025-01-13T03:20:18.544Z,Afternoon,2025-01-13T03:20:59.915Z
93dc6919-8c21-41c1-b998-13f6bb935a67,19:14:23,2,1,Osceola,7,3.0,Pastry,Butter Croissant,Debit,Karen,2025-01-13T03:20:18.544Z,Evening,2025-01-13T03:20:59.915Z


### 3. Gold Transformation

#### A. dim_store

In [0]:
dim_store = silver_df.select("store_id", "store_location").distinct()

In [0]:
dim_store.printSchema()

root
 |-- store_id: string (nullable = true)
 |-- store_location: string (nullable = true)



#### B. dim_product

In [0]:
dim_product = silver_df.select("product_id", "product_name", "unit_price", "product_category").distinct()

In [0]:
dim_product.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- unit_price: float (nullable = true)
 |-- product_category: string (nullable = true)



#### C. dim_customer

In [0]:
from pyspark.sql import functions as sf

dim_customer = silver_df.select("customer_name").distinct() \
    .withColumn("customer_id", sf.monotonically_increasing_id().cast("string"))

In [0]:
dim_customer.printSchema()

root
 |-- customer_name: string (nullable = true)
 |-- customer_id: string (nullable = false)



#### D. dim_payment

In [0]:
dim_payment = silver_df.select("payment_type").distinct() \
    .withColumn("payment_id", sf.monotonically_increasing_id().cast("string"))

In [0]:
dim_payment.printSchema()

root
 |-- payment_type: string (nullable = true)
 |-- payment_id: string (nullable = false)



#### E. fact_transactions

In [0]:
from pyspark.sql.functions import col, current_timestamp

# Create Fact Table by Joining with Dimension Tables
fact_transactions = silver_df \
    .join(dim_customer, "customer_name") \
    .join(dim_payment, "payment_type") \
    .select("transaction_id", "transaction_time", "daypart",  "transaction_qty", "store_id", "product_id", "unit_price", "customer_id", "payment_id", "extract_time", "transformed_time")\
    .withColumn('load_time', sf.current_timestamp())

In [0]:
fact_transactions.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- transaction_time: string (nullable = true)
 |-- daypart: string (nullable = true)
 |-- transaction_qty: integer (nullable = true)
 |-- store_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- unit_price: float (nullable = true)
 |-- customer_id: string (nullable = false)
 |-- payment_id: string (nullable = false)
 |-- extract_time: timestamp (nullable = true)
 |-- transformed_time: timestamp (nullable = true)
 |-- load_time: timestamp (nullable = false)



In [0]:
display(fact_transactions)

transaction_id,transaction_time,daypart,transaction_qty,store_id,product_id,unit_price,customer_id,payment_id,extract_time,transformed_time,load_time
a111946d-2395-40bc-8945-451167eba055,00:40:00,Night,4,4,3,5.0,41,0,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z
2209a0ad-e83c-4547-8acc-bc41edc9f16d,07:29:47,Morning,1,3,4,6.5,146,1,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z
5f789da2-e58a-43f8-a23b-0ecc8d021e8c,19:35:51,Evening,4,1,4,6.5,83,0,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z
5f789da2-e58a-43f8-a23b-0ecc8d021e8c,19:35:51,Evening,3,1,8,5.0,83,0,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z
e70d66c6-4539-46ff-8265-ab481cedc89b,09:30:16,Morning,1,1,12,4.0,19,0,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z
cf9a7b93-1c53-4a25-be20-0377b5362a17,05:49:25,Night,4,5,4,6.5,9,1,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z
a7d6c84c-365f-44b8-aa46-f200cdc5840a,01:23:29,Night,1,1,12,4.0,160,0,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z
ffa73db8-0f83-4844-9416-7223c14a32c1,01:36:25,Night,4,5,2,5.5,181,1,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z
fc3e8fc2-1135-400b-ad03-394632d4cdbf,14:32:17,Afternoon,3,2,5,6.5,23,0,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z
93dc6919-8c21-41c1-b998-13f6bb935a67,19:14:23,Evening,2,1,7,3.0,237,1,2025-01-13T03:20:18.544Z,2025-01-13T03:20:59.915Z,2025-01-13T03:25:15.476Z


### 4. Save the gold data

In [0]:
def write_to_gold(dim_store, dim_product, dim_customer, dim_payment, fact_transactions, gold_path):

    dim_store.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_store")
    dim_product.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_product")
    dim_customer.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_customer")
    dim_payment.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_payment")
    fact_transactions.write.format("delta").mode("append").option("mergeSchema", "true").save(f"{gold_path}/fact_transactions")

    print("DataFrames successfully written to the gold directory.")

In [0]:
# Define the base path to the gold directory
gold_path = "dbfs:/mnt/medallion/gold/"

# Call the function to write the DataFrames to the gold directory
write_to_gold(dim_store, dim_product, dim_customer, dim_payment, fact_transactions, gold_path)

DataFrames successfully written to the gold directory.
