In [1]:


# Import all libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.fpm import FPGrowth
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
import os
from datetime import datetime

print("=== COMPLETE SPARK MBA NOTEBOOK ===")
print("Dataset: online_retail_II.xlsx (2 sheets)")
print("Expected records: ~1.6 million")

=== COMPLETE SPARK MBA NOTEBOOK ===
Dataset: online_retail_II.xlsx (2 sheets)
Expected records: ~1.6 million


In [3]:
!pip list

Package                 Version
----------------------- -----------
aiohappyeyeballs        2.6.1
aiohttp                 3.12.15
aiosignal               1.4.0
asttokens               3.0.0
attrs                   25.3.0
certifi                 2025.8.3
charset-normalizer      3.4.3
click                   8.2.1
cloudpickle             3.1.1
comm                    0.2.3
complete                0.0.1
contourpy               1.3.3
cycler                  0.12.1
dask                    2025.9.1
dask-glm                0.3.2
dask-ml                 2025.1.0
debugpy                 1.8.16
decorator               5.2.1
distributed             2025.9.1
exceptiongroup          1.3.0
executing               2.2.1
fonttools               4.59.2
frozenlist              1.7.0
fsspec                  2025.9.0
geopandas               1.1.1
idna                    3.10
importlib_metadata      8.7.0
ipykernel               6.30.1
ipython                 9.5.0
ipython_pygments_lexers 1.1.1
jedi       

In [None]:
# COMPLETE SPARK MARKET BASKET ANALYSIS NOTEBOOK
# For online_retail_II.xlsx (2 sheets, ~1.6M records)

# Install required packages (run this cell first)
!pip install pyspark pandas openpyxl xlrd

In [4]:
# Cell 2: Initialize Spark
def initialize_spark():
    """Initialize Spark session for notebook environment"""
    print("üîÑ INITIALIZING SPARK SESSION")
    print("-" * 40)
    
    spark = SparkSession.builder \
        .appName("MarketBasketAnalysis") \
        .config("spark.driver.memory", "2g") \
        .config("spark.executor.memory", "2g") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("WARN")
    print("‚úì Spark session initialized")
    return spark

# Initialize Spark
spark = initialize_spark()

üîÑ INITIALIZING SPARK SESSION
----------------------------------------


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/11/28 19:48:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
‚úì Spark session initialized


In [6]:
# Cell 3: Load CSV File
def load_csv_data(file_path):
    """Load data from online_retail_II.csv"""
    print("üìÇ LOADING CSV FILE")
    print("-" * 40)
    
    # Define schema for better performance
    custom_schema = StructType([
        StructField("Invoice", StringType(), True),
        StructField("StockCode", StringType(), True),
        StructField("Description", StringType(), True),
        StructField("Quantity", IntegerType(), True),
        StructField("InvoiceDate", TimestampType(), True),
        StructField("Price", DoubleType(), True),
        StructField("Customer ID", IntegerType(), True),
        StructField("Country", StringType(), True)
    ])
    
    # Load CSV with inferred schema (more flexible)
    print("Loading online_retail_II.csv...")
    df_raw = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(file_path)
    
    # Rename columns to match our expected format
    df_raw = df_raw \
        .withColumnRenamed("Invoice", "InvoiceNo") \
        .withColumnRenamed("Price", "UnitPrice") \
        .withColumnRenamed("Customer ID", "CustomerID")
    
    original_count = df_raw.count()
    print(f"‚úì Records loaded: {original_count:,}")
    
    # Show basic info
    print(f"‚úì Columns: {', '.join(df_raw.columns)}")
    
    # Show date range
    date_range = df_raw.agg(
        min("InvoiceDate").alias("min_date"),
        max("InvoiceDate").alias("max_date")
    ).collect()[0]
    
    print(f"‚úì Date range: {date_range['min_date']} to {date_range['max_date']}")
    
    return df_raw

# Load the data
file_path = "online_retail_II.csv"  # Make sure this file is in your notebook directory
df_raw = load_csv_data(file_path)

# Show sample data
print("\nüìä SAMPLE DATA:")
df_raw.show(5)
print(f"Total records: {df_raw.count():,}")

üìÇ LOADING CSV FILE
----------------------------------------
Loading online_retail_II.csv...


                                                                                

‚úì Records loaded: 1,067,371
‚úì Columns: InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country


                                                                                

