In [11]:
import os
import findspark

# ============================================================================
# 0. ENVIRONMENT SETUP
# ============================================================================

java_home = r"C:\Program Files\Eclipse Adoptium\jdk-11.0.28.6-hotspot"
os.environ['JAVA_HOME'] = java_home
os.environ['PATH'] = os.path.join(java_home, 'bin') + os.pathsep + os.environ.get('PATH', '')
findspark.init()

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, count, avg, sum, min, max, stddev,
    when, lit, coalesce, regexp_replace, round as spark_round,
    year, month, dayofmonth, date_trunc, to_date, from_unixtime,
    datediff, lag, lead, row_number, rank, dense_rank,
    concat_ws, split, explode, lower, trim, length,
    countDistinct, approx_count_distinct, first, last
)
from pyspark.sql.types import DoubleType, IntegerType, StringType
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

print("✓ Environment configured")

✓ Environment configured


In [12]:
# ============================================================================
# 1. INITIALIZE SPARK
# ============================================================================

spark = SparkSession.builder \
    .appName("EcommerceFeaturePipeline") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()

print("✓ Spark session created")


✓ Spark session created


In [13]:
# ============================================================================
# 2. LOAD DATA (from previous notebooks)
# ============================================================================

print("\n" + "="*80)
print("LOADING DATA")
print("="*80)

# Load reviews
reviews_path = r"C:\Users\shafe\OneDrive\Desktop\ecommerce-intelligence\data\raw\electronics_sample_2M.jsonl.gz"
df_reviews = spark.read.json(reviews_path)

# Load metadata with manual schema
from pyspark.sql.types import StructType, StructField, ArrayType

schema = StructType([
    StructField("parent_asin", StringType(), True),
    StructField("main_category", StringType(), True),
    StructField("title", StringType(), True),
    StructField("average_rating", DoubleType(), True),
    StructField("rating_number", IntegerType(), True),
    StructField("features", ArrayType(StringType()), True),
    StructField("description", ArrayType(StringType()), True),
    StructField("price", StringType(), True),
    StructField("store", StringType(), True),
    StructField("categories", ArrayType(StringType()), True),
    StructField("details", StringType(), True),
    StructField("bought_together", ArrayType(StringType()), True),
    StructField("images", ArrayType(StringType()), True),
    StructField("videos", ArrayType(StringType()), True),
    StructField("subtitle", StringType(), True),
    StructField("author", StringType(), True),
])

metadata_path = r"C:\Users\shafe\OneDrive\Desktop\ecommerce-intelligence\data\raw\meta_Electronics.jsonl.gz"
df_meta = spark.read.schema(schema).json(metadata_path)

# Filter metadata to sample products
sample_products = df_reviews.select("parent_asin").distinct()
df_meta_sample = df_meta.join(sample_products, on="parent_asin", how="inner")

print(f"✓ Reviews loaded: {df_reviews.count():,}")
print(f"✓ Metadata loaded: {df_meta_sample.count():,} products")


LOADING DATA
✓ Reviews loaded: 2,051,569
✓ Metadata loaded: 47 products


In [14]:
# ============================================================================
# 3. DATA PREPARATION
# ============================================================================

print("\n" + "="*80)
print("DATA PREPARATION")
print("="*80)

# Rename title columns to avoid collision
df_meta_renamed = df_meta_sample.select(
    col("parent_asin"),
    col("title").alias("product_title"),
    col("main_category"),
    col("store"),
    col("price"),
    col("features"),
    col("description"),
    col("average_rating").alias("product_avg_rating"),
    col("rating_number").alias("product_total_ratings")
)

# Join reviews with metadata
df_enriched = df_reviews.join(df_meta_renamed, on="parent_asin", how="left") \
    .withColumnRenamed("title", "review_title")

# Add temporal columns
df_enriched = df_enriched.withColumn(
    "review_date", 
    to_date(from_unixtime(col("timestamp") / 1000))
).withColumn(
    "review_year",
    year("review_date")
).withColumn(
    "review_month",
    month("review_date")
).withColumn(
    "review_year_month",
    date_trunc("month", "review_date")
)

