## Dimensions tables

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import current_date,lit
from delta.tables import DeltaTable


### DimUser

In [0]:
df_orders = spark.read.format("delta").load("abfss://silver@adlsinstacart.dfs.core.windows.net/Orders")
display(df_orders)

order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order
1021,19933,Prior,23,3,11,6.0
2737,9636,Train,13,0,19,12.0
2936,159042,Train,5,5,15,7.0
3139,37240,Prior,2,4,17,17.0
3683,137594,Prior,11,2,11,15.0
4366,3646,Prior,1,6,10,0.0
4885,173103,Train,4,0,17,6.0
5156,175361,Prior,18,1,15,3.0
11451,110128,Prior,17,0,8,8.0
12451,31396,Prior,30,2,23,5.0


In [0]:
df_dim_user = df_orders.select('user_id').distinct()\
                       .withColumn("user_key", row_number().over(Window.orderBy('user_id')))
display(df_dim_user)




user_id,user_key
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
9,9
10,10


In [0]:
df_dim_user.write.format("delta")\
                 .mode("overwrite")\
                 .save("abfss://gold@adlsinstacart.dfs.core.windows.net/DimUser")



### DimAisles

In [0]:
df_aisles_clean = spark.read.format("delta")\
                            .load("abfss://silver@adlsinstacart.dfs.core.windows.net/Aisles")

In [0]:
display(df_aisles_clean)

aisle_id_lookup,aisle_name
94,Tea
29,Honeys Syrups Nectars
88,Spreads
108,Other Creams Cheeses
131,Dry Pasta
56,Diapers Wipes
24,Fresh Fruits
99,Canned Fruit Applesauce
39,Seafood Counter
102,Baby Bath Body Care


In [0]:
df_dim_aisles = df_aisles_clean.withColumn("aisle_key", row_number().over(Window.orderBy("aisle_id_lookup")))
df_dim_aisles.display()



aisle_id_lookup,aisle_name,aisle_key
1,Prepared Soups Salads,1
2,Specialty Cheeses,2
3,Energy Granola Bars,3
4,Instant Foods,4
5,Marinades Meat Preparation,5
6,Other,6
7,Packaged Meat,7
8,Bakery Desserts,8
9,Pasta Sauce,9
10,Kitchen Supplies,10


In [0]:
df_dim_aisles.write.format("delta")\
                 .mode("overwrite")\
                 .save("abfss://gold@adlsinstacart.dfs.core.windows.net/DimAisles")



### DimDepartments

In [0]:
df_departments_clean = spark.read.format("delta")\
                                           .load("abfss://silver@adlsinstacart.dfs.core.windows.net/Departments")

In [0]:
df_departments_clean.display()

department_id_lookup,department_name
15,Canned Goods
20,Deli
21,Missing
17,Household
1,Frozen
4,Produce
13,Pantry
8,Pets
6,International
3,Bakery


In [0]:
df_dim_departments = df_departments_clean.withColumn("department_key",row_number().over(Window.orderBy("department_id_lookup")))
display(df_dim_departments)



department_id_lookup,department_name,department_key
1,Frozen,1
2,Other,2
3,Bakery,3
4,Produce,4
5,Alcohol,5
6,International,6
7,Beverages,7
8,Pets,8
9,Dry Goods Pasta,9
10,Bulk,10


In [0]:
df_dim_departments.write.format("delta")\
                         .mode("overwrite")\
                         .save("abfss://gold@adlsinstacart.dfs.core.windows.net/DimDepartments")

### DimProducts

In [0]:
df_products_enriched = spark.read.format("delta")\
                                    .load("abfss://silver@adlsinstacart.dfs.core.windows.net/Products")

df_products_enriched.display()

product_id,product_name,product_aisle_id,product_department_id,aisle_name,department_name
1,Chocolate Sandwich Cookies,61.0,19.0,Cookies Cakes,Snacks
2,All-Seasons Salt,104.0,13.0,Spices Seasonings,Pantry
3,Robust Golden Unsweetened Oolong Tea,94.0,7.0,Tea,Beverages
4,Smart Ones Classic Favorites Mini Rigatoni With Vodka Cream Sauce,38.0,1.0,Frozen Meals,Frozen
5,Green Chile Anytime Sauce,5.0,13.0,Marinades Meat Preparation,Pantry
6,Dry Nose Oil,11.0,11.0,Cold Flu Allergy,Personal Care
7,Pure Coconut Water With Orange,98.0,7.0,Juice Nectars,Beverages
8,Cut Russet Potatoes Steam N' Mash,116.0,1.0,Frozen Produce,Frozen
9,Light Strawberry Blueberry Yogurt,120.0,16.0,Yogurt,Dairy Eggs
10,Sparkling Orange Juice & Prickly Pear Beverage,115.0,7.0,Water Seltzer Sparkling Water,Beverages


