# 📊 Customer Purchase Behavior Analysis

## Interactive Analysis with PySpark

This notebook provides an interactive environment for exploring customer purchase behavior using PySpark and the Online Retail Dataset.

### 📋 Table of Contents
1. [Setup and Data Loading](#setup)
2. [Data Exploration](#exploration)
3. [Customer Analysis](#customers)
4. [Product Analysis](#products)
5. [Temporal Patterns](#temporal)
6. [Geographic Analysis](#geographic)
7. [Visualizations](#visualizations)
8. [Key Insights](#insights)

## 🚀 Setup and Data Loading <a id="setup"></a>

In [1]:
# Import required libraries
import sys
import os
from pathlib import Path

# Add project root to path
project_root = Path.cwd().parent
sys.path.append(str(project_root))

# Import PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Import visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Import project modules
from src.data_ingestion import create_spark_session
from src.data_analysis import CustomerPurchaseAnalyzer
from src.visualization import AnalysisVisualizer
from config import PROCESSED_DATA_DIR, CHARTS_DIR

# Configure plotting
plt.style.use('default')
sns.set_palette('Set2')
%matplotlib inline

print("📦 Libraries imported successfully!")

📦 Libraries imported successfully!


In [2]:
# Create Spark Session
spark = create_spark_session()
print(f"✅ Spark session created: {spark.sparkContext.appName}")
print(f"🔧 Spark version: {spark.version}")
print(f"💻 Master: {spark.sparkContext.master}")

2025-08-22 13:55:58,269 - INFO - Spark session created successfully


✅ Spark session created: CustomerPurchaseBehaviorAnalysis
🔧 Spark version: 3.5.0
💻 Master: local[*]


In [3]:
# Load processed data
processed_data_path = PROCESSED_DATA_DIR / "retail_data_processed.parquet"

if processed_data_path.exists():
    df = spark.read.parquet(str(processed_data_path))
    print(f"✅ Data loaded from: {processed_data_path}")
    print(f"📊 Dataset shape: {df.count()} rows, {len(df.columns)} columns")
else:
    print("❌ Processed data not found. Please run the data ingestion script first.")
    print("Run: python src/data_ingestion.py")

❌ Processed data not found. Please run the data ingestion script first.
Run: python src/data_ingestion.py


## 🔍 Data Exploration <a id="exploration"></a>

In [4]:
# Display basic information about the dataset
print("📋 Dataset Schema:")
df.printSchema()

print("\n📈 Dataset Statistics:")
print(f"Total Records: {df.count():,}")
print(f"Unique Customers: {df.select('CustomerID').distinct().count():,}")
print(f"Unique Products: {df.select('StockCode').distinct().count():,}")
print(f"Unique Countries: {df.select('Country').distinct().count():,}")

# Date range
date_stats = df.select(
    min('InvoiceDate').alias('earliest_date'),
    max('InvoiceDate').alias('latest_date')
).collect()[0]

print(f"Date Range: {date_stats.earliest_date} to {date_stats.latest_date}")

📋 Dataset Schema:


NameError: name 'df' is not defined

In [None]:
# Display first few rows
print("🔍 Sample Data:")
df.select('InvoiceNo', 'Description', 'Quantity', 'UnitPrice', 'TotalAmount', 'Country').show(10, truncate=False)

In [None]:
# Quick statistics on numerical columns
print("📊 Numerical Statistics:")
df.select('Quantity', 'UnitPrice', 'TotalAmount').describe().show()

## 👥 Customer Analysis <a id="customers"></a>

In [None]:
# Customer spending analysis
customer_stats = df.groupBy('CustomerID') \
                  .agg(
                      sum('TotalAmount').alias('total_spent'),
                      count('InvoiceNo').alias('order_count'),
                      countDistinct('StockCode').alias('unique_products'),
                      avg('TotalAmount').alias('avg_order_value')
                  ) \
                  .orderBy(desc('total_spent'))

print("💰 Top 10 Customers by Spending:")
customer_stats.show(10)

In [None]:
# Customer segmentation
customer_segments = customer_stats.withColumn('segment',
    when(col('total_spent') >= 1000, 'High Value')
    .when(col('total_spent') >= 100, 'Medium Value')
    .otherwise('Low Value')
)

segment_summary = customer_segments.groupBy('segment') \
                                 .agg(
                                     count('*').alias('customer_count'),
                                     avg('total_spent').alias('avg_spent'),
                                     avg('order_count').alias('avg_orders')
                                 )

print("📊 Customer Segments:")
segment_summary.show()

## 🛍️ Product Analysis <a id="products"></a>

In [None]:
# Top products by revenue
top_products = df.groupBy('StockCode', 'Description') \
                .agg(
                    sum('TotalAmount').alias('total_revenue'),
                    sum('Quantity').alias('total_quantity'),
                    count('*').alias('order_count')
                ) \
                .orderBy(desc('total_revenue'))

print("🏆 Top 15 Products by Revenue:")
top_products.show(15, truncate=False)

In [None]:
# Product categories analysis (based on stock codes)
df_with_category = df.withColumn('category', regexp_extract(col('StockCode'), '^([A-Z]+)', 1))
df_with_category = df_with_category.filter(col('category') != '')

category_analysis = df_with_category.groupBy('category') \
                                   .agg(
                                       sum('TotalAmount').alias('total_revenue'),
                                       sum('Quantity').alias('total_quantity'),
                                       countDistinct('StockCode').alias('unique_products')
                                   ) \
                                   .orderBy(desc('total_revenue'))

print("📂 Product Categories by Revenue:")
category_analysis.show(20)

## ⏰ Temporal Patterns <a id="temporal"></a>

In [None]:
# Hourly sales pattern
hourly_sales = df.groupBy('Hour') \
                .agg(
                    sum('TotalAmount').alias('total_revenue'),
                    count('*').alias('order_count')
                ) \
                .orderBy('Hour')

print("🕐 Sales by Hour of Day:")
hourly_sales.show(24)

In [None]:
# Daily sales pattern (day of week)
daily_sales = df.groupBy('DayOfWeek', 'DayName') \
               .agg(
                   sum('TotalAmount').alias('total_revenue'),
                   count('*').alias('order_count')
               ) \
               .orderBy('DayOfWeek')

print("📅 Sales by Day of Week:")
daily_sales.show()

In [None]:
# Monthly sales trend
monthly_sales = df.groupBy('Year', 'Month') \
                 .agg(
                     sum('TotalAmount').alias('total_revenue'),
                     count('*').alias('order_count'),
                     countDistinct('CustomerID').alias('unique_customers')
                 ) \
                 .orderBy('Year', 'Month')

print("📈 Monthly Sales Trend:")
monthly_sales.show()

## 🌍 Geographic Analysis <a id="geographic"></a>

In [None]:
# Sales by country
country_analysis = df.groupBy('Country') \
                    .agg(
                        sum('TotalAmount').alias('total_revenue'),
                        count('*').alias('order_count'),
                        countDistinct('CustomerID').alias('unique_customers'),
                        avg('TotalAmount').alias('avg_order_value')
                    ) \
                    .orderBy(desc('total_revenue'))

print("🗺️ Sales by Country:")
country_analysis.show(20, truncate=False)

## 📊 Visualizations <a id="visualizations"></a>

In [None]:
# Convert Spark DataFrames to Pandas for plotting
hourly_pd = hourly_sales.toPandas()
daily_pd = daily_sales.toPandas().sort_values('DayOfWeek')
top_products_pd = top_products.limit(15).toPandas()
country_pd = country_analysis.limit(15).toPandas()

print("✅ Data converted to Pandas for visualization")

In [None]:
# Hourly sales pattern
plt.figure(figsize=(14, 6))
plt.subplot(1, 2, 1)
plt.plot(hourly_pd['Hour'], hourly_pd['total_revenue'], marker='o', linewidth=2)
plt.title('Revenue by Hour of Day')
plt.xlabel('Hour')
plt.ylabel('Revenue ($)')
plt.grid(True, alpha=0.3)
plt.xticks(range(0, 24, 2))

plt.subplot(1, 2, 2)
plt.bar(hourly_pd['Hour'], hourly_pd['order_count'], alpha=0.7)
plt.title('Order Count by Hour of Day')
plt.xlabel('Hour')
plt.ylabel('Number of Orders')
plt.xticks(range(0, 24, 2))

plt.tight_layout()
plt.show()

In [None]:
# Top products visualization
plt.figure(figsize=(14, 10))
top_10_products = top_products_pd.head(10)

# Truncate long product names for better display
product_names = [name[:30] + '...' if len(name) > 30 else name for name in top_10_products['Description']]

plt.barh(range(len(top_10_products)), top_10_products['total_revenue'])
plt.yticks(range(len(top_10_products)), product_names)
plt.xlabel('Total Revenue ($)')
plt.title('Top 10 Products by Revenue')
plt.gca().invert_yaxis()

# Add value labels
for i, v in enumerate(top_10_products['total_revenue']):
    plt.text(v, i, f' ${v:,.0f}', va='center')

plt.tight_layout()
plt.show()

In [None]:
# Geographic analysis
plt.figure(figsize=(14, 8))
top_countries = country_pd.head(10)

plt.subplot(2, 1, 1)
plt.barh(range(len(top_countries)), top_countries['total_revenue'])
plt.yticks(range(len(top_countries)), top_countries['Country'])
plt.xlabel('Total Revenue ($)')
plt.title('Top 10 Countries by Revenue')
plt.gca().invert_yaxis()

plt.subplot(2, 1, 2)
plt.bar(range(len(top_countries)), top_countries['unique_customers'])
plt.xticks(range(len(top_countries)), top_countries['Country'], rotation=45, ha='right')
plt.ylabel('Number of Customers')
plt.title('Top 10 Countries by Customer Count')

plt.tight_layout()
plt.show()

In [None]:
# Interactive Plotly visualization
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=('Hourly Sales Pattern', 'Daily Sales Pattern', 
                   'Top Products', 'Geographic Distribution'),
    specs=[[{"secondary_y": True}, {}],
           [{"type": "bar"}, {"type": "bar"}]]
)

# Hourly pattern
fig.add_trace(
    go.Scatter(x=hourly_pd['Hour'], y=hourly_pd['total_revenue'], 
               mode='lines+markers', name='Revenue'),
    row=1, col=1
)

# Daily pattern
fig.add_trace(
    go.Bar(x=daily_pd['DayName'], y=daily_pd['total_revenue'], name='Daily Revenue'),
    row=1, col=2
)

# Top products
top_5_products = top_products_pd.head(5)
fig.add_trace(
    go.Bar(x=top_5_products['total_revenue'], 
           y=[name[:20] + '...' if len(name) > 20 else name for name in top_5_products['Description']],
           orientation='h', name='Product Revenue'),
    row=2, col=1
)

# Countries
top_5_countries = country_pd.head(5)
fig.add_trace(
    go.Bar(x=top_5_countries['Country'], y=top_5_countries['total_revenue'], 
           name='Country Revenue'),
    row=2, col=2
)

fig.update_layout(height=800, showlegend=False, 
                 title_text="Customer Purchase Behavior Dashboard")
fig.show()

## 💡 Key Insights <a id="insights"></a>

In [None]:
# Calculate key insights
total_revenue = df.agg(sum('TotalAmount')).collect()[0][0]
total_orders = df.count()
unique_customers = df.select('CustomerID').distinct().count()
unique_products = df.select('StockCode').distinct().count()
avg_order_value = df.agg(avg('TotalAmount')).collect()[0][0]

# Peak shopping hours
peak_hours = hourly_pd.nlargest(3, 'order_count')

# Top spending customer
top_customer_spending = customer_stats.first()['total_spent']

print("🎯 KEY BUSINESS INSIGHTS")
print("=" * 50)
print(f"💰 Total Revenue: ${total_revenue:,.2f}")
print(f"🛒 Total Orders: {total_orders:,}")
print(f"👥 Unique Customers: {unique_customers:,}")
print(f"📦 Unique Products: {unique_products:,}")
print(f"💳 Average Order Value: ${avg_order_value:.2f}")
print(f"🔄 Orders per Customer: {total_orders/unique_customers:.1f}")
print(f"💎 Top Customer Spent: ${top_customer_spending:,.2f}")

print("\n⏰ PEAK SHOPPING HOURS:")
for _, hour_data in peak_hours.iterrows():
    print(f"   {hour_data['Hour']:02d}:00 - {hour_data['order_count']:,} orders")

print("\n🏆 TOP PERFORMING PRODUCT:")
top_product = top_products_pd.iloc[0]
print(f"   {top_product['Description']} - ${top_product['total_revenue']:,.2f}")

print("\n🌍 TOP MARKET:")
top_market = country_pd.iloc[0]
print(f"   {top_market['Country']} - ${top_market['total_revenue']:,.2f} ({top_market['unique_customers']:,} customers)")

In [None]:
# Business recommendations based on analysis
print("💼 BUSINESS RECOMMENDATIONS")
print("=" * 50)

# Find best day for marketing
best_day = daily_pd.loc[daily_pd['total_revenue'].idxmax()]
print(f"📅 Focus marketing on {best_day['DayName']} (highest sales day)")

# Customer segmentation insight
segment_pd = segment_summary.toPandas()
high_value_pct = (segment_pd[segment_pd['segment'] == 'High Value']['customer_count'].iloc[0] / unique_customers * 100) if 'High Value' in segment_pd['segment'].values else 0
print(f"👑 {high_value_pct:.1f}% of customers are high-value - focus retention efforts here")

# Geographic expansion
if len(country_pd) > 1:
    second_market = country_pd.iloc[1]
    revenue_gap = top_market['total_revenue'] - second_market['total_revenue']
    print(f"🌐 Consider expanding in {second_market['Country']} (${revenue_gap:,.0f} growth potential)")

# Inventory optimization
top_3_products = top_products_pd.head(3)
print(f"📦 Ensure stock availability for top 3 products (${top_3_products['total_revenue'].sum():,.0f} revenue)")

# Peak hour staffing
peak_hour = peak_hours.iloc[0]['Hour']
print(f"👨‍💼 Optimize staffing around {peak_hour:02d}:00 (peak order time)")

In [None]:
# Stop Spark session when done
spark.stop()
print("✅ Spark session stopped")
print("\n🎉 Analysis complete! You can now:")
print("   1. Review the insights above")
print("   2. Run the main analysis script for full reports")
print("   3. Create additional custom visualizations")
print("   4. Export results for presentation")