In [0]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/Volumes/workspace/product/productt/products-10000 0.csv")

display(df)


In [0]:
df.write \
  .mode("overwrite") \
  .parquet("/Volumes/workspace/product/productt/parquet/products_100k")


In [0]:
parquet_df = spark.read.parquet(
    "/Volumes/workspace/product/productt/parquet/products_100k"
)


In [0]:
display(parquet_df)

In [0]:
parquet_df.printSchema()


In [0]:
from pyspark.sql.functions import col

df_clean = parquet_df.toDF(
    *[c.strip().lower().replace(" ", "_") for c in parquet_df.columns]
)


In [0]:
df_clean.write \
  .mode("overwrite") \
  .format("delta") \
  .save("/Volumes/workspace/product/productt/delta/products_silver")


In [0]:
df_clean.printSchema()


In [0]:
product_df = spark.read.format("delta") \
  .load("/Volumes/workspace/product/productt/delta/products_silver")

display(product_df)


In [0]:
filtered_df = product_df.filter(product_df.price > 500)
display(filtered_df)


In [0]:
Furniture_df = product_df.filter(
    product_df.category == "Furniture"
)
display(Furniture_df)


In [0]:
category_df = spark.createDataFrame([
    (1, "Furniture"),
    (2, "Skincare"),
    (3, "Cycling")
], ["category_id", "category_name"])


In [0]:
display(category_df.select("category_name"))

In [0]:
joined_df = product_df.join(
    category_df,
    product_df.category == category_df.category_name,
    "left"
)

display(joined_df)


In [0]:
from pyspark.sql.functions import count

category_count_df = product_df.groupBy("category") \
    .agg(count("*").alias("product_count"))

display(category_count_df)


In [0]:
from pyspark.sql.functions import avg

avg_price_df = product_df.groupBy("category") \
    .agg(avg("price").alias("avg_price"))

display(avg_price_df)




In [0]:
from pyspark.sql.functions import col, sum

inventory_value_df = product_df.groupBy("category").agg(
    sum(col("price") * col("stock")).alias("total_inventory_value")
)

display(inventory_value_df)


In [0]:
inventory_value_df.write \
  .mode("overwrite") \
  .format("delta") \
  .save("/Volumes/workspace/product/productt/delta/products_gold")


In [0]:
gold_df = spark.read.format("delta") \
    .load("/Volumes/workspace/product/productt/delta/products_gold")

display(gold_df)


In [0]:
silver_df = spark.read.format("delta") \
    .load("/Volumes/workspace/product/productt/delta/products_silver")
display(silver_df)

In [0]:
product_df.filter(col("price").isNull()).display()