In [0]:
dbutils.fs.ls("/Volumes/retail_catalog/raw/raw_data/")

[FileInfo(path='dbfs:/Volumes/retail_catalog/raw/raw_data/customers.csv', name='customers.csv', size=2229, modificationTime=1761573816000),
 FileInfo(path='dbfs:/Volumes/retail_catalog/raw/raw_data/orders.csv', name='orders.csv', size=1345, modificationTime=1761573816000),
 FileInfo(path='dbfs:/Volumes/retail_catalog/raw/raw_data/products.csv', name='products.csv', size=1564, modificationTime=1761573816000)]

In [0]:
display(dbutils.fs.ls("/Volumes/retail_catalog/raw/raw_data/customers.csv"))

path,name,size,modificationTime
dbfs:/Volumes/retail_catalog/raw/raw_data/customers.csv,customers.csv,2229,1761573816000


In [0]:
spark.read.csv("/Volumes/retail_catalog/raw/raw_data/customers.csv", header = True).show()

+-----------+-----------+--------------------+------+
|customer_id|       name|               email|region|
+-----------+-----------+--------------------+------+
|          1| Customer 1|customer1@example...| South|
|          2| Customer 2|customer2@example...|  East|
|          3| Customer 3|customer3@example...|  West|
|          4| Customer 4|customer4@example...| North|
|          5| Customer 5|customer5@example...| South|
|          6| Customer 6|customer6@example...|  East|
|          7| Customer 7|customer7@example...|  West|
|          8| Customer 8|customer8@example...| North|
|          9| Customer 9|customer9@example...| South|
|         10|Customer 10|customer10@exampl...|  East|
|         11|Customer 11|customer11@exampl...|  West|
|         12|Customer 12|customer12@exampl...| North|
|         13|Customer 13|customer13@exampl...| South|
|         14|Customer 14|customer14@exampl...|  East|
|         15|Customer 15|customer15@exampl...|  West|
|         16|Customer 16|cus

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DataType


#### Customers Schema
customers_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("name", StringType(), True),
  StructField("email", StringType(), True),
  StructField("region", StringType(), True)
])

#### Products Schema
products_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True)
])

#### Orders Schema
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("order_date", DoubleType(), True)
])

In [0]:
#### Customers  
df_customers = spark.read.schema(customers_schema).csv("/Volumes/retail_catalog/raw/raw_data/customers.csv", header=True)

#### Orders
df_orders = spark.read.schema(orders_schema).csv("/Volumes/retail_catalog/raw/raw_data/orders.csv", header=True)

#### Products
df_products = spark.read.schema(products_schema).csv("/Volumes/retail_catalog/raw/raw_data/products.csv", header=True)


In [0]:
df_customers.write.format("delta").mode("overwrite").saveAsTable("retail_catalog.bronze.customers_delta")

df_orders.write.format("delta").mode("overwrite").saveAsTable("retail_catalog.bronze.orders_delta")

df_products.write.format("delta").mode("overwrite").saveAsTable("retail_catalog.bronze.products_delta")



In [0]:
%sql
select * from retail_catalog.bronze.customers_delta

customer_id,name,email,region
1,Customer 1,customer1@example.com,South
2,Customer 2,customer2@example.com,East
3,Customer 3,customer3@example.com,West
4,Customer 4,customer4@example.com,North
5,Customer 5,customer5@example.com,South
6,Customer 6,customer6@example.com,East
7,Customer 7,customer7@example.com,West
8,Customer 8,customer8@example.com,North
9,Customer 9,customer9@example.com,South
10,Customer 10,customer10@example.com,East


In [0]:
%sql
select * from retail_catalog.bronze.orders_delta

order_id,customer_id,product_id,quantity,order_date
1001,39,124,2,2025-10-01
1002,47,145,5,2025-10-02
1003,27,135,5,2025-10-03
1004,32,123,2,2025-10-04
1005,39,147,3,2025-10-05
1006,8,127,2,2025-10-06
1007,9,133,5,2025-10-07
1008,6,112,3,2025-10-08
1009,48,116,4,2025-10-09
1010,44,144,1,2025-10-10


In [0]:
%sql
select * from retail_catalog.bronze.products_delta

product_id,name,category,price
101,Product 1,Category 1,10
102,Product 2,Category 2,20
103,Product 3,Category 3,30
104,Product 4,Category 4,40
105,Product 5,Category 5,50
106,Product 6,Category 1,60
107,Product 7,Category 2,70
108,Product 8,Category 3,80
109,Product 9,Category 4,90
110,Product 10,Category 5,100


In [0]:
from pyspark.sql.functions import current_date
df_customers_new = spark.read.schema(customers_schema).csv("/Volumes/retail_catalog/raw/raw_data/customers_incremental.csv", header=True).withColumn("load_date", current_date())

In [0]:
df_customers_new.display()

customer_id,first_name,last_name,email,load_date
101,Emma,Green,emma.green@example.com,2025-10-27
102,David,King,david.king@example.com,2025-10-27
5,John,Smith,john.smith_updated@example.com,2025-10-27


In [0]:
from delta.tables import DeltaTable
bronze_customers = DeltaTable.forName(spark, "retail_catalog.bronze.customers_delta")



In [0]:
bronze_customers.alias("t") \
    .merge(
        source=df_customers_new.alias("s"),condition='t.customer_id=s.customer_id'
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
select * from retail_catalog.bronze.customers_delta

customer_id,name,email,region,load_date
1,Customer 1,customer1@example.com,South,
2,Customer 2,customer2@example.com,East,
3,Customer 3,customer3@example.com,West,
4,Customer 4,customer4@example.com,North,
6,Customer 6,customer6@example.com,East,
7,Customer 7,customer7@example.com,West,
8,Customer 8,customer8@example.com,North,
9,Customer 9,customer9@example.com,South,
10,Customer 10,customer10@example.com,East,
11,Customer 11,customer11@example.com,West,


In [0]:
spark.sql("""
ALTER TABLE retail_catalog.bronze.customers_delta
ADD COLUMNS (load_date DATE)
""")

DataFrame[]

In [0]:
from pyspark.sql.functions import lit, col, coalesce

df_bronze = spark.table('retail_catalog.bronze.customers_delta')

df_bronze_backfilled = df_bronze.withColumn("load_date", coalesce(col("load_date"), lit("2025-10-25").cast("date")))

df_bronze_backfilled.write.format("delta").mode("overwrite").option("overwriteSchema", True).saveAsTable("retail_catalog.bronze.customers_delta")

In [0]:
%sql
select * from retail_catalog.bronze.customers_delta

customer_id,name,email,region,load_date
1,Customer 1,customer1@example.com,South,2025-10-25
2,Customer 2,customer2@example.com,East,2025-10-25
3,Customer 3,customer3@example.com,West,2025-10-25
4,Customer 4,customer4@example.com,North,2025-10-25
6,Customer 6,customer6@example.com,East,2025-10-25
7,Customer 7,customer7@example.com,West,2025-10-25
8,Customer 8,customer8@example.com,North,2025-10-25
9,Customer 9,customer9@example.com,South,2025-10-25
10,Customer 10,customer10@example.com,East,2025-10-25
11,Customer 11,customer11@example.com,West,2025-10-25