‚úì Date range: 2009-12-01 07:45:00 to 2011-12-09 12:50:00

üìä SAMPLE DATA:
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00|     6.95|   13085.0|United Kingdom|
|   489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00|     6.75|   13085.0|United Kingdom|
|   489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00|     6.75|   13085.0|United Kingdom|
|   489434|    22041|"RECORD FRAME 7""...|      48|2009-12-01 07:45:00|      2.1|   13085.0|United Kingdom|
|   489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00|     1.25|   13085.0|United Kingdom|
+---------+---------+--------------------+--------+-------

[Stage 9:>                                                          (0 + 2) / 2]

Total records: 1,067,371


                                                                                

In [7]:
# Cell 4: Data Cleaning Pipeline
def comprehensive_cleaning(df_raw):
    """Complete data cleaning pipeline"""
    print("\nüßπ COMPREHENSIVE DATA CLEANING")
    print("=" * 50)
    
    original_count = df_raw.count()
    print(f"Starting with {original_count:,} records")
    
    # Data Quality Analysis
    print("\n4.1 DATA QUALITY ANALYSIS")
    print("-" * 40)
    
    # Missing values
    missing_analysis = df_raw.select([
        count(when(isnull(c), c)).alias(c) for c in df_raw.columns
    ]).collect()[0]
    
    print("Missing values:")
    for col_name in df_raw.columns:
        missing_count = missing_analysis[col_name]
        if missing_count > 0:
            print(f"  {col_name}: {missing_count:,} ({missing_count/original_count*100:.2f}%)")
    
    # Data issues
    data_issues = df_raw.agg(
        count(when(col("Quantity") <= 0, True)).alias("invalid_quantity"),
        count(when(col("UnitPrice") <= 0, True)).alias("invalid_price"),
        count(when(col("InvoiceNo").startswith("C"), True)).alias("cancelled_invoices")
    ).collect()[0]
    
    print("\nData quality issues:")
    print(f"  Invalid quantities: {data_issues['invalid_quantity']:,}")
    print(f"  Invalid prices: {data_issues['invalid_price']:,}")
    print(f"  Cancelled invoices: {data_issues['cancelled_invoices']:,}")
    
    # Cleaning Steps
    print("\n4.2 APPLYING CLEANING RULES")
    print("-" * 40)
    
    df_clean = df_raw
    
    # Step 1: Remove cancelled invoices
    df_clean = df_clean.filter(~col("InvoiceNo").startswith("C"))
    cancelled_count = original_count - df_clean.count()
    print(f"‚úì Removed cancellations: {cancelled_count:,} records")
    
    # Step 2: Remove invalid quantities and prices
    df_clean = df_clean.filter((col("Quantity") > 0) & (col("UnitPrice") > 0))
    invalid_count = original_count - cancelled_count - df_clean.count()
    print(f"‚úì Removed invalid data: {invalid_count:,} records")
    
    # Step 3: Handle missing values
    df_clean = df_clean.filter(col("CustomerID").isNotNull() & col("Description").isNotNull())
    missing_count = original_count - cancelled_count - invalid_count - df_clean.count()
    print(f"‚úì Removed missing values: {missing_count:,} records")
    
    # Step 4: Standardize descriptions
    df_clean = df_clean.withColumn("Description", upper(trim(col("Description"))))
    print(f"‚úì Standardized descriptions")
    
    # Step 5: Remove POST/non-product items
    non_product_keywords = ["POST", "POSTAGE", "CARRIAGE", "DISCOUNT", "FEE", "CHARGE", "ADJUST", "BANK", "CREDIT", "GIFT"]
    condition = ~col("Description").rlike("|".join(non_product_keywords))
    df_clean = df_clean.filter(condition)
    non_product_count = original_count - cancelled_count - invalid_count - missing_count - df_clean.count()
    print(f"‚úì Removed non-product items: {non_product_count:,} records")
    
    # Step 6: Remove duplicates
    initial_count = df_clean.count()
    df_clean = df_clean.dropDuplicates()
    duplicate_count = initial_count - df_clean.count()
    if duplicate_count > 0:
        print(f"‚úì Removed duplicates: {duplicate_count:,} records")
    
    # Post-cleaning analysis
    final_count = df_clean.count()
    retention_rate = (final_count / original_count) * 100
    
    print(f"\n4.3 CLEANING RESULTS")
    print("-" * 40)
    print(f"Original records: {original_count:,}")
    print(f"Cleaned records: {final_count:,}")
    print(f"Records removed: {original_count - final_count:,}")
    print(f"Retention rate: {retention_rate:.2f}%")
    
    # Key metrics
    metrics = df_clean.agg(
        countDistinct("InvoiceNo").alias("transactions"),
        countDistinct("StockCode").alias("products"),
        countDistinct("CustomerID").alias("customers"),
        mean("Quantity").alias("avg_quantity"),
        mean("UnitPrice").alias("avg_price"),
        sum("Quantity").alias("total_quantity")
    ).collect()[0]
    
    print(f"\nüìä FINAL METRICS:")
    print(f"  Transactions: {metrics['transactions']:,}")
    print(f"  Products: {metrics['products']:,}")
    print(f"  Customers: {metrics['customers']:,}")
    print(f"  Avg Quantity: {metrics['avg_quantity']:.2f}")
    print(f"  Avg Price: ${metrics['avg_price']:.2f}")
    print(f"  Total Quantity Sold: {metrics['total_quantity']:,}")
    
    return df_clean

