# Optimized PySpark Retail Data Processing and Sales Analysis

This notebook contains an optimized version of the retail data processing pipeline with performance improvements for big data processing (>1M records).

- Spark configuration tuning for big data workloads
- Strategic caching of reused DataFrames
- Combined filtering operations to reduce data passes
- Optimal partitioning strategies
- Window function optimization with coalesce
- Memory management and cleanup

In [None]:
# Spark Configuration for Big Data Optimization
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
spark.conf.set("spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold", "0")

print("Spark configuration optimized for big data processing")

In [None]:
# Import required libraries
from pyspark.sql.functions import col, to_date, year, month, round, sum, desc, row_number
from pyspark.sql import Window
import time

print("Libraries imported successfully")

In [None]:
# Performance monitoring setup
start_time = time.time()
print(f"Starting data processing at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")

In [None]:
# Load data with optimal partitioning
print("Loading raw data...")
df = spark.read.csv("/mnt/raw/merge_csv.csv", header=True, inferSchema=True)

df = df.cache()

initial_count = df.count()
print(f"Initial dataset size: {initial_count:,} records")

optimal_partitions = max(50, min(200, initial_count // 25000))
df = df.repartition(optimal_partitions)
print(f"Repartitioned data into {optimal_partitions} partitions")

In [None]:
# Combined data cleaning operations for optimal performance
print("Applying data cleaning operations...")

df_cleaned = df.withColumnRenamed('Customer ID', 'CustomerID') \
    .filter(~col('Invoice').startswith('C')) \
    .filter(col('CustomerID').isNotNull()) \
    .filter((col('Quantity') > 0) & (col('Price') > 0)) \
    .dropDuplicates() \
    .cache()  # Cache cleaned data for reuse

cleaned_count = df_cleaned.count()
print(f"Cleaned dataset size: {cleaned_count:,} records")
print(f"Filtered out: {initial_count - cleaned_count:,} records ({((initial_count - cleaned_count) / initial_count * 100):.1f}%)")

df.unpersist()

In [None]:
# Combined date processing and feature engineering
print("Processing dates and calculating revenue...")

df_processed = df_cleaned.withColumn('InvoiceDate', to_date(col('InvoiceDate'), 'MM/d/yyyy H:mm')) \
    .withColumn("Year", year(col("InvoiceDate"))) \
    .withColumn("Month", month(col("InvoiceDate"))) \
    .withColumn('Revenue', round(col('Quantity') * col('Price'), 2)) \
    .cache()  # Cache processed data

processed_count = df_processed.count()
print(f"Processed dataset size: {processed_count:,} records")

df_cleaned.unpersist()

In [None]:
# Optimized aggregation with pre-partitioning
print("Performing monthly sales aggregation...")

monthly_sales = df_processed.repartition(col('Year'), col('Month')) \
    .groupBy('Year','Month','Stockcode','Description') \
    .agg(sum('Revenue').alias('TotalRevenue')) \
    .cache()  # Cache aggregated results

monthly_sales_count = monthly_sales.count()
print(f"Monthly sales aggregation complete: {monthly_sales_count:,} product-month combinations")

display(monthly_sales.orderBy(desc('TotalRevenue')).limit(10))

In [None]:
# Optimized window function with coalesce
print("Ranking products by monthly revenue...")

window_spec = Window.partitionBy('Year','Month').orderBy(desc('TotalRevenue'))

ranked = monthly_sales.coalesce(50) \
    .withColumn('Rank', row_number().over(window_spec))

top10_product = ranked.filter(col('Rank') <= 10).cache()

top10_count = top10_product.count()
print(f"Top 10 products analysis complete: {top10_count:,} records")

display(top10_product.orderBy('Year', 'Month', 'Rank'))

In [None]:
# Additional analysis: Country sales (optimized)
print("Performing country sales analysis...")

country_sales = df_processed.repartition(col('Year'), col('Month')) \
    .groupBy('Year','Month','Country') \
    .agg(sum('Revenue').alias('TotalRevenue')) \
    .orderBy(desc('TotalRevenue'))

display(country_sales.limit(20))

In [None]:
# Save results to Delta Lake with optimized write
print("Saving results to Delta Lake...")

top10_product.coalesce(10) \
    .write \
    .format('delta') \
    .mode('overwrite') \
    .option("overwriteSchema", "true") \
    .partitionBy('Year','Month') \
    .save('/mnt/processed/top10_product')

print("Results successfully saved to Delta Lake")

In [None]:
# Memory management and cleanup
print("Cleaning up cached DataFrames...")

df_processed.unpersist()
monthly_sales.unpersist()
top10_product.unpersist()

print("Memory cleanup completed")

In [None]:
# Performance summary
end_time = time.time()
processing_time = end_time - start_time

print("\n" + "="*50)
print("PROCESSING SUMMARY")
print("="*50)
print(f"Total processing time: {processing_time:.2f} seconds")
print(f"Initial dataset size: {initial_count:,} records")
print(f"Final top 10 products: {top10_count:,} records")
print(f"Processing rate: {initial_count / processing_time:.0f} records/second")
print(f"Completed at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))}")
print("="*50)