# Exploring Synthetic Big Data

This notebook demonstrates how to explore and analyze the synthetic data generated for the retail analytics database.

In [None]:
# Initialize Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Create Spark session with Hive support
spark = SparkSession.builder \
    .appName("Explore Synthetic Data") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

# Set display options
spark.conf.set("spark.sql.repl.eagerEval.enabled", "true")
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 10)

print("Spark Session initialized successfully!")

## 1. Database Overview

In [None]:
# Show all tables in retail_analytics database
spark.sql("USE retail_analytics")
tables_df = spark.sql("SHOW TABLES")
tables_df.show()

In [None]:
# Get record counts for all tables
table_names = [row.tableName for row in tables_df.collect()]

record_counts = []
for table in table_names:
    count = spark.sql(f"SELECT COUNT(*) as count FROM {table}").collect()[0].count
    record_counts.append({'table': table, 'record_count': count})

counts_df = pd.DataFrame(record_counts)
counts_df = counts_df.sort_values('record_count', ascending=False)

# Plot record counts
plt.figure(figsize=(12, 6))
plt.bar(counts_df['table'], counts_df['record_count'])
plt.xticks(rotation=45, ha='right')
plt.title('Record Counts by Table')
plt.ylabel('Number of Records')
plt.tight_layout()
plt.show()

print(counts_df)

## 2. Customer Analysis

In [None]:
# Load customers data
customers_df = spark.sql("SELECT * FROM customers")
customers_df.printSchema()

In [None]:
# Customer segmentation analysis
segment_analysis = spark.sql("""
    SELECT 
        customer_segment,
        COUNT(*) as customer_count,
        AVG(credit_score) as avg_credit_score,
        AVG(annual_income) as avg_annual_income,
        SUM(CASE WHEN marketing_opt_in = true THEN 1 ELSE 0 END) as marketing_opted_in
    FROM customers
    GROUP BY customer_segment
    ORDER BY customer_count DESC
""")

segment_analysis.show()

In [None]:
# Customer distribution by state
state_dist = spark.sql("""
    SELECT 
        state,
        COUNT(*) as customer_count
    FROM customers
    GROUP BY state
    ORDER BY customer_count DESC
    LIMIT 20
""")

state_dist_pd = state_dist.toPandas()

plt.figure(figsize=(15, 6))
plt.bar(state_dist_pd['state'], state_dist_pd['customer_count'])
plt.title('Top 20 States by Customer Count')
plt.xlabel('State')
plt.ylabel('Number of Customers')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## 3. Sales Analysis

In [None]:
# Monthly sales trend
monthly_sales = spark.sql("""
    SELECT 
        DATE_FORMAT(transaction_date, 'yyyy-MM') as month,
        COUNT(DISTINCT transaction_id) as num_transactions,
        SUM(total_amount) as total_revenue,
        AVG(total_amount) as avg_transaction_value
    FROM transactions
    WHERE transaction_status = 'Completed'
    GROUP BY DATE_FORMAT(transaction_date, 'yyyy-MM')
    ORDER BY month
""")

monthly_sales_pd = monthly_sales.toPandas()

# Plot revenue trend
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10))

# Revenue trend
ax1.plot(monthly_sales_pd['month'], monthly_sales_pd['total_revenue'], marker='o')
ax1.set_title('Monthly Revenue Trend')
ax1.set_xlabel('Month')
ax1.set_ylabel('Total Revenue ($)')
ax1.tick_params(axis='x', rotation=45)
ax1.grid(True, alpha=0.3)

# Transaction count trend
ax2.bar(monthly_sales_pd['month'], monthly_sales_pd['num_transactions'], alpha=0.7)
ax2.set_title('Monthly Transaction Count')
ax2.set_xlabel('Month')
ax2.set_ylabel('Number of Transactions')
ax2.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

In [None]:
# Top selling products
top_products = spark.sql("""
    SELECT 
        p.product_name,
        p.category,
        COUNT(DISTINCT t.transaction_id) as num_transactions,
        SUM(t.quantity) as total_quantity_sold,
        SUM(t.total_amount) as total_revenue
    FROM transactions t
    JOIN products p ON t.product_id = p.product_id
    WHERE t.transaction_status = 'Completed'
    GROUP BY p.product_name, p.category
    ORDER BY total_revenue DESC
    LIMIT 20
""")

top_products.show(truncate=False)

