# E-commerce Advanced Analytics: User Funnel, Product Ranking, Basket Patterns & Time-Series Sales
This notebook presents advanced e-commerce analytics and demonstrates practical techniques to explore user behavior, product performance, and sales trends.
I'm going to perform user funnel analysis (tracking the conversion from homepage to purchase), identify top products by category, analyze average basket size and customer segmentation, conduct detailed time-series analyses of sales, and uncover key customer behavior patterns (such as time-to-purchase and repeat activity).

In [14]:
# Set up Java environment variable
import os
print("JAVA_HOME =", os.environ.get("JAVA_HOME"))

JAVA_HOME = /opt/anaconda3/lib/jvm


In [15]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, countDistinct, when, sum as spark_sum,
    round as spark_round, lit
)
from pyspark.sql.window import Window
import pyspark.sql.functions as F

##### Tracks user journey: Homepage → Product → Cart → Purchase

In [16]:
# Initialize Spark session
def initialize_spark():
    """Initialize and configure Spark session with error handling"""
    try:
        spark = SparkSession.builder \
            .appName("User Funnel Analysis") \
            .config("spark.driver.memory", "8g") \
            .getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")
        print("✓ Spark session initialized successfully")
        return spark
    except Exception as e:
        print(f"Error initializing Spark session: {str(e)}")
        raise

In [17]:
# load data
def load_data(spark):
    """Load user logs and transactions data with error handling
    Args:
        spark (SparkSession): The Spark session object.
    Returns:
        logs_df (DataFrame): DataFrame containing user logs.
        transactions_df (DataFrame): DataFrame containing transactions.
    Raises:
        FileNotFoundError: If files don't exist
        Exception: For other loading errors
    """
    try:
        # Load user logs
        logs_path = "/Users/cherifamanatoulhasy/Downloads/Pyspark-E-Commerce-Logs-Analysis/src/data_generation/data/raw/user_logs.csv"
        logs_df = spark.read.format("csv").option("header", "true").load(logs_path)
        print(f"✓ Loaded user_logs.csv ({logs_df.count()} rows)")
        
        # Load transactions
        transactions_path = "/Users/cherifamanatoulhasy/Downloads/Pyspark-E-Commerce-Logs-Analysis/src/data_generation/data/raw/transactions.csv"
        transactions_df = spark.read.format("csv").option("header", "true").load(transactions_path)
        print(f"Loaded transactions.csv ({transactions_df.count()} rows)")
        
        return logs_df, transactions_df
    except FileNotFoundError as fe:
        print(f"✗ File not found: {str(fe)}")
        raise
    except Exception as e:
        print(f"Error loading data: {str(e)}")
        raise

In [18]:
# Analyse user funnel
def analyze_funnel_stages(spark, logs_file, transactions_file):
    """
    Analyze each stage of the funnel with error handling:
    - Homepage Visit: Users who visited the homepage
    - Product View: Users who viewed product pages
    - Add to Cart: Users who added items to cart
    - Purchase: Users who completed a transaction
    """
    try:
        # Users who visited the homepage (/)
        homepage_visitors = logs_file.filter(col("page_url") == "/").select("user_id").distinct()
        homepage_count = homepage_visitors.count()
        if homepage_count == 0:
            print("⚠ Warning: No homepage visitors found")
        else:
            print(f"Number of visitors to homepage: {homepage_count} users")

        # Users who viewed product pages (/product/...)
        product_page_views = logs_file.filter((col("page_url").like("/product/%")) & (col("action") == "view")).select("user_id").distinct()
        product_view_count = product_page_views.count()
        if product_view_count == 0:
            print("⚠ Warning: No product page views found")
        else:
            print(f"Number of users who viewed product pages: {product_view_count} users")

        # Users who added items to cart (/cart)
        add_to_cart_users = logs_file.filter((col("page_url") == "/cart") & (col("action") == "add")).select("user_id").distinct()
        add_to_cart_count = add_to_cart_users.count()
        if add_to_cart_count == 0:
            print("⚠ Warning: No users added items to cart")
        else:
            print(f"Number of users who added items to cart: {add_to_cart_count} users")

        # Users who completed a transaction (/purchase)
        purchasers = transactions_file.select("user_id").distinct()
        purchase_count = purchasers.count()
        if purchase_count == 0:
            print("⚠ Warning: No purchases found")
        else:
            print(f"Number of users who completed a purchase: {purchase_count} users")

        # Calculate conversion rates between stages with safe division
        conversion_homepage_to_product = (product_view_count / homepage_count) * 100 if homepage_count > 0 else 0
        conversion_product_to_cart = (add_to_cart_count / product_view_count) * 100 if product_view_count > 0 else 0
        conversion_cart_to_purchase = (purchase_count / add_to_cart_count) * 100 if add_to_cart_count > 0 else 0

        # Create a summary DataFrame
        funnel_summary = spark.createDataFrame([
            ("Homepage Visit", float(homepage_count), None),
            ("Product View", float(product_view_count), float(conversion_homepage_to_product)),
            ("Add to Cart", float(add_to_cart_count), float(conversion_product_to_cart)),
            ("Purchase", float(purchase_count), float(conversion_cart_to_purchase))
        ], ["Funnel Stage", "User Count", "Conversion Rate (%)"])

        return funnel_summary
    
    except Exception as e:
        print(f"✗ Error analyzing funnel stages: {str(e)}")
        raise

In [19]:
def main():
    """Main function to orchestrate the analysis with error handling"""
    try:
        spark = initialize_spark()
        logs_df, transactions_df = load_data(spark)
        funnel_summary = analyze_funnel_stages(spark, logs_file=logs_df, transactions_file=transactions_df)
        
        print("\n Funnel Summary:")
        funnel_summary.show(truncate=False)
        print("✓ Analysis completed successfully")
        
    except Exception as e:
        print(f"✗ Error in main analysis: {str(e)}")
        raise
    finally:
        print("✓ Process finished")

In [None]:
if __name__ == "__main__":
    main()

✓ Spark session initialized successfully
✓ Loaded user_logs.csv (10000 rows)
✓ Loaded user_logs.csv (10000 rows)
Loaded transactions.csv (9492 rows)
Loaded transactions.csv (9492 rows)
Number of visitors to homepage: 405 users
Number of visitors to homepage: 405 users
Number of users who completed a purchase: 300 users

 Funnel Summary:
Number of users who completed a purchase: 300 users

 Funnel Summary:


                                                                                

+--------------+----------+-------------------+
|Funnel Stage  |User Count|Conversion Rate (%)|
+--------------+----------+-------------------+
|Homepage Visit|405.0     |NULL               |
|Product View  |0.0       |0.0                |
|Add to Cart   |0.0       |0.0                |
|Purchase      |300.0     |0.0                |
+--------------+----------+-------------------+

✓ Analysis completed successfully
✓ Process finished


25/11/28 21:02:21 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:131)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:700)
	at org.apache.spark.storage.BlockManagerMasterE