## **Setting Up Aws/Azure Credentials**

In [0]:
# Load AWS credentials securely from secret scope
ACCESS_KEY = dbutils.secrets.get(scope="aws-creds", key="aws-access-key")
SECRET_KEY = dbutils.secrets.get(scope="aws-creds", key="aws-secret-key")

# Set S3 access in Spark config
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")


## **Load Raw Transactions from S3 and Write to Bronze Delta Table**

In [0]:
from pyspark.sql.functions import input_file_name, lit

# Define S3 path
s3_base = "s3a://marketing-data-bucket-09/marketing_data"

# Input paths for each channel
web_path     = f"{s3_base}/transactions/web/"
mobile_path  = f"{s3_base}/transactions/mobile/"
instore_path = f"{s3_base}/transactions/instore/"

# Read web transactions
df_web = spark.read.option("header", True).csv(web_path) \
    .withColumn("channel", lit("web"))

# Read mobile transactions
df_mobile = spark.read.option("header", True).csv(mobile_path) \
    .withColumn("channel", lit("mobile"))

# Read instore transactions
df_instore = spark.read.option("header", True).csv(instore_path) \
    .withColumn("channel", lit("instore"))

# Combine all into a single df
df_transactions = df_web.unionByName(df_mobile).unionByName(df_instore)

# Preview schema
df_transactions.printSchema()
df_transactions.show(5)

# Bronze output path for raw data
bronze_output = f"{s3_base}/output/bronze/transactions_bronze"

# Write as managed Delta table to Bronze layer
df_transactions.write.format("delta").mode("overwrite").save(bronze_output)


root
 |-- transaction_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- channel: string (nullable = false)
 |-- transaction_date: string (nullable = true)
 |-- campaign_id: string (nullable = true)

+--------------+----------+-----------+-------+-------+----------------+-----------+
|transaction_id|product_id|customer_id| amount|channel|transaction_date|campaign_id|
+--------------+----------+-----------+-------+-------+----------------+-----------+
|   TXN00000002|  PRD00017|  CUST00231|2379.87|    web|      2024-04-28|  CAMPAIGN1|
|   TXN00000004|  PRD00058|  CUST00449|2190.22|    web|      2024-03-27|       NULL|
|   TXN00000005|  PRD00074|  CUST00321|2337.32|    web|      2024-02-29|  CAMPAIGN1|
|   TXN00000007|  PRD00065|  CUST00219|2602.37|    web|      2024-04-29|  CAMPAIGN1|
|   TXN00000010|  PRD00083|  CUST00080|1751.72|    web|      2024-02-24|       NULL|
+-------------

## **Clean & join transaction data (Bronze) with product catalog to create Silver Layer (enriched Delta table)**

In [0]:
from pyspark.sql.functions import col
from delta.tables import *

# Set paths
bronze_path = "s3a://marketing-data-bucket-09/marketing_data/output/bronze/transactions_bronze/"
product_path = "s3a://marketing-data-bucket-09/marketing_data/products/product_catalog.csv"
silver_path = "s3a://marketing-data-bucket-09/marketing_data/output/silver/transactions_enriched"

# Read bronze transaction data
bronze_df = spark.read.format("delta").load(bronze_path)

# Read product catalog (CSV)
products_df = spark.read.option("header", True).csv(product_path)

# Join transactions with product catalog on product_id
silver_df = bronze_df.join(products_df, on="product_id", how="left")

# Optional: Filter bad records (e.g., no product match)
silver_df_clean = silver_df.filter(col("product_name").isNotNull())

# Write Silver table to Delta
silver_df_clean.write.format("delta").mode("overwrite").save(silver_path)


In [0]:
display(spark.read.format("delta").load(silver_path).limit(10))

product_id,transaction_id,customer_id,amount,channel,transaction_date,campaign_id,product_name,category,price
PRD00010,TXN00000003,CUST00019,333.49,instore,2024-02-14,CAMPAIGN1,Tablet,Electronics,1008.7
PRD00087,TXN00000008,CUST00495,3223.51,instore,2024-03-22,CAMPAIGN1,Phone,Accessories,1417.75
PRD00074,TXN00000009,CUST00153,1789.65,instore,2024-02-04,,Laptop,Accessories,1721.96
PRD00047,TXN00000016,CUST00431,4384.5,instore,2024-04-08,CAMPAIGN1,Laptop,Electronics,1492.17
PRD00078,TXN00000017,CUST00131,1302.32,instore,2024-02-25,,Tablet,Computers,1474.46
PRD00016,TXN00000024,CUST00296,768.17,instore,2024-06-17,CAMPAIGN2,Headphones,Computers,1401.37
PRD00077,TXN00000028,CUST00025,225.86,instore,2024-01-28,,Monitor,Computers,298.1
PRD00072,TXN00000029,CUST00343,4120.67,instore,2024-04-17,,Laptop,Accessories,1322.7
PRD00040,TXN00000030,CUST00356,4681.04,instore,2024-06-11,CAMPAIGN1,Headphones,Accessories,700.61
PRD00008,TXN00000034,CUST00026,3493.1,instore,2024-02-05,CAMPAIGN2,Headphones,Electronics,1381.91


