### 1. Setup

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Data_Engineering").getOrCreate()

### 2. Load raw CSVs

In [0]:
orders_raw_path = "/Volumes/workspace/data/data_engineering/pyspark_pipeline_orders.csv"
customers_raw_path = "/Volumes/workspace/data/data_engineering/pyspark_pipeline_customers_scd.csv"

df_orders_raw = spark.read.option("header", True).option("inferSchema", True).csv(orders_raw_path)
df_customers_raw = spark.read.option("header", True).option("inferSchema", True).csv(customers_raw_path)

display(df_orders_raw.limit(5))
display(df_customers_raw.limit(5))

order_id,customer_id,product_id,order_date,quantity,price,region,status
O100000,C00861,P001,2023-09-04,1,19.35,North,SHIPPED
O100001,C01295,P003,2023-03-09,5,9.59,South,PLACED
O100002,C01131,P002,2023-11-22,2,27.49,South,SHIPPED
O100003,C01096,P001,2023-11-30,1,21.44,Central,PLACED
O100004,C01639,P001,2023-07-15,1,20.44,Central,PLACED


customer_id,first_name,last_name,gender,email,address,region,effective_date,end_date,is_current,version
C02568,Jamie,Lopez,F,c02568@example.com,100 Main St,West,2023-01-01,,True,1
C02561,Chris,Lee,F,c02561@example.com,101 Main St,North,2023-01-01,,True,1
C01895,Alex,Lopez,F,c01895@example.com,102 Main St,West,2023-01-01,,True,1
C00648,Morgan,Davis,F,c00648@example.com,103 Main St,West,2023-01-01,,True,1
C00828,Chris,Davis,F,c00828@example.com,104 Main St,East,2023-01-01,2024-06-30,False,1


### 3. Bronze

In [0]:
bronze_orders = "default.bronze_orders"
bronze_customers = "default.bronze_customers"

df_orders_raw.withColumn("ingest_ts", current_timestamp())\
    .write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(bronze_orders)

df_customers_raw.withColumn("ingest_ts", current_timestamp())\
    .write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(bronze_customers)

print("bronze tables are written")

bronze tables are written


### 4. Bronze -> Silver

In [0]:
# 1. window function deduplication

df = spark.table(bronze_orders)
w = Window.partitionBy("order_id").orderBy(col("ingest_ts").desc())
df_dedup = df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")
display(df_dedup.limit(5))
df=df_dedup

order_id,customer_id,product_id,order_date,quantity,price,region,status,ingest_ts
O100000,C00861,P001,2023-09-04,1,19.35,North,SHIPPED,2026-01-14T12:50:42.830Z
O100001,C01295,P003,2023-03-09,5,9.59,South,PLACED,2026-01-14T12:50:42.830Z
O100002,C01131,P002,2023-11-22,2,27.49,South,SHIPPED,2026-01-14T12:50:42.830Z
O100003,C01096,P001,2023-11-30,1,21.44,Central,PLACED,2026-01-14T12:50:42.830Z
O100004,C01639,P001,2023-07-15,1,20.44,Central,PLACED,2026-01-14T12:50:42.830Z


In [0]:
# 2. bronze -> silver

silver_orders = "default.silver_orders"

df_orders_bronze = spark.table(bronze_orders)

df_orders_silver = (
    df_orders_bronze
    .withColumn("order_date", to_date("order_date","yyyy-MM-dd"))
    .withColumn("quantity", col("quantity").cast("int"))
    .withColumn("price", col("price").cast("double"))
    .withColumn("sales_amount", col("quantity") * col("price"))
    .filter(col("order_date").isNotNull())
    .dropDuplicates(["order_id"])
)

df_orders_silver.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(silver_orders)
print("silver table is written")

silver table is written


### 5. Silver -> Gold

In [0]:
gold_sales = "default.gold_daily_sales"
df_silver = spark.table(silver_orders)

df_daily_sales = df_silver.groupBy("order_date", "region", "product_id")\
                    .agg(sum("sales_amount").alias("sales_amount"), sum("quantity").alias("units"))

df_daily_sales.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(gold_sales)
display(df_daily_sales.limit(10))
print("gold table is written")

order_date,region,product_id,sales_amount,units
2023-11-06,West,P002,31.03,1
2023-06-24,East,P001,57.94,3
2023-05-08,South,P005,55.59,1
2023-09-09,West,P005,142.36,3
2023-11-09,South,P005,103.74,2
2023-09-26,South,P004,31.04,2
2023-05-06,North,P003,9.72,1
2023-10-20,Central,P002,186.68,6
2023-08-29,South,P004,60.2,4
2023-11-28,Central,P005,102.64,2


gold table is written


## SCD or Slowly Changing Dimension

In [0]:
### 1. SCD Type 1

# keep only latest values without preserving history

# Emp 1 , Dep A -> Dep B, Dep B will overwrite all records