# Run cleaning
df_clean = comprehensive_cleaning(df_raw)

# Show cleaned data sample
print("\n‚úÖ CLEANED DATA SAMPLE:")
df_clean.show(5)


üßπ COMPREHENSIVE DATA CLEANING


                                                                                

Starting with 1,067,371 records

4.1 DATA QUALITY ANALYSIS
----------------------------------------


                                                                                

Missing values:
  Description: 4,382 (0.41%)
  CustomerID: 243,007 (22.77%)


                                                                                


Data quality issues:
  Invalid quantities: 22,950
  Invalid prices: 6,207
  Cancelled invoices: 19,494

4.2 APPLYING CLEANING RULES
----------------------------------------


                                                                                

‚úì Removed cancellations: 19,494 records


                                                                                

‚úì Removed invalid data: 6,207 records


                                                                                

‚úì Removed missing values: 236,121 records
‚úì Standardized descriptions


                                                                                

‚úì Removed non-product items: 19,475 records


                                                                                

‚úì Removed duplicates: 25,559 records


                                                                                


4.3 CLEANING RESULTS
----------------------------------------
Original records: 1,067,371
Cleaned records: 760,515
Records removed: 306,856
Retention rate: 71.25%


                                                                                


üìä FINAL METRICS:
  Transactions: 36,722
  Products: 4,503
  Customers: 5,866
  Avg Quantity: 13.52
  Avg Price: $3.12
  Total Quantity Sold: 10,283,096

‚úÖ CLEANED DATA SAMPLE:


[Stage 60:>                                                         (0 + 1) / 1]

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   489446|    21671|RED SPOT CERAMIC ...|      12|2009-12-01 10:06:00|     1.25|   13758.0|United Kingdom|
|   489594|    21232|STRAWBERRY CERAMI...|      12|2009-12-01 14:19:00|     1.25|   15005.0|United Kingdom|
|   489599|    20711|      JUMBO BAG TOYS|      30|2009-12-01 14:40:00|     1.95|   12758.0|      Portugal|
|   489676|    21864|UNION JACK FLAG P...|     120|2009-12-02 09:49:00|     1.69|   13777.0|United Kingdom|
|   489679|    84371|     BIG PINK POODLE|       1|2009-12-02 10:00:00|    19.95|   16163.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



                                                                                

In [8]:
# Cell 5: Prepare Transactions for FP-Growth
def prepare_transactions(df_clean):
    """Prepare transaction baskets for FP-Growth"""
    print("\nüõí PREPARING TRANSACTION DATA")
    print("-" * 40)
    
    # Create transaction baskets
    transactions_df = df_clean.groupBy("InvoiceNo") \
        .agg(collect_list("StockCode").alias("items")) \
        .filter(size(col("items")) > 0)
    
    # Add basket size
    transactions_df = transactions_df.withColumn("basket_size", size(col("items")))
    
    transaction_count = transactions_df.count()
    print(f"‚úì Created {transaction_count:,} transaction baskets")
    
    # Basket statistics
    basket_stats = transactions_df.select(
        mean("basket_size").alias("avg_size"),
        stddev("basket_size").alias("std_size"),
        min("basket_size").alias("min_size"),
        max("basket_size").alias("max_size")
    ).collect()[0]
    
    print(f"\nüìä BASKET STATISTICS:")
    print(f"  Average size: {basket_stats['avg_size']:.2f} items")
    print(f"  Std deviation: {basket_stats['std_size']:.2f} items")
    print(f"  Min size: {basket_stats['min_size']} items")
    print(f"  Max size: {basket_stats['max_size']} items")
    
    # Show basket size distribution
    basket_sizes = transactions_df.select("basket_size").toPandas()
    print(f"  95th percentile: {basket_sizes['basket_size'].quantile(0.95):.0f} items")
    
    # Show sample transactions
    print(f"\nüéØ SAMPLE TRANSACTIONS:")
    samples = transactions_df.limit(3).collect()
    for i, sample in enumerate(samples):
        print(f"  Basket {i+1}: {sample['basket_size']} items - First 3: {sample['items'][:3]}")
    
    return transactions_df

