In [0]:
dbutils.widgets.text("input_path", "", "Enter File Path")


In [0]:
# ------------------------------------------------------
# Databricks Job Pipeline Version of CDC POC (Non-DLT)
# ------------------------------------------------------
# This version is for jobs, using SQL + PySpark + parameters
# Features: parameterized dataset path, Bronze → Silver → Gold pipeline

from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import sys

# ----------------------------
# 🔹 Parameter Inputs
# ----------------------------
input_path = dbutils.widgets.get("input_path")  # e.g. /mnt/data/transactions_cdc_200.json
silver_table = "dev_catalog.default.silver_job_transactions"
gold_table = "dev_catalog.default.gold_job_summary"


In [0]:
# ------------------------------------------------------
# Databricks Job Pipeline with CDC + SCD Type 2 (Merge)


# ----------------------------
# 🟤 Bronze Layer - Raw Load (View Only)
# ----------------------------
schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("product_id", StringType()),
    StructField("status", StringType()),
    StructField("quantity", IntegerType()),
    StructField("price", DoubleType()),
    StructField("event_ts", StringType()),
    StructField("payment_method", StringType()),
    StructField("currency", StringType()),
    StructField("location", StringType()),
    StructField("tax", DoubleType()),
    StructField("discount", DoubleType())
])

bronze_df = (
    spark.read.schema(schema).json(input_path)
    .withColumn("ingest_ts", current_timestamp())
    .withColumn("event_ts", to_timestamp("event_ts"))
)
bronze_df.createOrReplaceTempView("bronze_view")





In [0]:
%sql

-- ⚪ Silver Layer - Merge CDC with SCD Type 2
-- Prepare staged view from bronze view
CREATE OR REPLACE TEMP VIEW updates AS
SELECT *,
       true AS is_active,
       CAST(event_ts AS TIMESTAMP) AS effective_start,
       CAST(NULL AS TIMESTAMP) AS effective_end
FROM bronze_view
WHERE status != 'delete';

-- Create the silver SCD2 table if not exists
CREATE TABLE IF NOT EXISTS dev_catalog.default.silver_transaction_scd2 (
  transaction_id STRING,
  order_id STRING,
  customer_id STRING,
  product_id STRING,
  status STRING,
  quantity INT,
  price DOUBLE,
  event_ts TIMESTAMP,
  payment_method STRING,
  currency STRING,
  location STRING,
  tax DOUBLE,
  discount DOUBLE,
  is_active BOOLEAN,
  effective_start TIMESTAMP,
  effective_end TIMESTAMP
) USING DELTA;

-- Merge logic for SCD Type 2
MERGE INTO dev_catalog.default.silver_transaction_scd2 target
USING updates source
ON target.transaction_id = source.transaction_id AND target.is_active = true
WHEN MATCHED AND (
  target.quantity != source.quantity OR
  target.price != source.price OR
  target.status != source.status
) THEN UPDATE SET
  is_active = false,
  effective_end = source.event_ts
WHEN NOT MATCHED THEN INSERT *;


-- ✅ SQL CDC + SCD Type 2 + Gold Summary executed

In [0]:
%sql
-- 🟡 Gold Layer - Aggregated Table (Physical Table)
CREATE OR REPLACE TEMP VIEW active_transactions AS
SELECT *
FROM dev_catalog.default.silver_transaction_scd2
WHERE is_active = true;

CREATE OR REPLACE TABLE dev_catalog.default.gold_job_summary
USING DELTA
AS
SELECT customer_id,
       location,
       SUM(quantity) AS total_items,
       SUM(price) AS total_spent,
       SUM(discount) AS total_discount,
       SUM(tax) AS total_tax
FROM active_transactions
GROUP BY customer_id, location;


In [0]:
%sql
select * from dev_catalog.default.silver_transaction_scd2 

In [0]:
%sql
select * from dev_catalog.default.gold_job_summary

In [0]:
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql import Row

# Sample 5-row test data
test_data = [
    # ✅ Update existing
    Row(transaction_id="TXN0001", order_id="ORD0001", customer_id="CUST001", product_id="PROD001", status="update", quantity=3, price=300.0, event_ts=datetime.now().isoformat(), payment_method="UPI", currency="INR", location="Delhi", tax=12.0, discount=20.0),
    Row(transaction_id="TXN0002", order_id="ORD0002", customer_id="CUST002", product_id="PROD002", status="update", quantity=1, price=150.0, event_ts=datetime.now().isoformat(), payment_method="Wallet", currency="INR", location="Mumbai", tax=10.0, discount=10.0),

    # ✅ New inserts
    Row(transaction_id="TXN9996", order_id="ORD9996", customer_id="CUST010", product_id="PROD020", status="insert", quantity=2, price=220.0, event_ts=datetime.now().isoformat(), payment_method="CreditCard", currency="USD", location="Pune", tax=8.0, discount=15.0),
    Row(transaction_id="TXN9997", order_id="ORD9997", customer_id="CUST011", product_id="PROD021", status="insert", quantity=1, price=120.0, event_ts=datetime.now().isoformat(), payment_method="NetBanking", currency="EUR", location="Chennai", tax=5.0, discount=5.0),
    Row(transaction_id="TXN9998", order_id="ORD9998", customer_id="CUST012", product_id="PROD022", status="insert", quantity=4, price=400.0, event_ts=datetime.now().isoformat(), payment_method="UPI", currency="INR", location="Hyderabad", tax=18.0, discount=25.0)
]

df = spark.createDataFrame(test_data)

# Simulate Bronze View load
df.createOrReplaceTempView("v_bronze_transactions")
