**Silver Transformations - Phase 1 Data Load logics**

In [0]:
# Libraries
import re
from datetime import datetime
from pyspark.sql.functions import (
    col, when, upper, current_timestamp
)
from pyspark.sql.types import (
    StructType, StructField,
    TimestampType, StringType, DoubleType
)


In [0]:
%sql
USE CATALOG ppe;
USE SCHEMA silver;


In [0]:
last_ts = spark.sql(
    "SELECT last_processed_ingestion_ts FROM silver_processing_watermark"
).collect()

if last_ts:
    last_ts = last_ts[0][0]
    print("Last processed Bronze ingestion_ts:", last_ts)
else:
    print("No last_processed_ingestion_ts found in silver_processing_watermark.")

In [0]:
# Read ONLY New Bronze Records
if last_ts:
    bronze_df = spark.sql(
        f"""
        SELECT file_name, raw_text, ingestion_ts
        FROM ppe.bronze.bronze_raw_transactions
        WHERE ingestion_ts > timestamp('{last_ts}')
        """
    )
else:
    bronze_df = spark.sql(
        """
        SELECT file_name, raw_text, ingestion_ts
        FROM ppe.bronze.bronze_raw_transactions
        """
    )

if bronze_df.count() == 0:
    print("No new Bronze data to process.")
    dbutils.notebook.exit("Silver up-to-date")

In [0]:
from pyspark.sql.types import (
    StructType, StructField,
    TimestampType, StringType, DoubleType
)

