In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
# Storage paths
BRONZE_BASE = "abfss://bronze@stretaildata123.dfs.core.windows.net"
SILVER_BASE = "abfss://silver@stretaildata123.dfs.core.windows.net"


In [0]:
def write_delta(df, path):
    (
        df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .save(path)
    )

# Orders

In [0]:
orders_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("eval_set", StringType(), True),
    StructField("order_number", IntegerType(), True),
    StructField("order_dow", IntegerType(), True),
    StructField("order_hour_of_day", IntegerType(), True),
    StructField("days_since_prior_order", DoubleType(), True)
])


In [0]:
orders_df = (
    spark.read
    .schema(orders_schema)
    .option("header", "true")
    .csv(f"{BRONZE_BASE}/orders/")
)


In [0]:
orders_silver = (
    orders_df
    .dropDuplicates(["order_id"])
    .filter(F.col("order_id").isNotNull())
    .filter(F.col("user_id").isNotNull())
)


In [0]:
write_delta(orders_silver, f"{SILVER_BASE}/orders")

In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver_orders
USING DELTA
LOCATION '{SILVER_BASE}/orders'
""")


## Products

In [0]:
products_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("product_name", StringType(), True),
    StructField("aisle_id", IntegerType(), True),
    StructField("department_id", IntegerType(), True)
])


In [0]:
products_df = (
    spark.read
    .schema(products_schema)
    .option("header", "true")
    .csv(f"{BRONZE_BASE}/products/")
)

products_silver = (
    products_df
    .dropDuplicates(["product_id"])
    .filter(F.col("product_id").isNotNull())
)


In [0]:
write_delta(products_silver, f"{SILVER_BASE}/products")


In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver_products
USING DELTA
LOCATION '{SILVER_BASE}/products'
""")


## Aisles

In [0]:
aisles_schema = StructType([
    StructField("aisle_id", IntegerType(), False),
    StructField("aisle", StringType(), True)
])

aisles_df = (
    spark.read
    .schema(aisles_schema)
    .option("header", "true")
    .csv(f"{BRONZE_BASE}/aisles/")
)

aisles_silver = aisles_df.dropDuplicates(["aisle_id"])

write_delta(aisles_silver, f"{SILVER_BASE}/aisles")


In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver_aisles
USING DELTA
LOCATION '{SILVER_BASE}/aisles'
""")


In [0]:
departments_schema = StructType([
    StructField("department_id", IntegerType(), False),
    StructField("department", StringType(), True)
])

departments_df = (
    spark.read
    .schema(departments_schema)
    .option("header", "true")
    .csv(f"{BRONZE_BASE}/departments/")
)

departments_silver = departments_df.dropDuplicates(["department_id"])

write_delta(departments_silver, f"{SILVER_BASE}/departments")


In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver_departments
USING DELTA
LOCATION '{SILVER_BASE}/departments'
""")


## Order Products (train + prior)

In [0]:
order_products_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("product_id", IntegerType(), False),
    StructField("add_to_cart_order", IntegerType(), True),
    StructField("reordered", IntegerType(), True)
])


In [0]:
train_df = (
    spark.read
    .schema(order_products_schema)
    .option("header", "true")
    .csv(f"{BRONZE_BASE}/order_products_train/")
)

prior_df = (
    spark.read
    .schema(order_products_schema)
    .option("header", "true")
    .csv(f"{BRONZE_BASE}/order_products_prior/")
)


In [0]:
order_items_silver = (
    train_df.unionByName(prior_df)
    .dropDuplicates(["order_id", "product_id"])
    .filter(F.col("order_id").isNotNull())
    .filter(F.col("product_id").isNotNull())
)


In [0]:
write_delta(order_items_silver, f"{SILVER_BASE}/order_items")


In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver_order_items
USING DELTA
LOCATION '{SILVER_BASE}/order_items'
""")