In [0]:
df_products_src = df_products_enriched.select(
                                       "product_id", "product_name", "product_aisle_id", "product_department_id", "aisle_name","department_name")\
                                      .withColumn("effective_start_date", current_date())\
                                      .withColumn("effective_end_date",lit("9999-12-31"))\
                                      .withColumn("is_Current",lit(True))

display(df_products_src)

product_id,product_name,product_aisle_id,product_department_id,aisle_name,department_name,effective_start_date,effective_end_date,is_Current
1,Chocolate Sandwich Cookies,61.0,19.0,Cookies Cakes,Snacks,2025-12-30,9999-12-31,True
2,All-Seasons Salt,104.0,13.0,Spices Seasonings,Pantry,2025-12-30,9999-12-31,True
3,Robust Golden Unsweetened Oolong Tea,94.0,7.0,Tea,Beverages,2025-12-30,9999-12-31,True
4,Smart Ones Classic Favorites Mini Rigatoni With Vodka Cream Sauce,38.0,1.0,Frozen Meals,Frozen,2025-12-30,9999-12-31,True
5,Green Chile Anytime Sauce,5.0,13.0,Marinades Meat Preparation,Pantry,2025-12-30,9999-12-31,True
6,Dry Nose Oil,11.0,11.0,Cold Flu Allergy,Personal Care,2025-12-30,9999-12-31,True
7,Pure Coconut Water With Orange,98.0,7.0,Juice Nectars,Beverages,2025-12-30,9999-12-31,True
8,Cut Russet Potatoes Steam N' Mash,116.0,1.0,Frozen Produce,Frozen,2025-12-30,9999-12-31,True
9,Light Strawberry Blueberry Yogurt,120.0,16.0,Yogurt,Dairy Eggs,2025-12-30,9999-12-31,True
10,Sparkling Orange Juice & Prickly Pear Beverage,115.0,7.0,Water Seltzer Sparkling Water,Beverages,2025-12-30,9999-12-31,True


In [0]:
target_path = "abfss://gold@adlsinstacart.dfs.core.windows.net/DimProducts"

if not DeltaTable.isDeltaTable(spark, target_path):
    (
        df_products_src.withColumn("product_sk",row_number().over(Window.orderBy("product_id")))\
        .write.format("delta")\
        .mode("overwrite")\
        .save(target_path)
    )


In [0]:
from pyspark.sql.functions import max

df_existing = spark.read.format("delta").load(target_path)

max_sk = df_existing.select(max("product_sk")).collect()[0][0]
max_sk = max_sk if max_sk is not None else 0

In [0]:
df_products_src_sk = df_products_src.withColumn("product_sk",row_number().over(Window.orderBy("product_id")) + max_sk)



In [0]:
df_dim_product = DeltaTable.forPath(spark, target_path)

df_dim_product.alias("tgt").merge(
    df_products_src_sk.alias("src"),
    "tgt.product_id = src.product_id AND tgt.is_current = true"
).whenMatchedUpdate(
    condition="""
        tgt.product_name <> src.product_name
        OR tgt.product_aisle_id <> src.product_aisle_id
        OR tgt.product_department_id <> src.product_department_id
    """,
    set={
        "effective_end_date": "current_date()",
        "is_current": "false"
    }
).whenNotMatchedInsert(
    values={
        "product_sk": "src.product_sk",
        "product_id": "src.product_id",
        "product_name": "src.product_name",
        "product_aisle_id": "src.product_aisle_id",
        "product_department_id": "src.product_department_id",
        "aisle_name": "src.aisle_name",
        "department_name": "src.department_name",
        "effective_start_date": "src.effective_start_date",
        "effective_end_date": "src.effective_end_date",
        "is_Current": "true"
    }
).execute()




DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]