In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

SILVER_DB = "silver"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_DB}")

print("Silver database ready.")

In [0]:
txn = spark.table("bronze.transactions").alias("txn")
acc = spark.table("bronze.accounts").alias("acc")
cust = spark.table("bronze.customers").alias("cust")
merch = spark.table("bronze.merchants").alias("merch")
fx = spark.table("bronze.exchange_rates").alias("fx")

In [0]:
window_spec = (
    Window
        .partitionBy("transaction_id")
        .orderBy(col("ingestion_timestamp").desc())
)

deduped_txn = (
    txn
        .withColumn("row_num", row_number().over(window_spec))
        .filter(col("row_num") == 1)
        .drop("row_num")
)

In [0]:
clean_txn = (
    deduped_txn
        .filter(col("transaction_id").isNotNull())
        .filter(col("amount") > 0)
        .filter(col("transaction_timestamp").isNotNull())
)

In [0]:
joined = (
    clean_txn.alias("txn")
    .join(acc, col("txn.account_id") == col("acc.account_id"), "left")
    .join(cust, col("acc.customer_id") == col("cust.customer_id"), "left")
    .join(merch, col("txn.merchant_id") == col("merch.merchant_id"), "left")
)

enriched = joined.select(
    col("txn.transaction_id"),
    col("txn.account_id"),
    col("acc.customer_id"),
    col("txn.merchant_id"),
    col("merch.category").alias("merchant_category"),
    col("txn.amount"),
    col("txn.currency").alias("transaction_currency"),
    col("txn.transaction_timestamp")
)

In [0]:
txn_with_date = enriched.withColumn(
    "transaction_date",
    col("transaction_timestamp").cast("date")
)

fx_joined = (
    txn_with_date.alias("t")
    .join(
        fx,
        (col("t.transaction_currency") == col("fx.currency")) &
        (col("t.transaction_date") == col("fx.date")),
        "left"
    )
    .withColumn("amount_usd", col("amount") * col("rate_to_usd"))
)

In [0]:
silver_txn = (
    fx_joined
        .withColumn("year", year("transaction_timestamp"))
        .withColumn("month", month("transaction_timestamp"))
        .withColumn("day", dayofmonth("transaction_timestamp"))
)

In [0]:
(
    silver_txn.write
        .format("delta")
        .mode("overwrite")
        .partitionBy("year", "month")
        .saveAsTable("silver.transactions_enriched")
)

print("Silver layer complete.")