In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring

# Initiate Spark session
spark = SparkSession.builder.appName("Pizza").getOrCreate()

# Ingest CSV files with optimized partitioning and caching
pizza_path = 'dbfs:/FileStore/pizzas.csv'
pizza_type_path = 'dbfs:/FileStore/pizza_types_latest.csv'
orders_path = 'dbfs:/FileStore/orders.csv'
orders_details_path = 'dbfs:/FileStore/order_details.csv'

# Load CSVs with repartitioning based on expected data size and Spark cluster
df_pizza = spark.read.format("csv") \
  .option("header", True) \
  .option("inferSchema", True) \
  .option("delimiter", ",") \
  .load(pizza_path) \
  .repartition(4)  # Adjust partition based on data size

df_pizza_type = spark.read.format("csv") \
  .option("header", True) \
  .option("inferSchema", True) \
  .option("delimiter", ",") \
  .load(pizza_type_path) \
  .repartition(4)

df_orders = spark.read.format("csv") \
  .option("header", True) \
  .option("inferSchema", True) \
  .option("delimiter", ",") \
  .load(orders_path) \
  .withColumn("time", substring("time", 12, 8)) \
  .repartition(4)

df_order_details = spark.read.format("csv") \
  .option("header", True) \
  .option("inferSchema", True) \
  .option("delimiter", ",") \
  .load(orders_details_path) \
  .repartition(4)

# Save tables with specific partitioning if needed for large datasets
df_pizza.write.mode("overwrite").saveAsTable("PIZZA")
df_pizza_type.write.mode("overwrite").saveAsTable("PIZZA_TYPES")
df_orders.write.mode("overwrite").saveAsTable("PIZZA_ORDERS")
df_order_details.write.mode("overwrite").saveAsTable("PIZZA_ORDER_DTL")

# Verify data counts
for table_name in ["PIZZA", "PIZZA_TYPES", "PIZZA_ORDERS", "PIZZA_ORDER_DTL"]:
    spark.sql(f"SELECT COUNT(*) AS {table_name} FROM {table_name}").show()

# Cache tables if used multiple times to reduce re-computation cost
spark.catalog.cacheTable("PIZZA")
spark.catalog.cacheTable("PIZZA_ORDER_DTL")
spark.catalog.cacheTable("PIZZA_TYPES")

# Identify the most common pizza size ordered
df_most_common_size = spark.sql(
    """
    SELECT size, COUNT(*) AS sold_count_by_size
    FROM (
        SELECT dtl.pizza_id, p.size
        FROM PIZZA_ORDER_DTL dtl
        LEFT JOIN PIZZA p ON dtl.pizza_id = p.pizza_id
    ) AS joined_data
    GROUP BY size
    ORDER BY sold_count_by_size DESC
    LIMIT 1
    """
)

df_most_common_size = df_most_common_size.repartition(1)  # Repartition based on small result size
df_most_common_size.show()

# List the top 5 most ordered pizza types along with their quantities
df_top_5 = spark.sql(
    """
    SELECT pt.name, ordered_data.quantity
    FROM (
        SELECT dtl.pizza_id, SUM(dtl.quantity) AS quantity
        FROM PIZZA_ORDER_DTL dtl
        GROUP BY dtl.pizza_id
        ORDER BY quantity DESC
        LIMIT 5
    ) AS ordered_data
    LEFT JOIN PIZZA p ON ordered_data.pizza_id = p.pizza_id
    LEFT JOIN PIZZA_TYPES pt ON p.pizza_type_id = pt.pizza_type_id
    """
)

df_top_5 = df_top_5.repartition(1)  # Repartition based on small result size
df_top_5.show()

# Un-cache tables when done
spark.catalog.uncacheTable("PIZZA")
spark.catalog.uncacheTable("PIZZA_ORDER_DTL")
spark.catalog.uncacheTable("PIZZA_TYPES")