# Prepare transactions
transactions_df = prepare_transactions(df_clean)


üõí PREPARING TRANSACTION DATA
----------------------------------------


                                                                                

‚úì Created 36,722 transaction baskets


                                                                                


üìä BASKET STATISTICS:
  Average size: 20.71 items
  Std deviation: 22.54 items
  Min size: 1 items
  Max size: 533 items


                                                                                

  95th percentile: 61 items

üéØ SAMPLE TRANSACTIONS:


[Stage 89:>                                                         (0 + 2) / 2]

  Basket 1: 8 items - First 3: ['22041', '79323W', '21523']
  Basket 2: 4 items - First 3: ['22353', '22349', '22350']
  Basket 3: 17 items - First 3: ['21252', '21411', '21033']


                                                                                

In [9]:
# Cell 6: Run FP-Growth
def run_fp_growth_analysis(transactions_df, min_support=0.005, min_confidence=0.4):
    """Run FP-Growth algorithm and generate association rules"""
    print(f"\nüîç RUNNING FP-GROWTH ANALYSIS")
    print("-" * 40)
    print(f"Minimum Support: {min_support}")
    print(f"Minimum Confidence: {min_confidence}")
    
    start_time = time.time()
    
    # Initialize and run FP-Growth
    fp_growth = FPGrowth(
        itemsCol="items", 
        minSupport=min_support, 
        minConfidence=min_confidence,
        numPartitions=8
    )
    
    print("Training FP-Growth model...")
    model = fp_growth.fit(transactions_df)
    
    # Get results
    frequent_itemsets = model.freqItemsets
    association_rules = model.associationRules
    
    processing_time = time.time() - start_time
    
    print(f"‚úì FP-Growth completed in {processing_time:.2f} seconds")
    print(f"‚úì Frequent itemsets: {frequent_itemsets.count():,}")
    print(f"‚úì Association rules: {association_rules.count():,}")
    
    return model, frequent_itemsets, association_rules

# Run FP-Growth
model, frequent_itemsets, association_rules = run_fp_growth_analysis(transactions_df)

# Show sample results
print("\nüîç SAMPLE RESULTS:")
print("Frequent Itemsets sample:")
frequent_itemsets.show(5)
print("Association Rules sample:")
association_rules.show(5)


üîç RUNNING FP-GROWTH ANALYSIS
----------------------------------------
Minimum Support: 0.005
Minimum Confidence: 0.4
Training FP-Growth model...


                                                                                

25/11/28 20:09:06 ERROR Executor: Exception in task 1.0 in stage 107.0 (TID 94)
org.apache.spark.SparkException: Items in a transaction must be unique but got WrappedArray(20967, 90195B, 20971, 84032A, 22125, 21482, 84031B, 22086, 21058, 21063, 21930, 20870, 21100, 21231, 20970, 84582, 90121B, 21484, 84327A, 90175A, 84032B, 90062, 20972, 84997C, 22125, 84997D).
	at org.apache.spark.mllib.fpm.FPGrowth.$anonfun$genFreqItems$1(FPGrowth.scala:249)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.a