# Clean and parse price (convert "$29.99" to 29.99)
df_enriched = df_enriched.withColumn(
    "price_numeric",
    regexp_replace(col("price"), "[$,]", "").cast(DoubleType())
)

# Create price tiers
df_enriched = df_enriched.withColumn(
    "price_tier",
    when(col("price_numeric").isNull(), "Unknown")
    .when(col("price_numeric") < 20, "Budget (<$20)")
    .when(col("price_numeric") < 50, "Mid ($20-50)")
    .when(col("price_numeric") < 100, "Premium ($50-100)")
    .otherwise("Luxury ($100+)")
)

# Sentiment categories from ratings
df_enriched = df_enriched.withColumn(
    "sentiment",
    when(col("rating") >= 4, "Positive")
    .when(col("rating") <= 2, "Negative")
    .otherwise("Neutral")
)

# Text quality indicators
df_enriched = df_enriched.withColumn(
    "review_length",
    length(col("text"))
).withColumn(
    "is_detailed_review",
    when(col("review_length") >= 200, 1).otherwise(0)
)

# Cache for reuse
df_enriched.cache()

print(f"✓ Enriched dataset prepared: {df_enriched.count():,} reviews")
print(f"  Columns: {len(df_enriched.columns)}")


DATA PREPARATION
✓ Enriched dataset prepared: 2,051,569 reviews
  Columns: 27


In [15]:
# ============================================================================
# 4. FEATURE 1: SENTIMENT TRENDS OVER TIME
# ============================================================================

print("\n" + "="*80)
print("FEATURE 1: SENTIMENT TRENDS OVER TIME")
print("="*80)

# KEY PATTERN: GroupBy with time dimension + aggregations
# Business Logic: Track how sentiment changes month-by-month for each brand

sentiment_trends = df_enriched.groupBy(
    "store", 
    "review_year_month"
).agg(
    count("*").alias("review_count"),
    avg("rating").alias("avg_rating"),
    count(when(col("sentiment") == "Positive", 1)).alias("positive_count"),
    count(when(col("sentiment") == "Negative", 1)).alias("negative_count"),
    count(when(col("sentiment") == "Neutral", 1)).alias("neutral_count")
).withColumn(
    "positive_rate",
    spark_round((col("positive_count") / col("review_count") * 100), 2)
).withColumn(
    "negative_rate",
    spark_round((col("negative_count") / col("review_count") * 100), 2)
).orderBy("store", "review_year_month")

# Calculate month-over-month changes using Window functions
# KEY PATTERN: Window function to compare current row with previous row
windowSpec = Window.partitionBy("store").orderBy("review_year_month")

sentiment_trends = sentiment_trends.withColumn(
    "prev_avg_rating",
    lag("avg_rating", 1).over(windowSpec)
).withColumn(
    "rating_change",
    spark_round(col("avg_rating") - col("prev_avg_rating"), 3)
).withColumn(
    "trend_direction",
    when(col("rating_change") > 0.1, "Improving")
    .when(col("rating_change") < -0.1, "Declining")
    .otherwise("Stable")
)

print(f"✓ Sentiment trends calculated")
print(f"  Monthly data points: {sentiment_trends.count():,}")

# Show sample trends
print("\nSample: Amazon sentiment trends (last 12 months)")
sentiment_trends.filter(col("store") == "Amazon") \
    .select("review_year_month", "review_count", "avg_rating", 
            "positive_rate", "rating_change", "trend_direction") \
    .orderBy(col("review_year_month").desc()) \
    .show(12, truncate=False)



FEATURE 1: SENTIMENT TRENDS OVER TIME
✓ Sentiment trends calculated
  Monthly data points: 1,883