## **Now let's Check Data Quality Before Creating Gold Table**

In [0]:
%restart_python

In [0]:
from pyspark.sql.functions import col

# Check for null values in transaction_id, product_id, and amount columns
null_counts = silver_df.select([
    col(c).isNull().cast("int").alias(c) for c in ["transaction_id", "product_id", "amount"]
]).groupBy().sum().collect()[0].asDict()

print("Null Check:")
for col_name, null_count in null_counts.items():
    print(f"{col_name}: {null_count} nulls")

# Check valid amount range (0–10,000)
out_of_range = silver_df.filter((col("amount") < 0) | (col("amount") > 10000)).count()
print(f"\nAmount out of range (0–10,000): {out_of_range} rows")

# Check for malformed product IDs (e.g., PRD1234)
invalid_products = silver_df.filter(~col("product_id").rlike("^PRD\\d{5}$")).count()
print(f"\nInvalid product_id format: {invalid_products} rows")

# Checking for Duplicate transaction IDs  
duplicate_txns = silver_df.groupBy("transaction_id").count().filter("count > 1").count()
print(f"\nDuplicate transaction_id: {duplicate_txns} rows")


Null Check:
sum(transaction_id): 0 nulls
sum(product_id): 0 nulls
sum(amount): 0 nulls

Amount out of range (0–10,000): 0 rows

Invalid product_id format: 0 rows

Duplicate transaction_id: 0 rows


## **Loading Cleaned Data into Gold Delta Table**

In [0]:
from pyspark.sql import functions as F

# Reading Silver Table
silver_df = spark.read.format("delta").load("s3://marketing-data-bucket-09/marketing_data/output/silver/transactions_enriched")

# Some Business Aggregations for example
# Total revenue per channel and category
gold_df = silver_df.groupBy("channel", "category").agg(
    F.sum("amount").alias("total_revenue"),
    F.countDistinct("customer_id").alias("unique_customers"),
    F.count("*").alias("transaction_count")
)

# Write it into Gold Layer Delta Table
gold_df.write.format("delta") \
    .mode("overwrite") \
    .save("s3://marketing-data-bucket-09/marketing_data/output/gold/channel_category_summary")

# Register as a table for future BI tools
spark.sql("""
    CREATE TABLE IF NOT EXISTS channel_category_summary
    USING DELTA
    LOCATION 's3://marketing-data-bucket-09/marketing_data/output/gold/channel_category_summary'
""")


DataFrame[]

# Delta time Travel & Audits

**Check Available Versions**

In [0]:
# See commit history for the Delta table from gold layer
gold_table_path = "s3a://marketing-data-bucket-09/marketing_data/output/gold/channel_category_summary"

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, gold_table_path)
delta_table.history().show(truncate=False)


+-------+-------------------+---------------+------------------------+---------+------------------------------------------------------------+----+------------------+-------------------+-----------+-----------------+-------------+-----------------------------------------------------------+------------+------------------------------------------+
|version|timestamp          |userId         |userName                |operation|operationParameters                                         |job |notebook          |clusterId          |readVersion|isolationLevel   |isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                                |
+-------+-------------------+---------------+------------------------+---------+------------------------------------------------------------+----+------------------+-------------------+-----------+-----------------+-------------+-----------------------------------------------------------+------------+------

**Time Traveling**

In [0]:
from pyspark.sql import SparkSession

# Repointing to the gold delta table
gold_table_path = "s3a://marketing-data-bucket-09/marketing_data/output/gold/channel_category_summary"

# Reading current version 
df_latest = spark.read.format("delta").load(gold_table_path)
print("Current Version:")
df_latest.show()

# Reading specific version (like version 0)
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(gold_table_path)
print("Time Travel to Version 0:")
df_v0.show()


Current Version:
+-------+-----------+------------------+----------------+-----------------+
|channel|   category|     total_revenue|unique_customers|transaction_count|
+-------+-----------+------------------+----------------+-----------------+
|    web|  Computers|        2740357.86|             430|             1075|
| mobile|Electronics|2193510.2299999977|             404|              860|
|instore|  Computers| 2576285.130000001|             447|             1022|
| mobile|  Computers|2707976.7200000016|             447|             1069|
|    web|Electronics|2199698.7299999995|             416|              878|
|instore|Accessories|3609763.2199999965|             475|             1445|
|instore|Electronics|2153074.6399999987|             411|              857|
| mobile|Accessories|3474774.6399999983|             471|             1369|
|    web|Accessories|3574811.3600000003|             463|             1425|
+-------+-----------+------------------+----------------+--------------

## Optimizing storage 

**OPTIMIZE for improve performance by compacting small files**<br>
**VACUUM for clean up old data files not needed for time travel**

In [0]:
spark.sql(f"OPTIMIZE delta.`{gold_table_path}`")

spark.sql(f"VACUUM delta.`{gold_table_path}` RETAIN 168 HOURS") #retain for 7 days

DataFrame[path: string]