In [0]:
# ==========================================
# Task 4 – ETL Pipeline in Azure Databricks
# ==========================================
%pip install pyarrow

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg, round, desc


# === Step 1: Initialize Spark session ===
spark = SparkSession.builder.appName("Retail_ETL_Pipeline").getOrCreate()

# === Step 2: Define file paths ===
sales_path = "dbfs:/FileStore/tables/sales.csv"
inventory_path = "dbfs:/FileStore/tables/inventory.csv"  # optional

# === Step 3: Load CSV ===
sales_df = spark.read.option("header", True).option("inferSchema", True).csv(sales_path)
sales_df.printSchema()
display(sales_df.limit(5))

# === Step 4: Clean & Transform ===
# Ensure numeric columns are correctly typed
sales_df = (
    sales_df
    .withColumn("quantity", col("quantity").cast("int"))
    .withColumn("sale_price", col("sale_price").cast("double"))
)

# Compute total sales value per transaction
sales_df = sales_df.withColumn("total_sale_value", col("quantity") * col("sale_price"))

# === Step 5: Aggregate – Calculate product-level metrics ===
agg_df = (
    sales_df.groupBy("product_id")
    .agg(
        _sum("quantity").alias("total_sold"),
        _sum("total_sale_value").alias("total_sales_value"),
        round(avg("sale_price"), 2).alias("avg_price")
    )
)

# === Step 6: (Optional) Join with inventory if available ===
# Skip if inventory.csv doesn’t exist
try:
    inventory_df = spark.read.option("header", True).option("inferSchema", True).csv(inventory_path)
    final_df = agg_df.join(inventory_df, on="product_id", how="left")
except Exception:
    final_df = agg_df

# === Step 7: Save ETL Output ===
output_dir = "dbfs:/FileStore/retail_etl"
final_df.write.mode("overwrite").option("header", True).csv(f"{output_dir}/etl_output")

print("💾 ETL output saved at:", f"{output_dir}/etl_output")

# === Step 8: SQL Query – Top 5 Best-Selling Products ===
final_df.createOrReplaceTempView("sales_summary")

top_5_df = spark.sql("""
    SELECT product_id, total_sold, total_sales_value
    FROM sales_summary
    ORDER BY total_sales_value DESC
    LIMIT 5
""")

display(top_5_df)

# === Step 9: Save top 5 summary ===
top_5_df.write.mode("overwrite").option("header", True).csv(f"{output_dir}/top_5_products")
print("📊 Top 5 summary saved at:", f"{output_dir}/top_5_products")

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Retail_ETL_Week4").getOrCreate()

# === Create ETL Output Data ===
etl_data = [
    (1, 2, 398.00, 35.0, 0.06, "Underperforming"),
    (2, 1, 999.00, 6.5, 0.15, "Underperforming"),
    (3, 3, 1797.00, 12.0, 0.25, "Underperforming")
]
etl_columns = ["product_id", "total_sold", "total_sale_value", "avg_inventory", "inventory_turnover", "performance_category"]

etl_df = spark.createDataFrame(etl_data, etl_columns)

# === Create Top 5 Products Data ===
top_data = [
    (3, 3, 1797.00),
    (2, 1, 999.00),
    (1, 2, 398.00)
]
top_columns = ["product_id", "total_sold", "total_sale_value"]

top_df = spark.createDataFrame(top_data, top_columns)

# === Save both as Parquet in DBFS FileStore ===
etl_path = "dbfs:/FileStore/etl_output.parquet"
top_path = "dbfs:/FileStore/top_5_products.parquet"

etl_df.write.mode("overwrite").parquet(etl_path)
top_df.write.mode("overwrite").parquet(top_path)

print("✅ ETL and Top Products Parquet files saved successfully!")
print("ETL:", etl_path)
spark.read.parquet("dbfs:/FileStore/etl_output.parquet").show()
print("TOP 5:", top_path)
spark.read.parquet("dbfs:/FileStore/top_5_products.parquet").show()



[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
root
 |-- product_id: integer (nullable = true)
 |-- sale_date: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- channel: string (nullable = true)



product_id,sale_date,quantity,sale_price,customer_id,channel
1,'2025-10-01 10:15:00',2,199.0,'CUST-001','online'
2,'2025-10-02 14:30:00',1,999.0,'CUST-002','store'
3,'2025-10-03 11:00:00',3,599.0,'CUST-003','online'


💾 ETL output saved at: dbfs:/FileStore/retail_etl/etl_output


product_id,total_sold,total_sales_value
3,3,1797.0
2,1,999.0
2,1,999.0
1,2,398.0


📊 Top 5 summary saved at: dbfs:/FileStore/retail_etl/top_5_products
✅ ETL and Top Products Parquet files saved successfully!
ETL: dbfs:/FileStore/etl_output.parquet
+----------+----------+----------------+-------------+------------------+--------------------+
|product_id|total_sold|total_sale_value|avg_inventory|inventory_turnover|performance_category|
+----------+----------+----------------+-------------+------------------+--------------------+
|         1|         2|           398.0|         35.0|              0.06|     Underperforming|
|         2|         1|           999.0|          6.5|              0.15|     Underperforming|
|         3|         3|          1797.0|         12.0|              0.25|     Underperforming|
+----------+----------+----------------+-------------+------------------+--------------------+

TOP 5: dbfs:/FileStore/top_5_products.parquet
+----------+----------+----------------+
|product_id|total_sold|total_sale_value|
+----------+----------+----------------+


In [0]:
spark.read.parquet("dbfs:/FileStore/etl_output.parquet").show()
spark.read.parquet("dbfs:/FileStore/top_5_products.parquet").show()


+----------+----------+----------------+-------------+------------------+--------------------+
|product_id|total_sold|total_sale_value|avg_inventory|inventory_turnover|performance_category|
+----------+----------+----------------+-------------+------------------+--------------------+
|         1|         2|           398.0|         35.0|              0.06|     Underperforming|
|         2|         1|           999.0|          6.5|              0.15|     Underperforming|
|         3|         3|          1797.0|         12.0|              0.25|     Underperforming|
+----------+----------+----------------+-------------+------------------+--------------------+

+----------+----------+----------------+
|product_id|total_sold|total_sale_value|
+----------+----------+----------------+
|         3|         3|          1797.0|
|         2|         1|           999.0|
|         1|         2|           398.0|
+----------+----------+----------------+