Sample: Amazon sentiment trends (last 12 months)
+-------------------+------------+------------------+-------------+-------------+---------------+
|review_year_month  |review_count|avg_rating        |positive_rate|rating_change|trend_direction|
+-------------------+------------+------------------+-------------+-------------+---------------+
|2023-09-01 00:00:00|29          |4.206896551724138 |79.31        |-0.061       |Stable         |
|2023-08-01 00:00:00|287         |4.2682926829268295|79.44        |0.164        |Improving      |
|2023-07-01 00:00:00|366         |4.103825136612022 |76.5         |0.239        |Improving      |
|2023-06-01 00:00:00|451         |3.8647450110864745|70.07        |0.106        |Improving      |
|2023-05-01 00:00:00|701         |3.7589158345221114|65.48        |-0.062       |Stable         |
|2023-04-01 00:00:00|1274        |3.8210361067503924|67.82        |-

In [16]:
# ============================================================================
# 5. FEATURE 2: THEME EXTRACTION (Text Analysis Prep)
# ============================================================================

print("\n" + "="*80)
print("FEATURE 2: THEME EXTRACTION - TEXT PREPARATION")
print("="*80)

# For now: Prepare aggregated text by product and sentiment for AWS Bedrock
# In production: This data will be sent to AWS Bedrock for theme extraction

# KEY PATTERN: Collect sample reviews by product and sentiment for LLM analysis
theme_prep = df_enriched.groupBy(
    "parent_asin",
    "product_title", 
    "store",
    "sentiment"
).agg(
    count("*").alias("review_count"),
    avg("rating").alias("avg_rating")
).filter(
    col("review_count") >= 10  # Only analyze products with sufficient reviews
).orderBy(
    col("review_count").desc()
)

print(f"✓ Theme extraction preparation")
print(f"  Product-sentiment combinations: {theme_prep.count():,}")

# Show which products have enough reviews for theme analysis
print("\nTop products by review volume (ready for theme extraction):")
theme_prep.filter(col("sentiment") == "Negative") \
    .select("product_title", "store", "review_count", "avg_rating") \
    .orderBy(col("review_count").desc()) \
    .show(10, truncate=60)

# Sample negative reviews for one product (for AWS Bedrock later)
print("\nSample negative reviews for Echo Dot (input for theme extraction):")
df_enriched.filter(
    (col("product_title").contains("Echo Dot")) & 
    (col("sentiment") == "Negative")
).select("review_title", "text") \
    .show(3, truncate=100, vertical=True)



FEATURE 2: THEME EXTRACTION - TEXT PREPARATION
✓ Theme extraction preparation
  Product-sentiment combinations: 141

Top products by review volume (ready for theme extraction):
+------------------------------------------------------------+------+------------+------------------+
|                                               product_title| store|review_count|        avg_rating|
+------------------------------------------------------------+------+------------+------------------+
|Fire TV Stick with Alexa Voice Remote, streaming media pl...|Amazon|       18820|1.3301806588735388|
|Fire Tablet with Alexa, 7" Display, 16 GB, Blue - with Sp...|Amazon|       14558|1.3863168017584833|
|Echo Dot (2nd Generation) - Smart speaker with Alexa - White|Amazon|       12119|1.3945870121297137|
|Fire TV Stick 4K streaming device with Alexa Voice Remote...|Amazon|       12020|1.3083194675540766|
|TOZO T10 Bluetooth 5.3 Wireless Earbuds with Wireless Cha...|  TOZO|        9836|1.4052460349735665|
|Fire 

In [17]:
# ============================================================================
# 6. FEATURE 3: COMPETITIVE PRODUCT ANALYSIS
# ============================================================================

print("\n" + "="*80)
print("FEATURE 3: COMPETITIVE PRODUCT ANALYSIS")
print("="*80)

# KEY PATTERN: Multiple aggregations to create comprehensive product profiles