In [0]:
from delta.tables import DeltaTable

bronze_customers = "default.bronze_customers"
dim_customers_table = "default.scd1_customers"

# Initialize dim table
spark.read.table(bronze_customers)\
    .write.format("delta").mode("overwrite").saveAsTable(dim_customers_table)

# Read updates
updates_df = spark.read.table(bronze_customers)

# Deduplicate by customer id (keep first occurrence)
updates_dedup = updates_df.dropDuplicates(["customer_id"])

# Merge into DIM table
dim = DeltaTable.forName(spark, dim_customers_table)

dim.alias("d").merge(
    updates_dedup.alias("u"),
    "d.customer_id = u.customer_id"
).whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()

print("SCD Type 1 merge completed")
display(spark.table(dim_customers_table).limit(5))

SCD Type 1 merge completed


customer_id,first_name,last_name,gender,email,address,region,effective_date,end_date,is_current,version,ingest_ts
C00001,Jamie,Brown,F,c00001@example.com,158 Main St,West,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C00002,Sam,Martinez,M,c00002@example.com,273 Main St,West,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C00003,Morgan,Davis,F,c00003@example.com,114 Main St,West,2023-01-01,2024-06-30,False,1,2026-01-14T12:50:47.503Z
C00004,Chris,Smith,F,c00004@example.com,278 Main St,South,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C00005,Alex,Lee,M,c00005@example.com,206 Main St,West,2023-01-01,,True,1,2026-01-14T12:50:47.503Z


### 2. SCD Type 2

Keep historical values with effective date, end_date, is_current and version

Emp 1 -> Dep A -> Dep B

Emp 1 , Dep A , Date dt1 , date dt2, false, 1
Emp 1 , Dep B, date dt2+1 , null, true, 2



In [0]:
dim_customers_scd2 = "default.scd2_customers"

# Initialize dimension

initial_dim = spark.table(bronze_customers)\
    .withColumn("effective_date", to_date(lit("2023-01-01")))\
    .withColumn("end_date", lit(None).cast("date"))\
    .withColumn("is_current", lit(True))\
    .withColumn("version", lit(1))

initial_dim.write.format("delta").mode("overwrite").saveAsTable(dim_customers_scd2)
dim_table  = DeltaTable.forName(spark, dim_customers_scd2)

# simulated updates
updates_raw = spark.table(bronze_customers).limit(50)\
            .withColumn("address", concat(col("address"), lit(" (new)")))\
            .withColumn("effective_date", to_date(lit("2024-07-01"))) \
            .withColumn("is_current", lit(True))\
            .withColumn("version", col("version").cast("int") + 1)

# when a change in tracked attributes occur

dim_table.alias("d").merge(
    updates_raw.alias("u"),
    "d.customer_id = u.customer_id AND d.is_current = true AND (d.address <> u.address OR d.region <> u.region)"
).whenMatchedUpdate(set = {
    "is_current" : "false",
    "end_date" : "u.effective_date"
}).execute()

dim_table.alias("d").merge(
    updates_raw.alias("u"),
    "d.customer_id=u.customer_id AND d.version=u.version"
).whenNotMatchedInsertAll().execute()

# Display results
display(spark.table(dim_customers_scd2))

customer_id,first_name,last_name,gender,email,address,region,effective_date,end_date,is_current,version,ingest_ts
C01591,Dana,Lee,F,c01591@example.com,150 Main St,South,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C00161,Jordan,Lopez,F,c00161@example.com,151 Main St,West,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C02454,Dana,Smith,F,c02454@example.com,152 Main St,North,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C02005,Jordan,Anderson,M,c02005@example.com,153 Main St,Central,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C01811,Jordan,Brown,M,c01811@example.com,154 Main St,West,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C01795,Dana,Anderson,F,c01795@example.com,155 Main St,East,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C01120,Morgan,Johnson,F,c01120@example.com,156 Main St,East,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C02256,Jamie,Garcia,M,c02256@example.com,157 Main St,South,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C02710,Jordan,Davis,F,c02710@example.com,158 Main St,North,2023-01-01,,True,1,2026-01-14T12:50:47.503Z
C01221,Alex,Smith,F,c01221@example.com,159 Main St,North,2023-01-01,,True,1,2026-01-14T12:50:47.503Z


Part 2:
Job ID created
Setup Airflow
DAG -> run this Databricks Job ID

1. Docker desktop installation
2. created project directory for airflow. created subfolders for dags, logs
3. create docker compose yaml file with services : postgres, scheduler and webservice
4. to install databricks provider, add requirements text file
5. generated fernet key and saved in .env file
6. created docker file to run requirements
7. initialize postgres db + created admin user
8. started container with postgres, scheduler and webservice
9. accessed webservice. created databricks connection using PAT and host url from databricks
10. created job for running notebook
11. created dag file in project directory under dags folder using job id
11. Triggered dag