Py4JJavaError: An error occurred while calling o221.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 107.0 failed 1 times, most recent failure: Lost task 1.0 in stage 107.0 (TID 94) (jupyter-km4886 executor driver): org.apache.spark.SparkException: Items in a transaction must be unique but got WrappedArray(20967, 90195B, 20971, 84032A, 22125, 21482, 84031B, 22086, 21058, 21063, 21930, 20870, 21100, 21231, 20970, 84582, 90121B, 21484, 84327A, 90175A, 84032B, 90062, 20972, 84997C, 22125, 84997D).
	at org.apache.spark.mllib.fpm.FPGrowth.$anonfun$genFreqItems$1(FPGrowth.scala:249)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.mllib.fpm.FPGrowth.genFreqItems(FPGrowth.scala:254)
	at org.apache.spark.mllib.fpm.FPGrowth.run(FPGrowth.scala:219)
	at org.apache.spark.ml.fpm.FPGrowth.$anonfun$genericFit$1(FPGrowth.scala:180)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.fpm.FPGrowth.genericFit(FPGrowth.scala:162)
	at org.apache.spark.ml.fpm.FPGrowth.fit(FPGrowth.scala:159)
	at org.apache.spark.ml.fpm.FPGrowth.fit(FPGrowth.scala:129)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Items in a transaction must be unique but got WrappedArray(20967, 90195B, 20971, 84032A, 22125, 21482, 84031B, 22086, 21058, 21063, 21930, 20870, 21100, 21231, 20970, 84582, 90121B, 21484, 84327A, 90175A, 84032B, 90062, 20972, 84997C, 22125, 84997D).
	at org.apache.spark.mllib.fpm.FPGrowth.$anonfun$genFreqItems$1(FPGrowth.scala:249)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [None]:
# Cell 7: Analyze Results
def analyze_results(frequent_itemsets, association_rules, transactions_count):
    """Analyze and display FP-Growth results"""
    print("\nüìä ANALYZING RESULTS")
    print("-" * 40)
    
    # Convert to pandas for easier analysis
    freq_itemsets_pd = frequent_itemsets.orderBy(desc("freq")).limit(20).toPandas()
    rules_pd = association_rules.orderBy(desc("confidence")).limit(20).toPandas()
    
    # Calculate support for itemsets
    freq_itemsets_pd['support'] = freq_itemsets_pd['freq'] / transactions_count
    
    print("üèÜ TOP 10 FREQUENT ITEMSETS:")
    for i, row in freq_itemsets_pd.head(10).iterrows():
        items_str = str(row['items'])[:80] + "..." if len(str(row['items'])) > 80 else str(row['items'])
        print(f"  {i+1:2d}. Support: {row['support']:.4f} - Items: {items_str}")
    
    print(f"\nüéØ TOP 10 ASSOCIATION RULES:")
    for i, row in rules_pd.head(10).iterrows():
        support = row['freq'] / transactions_count
        antecedent_str = str(row['antecedent'])[:50] + "..." if len(str(row['antecedent'])) > 50 else str(row['antecedent'])
        consequent_str = str(row['consequent'])[:50] + "..." if len(str(row['consequent'])) > 50 else str(row['consequent'])
        print(f"  {i+1:2d}. {antecedent_str} ‚Üí {consequent_str}")
        print(f"      Confidence: {row['confidence']:.3f}, Support: {support:.4f}")
    
    # Rule quality analysis
    if len(rules_pd) > 0:
        print(f"\nüìà RULE QUALITY SUMMARY:")
        print(f"  Total rules: {len(rules_pd):,}")
        print(f"  Avg confidence: {rules_pd['confidence'].mean():.3f}")
        print(f"  Max confidence: {rules_pd['confidence'].max():.3f}")
        print(f"  High confidence rules (>0.7): {len(rules_pd[rules_pd['confidence'] > 0.7])}")
        print(f"  Medium confidence rules (>0.5): {len(rules_pd[rules_pd['confidence'] > 0.5])}")
        print(f"  Avg rule length: {rules_pd['antecedent'].apply(len).mean() + rules_pd['consequent'].apply(len).mean():.1f} items")
    
    return freq_itemsets_pd, rules_pd

# Analyze results
transaction_count = transactions_df.count()
freq_itemsets_pd, rules_pd = analyze_results(frequent_itemsets, association_rules, transaction_count)