competitive_analysis = df_enriched.groupBy(
    "parent_asin",
    "product_title",
    "store",
    "main_category",
    "price_tier"
).agg(
    # Volume metrics
    count("*").alias("total_reviews"),
    countDistinct("user_id").alias("unique_reviewers"),
    
    # Rating metrics
    avg("rating").alias("avg_rating"),
    stddev("rating").alias("rating_stddev"),
    
    # Sentiment distribution
    count(when(col("sentiment") == "Positive", 1)).alias("positive_reviews"),
    count(when(col("sentiment") == "Negative", 1)).alias("negative_reviews"),
    count(when(col("sentiment") == "Neutral", 1)).alias("neutral_reviews"),
    
    # Quality indicators
    avg("review_length").alias("avg_review_length"),
    sum("is_detailed_review").alias("detailed_reviews"),
    count(when(col("verified_purchase") == True, 1)).alias("verified_purchases"),
    avg("helpful_vote").alias("avg_helpful_votes"),
    
    # Date range
    min("review_date").alias("first_review_date"),
    max("review_date").alias("last_review_date")
).withColumn(
    "satisfaction_rate",
    spark_round((col("positive_reviews") / col("total_reviews") * 100), 2)
).withColumn(
    "negative_rate", 
    spark_round((col("negative_reviews") / col("total_reviews") * 100), 2)
).withColumn(
    "verified_rate",
    spark_round((col("verified_purchases") / col("total_reviews") * 100), 2)
).withColumn(
    "review_span_days",
    datediff(col("last_review_date"), col("first_review_date"))
).orderBy(col("total_reviews").desc())

print(f"✓ Competitive analysis calculated")
print(f"  Products analyzed: {competitive_analysis.count():,}")

# Show top performers by category
print("\nTop Products by Satisfaction Rate (>1000 reviews):")
competitive_analysis.filter(col("total_reviews") >= 1000) \
    .select("product_title", "store", "total_reviews", "avg_rating", 
            "satisfaction_rate", "negative_rate") \
    .orderBy(col("satisfaction_rate").desc()) \
    .show(10, truncate=60)

# Compare brands within same category
print("\nBrand Comparison: Audio Products")
df_enriched.filter(col("main_category").contains("Audio")) \
    .groupBy("store") \
    .agg(
        count("*").alias("reviews"),
        avg("rating").alias("avg_rating"),
        spark_round(
            (count(when(col("sentiment") == "Positive", 1)) / count("*") * 100), 
            2
        ).alias("satisfaction_rate")
    ) \
    .orderBy(col("satisfaction_rate").desc()) \
    .show(10, truncate=False)


FEATURE 3: COMPETITIVE PRODUCT ANALYSIS
✓ Competitive analysis calculated
  Products analyzed: 47

Top Products by Satisfaction Rate (>1000 reviews):
+------------------------------------------------------------+-------------+-------------+------------------+-----------------+-------------+
|                                               product_title|        store|total_reviews|        avg_rating|satisfaction_rate|negative_rate|
+------------------------------------------------------------+-------------+-------------+------------------+-----------------+-------------+
|Stylus Pen for iPad 9th&10th Generation-2X Fast Charge Ac...|      JAMJAKE|        20030| 4.664003994008986|            94.48|         3.95|
|Echo Dot (3rd Gen, 2018 release) - Smart speaker with Ale...|       Amazon|        95397| 4.626235625858255|            90.81|         5.57|
|VideoSecu ML531BE2 TV Wall Mount kit with Free Magnetic S...|    VideoSecu|        21325| 4.583399765533412|            90.33|         5.5

In [18]:
# ============================================================================
# 7. FEATURE 4: PRODUCT SUCCESS PREDICTION (Feature Engineering)
# ============================================================================

print("\n" + "="*80)
print("FEATURE 4: PRODUCT SUCCESS PREDICTION FEATURES")
print("="*80)

# KEY PATTERN: Create features that predict product success
# Success definition: High satisfaction + consistent ratings + good velocity