In [None]:
# Sales by category
category_sales = spark.sql("""
    SELECT 
        p.category,
        COUNT(DISTINCT t.transaction_id) as num_transactions,
        SUM(t.total_amount) as total_revenue,
        AVG(t.total_amount) as avg_transaction_value
    FROM transactions t
    JOIN products p ON t.product_id = p.product_id
    WHERE t.transaction_status = 'Completed'
    GROUP BY p.category
    ORDER BY total_revenue DESC
""")

category_sales_pd = category_sales.toPandas()

# Pie chart of revenue by category
plt.figure(figsize=(10, 8))
plt.pie(category_sales_pd['total_revenue'], 
        labels=category_sales_pd['category'], 
        autopct='%1.1f%%',
        startangle=90)
plt.title('Revenue Distribution by Product Category')
plt.axis('equal')
plt.show()

## 4. Store Performance Analysis

In [None]:
# Store performance metrics
store_performance = spark.sql("""
    SELECT 
        s.store_name,
        s.store_type,
        s.city,
        s.state,
        COUNT(DISTINCT t.transaction_id) as num_transactions,
        COUNT(DISTINCT t.customer_id) as unique_customers,
        SUM(t.total_amount) as total_revenue,
        AVG(t.total_amount) as avg_transaction_value,
        s.annual_revenue_target,
        (SUM(t.total_amount) / s.annual_revenue_target * 100) as target_achievement_pct
    FROM stores s
    LEFT JOIN transactions t ON s.store_id = t.store_id AND t.transaction_status = 'Completed'
    WHERE s.is_active = true
    GROUP BY s.store_id, s.store_name, s.store_type, s.city, s.state, s.annual_revenue_target
    ORDER BY total_revenue DESC
    LIMIT 20
""")

store_performance.show(truncate=False)

In [None]:
# Store type comparison
store_type_analysis = spark.sql("""
    SELECT 
        s.store_type,
        COUNT(DISTINCT s.store_id) as num_stores,
        AVG(s.employees_count) as avg_employees,
        SUM(t.total_amount) as total_revenue,
        AVG(t.total_amount) as avg_transaction_value
    FROM stores s
    LEFT JOIN transactions t ON s.store_id = t.store_id AND t.transaction_status = 'Completed'
    GROUP BY s.store_type
    ORDER BY total_revenue DESC
""")

store_type_analysis.show()

## 5. Customer Behavior Analysis

In [None]:
# Device usage analysis
device_analysis = spark.sql("""
    SELECT 
        device_type,
        COUNT(*) as event_count,
        COUNT(DISTINCT session_id) as num_sessions,
        COUNT(DISTINCT customer_id) as unique_users,
        AVG(time_on_page_seconds) as avg_time_on_page,
        SUM(CASE WHEN bounce = true THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as bounce_rate
    FROM customer_behavior
    GROUP BY device_type
    ORDER BY event_count DESC
""")

device_analysis.show()

In [None]:
# Customer journey funnel
funnel_analysis = spark.sql("""
    SELECT 
        event_type,
        COUNT(DISTINCT session_id) as num_sessions,
        COUNT(DISTINCT customer_id) as unique_customers
    FROM customer_behavior
    WHERE event_type IN ('page_view', 'product_view', 'add_to_cart', 'checkout_start', 'purchase')
    GROUP BY event_type
""")

funnel_pd = funnel_analysis.toPandas()

# Define funnel order
funnel_order = ['page_view', 'product_view', 'add_to_cart', 'checkout_start', 'purchase']
funnel_pd['event_type'] = pd.Categorical(funnel_pd['event_type'], categories=funnel_order, ordered=True)
funnel_pd = funnel_pd.sort_values('event_type')

# Create funnel chart
plt.figure(figsize=(10, 6))
plt.bar(funnel_pd['event_type'], funnel_pd['num_sessions'], alpha=0.7)
plt.title('Customer Journey Funnel')
plt.xlabel('Event Type')
plt.ylabel('Number of Sessions')
plt.xticks(rotation=45)

# Add conversion rates
for i in range(1, len(funnel_pd)):
    conv_rate = (funnel_pd.iloc[i]['num_sessions'] / funnel_pd.iloc[i-1]['num_sessions']) * 100
    plt.text(i-0.5, funnel_pd.iloc[i]['num_sessions'] + 1000, 
             f'{conv_rate:.1f}%', ha='center', va='bottom')

plt.tight_layout()
plt.show()

## 6. Product Reviews Analysis

In [None]:
# Rating distribution
rating_dist = spark.sql("""
    SELECT 
        rating,
        COUNT(*) as review_count,
        AVG(helpful_count) as avg_helpful_count
    FROM product_reviews
    GROUP BY rating
    ORDER BY rating
""")

