In [0]:
RAW_PATH = "/Volumes/ecommerce/ecommerce/data"
GCP_PROJECT = "regal-elf-481622-u5"
BQ_DATASET = "ecommerce"
TEMP_GCS_BUCKET = "ecommerce-data1"

GCP_SECRET_SCOPE = "gcp-secrets"
GCP_SECRET_KEY = "gcp-sa-key"

In [0]:
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
)

from pyspark.sql.functions import (
    current_timestamp, input_file_name, lit, to_timestamp, to_date, col, count, sum
)

from delta.tables import DeltaTable
import uuid

# Unique id per notebook run (helps with audit + debugging)
LOAD_ID = str(uuid.uuid4())

print("Bronze Load ID:", LOAD_ID)

In [0]:

txn_schema = StructType([
    StructField("transaction_id", IntegerType(), True ),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("store_id", IntegerType(), True),
    StructField("promotion_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("transaction_date", TimestampType(), True)
])

cust_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("join_date", TimestampType(), True),
    StructField("country", StringType(), True)
])

prod_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("supplier_name", StringType(), True)
])

store_schema = StructType([
    StructField("store_id", IntegerType(), True),
    StructField("store_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("country", StringType(), True)
])

promo_schema = StructType([
    StructField("promotion_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("promotion_method", StringType(), True),
    StructField("discount_percent", DoubleType(), True),
    StructField("start_date", TimestampType(), True),
    StructField("end_date", TimestampType(), True)
])

feedback_schema = StructType([
    StructField("feedback_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("review_date", TimestampType(), True),
    StructField("comments", StringType(), True)
])

txn_df = spark.read.option("header", True).schema(txn_schema).csv(RAW_PATH + "/transactions.csv")
cust_df = spark.read.option("header", True).schema(cust_schema).csv(RAW_PATH + "/customers.csv")
prod_df = spark.read.option("header", True).schema(prod_schema).csv(RAW_PATH + "/products.csv")
store_df = spark.read.option("header", True).schema(store_schema).csv(RAW_PATH + "/stores.csv")
promo_df = spark.read.option("header", True).schema(promo_schema).csv(RAW_PATH + "/promotions.csv")
feedback_df = spark.read.option("header", True).schema(feedback_schema).csv(RAW_PATH + "/feedback.csv")

print("Raw Counts: ")
print("Transactions: ", txn_df.count())
print("Customers: ", cust_df.count())
print("Products: ", prod_df.count())
print("Stores: ", store_df.count())
print("Promotions: ", promo_df.count())
print("Feedback: ", feedback_df.count())


In [0]:
def add_bronze_meta(df):
    return (df
        .withColumn("_ingest_time", current_timestamp())
        .withColumn("_source_file", input_file_name())
        .withColumn("_load_id", lit(LOAD_ID))
    )

# Transactions: keep BOTH timestamp and date (important later for fraud/suspects & time series)
txn_bronze = (add_bronze_meta(txn_df)
    .withColumn("transaction_ts", col("transaction_date"))
    .withColumn("transaction_date", to_date(col("transaction_ts")))
)

cust_bronze = add_bronze_meta(cust_df)
prod_bronze = add_bronze_meta(prod_df)
store_bronze = add_bronze_meta(store_df)
promo_bronze = add_bronze_meta(promo_df)
feedback_bronze = add_bronze_meta(feedback_df)

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS ecommerce.bronze;
CREATE SCHEMA IF NOT EXISTS ecommerce.silver;
CREATE SCHEMA IF NOT EXISTS ecommerce.gold;
CREATE SCHEMA IF NOT EXISTS ecommerce.dq;


In [0]:
def merge_insert_only(df, table_fqn, merge_condition):
    """
    If table doesn't exist -> create.
    If exists -> MERGE and insert only new rows based on merge_condition.
    """
    if not spark.catalog.tableExists(table_fqn):
        (df.write
          .format("delta")
          .mode("overwrite")
          .option("overwriteSchema", "true")
          .saveAsTable(table_fqn))
        print(f"Created table: {table_fqn}")
    else:
        delta_tbl = DeltaTable.forName(spark, table_fqn)
        (delta_tbl.alias("t")
            .merge(df.alias("s"), merge_condition)
            .whenNotMatchedInsertAll()
            .execute()
        )
        print(f"Merged into table (insert-only): {table_fqn}")


In [0]:
merge_insert_only(txn_bronze,     "ecommerce.bronze.transactions", "t.transaction_id = s.transaction_id")
merge_insert_only(cust_bronze,    "ecommerce.bronze.customers",    "t.customer_id = s.customer_id")
merge_insert_only(prod_bronze,    "ecommerce.bronze.products",     "t.product_id = s.product_id")
merge_insert_only(store_bronze,   "ecommerce.bronze.stores",       "t.store_id = s.store_id")
merge_insert_only(promo_bronze,   "ecommerce.bronze.promotions",   "t.promotion_id = s.promotion_id")
merge_insert_only(feedback_bronze,"ecommerce.bronze.feedback",     "t.feedback_id = s.feedback_id")

In [0]:
def dq_audit(df, table_name, key_col):
    return (df.agg(
        count("*").alias("row_count"),
        count(col(key_col)).alias("non_null_key_count"),
        sum(col(key_col).isNull().cast("int")).alias("null_key_count")
    )
    .withColumn("table_name", lit(table_name))
    .withColumn("key_column", lit(key_col))
    .withColumn("_load_id", lit(LOAD_ID))
    .withColumn("_audit_time", current_timestamp())
    )

audit_df = (
    dq_audit(txn_bronze,      "ecommerce.bronze.transactions", "transaction_id")
    .unionByName(dq_audit(cust_bronze,     "ecommerce.bronze.customers",    "customer_id"))
    .unionByName(dq_audit(prod_bronze,     "ecommerce.bronze.products",     "product_id"))
    .unionByName(dq_audit(store_bronze,    "ecommerce.bronze.stores",       "store_id"))
    .unionByName(dq_audit(promo_bronze,    "ecommerce.bronze.promotions",   "promotion_id"))
    .unionByName(dq_audit(feedback_bronze, "ecommerce.bronze.feedback",     "feedback_id"))
)

audit_df.write.format("delta").mode("append").saveAsTable("ecommerce.dq.bronze_audit")
display(audit_df)

In [0]:
spark.table("ecommerce.bronze.transactions").selectExpr(
    "count(*) as total_rows",
    "count(distinct transaction_id) as distinct_txn_ids"
).show()

spark.table("ecommerce.bronze.feedback").selectExpr(
    "count(*) as total_rows",
    "count(distinct feedback_id) as distinct_feedback_ids"
).show()

print("âœ… Bronze ingestion completed successfully.")