# Calculate additional success indicators
prediction_features = competitive_analysis.withColumn(
    # Success score (composite metric)
    "success_score",
    spark_round(
        (col("satisfaction_rate") * 0.4) +
        ((100 - col("rating_stddev") * 20) * 0.3) +  # Consistency bonus
        ((col("verified_rate")) * 0.3),  # Trust factor
        2
    )
).withColumn(
    # Growth potential based on review velocity
    "review_velocity",
    spark_round(col("total_reviews") / (col("review_span_days") + 1), 2)
).withColumn(
    # Engagement score
    "engagement_score",
    spark_round(
        (col("avg_review_length") / 100) * (col("avg_helpful_votes") + 1),
        2
    )
).withColumn(
    # Success category
    "success_category",
    when(
        (col("satisfaction_rate") >= 85) & 
        (col("total_reviews") >= 5000), 
        "Top Performer"
    ).when(
        (col("satisfaction_rate") >= 80) & 
        (col("total_reviews") >= 1000), 
        "Strong Performer"
    ).when(
        col("satisfaction_rate") >= 75, 
        "Average Performer"
    ).otherwise("Underperformer")
)

print(f"✓ Prediction features created")

# Show success distribution
print("\nProduct Success Distribution:")
prediction_features.groupBy("success_category") \
    .agg(count("*").alias("product_count")) \
    .orderBy(col("product_count").desc()) \
    .show(truncate=False)

# Top and bottom performers
print("\nTop 5 Performers:")
prediction_features.select(
    "product_title", "store", "success_score", "satisfaction_rate", 
    "review_velocity", "success_category"
).orderBy(col("success_score").desc()) \
    .show(5, truncate=60)

print("\nBottom 5 Performers:")
prediction_features.select(
    "product_title", "store", "success_score", "satisfaction_rate",
    "negative_rate", "success_category"
).orderBy(col("success_score").asc()) \
    .show(5, truncate=60)



FEATURE 4: PRODUCT SUCCESS PREDICTION FEATURES
✓ Prediction features created

Product Success Distribution:
+-----------------+-------------+
|success_category |product_count|
+-----------------+-------------+
|Strong Performer |17           |
|Top Performer    |15           |
|Underperformer   |9            |
|Average Performer|6            |
+-----------------+-------------+