silver_schema = StructType([
    StructField("transaction_time", TimestampType(), True),
    StructField("amount", DoubleType(), True),
    StructField("type", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("transaction_id", StringType(), True),
    StructField("utr", StringType(), True),
    StructField("paid_by", StringType(), True),
    StructField("ingestion_ts", TimestampType(), True),
    StructField("dq_status", StringType(), True),
    StructField("dq_reason", StringType(), True),
    StructField("category", StringType(), True)   
])


In [0]:
import re
from datetime import datetime
from pyspark.sql import Row

# Load Bronze
bronze_df = spark.sql("SELECT * FROM ppe.bronze.bronze_raw")
raw_text = bronze_df.collect()[0]["raw_text"]

# Split lines and clean
lines = [l.strip() for l in raw_text.split("\n") if l.strip()]

# Helper: ignore noise
def is_noise(line):
    return (
        line.startswith("Page ")
        or "system generated statement" in line.lower()
        or "transaction statement for" in line.lower()
        or "date transaction details type amount" in line.lower()
        or re.match(r"\d{2}\s[A-Za-z]{3},\s\d{4}\s-\s\d{2}\s[A-Za-z]{3},\s\d{4}", line)
    )

clean_lines = [l for l in lines if not is_noise(l)]

transactions = []
i = 0
n = len(clean_lines)

while i < n:

    # 1️⃣ Transaction starts with a date
    if re.match(r"^[A-Za-z]{3}\s+\d{2},\s+\d{4}", clean_lines[i]):

        block = []
        j = i

        # 2️⃣ Collect lines until Paid by / Credited to
        while j < n:
            block.append(clean_lines[j])
            if clean_lines[j].startswith("Paid by") or clean_lines[j].startswith("Credited to"):
                break
            j += 1

        block_text = " ".join(block)

        try:
            # Date
            date_str = re.search(r"[A-Za-z]{3}\s+\d{2},\s+\d{4}", block[0]).group()

            # Time (always second line)
            time_match = re.search(r"\d{2}:\d{2}\s*(am|pm)", block[1], re.I)
            txn_time = datetime.strptime(
                f"{date_str} {time_match.group()}",
                "%b %d, %Y %I:%M %p"
            )

            # Amount
            amount = float(
                re.search(r"₹([\d,]+(\.\d+)?)", block_text)
                .group(1)
                .replace(",", "")
            )

            # Type
            txn_type = "DEBIT" if "DEBIT" in block_text else "CREDIT"

            # Merchant
            merchant_match = re.search(
                r"(Paid to|Payment to|Transfer to|Refund from)\s+(.*?)\s+(DEBIT|CREDIT)",
                block_text,
                re.I
            )
            merchant = merchant_match.group(2).strip() if merchant_match else None

            # Transaction ID
            txn_id_match = re.search(r"Transaction ID\s+([A-Z0-9]+)", block_text)
            transaction_id = txn_id_match.group(1) if txn_id_match else None

            # UTR
            utr_match = re.search(r"UTR No\.?\s*([0-9]+)", block_text)
            utr = utr_match.group(1) if utr_match else None

            # Paid by → Bank
            paid_by_match = re.search(r"(Paid by|Credited to)\s*([X0-9]+)", block_text)
            mask = paid_by_match.group(2) if paid_by_match else None

            if mask == "XXXXXX7267":
                paid_by = "SBI"
            elif mask == "XXXX438045":
                paid_by = "AXIS"
            else:
                paid_by = "UNKNOWN"

            transactions.append(Row(
                transaction_time=txn_time,
                amount=amount,
                type=txn_type,
                merchant=merchant,
                transaction_id=transaction_id,
                utr=utr,
                paid_by=paid_by,
                ingestion_ts=datetime.now(),
                dq_status="VALID",
                dq_reason=None,
                category=None
            ))

        except Exception as e:
            transactions.append(Row(
                transaction_time=None,
                amount=None,
                type=None,
                merchant=None,
                transaction_id=None,
                utr=None,
                paid_by=None,
                ingestion_ts=datetime.now(),
                dq_status="INVALID",
                dq_reason=str(e),
                category=None
            ))

        i = j + 1  # move to next transaction

    else:
        i += 1



In [0]:
#Create Data frame and Duplication
silver_df = spark.createDataFrame(transactions, silver_schema)
silver_dedup = silver_df.dropDuplicates(["transaction_id"])


**Silver Enhancement - Phase 2
Data Quality rules + Deduplication**

In [0]:
from pyspark.sql.functions import when, lit

silver_df_dq = (
    silver_df
    .withColumn(
        "dq_status",
        when(
            (silver_df.transaction_time.isNull()) |
            (silver_df.amount.isNull()) |
            (silver_df.amount <= 0) |
            (silver_df.transaction_id.isNull()) |
            (silver_df.merchant.isNull()) |
            (silver_df.utr.isNull()),
            lit("INVALID")
        ).otherwise(lit("VALID"))
    )
    .withColumn(
        "dq_reason",
        when(silver_df.transaction_time.isNull(), lit("Missing transaction_time"))
        .when(silver_df.amount.isNull(), lit("Missing amount"))
        .when(silver_df.amount <= 0, lit("Invalid amount"))
        .when(silver_df.transaction_id.isNull(), lit("Missing transaction_id"))
        .when(silver_df.merchant.isNull(), lit("Missing Merchant"))
        .when(silver_df.utr.isNull(), lit("Missing utr"))
        .otherwise(lit(None))
    )
)

display(silver_df_dq)


In [0]:
# Apply Categorization
silver_enriched = (
    silver_df_dq
    .withColumn(
        "category",
        when(col("dq_status") != "VALID", "UNKNOWN")

        # Shares / Investments
        .when(upper(col("merchant")).rlike("ANGEL|ZERODHA|UPSTOX|GROWW|PAYTM MONEY|COIN|MUTUAL"),
              "Shares_Investments")

        # Medical
        .when(upper(col("merchant")).rlike("PHARMACY|MEDICAL|HOSPITAL"),
              "Medical")

        # Food
        .when(upper(col("merchant")).rlike(
            "CAFE|CURRY|CURRIES|COOLDRIK|COOLDRINK|BURGER|FOOD|HOTEL|RESTAURANT|TIFFIN|UDUPI"),
              "Food")

        # Shopping
        .when(upper(col("merchant")).rlike("TRENDS|FASHION|CLOTH|SHOPPING"),
              "Shopping")

        # Travel
        .when(upper(col("merchant")).rlike("RAIL|IRCTC|BUS|METRO|OLA|UBER"),
              "Travel")

        # Fuel
        .when(upper(col("merchant")).rlike("PETROL|FILLING|FUEL|STATION"),
              "Fuel")

        # Groceries
        .when(upper(col("merchant")).rlike("KIRANA|STORE|MART|GENERAL"),
              "Groceries")

        # Utilities
        .when(upper(col("merchant")).rlike("RECHARGE|ELECTRICITY|JIO|AIRTEL"),
              "Utilities")

        # Rent
        .when(upper(col("merchant")).rlike("OWNER|RENT"),
              "Rent")

        .otherwise("Other")
    )
)


In [0]:
# Final Silver Table
silver_final = (
    silver_enriched
    .withColumnRenamed("bronze_ingestion_ts", "ingestion_ts")
    .withColumn("ingestion_ts", current_timestamp())
    .select(
        "transaction_time",
        "amount",
        "type",
        "merchant",
        "transaction_id",
        "utr",
        "paid_by",
        "category",
        "dq_status",
        "dq_reason",
        "ingestion_ts"
    )
)

(
    silver_final
    .write.mode("append")
    .saveAsTable("ppe.silver.silver_transactions")
)

display(silver_final)


In [0]:
max_ts = (
    bronze_df
    .selectExpr("max(ingestion_ts) as max_ts")
    .collect()[0]["max_ts"]
)

if max_ts is not None:
    spark.sql(f"""
        UPDATE silver_processing_watermark
        SET last_processed_ingestion_ts = timestamp('{max_ts}')
    """)
    print("Silver watermark updated to:", max_ts)
