In [0]:
# GOLD LAYER - BUSINESS ANALYTICS & AGGREGATIONS

from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
print("Starting Gold Layer ETL (Load Phase)...")

# Read from Silver layer
silver_df = spark.table("silver_credit_transactions")

print("Silver table loaded successfully")
print(f"Silver record count: {silver_df.count()}")


In [0]:
#Total Spend by Category
gold_spend_by_category = (
    silver_df
    .groupBy("category")
    .agg(
        sum("amt").alias("total_spend"),
        count("*").alias("transaction_count")
    )
    .orderBy(desc("total_spend"))
)

gold_spend_by_category.show()


In [0]:
#Fraud Rate by State
gold_fraud_by_state = (
    silver_df
    .groupBy("state")
    .agg(
        count("*").alias("total_transactions"),
        sum(col("is_fraud").cast("int")).alias("fraud_transactions")
    )
    .withColumn(
        "fraud_rate_percent",
        round(col("fraud_transactions") / col("total_transactions") * 100, 2)
    )
    .orderBy(desc("fraud_rate_percent"))
)

gold_fraud_by_state.show()


In [0]:
#Top Merchants by Transaction Amount
gold_top_merchants = (
    silver_df
    .groupBy("merchant")
    .agg(
        sum("amt").alias("total_spend"),
        count("*").alias("transaction_count")
    )
    .orderBy(desc("total_spend"))
    .limit(10)
)

gold_top_merchants.show(truncate=False)


In [0]:
#Daily Transaction Trend
gold_daily_trend = (
    silver_df
    .groupBy("transaction_date")
    .agg(
        sum("amt").alias("daily_spend"),
        count("*").alias("transaction_count")
    )
    .orderBy("transaction_date")
)

gold_daily_trend.show(10)


In [0]:
gold_spend_by_category.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_spend_by_category")

gold_fraud_by_state.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_fraud_by_state")

gold_top_merchants.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_top_merchants")

gold_daily_trend.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_daily_transaction_trend")

print("Gold tables created successfully!")