Top 5 Performers:
+------------------------------------------------------------+-------------+-------------+-----------------+---------------+----------------+
|                                               product_title|        store|success_score|satisfaction_rate|review_velocity|success_category|
+------------------------------------------------------------+-------------+-------------+-----------------+---------------+----------------+
|Stylus Pen for iPad 9th&10th Generation-2X Fast Charge Ac...|      JAMJAKE|        92.35|            94.48|          15.46|   Top Performer|
|Echo Dot (3rd Gen, 2018 releas

In [19]:
# ============================================================================
# 8. FEATURE 5: REVIEW VELOCITY ANALYSIS
# ============================================================================

print("\n" + "="*80)
print("FEATURE 5: REVIEW VELOCITY ANALYSIS")
print("="*80)

# KEY PATTERN: Time-series analysis to detect momentum changes

# Monthly velocity by product
velocity_analysis = df_enriched.groupBy(
    "parent_asin",
    "product_title",
    "store",
    "review_year_month"
).agg(
    count("*").alias("monthly_reviews"),
    avg("rating").alias("monthly_avg_rating")
).orderBy("parent_asin", "review_year_month")

# Calculate velocity changes with window functions
windowSpec = Window.partitionBy("parent_asin").orderBy("review_year_month")

velocity_analysis = velocity_analysis.withColumn(
    "prev_month_reviews",
    lag("monthly_reviews", 1).over(windowSpec)
).withColumn(
    "velocity_change",
    col("monthly_reviews") - col("prev_month_reviews")
).withColumn(
    "velocity_change_pct",
    spark_round(
        ((col("monthly_reviews") - col("prev_month_reviews")) / 
         (col("prev_month_reviews") + 1) * 100),
        2
    )
).withColumn(
    "momentum",
    when(col("velocity_change_pct") > 50, "Surging")
    .when(col("velocity_change_pct") > 20, "Growing")
    .when(col("velocity_change_pct") > -20, "Stable")
    .when(col("velocity_change_pct") > -50, "Declining")
    .otherwise("Collapsing")
)

print(f"✓ Velocity analysis calculated")
print(f"  Product-month data points: {velocity_analysis.count():,}")

# Find products with interesting velocity patterns
print("\nProducts with SURGING momentum (recent months):")
velocity_analysis.filter(col("momentum") == "Surging") \
    .select("product_title", "store", "review_year_month", 
            "monthly_reviews", "velocity_change_pct") \
    .orderBy(col("velocity_change_pct").desc()) \
    .show(10, truncate=60)

print("\nProducts with DECLINING momentum:")
velocity_analysis.filter(col("momentum").isin(["Declining", "Collapsing"])) \
    .select("product_title", "store", "review_year_month",
            "monthly_reviews", "velocity_change_pct") \
    .orderBy(col("velocity_change_pct").asc()) \
    .show(10, truncate=60)



FEATURE 5: REVIEW VELOCITY ANALYSIS
✓ Velocity analysis calculated
  Product-month data points: 3,518

Products with SURGING momentum (recent months):
+------------------------------------------------------------+--------------------+-------------------+---------------+-------------------+
|                                               product_title|               store|  review_year_month|monthly_reviews|velocity_change_pct|
+------------------------------------------------------------+--------------------+-------------------+---------------+-------------------+
|Fire Tablet with Alexa, 7" Display, 16 GB, Blue - with Sp...|              Amazon|2015-10-01 00:00:00|           6536|           72533.33|
|Fire TV Stick Lite, free and live TV, Alexa Voice Remote ...|              Amazon|2020-10-01 00:00:00|            226|            7466.67|
|Fire TV Stick (3rd Gen) with Alexa Voice Remote (includes...|              Amazon|2021-05-01 00:00:00|            801|            5246.67|
|       

In [21]:
# ============================================================================
# 9. SAVE TRANSFORMED FEATURES
# ============================================================================

print("\n" + "="*80)
print("SAVING TRANSFORMED FEATURES")
print("="*80)

# Cache all feature datasets
sentiment_trends.cache()
competitive_analysis.cache()
prediction_features.cache()
velocity_analysis.cache()

output_path = r"C:\Users\shafe\OneDrive\Desktop\ecommerce-intelligence\data\processed"
os.makedirs(output_path, exist_ok=True)

# Note: Skipping local Parquet save due to Windows/Hadoop compatibility issues
# In production, we'll write directly to S3 from AWS EMR
print("\n✓ Feature datasets cached in memory")
print("  - sentiment_trends")
print("  - competitive_analysis") 
print("  - prediction_features")
print("  - velocity_analysis")

print("\nNote: Local Parquet save skipped (Windows/Hadoop limitation)")
print("Production deployment will write directly to S3 from EMR")

# Optional: Save summary stats to CSV for quick reference
print("\nSaving summary statistics to CSV...")

try:
    sentiment_trends.limit(1000).toPandas().to_csv(
        os.path.join(output_path, "sentiment_trends_sample.csv"), index=False
    )
    print("  ✓ sentiment_trends_sample.csv")
    
    competitive_analysis.toPandas().to_csv(
        os.path.join(output_path, "competitive_analysis.csv"), index=False
    )
    print("  ✓ competitive_analysis.csv")
    
    prediction_features.toPandas().to_csv(
        os.path.join(output_path, "prediction_features.csv"), index=False  
    )
    print("  ✓ prediction_features.csv")
    
    velocity_analysis.limit(1000).toPandas().to_csv(
        os.path.join(output_path, "velocity_analysis_sample.csv"), index=False
    )
    print("  ✓ velocity_analysis_sample.csv")
    
    print("\n✓ Summary CSV files saved successfully")
except Exception as e:
    print(f"\n⚠ CSV save error: {e}")
    print("  Continuing without local save...")


SAVING TRANSFORMED FEATURES

✓ Feature datasets cached in memory
  - sentiment_trends
  - competitive_analysis
  - prediction_features
  - velocity_analysis

Note: Local Parquet save skipped (Windows/Hadoop limitation)
Production deployment will write directly to S3 from EMR

Saving summary statistics to CSV...
  ✓ sentiment_trends_sample.csv
  ✓ competitive_analysis.csv
  ✓ prediction_features.csv
  ✓ velocity_analysis_sample.csv

✓ Summary CSV files saved successfully
