In [None]:
context = None
config = None

# Olist Orders Exploratory Data Analysis
This notebook performs exploratory data analysis on the Olist e-commerce orders dataset using PySpark.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os
import logging

# Set py4j logging to ERROR to reduce verbosity
logging.getLogger("py4j").setLevel(logging.ERROR)

# Initialize Spark Session with Full Configuration
# NOTE: This matches the configuration in SparkSessionResource for consistency.
spark = (
    SparkSession.builder
    .appName("OrderExploration")
    .master("spark://spark-master:7077")
    .config("spark.jars", 
            "/opt/spark/jars/delta-core_2.12-2.3.0.jar,"
            "/opt/spark/jars/hadoop-aws-3.3.2.jar,"
            "/opt/spark/jars/delta-storage-2.3.0.jar,"
            "/opt/spark/jars/aws-java-sdk-bundle-1.11.1026.jar,"
            "/opt/spark/jars/s3-2.18.41.jar,"
            "/opt/spark/jars/mysql-connector-java-8.0.19.jar")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minio")
    .config("spark.hadoop.fs.s3a.secret.key", "minio123")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    .config("spark.sql.warehouse.dir", "s3a://lakehouse/")
    .config("hive.metastore.uris", "thrift://hive-metastore:9083")
    .config("spark.sql.catalogImplementation", "hive")
    # Distributed Processing Config
    .config("spark.executor.cores", "1")
    .config("spark.cores.max", "3")  # Use all 3 workers
    .config("spark.driver.memory", "512m")
    .config("spark.executor.memory", "512m")
    .config("spark.default.parallelism", "3")
    .config("spark.sql.shuffle.partitions", "3")
    # Force consistent Python execution environment
    .config("spark.pyspark.python", "python3")
    .config("spark.pyspark.driver.python", "python3")
    .enableHiveSupport()
    .getOrCreate()
)
print("✓ Spark Session Created Successfully with Full Config")

## 1. Load Data from Bronze Layer

In [None]:
# Read orders data from bronze layer
try:
    orders_df = spark.read.table("bronze.order")
    # Repartition to distribute work across workers
    orders_df = orders_df.repartition(3)
    print(f"✓ Loaded {orders_df.count():,} orders")
except Exception as e:
    print(f"Note: Could not load from bronze.order table. Using sample data instead.")
    print(f"Error: {e}")
    # Create a small sample for demonstration
    orders_df = None

## 2. Data Overview

In [None]:
if orders_df:
    # Show schema
    print("Schema:")
    orders_df.printSchema()
    
    # Show first few rows
    print("\nSample Data:")
    orders_df.show(5, truncate=False)

## 3. Basic Statistics

In [None]:
if orders_df:
    # Count total orders
    total_orders = orders_df.count()
    print(f"Total Orders: {total_orders:,}")
    
    # Count unique customers
    unique_customers = orders_df.select("customer_id").distinct().count()
    print(f"Unique Customers: {unique_customers:,}")
    
    # Order status distribution
    print("\nOrder Status Distribution:")
    orders_df.groupBy("order_status") \
        .count() \
        .orderBy(col("count").desc()) \
        .show()

## 4. Temporal Analysis

In [None]:
if orders_df:
    # Convert timestamp to date and extract temporal features
    orders_enriched = orders_df \
        .withColumn("order_date", to_date(col("order_purchase_timestamp"))) \
        .withColumn("order_year", year(col("order_purchase_timestamp"))) \
        .withColumn("order_month", month(col("order_purchase_timestamp"))) \
        .withColumn("order_day_of_week", dayofweek(col("order_purchase_timestamp")))
    
    # Orders by year
    print("Orders by Year:")
    orders_enriched.groupBy("order_year") \
        .count() \
        .orderBy("order_year") \
        .show()
    
    # Orders by month (latest year)
    print("\nOrders by Month (Most Recent Year):")
    latest_year = orders_enriched.agg(max("order_year")).collect()[0][0]
    orders_enriched.filter(col("order_year") == latest_year) \
        .groupBy("order_month") \
        .count() \
        .orderBy("order_month") \
        .show()

## 5. Order Delivery Analysis

In [None]:
if orders_df:
    # Calculate delivery time in days
    delivery_analysis = orders_df \
        .filter(col("order_delivered_customer_date").isNotNull()) \
        .withColumn(
            "delivery_time_days",
            datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp"))
        )
    
    # Delivery time statistics
    print("Delivery Time Statistics (in days):")
    delivery_analysis.select(
        avg("delivery_time_days").alias("avg_delivery_time"),
        min("delivery_time_days").alias("min_delivery_time"),
        max("delivery_time_days").alias("max_delivery_time")
    ).show()
    
    # Delivery time distribution
    print("\nDelivery Time Distribution:")
    delivery_analysis.groupBy(
        when(col("delivery_time_days") <= 7, "0-7 days")
        .when(col("delivery_time_days") <= 14, "8-14 days")
        .when(col("delivery_time_days") <= 21, "15-21 days")
        .otherwise("21+ days")
        .alias("delivery_range")
    ).count().orderBy("count", ascending=False).show()

## 6. Load and Join with Order Items

In [None]:
if orders_df:
    try:
        # Load order items
        order_items_df = spark.read.table("bronze.orderitem")
        
        # Join orders with order items
        orders_with_items = orders_df.join(
            order_items_df,
            on="order_id",
            how="inner"
        )
        
        print("✓ Successfully joined orders with order items")
        
        # Calculate total value per order
        print("\nTop 10 Orders by Total Value:")
        orders_with_items.groupBy("order_id") \
            .agg(
                sum("price").alias("total_price"),
                count("*").alias("num_items")
            ) \
            .orderBy(col("total_price").desc()) \
            .show(10)
            
    except Exception as e:
        print(f"Warning: Could not load order items: {str(e).splitlines()[0]}")

## 7. Summary Statistics

In [None]:
if orders_df:
    # Create summary report
    print("=" * 50)
    print("SUMMARY REPORT")
    print("=" * 50)
    
    summary = orders_df.select(
        count("*").alias("total_orders"),
        countDistinct("customer_id").alias("unique_customers"),
        min("order_purchase_timestamp").alias("first_order_date"),
        max("order_purchase_timestamp").alias("last_order_date")
    ).collect()[0]
    
    print(f"Total Orders: {summary['total_orders']:,}")
    print(f"Unique Customers: {summary['unique_customers']:,}")
    print(f"First Order: {summary['first_order_date']}")
    print(f"Last Order: {summary['last_order_date']}")
    print("=" * 50)

In [None]:
# Stop Spark session
# spark.stop()
print("✓ Analysis Complete")