In [None]:
# Cell 8: Create Visualizations
def create_visualizations(freq_itemsets_pd, rules_pd, df_clean, transactions_df):
    """Create visualizations for the analysis"""
    print("\nüìà CREATING VISUALIZATIONS")
    print("-" * 40)
    
    # Set up plotting style
    plt.style.use('default')
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    fig.suptitle('Market Basket Analysis - Online Retail II', fontsize=16, fontweight='bold')
    
    # Plot 1: Top frequent itemsets by support
    if len(freq_itemsets_pd) > 0:
        top_itemsets = freq_itemsets_pd.head(8)
        # Create shortened labels
        labels = []
        for items in top_itemsets['items']:
            if len(items) == 1:
                labels.append(f"Single: {items[0]}")
            else:
                labels.append(f"Combo: {len(items)} items")
        
        axes[0, 0].barh(range(len(top_itemsets)), top_itemsets['support'])
        axes[0, 0].set_yticks(range(len(top_itemsets)))
        axes[0, 0].set_yticklabels(labels)
        axes[0, 0].set_title('Top 8 Frequent Itemsets by Support', fontweight='bold')
        axes[0, 0].set_xlabel('Support')
        axes[0, 0].grid(axis='x', alpha=0.3)
    
    # Plot 2: Confidence distribution of rules
    if len(rules_pd) > 0:
        axes[0, 1].hist(rules_pd['confidence'], bins=20, alpha=0.7, edgecolor='black', color='skyblue')
        axes[0, 1].set_title('Distribution of Rule Confidence', fontweight='bold')
        axes[0, 1].set_xlabel('Confidence')
        axes[0, 1].set_ylabel('Number of Rules')
        axes[0, 1].grid(alpha=0.3)
    
    # Plot 3: Top rules by confidence
    if len(rules_pd) > 0:
        top_rules = rules_pd.head(6)
        y_pos = np.arange(len(top_rules))
        colors = plt.cm.viridis(np.linspace(0, 1, len(top_rules)))
        
        bars = axes[1, 0].barh(y_pos, top_rules['confidence'], color=colors)
        axes[1, 0].set_yticks(y_pos)
        axes[1, 0].set_yticklabels([f"Rule {i+1}" for i in range(len(top_rules))])
        axes[1, 0].set_title('Top 6 Rules by Confidence', fontweight='bold')
        axes[1, 0].set_xlabel('Confidence')
        axes[1, 0].grid(axis='x', alpha=0.3)
        
        # Add confidence values on bars
        for i, bar in enumerate(bars):
            width = bar.get_width()
            axes[1, 0].text(width + 0.01, bar.get_y() + bar.get_height()/2, 
                           f'{width:.3f}', ha='left', va='center')
    
    # Plot 4: Basket size distribution
    basket_sizes_pd = transactions_df.select("basket_size").toPandas()
    axes[1, 1].hist(basket_sizes_pd['basket_size'], bins=50, alpha=0.7, 
                   edgecolor='black', color='lightgreen')
    axes[1, 1].set_title('Distribution of Basket Sizes', fontweight='bold')
    axes[1, 1].set_xlabel('Items per Basket')
    axes[1, 1].set_ylabel('Frequency (log scale)')
    axes[1, 1].set_yscale('log')
    axes[1, 1].grid(alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # Additional visualization: Support vs Confidence scatter plot
    if len(rules_pd) > 0:
        plt.figure(figsize=(10, 6))
        scatter = plt.scatter(rules_pd['freq']/transaction_count, rules_pd['confidence'], 
                            alpha=0.6, c=rules_pd['confidence'], cmap='viridis')
        plt.colorbar(scatter, label='Confidence')
        plt.xlabel('Support')
        plt.ylabel('Confidence')
        plt.title('Support vs Confidence for Association Rules', fontweight='bold')
        plt.grid(alpha=0.3)
        plt.show()
    
    print("‚úì Visualizations created successfully")

# Create visualizations
create_visualizations(freq_itemsets_pd, rules_pd, df_clean, transactions_df)

In [None]:
# Cell 9: Save Results
def save_results(df_clean, frequent_itemsets, association_rules, transactions_df):
    """Save all results to files"""
    print("\nüíæ SAVING RESULTS")
    print("-" * 40)
    
    # Create output directory
    os.makedirs("csv_results", exist_ok=True)
    
    # Save cleaned data
    df_clean.write.mode("overwrite").parquet("csv_results/cleaned_data")
    df_clean.coalesce(1).write.mode("overwrite").option("header", "true").csv("csv_results/cleaned_data_csv")
    
    # Save frequent itemsets
    frequent_itemsets.write.mode("overwrite").parquet("csv_results/frequent_itemsets")
    frequent_itemsets.coalesce(1).write.mode("overwrite").option("header", "true").csv("csv_results/frequent_itemsets_csv")
    
    # Save association rules
    association_rules.write.mode("overwrite").parquet("csv_results/association_rules")
    association_rules.coalesce(1).write.mode("overwrite").option("header", "true").csv("csv_results/association_rules_csv")
    
    # Save transaction data
    transactions_df.write.mode("overwrite").parquet("csv_results/transactions")
    
    # Save sample data for quick inspection
    df_clean.limit(10000).write.mode("overwrite").option("header", "true").csv("csv_results/sample_data")
    
    print("‚úì All results saved to 'csv_results' folder:")
    print("  - cleaned_data/ (Parquet + CSV)")
    print("  - frequent_itemsets/ (Parquet + CSV)")
    print("  - association_rules/ (Parquet + CSV)")
    print("  - transactions/ (Parquet)")
    print("  - sample_data/ (CSV)")

# Save results
save_results(df_clean, frequent_itemsets, association_rules, transactions_df)

In [None]:
# Cell 10: Generate Business Insights
def generate_business_insights(rules_pd, freq_itemsets_pd, top_n=10):
    """Generate actionable business insights"""
    print("\nüí° BUSINESS INSIGHTS & RECOMMENDATIONS")
    print("=" * 50)
    
    print("üéØ TOP CROSS-SELLING OPPORTUNITIES:")
    print("-" * 40)
    
    for i, row in rules_pd.head(top_n).iterrows():
        print(f"\n{i+1}. WHEN customers buy: {row['antecedent']}")
        print(f"   THEY ALSO buy: {row['consequent']}")
        print(f"   Confidence: {row['confidence']:.1%}")
        
        # Business recommendation based on confidence
        if row['confidence'] >= 0.7:
            recommendation = "üí™ STRONG BUNDLE - Create product bundles and recommend together"
        elif row['confidence'] >= 0.5:
            recommendation = "üëç MODERATE OPPORTUNITY - Cross-sell promotions and suggestions"
        else:
            recommendation = "üëÄ WEAK ASSOCIATION - Monitor and test with discounts"
        
        print(f"   üí° ACTION: {recommendation}")
    
    print(f"\nüì¶ POPULAR PRODUCT COMBINATIONS:")
    print("-" * 40)
    for i, row in freq_itemsets_pd.head(5).iterrows():
        if len(row['items']) == 1:
            print(f"  {i+1}. Single popular item: {row['items']} (Support: {row['support']:.2%})")
        else:
            print(f"  {i+1}. Popular combination: {row['items']} (Support: {row['support']:.2%})")
    
    # Summary statistics
    print(f"\nüìà PROJECT SUMMARY:")
    print("-" * 40)
    print(f"  Total association rules found: {len(rules_pd):,}")
    print(f"  High-confidence rules (‚â•70%): {len(rules_pd[rules_pd['confidence'] >= 0.7])}")
    print(f"  Medium-confidence rules (‚â•50%): {len(rules_pd[rules_pd['confidence'] >= 0.5])}")
    print(f"  Average rule confidence: {rules_pd['confidence'].mean():.1%}")

# Generate insights
generate_business_insights(rules_pd, freq_itemsets_pd)

In [None]:
# Cell 11: Performance Summary and Cleanup
print("\n‚è±Ô∏è PERFORMANCE SUMMARY")
print("=" * 50)

# Calculate some basic stats
total_records = df_raw.count()
cleaned_records = df_clean.count()
transactions_count = transactions_df.count()
itemsets_count = frequent_itemsets.count()
rules_count = association_rules.count()

print(f"üìä DATASET STATISTICS:")
print(f"  Original records: {total_records:,}")
print(f"  Cleaned records: {cleaned_records:,}")
print(f"  Transaction baskets: {transactions_count:,}")
print(f"  Frequent itemsets: {itemsets_count:,}")
print(f"  Association rules: {rules_count:,}")

print(f"\nüéØ BUSINESS IMPACT:")
print(f"  Cross-selling opportunities: {rules_count:,} rules")
print(f"  Product combinations: {itemsets_count:,} patterns")
print(f"  Customer transactions analyzed: {transactions_count:,}")

print(f"\nüîö CLEANUP")
print("-" * 40)
spark.stop()
print("‚úì Spark session stopped")
print("\n‚úÖ NOTEBOOK EXECUTION COMPLETED SUCCESSFULLY!")
print("üíæ Results saved in 'csv_results/' folder")
print("üìà Visualizations generated")
print("üí° Business insights ready for implementation")