rating_dist_pd = rating_dist.toPandas()

plt.figure(figsize=(8, 6))
plt.bar(rating_dist_pd['rating'], rating_dist_pd['review_count'])
plt.title('Product Rating Distribution')
plt.xlabel('Rating')
plt.ylabel('Number of Reviews')
plt.xticks([1, 2, 3, 4, 5])
plt.grid(True, alpha=0.3, axis='y')
plt.show()

In [None]:
# Category ratings comparison
category_ratings = spark.sql("""
    SELECT 
        p.category,
        COUNT(r.review_id) as num_reviews,
        AVG(r.rating) as avg_rating,
        STDDEV(r.rating) as rating_stddev,
        SUM(CASE WHEN r.rating >= 4 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as positive_review_pct
    FROM product_reviews r
    JOIN products p ON r.product_id = p.product_id
    GROUP BY p.category
    HAVING COUNT(r.review_id) > 100
    ORDER BY avg_rating DESC
""")

category_ratings.show()

## 7. Marketing Campaign ROI Analysis

In [None]:
# Campaign performance analysis
campaign_performance = spark.sql("""
    SELECT 
        campaign_type,
        COUNT(*) as num_campaigns,
        AVG(budget) as avg_budget,
        AVG(actual_spend) as avg_spend,
        AVG(roi) as avg_roi,
        SUM(revenue_generated) as total_revenue,
        AVG(ctr) as avg_ctr,
        AVG(conversion_rate) as avg_conversion_rate
    FROM marketing_campaigns
    WHERE status = 'Completed'
    GROUP BY campaign_type
    ORDER BY avg_roi DESC
""")

campaign_performance.show()

In [None]:
# ROI by campaign goal
roi_by_goal = spark.sql("""
    SELECT 
        campaign_goal,
        campaign_type,
        AVG(roi) as avg_roi,
        COUNT(*) as num_campaigns
    FROM marketing_campaigns
    WHERE status = 'Completed' AND roi IS NOT NULL
    GROUP BY campaign_goal, campaign_type
    ORDER BY campaign_goal, avg_roi DESC
""")

roi_by_goal_pd = roi_by_goal.toPandas()

# Create grouped bar chart
fig, ax = plt.subplots(figsize=(12, 6))
goals = roi_by_goal_pd['campaign_goal'].unique()
x = np.arange(len(goals))
width = 0.15

for i, ctype in enumerate(roi_by_goal_pd['campaign_type'].unique()):
    data = roi_by_goal_pd[roi_by_goal_pd['campaign_type'] == ctype]
    values = []
    for goal in goals:
        goal_data = data[data['campaign_goal'] == goal]
        values.append(goal_data['avg_roi'].values[0] if len(goal_data) > 0 else 0)
    ax.bar(x + i * width, values, width, label=ctype)

ax.set_xlabel('Campaign Goal')
ax.set_ylabel('Average ROI (%)')
ax.set_title('Marketing Campaign ROI by Goal and Type')
ax.set_xticks(x + width * 2)
ax.set_xticklabels(goals, rotation=45, ha='right')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

## 8. Advanced Analytics - Customer Lifetime Value

In [None]:
# Calculate customer metrics for CLV
customer_metrics = spark.sql("""
    WITH customer_transactions AS (
        SELECT 
            c.customer_id,
            c.customer_segment,
            c.registration_date,
            COUNT(DISTINCT t.transaction_id) as num_transactions,
            SUM(t.total_amount) as total_spent,
            AVG(t.total_amount) as avg_transaction_value,
            MIN(t.transaction_date) as first_purchase,
            MAX(t.transaction_date) as last_purchase,
            DATEDIFF(MAX(t.transaction_date), MIN(t.transaction_date)) as customer_lifetime_days
        FROM customers c
        LEFT JOIN transactions t ON c.customer_id = t.customer_id AND t.transaction_status = 'Completed'
        GROUP BY c.customer_id, c.customer_segment, c.registration_date
    )
    SELECT 
        customer_segment,
        COUNT(*) as num_customers,
        AVG(total_spent) as avg_customer_value,
        AVG(num_transactions) as avg_transactions_per_customer,
        AVG(customer_lifetime_days) as avg_customer_lifetime_days,
        SUM(total_spent) / COUNT(*) as revenue_per_customer
    FROM customer_transactions
    WHERE total_spent > 0
    GROUP BY customer_segment
    ORDER BY avg_customer_value DESC
""")

customer_metrics.show()

In [None]:
# Close Spark session
spark.stop()
print("Analysis complete!")