In [0]:
%run /Workspace/Users/saijananig4@gmail.com/real-time-banking-etl/src/config/00_config_and_helpers

In [0]:
dbutils.widgets.text("env", "dev")
env = dbutils.widgets.get("env")

cfg = get_env_config(env)

SILVER_TBL = tbl(cfg, "silver_transactions")
df = spark.read.table(SILVER_TBL)

print("Running env:", cfg["ENV"])
print("SILVER_TBL:", SILVER_TBL)
print("Silver rows:", df.count())


In [0]:
from pyspark.sql import functions as F

silver_tbl = "finance_data.dev.silver_transactions"
df = spark.read.table(silver_tbl)

print("Silver rows: ", df.count())
df.show(10, truncate = False)

In [0]:
#Hourly KPIs

gold_hourly = (
    df.groupBy("transaction_date", "hour_of_day")
    .agg(
        F.count("*").alias("txn_count"),
        F.round(F.sum("amount"),2).alias("total_amount"),
        F.round(F.avg("amount"),2).alias("avg_amount"),
        F.sum(F.when(F.col("is_high_value") == True,1).otherwise(0)).alias("high_value_txn_count")
    )
    .orderBy(F.col("transaction_date").desc(), F.col("hour_of_day").desc())                  
)

gold_hourly_tbl = tbl(cfg, "gold_hourly_kpis")
(gold_hourly.write.mode("overwrite").option("mergeSchema", "true").format("delta").saveAsTable(gold_hourly_tbl))
print("✅ Created:", gold_hourly_tbl)
gold_hourly.show()

In [0]:
#daily KPIS

gold_daily = (
    df.groupBy("transaction_date", "day_of_week")
    .agg(
        F.count("*").alias("txn_count"),
        F.round(F.sum("amount"),2).alias("total_amount"),
        F.round(F.avg("amount"),2).alias("avg_amount"),
        F.sum(F.when(F.col("is_high_value")== True,1).otherwise(0)).alias("high_value_txn_count"),
        F.countDistinct("account_id").alias("active_accounts")
    )
    .orderBy(F.col("transaction_date").desc())
)

gold_daily_tbl = tbl(cfg, "gold_daily_kpis")
(gold_daily.write.mode("overwrite").option("mergeSchema", "true").format("delta").saveAsTable(gold_daily_tbl))
print("✅ Created:", gold_daily_tbl)
gold_daily.show()


In [0]:
#spend by merchant category

gold_category = (
    df.groupBy("transaction_date", "merchant_category")
    .agg(
        F.count("*").alias("txn_count"),
        F.round(F.sum("amount"),2).alias("total_amount"),
        F.round(F.avg("amount"),2).alias("avg_amount")
    )
    .orderBy(F.col("transaction_date").desc(), F.col("merchant_category").desc())
)

gold_category_tbl = tbl(cfg, "gold_category_kpis")
(gold_category.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable(gold_category_tbl))
print("✅ Created:", gold_category_tbl)
gold_category.show()

In [0]:
#high value monitoring

gold_high_value = (
    df.filter(F.col("is_high_value") == True)
      .select(
          "transaction_id", "account_id", "transaction_ts", "transaction_date",
          "amount", "merchant_category", "channel", "city"
      )
      .orderBy(F.col("amount").desc())
)

gold_high_value_tbl = tbl(cfg, "gold_high_value_txns")
(gold_high_value.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable(gold_high_value_tbl))
print("✅ Created:", gold_high_value_tbl) 
gold_high_value.show()

In [0]:
from pyspark.sql.window import Window

# Spend per account per day
acct_daily = (
    df.groupBy("transaction_date", "account_id")
      .agg(
          F.round(F.sum("amount"), 2).alias("total_spend"),
          F.count("*").alias("txn_count")
      )
)

# Rank accounts per day by spend
w = Window.partitionBy("transaction_date").orderBy(F.col("total_spend").desc())

gold_top_accounts_daily = (
    acct_daily
    .withColumn("rank", F.row_number().over(w))
    .filter(F.col("rank") <= 10)   # top 10 per day
    .orderBy(F.col("transaction_date").desc(), F.col("rank").asc())
)

gold_top_accounts_daily_tbl = tbl(cfg, "gold_top_accounts_daily")
(gold_top_accounts_daily.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable(gold_top_accounts_daily_tbl))

print("✅ Created:", gold_top_accounts_daily_tbl)
gold_top_accounts_daily.show()

In [0]:
# Spend per account per category per day
acct_cat_daily = (
    df.groupBy("transaction_date", "merchant_category", "account_id")
      .agg(
          F.round(F.sum("amount"), 2).alias("total_spend"),
          F.count("*").alias("txn_count")
      )
)

# Rank within each (date, category)
w2 = Window.partitionBy("transaction_date", "merchant_category").orderBy(F.col("total_spend").desc())

gold_top_account_by_category = (
    acct_cat_daily
    .withColumn("rank", F.row_number().over(w2))
    .filter(F.col("rank") == 1)
    .select("transaction_date", "merchant_category", "account_id", "total_spend", "txn_count")
    .orderBy(F.col("transaction_date").desc(), F.col("total_spend").desc())
)

gold_top_account_by_category_tbl = tbl(cfg, "gold_top_account_by_category_daily")
(gold_top_account_by_category.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable(gold_top_account_by_category_tbl))

print("✅ Created:", gold_top_account_by_category_tbl)
gold_top_account_by_category.show()

In [0]:
for t in [
    "finance_data.prod.gold_hourly_kpis",
    "finance_data.prod.gold_daily_kpis",
    "finance_data.prod.gold_category_kpis",
    "finance_data.prod.gold_high_value_txns",
    "finance_data.prod.gold_top_accounts_daily",
    "finance_data.prod.gold_top_account_by_category_daily"
]:
    c = spark.read.table(t).count()
    print(t, "=>", c)


In [0]:
%sql 

select * from finance_data.prod.gold_daily_kpis



