In [0]:
item_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("multiLine", True)
    .csv("/mnt/raw-bronze/item.csv")
)
#display(item_df)
item_df.createOrReplaceTempView("items")

In [0]:
spark.conf.set("spark.sql.execution.columnPruning", "false")


In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when, lit, regexp_extract, upper
from pyspark.sql.types import DoubleType

# Size Engineering (Ensures input columns exist) ---
item_df = item_df.withColumn(
    "size_value_str",
    regexp_extract(col("size"), r'(\d+\.?\d*)\s*([A-Za-z#]+)', 1)
)
item_df = item_df.withColumn(
    "size_value",
    col("size_value_str").cast(DoubleType())
).drop("size_value_str")

item_df = item_df.withColumn(
    "size_unit",
    upper(regexp_extract(col("size"), r'(\d+\.?\d*)\s*([A-Za-z#]+)', 2))
)

# Data Cleaning Fix (for StringIndexer) ---
item_df = item_df.withColumn(
    "size_unit",
    when(col("size_unit") == "", lit("UNKNOWN")).otherwise(col("size_unit"))
)
item_df = item_df.fillna("UNKNOWN", subset=["size_unit", "brand", "type"])

# Defensive Drop of Any Old Indexed/Encoded Columns ---
cols_to_drop = [
    "size_unit_indexed",
    "size_unit_encoded",
    "brand_indexed",
    "type_indexed"  # include this for safety
]
for c in cols_to_drop:
    if c in item_df.columns:
        item_df = item_df.drop(c)

print("Columns after drop:", item_df.columns)

# Define ML Stages (with handleInvalid='keep') ---
indexer_unit = StringIndexer(
    inputCol="size_unit",
    outputCol="size_unit_indexed",
    handleInvalid='keep'
)

encoder_unit = OneHotEncoder(
    inputCol="size_unit_indexed",
    outputCol="size_unit_encoded"
)

indexer_brand = StringIndexer(
    inputCol="brand",
    outputCol="brand_indexed",
    handleInvalid='keep'
)

indexer_type = StringIndexer(
    inputCol="type",
    outputCol="type_indexed",
    handleInvalid='keep'
)
assembler = VectorAssembler(
    inputCols=["type_indexed"],  # include it
    outputCol="type_vec"
)


# Execute the Pipeline ---
pipeline = Pipeline(stages=[
    indexer_unit,
    encoder_unit,
    indexer_brand,
    indexer_type,
    assembler
])
pipeline_model = pipeline.fit(item_df)
item_df = pipeline_model.transform(item_df)

# --- 6. Final DataFrame (dropping only unnecessary columns) ---
item_df_silver = item_df.drop("size_unit_indexed")

#print("Final columns:")
#print(item_df_silver.columns)



In [0]:
# Write to silver container
output_path = "/mnt/silver/items/"

item_df_silver.write.parquet(
    output_path,
    mode="overwrite"
)