In [13]:
from common import base_path, BRONZE_ROOT, SILVER_ROOT, GOLD_ROOT
import os
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import random
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [14]:
for layer in ['bronze', 'silver', 'gold']:
    os.makedirs(f"{base_path}/{layer}", exist_ok=True)

In [15]:
spark_conf = (
    SparkConf()
    .set("spark.driver.memory", "2g")
    .set("spark.jars.packages", "org.apache.hadoop:hadoop-client:3.3.4,io.delta:delta-spark_2.12:3.2.0")
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

sc = SparkContext.getOrCreate(spark_conf)
spark = SparkSession(sc)

print(f"Spark version = {spark.version}")

Spark version = 3.5.0


## 1️ Bronze Layer - Raw Data

We generate and store the following datasets:

- **Users**: user_id, user_name, credit_score
- **Cards**: card_id, user_id, bank_name (card issuer)
- **Transaction Logs**: transaction_id, card_from, card_to, amount, timestamp

All data is stored as Delta Lake format tables under the `/bronze` folder.

---

In [16]:
banks = ['BankA', 'BankB', 'BankC', 'BankD']

users = [{"user_id": i, "user_name": f"user_{i}", "credit_score": random.randint(300, 850)} for i in range(100)]

cards = [{"card_id": i, "user_id": random.choice(users)['user_id'], "bank_name": random.choice(banks)} for i in range(200)]

transactions = [{
    "transaction_id": i,
    "card_from": random.choice(cards)['card_id'],
    "card_to": random.choice(cards)['card_id'],
    "amount": round(random.uniform(10, 1000), 2),
    "timestamp": f"2024-0{random.randint(1,9)}-{random.randint(10,28)}"
} for i in range(5000)]

spark.createDataFrame(users).write.format("delta").mode("overwrite").save(f"{BRONZE_ROOT}/user")
spark.createDataFrame(cards).write.format("delta").mode("overwrite").save(f"{BRONZE_ROOT}/card")
spark.createDataFrame(transactions).write.format("delta").mode("overwrite").save(f"{BRONZE_ROOT}/transaction_log")


## 2️ Silver Layer - Enriched Wide Table

We enrich transaction logs by:

- Joining transaction data with sender and receiver card information.
- Extracting sender and receiver bank names.
- Creating a **wide table** using `pivot` to show total transaction amounts from sender bank to receiver bank.

The pivot operation generates a table with sender banks as rows and receiver banks as columns.

All data is stored as Delta Lake tables under the `/silver` folder.

---

In [17]:
user_df = spark.read.format("delta").load(f"{BRONZE_ROOT}/user")
card_df = spark.read.format("delta").load(f"{BRONZE_ROOT}/card")
txn_df = spark.read.format("delta").load(f"{BRONZE_ROOT}/transaction_log")

txn_enriched = (
    txn_df
    .join(card_df.withColumnRenamed("card_id", "card_from"), on="card_from")
    .withColumnRenamed("bank_name", "sender_bank")
    .join(card_df.withColumnRenamed("card_id", "card_to"), on="card_to")
    .withColumnRenamed("bank_name", "receiver_bank")
    .select("transaction_id", "amount", "timestamp", "sender_bank", "receiver_bank")
)

txn_enriched.write.format("delta").mode("overwrite").save(f"{SILVER_ROOT}/transactions")

pivoted = (
    txn_enriched
    .groupBy("sender_bank")
    .pivot("receiver_bank", banks)
    .agg(F.sum("amount"))
    .na.fill(0)
)

pivoted.write.format("delta").mode("overwrite").save(f"{SILVER_ROOT}/transactions_pivoted")


## 3️ Gold Layer - Reporting

We build a final report:

- Stack the pivoted silver data (using `stack()` function).
- Apply a `window` function (`row_number() over partitioned by sender_bank`) to rank receiver banks by total transaction amount.
- Select the **most popular counter-agent bank** for each sender bank.

The final report is stored as a Delta table under the `/gold` folder.

---

In [18]:
pivoted = spark.read.format("delta").load(f"{SILVER_ROOT}/transactions_pivoted")

stack_expr = "stack(4, " + ", ".join([f"'{bank}', `{bank}`" for bank in banks]) + ") as (receiver_bank, total_amount)"
stacked = pivoted.selectExpr("sender_bank", stack_expr)

window_spec = Window.partitionBy("sender_bank").orderBy(F.desc("total_amount"))

ranked = stacked.withColumn("rank", F.row_number().over(window_spec))
top_counteragent = ranked.filter("rank == 1").drop("rank")

top_counteragent.show()

top_counteragent.write.format("delta").mode("overwrite").save(f"{GOLD_ROOT}/top_counteragent")


+-----------+-------------+------------------+
|sender_bank|receiver_bank|      total_amount|
+-----------+-------------+------------------+
|      BankA|        BankD|         153780.31|
|      BankB|        BankB|         206639.18|
|      BankC|        BankD|169221.28999999998|
|      BankD|        BankD|204079.60999999996|
+-----------+-------------+------------------+

