# Online Evaluation - Recommendation System Telemetry

This notebook performs automated online evaluation of the recommendation system by analyzing:
- Service performance metrics (latency, error rates, success rates)
- Model performance metrics (coverage, diversity, personalization, conversion)
- Data quality and telemetry

**Usage**: Run all cells to generate a complete telemetry report.

**Data Source**: S3 events from `s3://shrek-events-dev/events/`

## Setup and Configuration

In [18]:
# AWS credentials setup
import os
os.environ["AWS_ACCESS_KEY_ID"] = "AKIATX3PICZ6L3ULH47Y"
os.environ["AWS_SECRET_ACCESS_KEY"] = "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"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"

In [19]:
# Initialize Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("online_evaluation")
    .master("local[*]")
    .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.4.0,com.amazonaws:aws-java-sdk-bundle:1.12.698"
    )
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.sql.files.ignoreCorruptFiles", "true")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

# Hadoop S3 access configuration
hconf = spark._jsc.hadoopConfiguration()
hconf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.EnvironmentVariableCredentialsProvider")
hconf.set("fs.s3a.endpoint", "s3.amazonaws.com")

print("✓ Spark session initialized")

✓ Spark session initialized


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 56931)
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python3.13/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python3.13/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
    ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python3.13/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
    ~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.fra

In [20]:
# Configuration
from datetime import datetime

# Date to analyze (modify as needed)
ANALYSIS_DATE = "2025-10-26"

# S3 paths
BASE_PATH = "s3a://shrek-events-dev/events/"
PATHS = [
    f"s3a://shrek-events-dev/events/date={ANALYSIS_DATE}/event_type=rate",
    f"s3a://shrek-events-dev/events/date={ANALYSIS_DATE}/event_type=recommendation",
    f"s3a://shrek-events-dev/events/date={ANALYSIS_DATE}/event_type=watch",
]

print(f"Analysis date: {ANALYSIS_DATE}")
print(f"Base path: {BASE_PATH}")

Analysis date: 2025-10-26
Base path: s3a://shrek-events-dev/events/


## Data Loading

In [21]:
# Load all event data
from pyspark.sql.functions import col, when, sum as spark_sum, count as spark_count, avg as spark_avg

df = spark.read.option("basePath", BASE_PATH).parquet(*PATHS)

# Separate by event type
watch_df = df.filter(df.event_type == "watch")
rate_df = df.filter(df.event_type == "rate")
rec_df = df.filter(df.event_type == "recommendation")

print("="*80)
print("DATA LOADING SUMMARY")
print("="*80)
print(f"Total records: {df.count():,}")
print(f"Watch events: {watch_df.count():,}")
print(f"Rate events: {rate_df.count():,}")
print(f"Recommendation events: {rec_df.count():,}")
print("✓ Data loaded successfully")

DATA LOADING SUMMARY


                                                                                

Total records: 30,490,595


                                                                                

Watch events: 30,079,387


                                                                                

Rate events: 159,400




Recommendation events: 251,808
✓ Data loaded successfully


                                                                                

## Service Performance Metrics

In [5]:
# Overall service metrics
print("="*80)
print("SERVICE PERFORMANCE OVERVIEW")
print("="*80)

total_requests = rec_df.count()
successful_requests = rec_df.filter(col("status_code") == 200).count()
failed_requests = total_requests - successful_requests
unique_users = rec_df.select("user_id").distinct().count()
unique_servers = rec_df.select("server").distinct().count()

print(f"\nTotal Requests: {total_requests:,}")
print(f"Successful Requests: {successful_requests:,}")
print(f"Failed Requests: {failed_requests:,}")
print(f"Success Rate: {(successful_requests / total_requests * 100):.2f}%")
print(f"Error Rate: {(failed_requests / total_requests * 100):.2f}%")
print(f"Unique Users Served: {unique_users:,}")
print(f"Active Servers: {unique_servers:,}")

SERVICE PERFORMANCE OVERVIEW





Total Requests: 225,616
Successful Requests: 224,366
Failed Requests: 1,250
Success Rate: 99.45%
Error Rate: 0.55%
Unique Users Served: 166,039
Active Servers: 1


                                                                                

In [6]:
# Status code breakdown
print("\n" + "="*80)
print("STATUS CODE DISTRIBUTION")
print("="*80)

status_breakdown = rec_df.groupBy("status_code").count().orderBy("status_code")
status_breakdown_pd = status_breakdown.toPandas()
status_breakdown_pd['percentage'] = (status_breakdown_pd['count'] / total_requests * 100).round(2)
print(status_breakdown_pd.to_string(index=False))


STATUS CODE DISTRIBUTION


                                                                                

 status_code  count  percentage
           0   1250        0.55
         200 224366       99.45


In [7]:
# Error rate over time
from pyspark.sql.functions import hour

print("\n" + "="*80)
print("ERROR RATE BY HOUR")
print("="*80)

error_by_hour = rec_df.withColumn("hour", hour("timestamp")) \
    .groupBy("hour") \
    .agg(
        spark_count("*").alias("total_requests"),
        spark_sum(when(col("status_code") != 200, 1).otherwise(0)).alias("error_count")
    ) \
    .orderBy("hour") \
    .toPandas()

error_by_hour['error_rate'] = (error_by_hour['error_count'] / error_by_hour['total_requests'] * 100).round(2)
print(error_by_hour.to_string(index=False))


ERROR RATE BY HOUR




 hour  total_requests  error_count  error_rate
    2            3855            2        0.05
    3           12347            4        0.03
    4           11485            9        0.08
    5           11041            4        0.04
    6           11126            7        0.06
    7           11751            4        0.03
    8           12842            5        0.04
    9           14287            5        0.03
   10           15855           20        0.13
   11           18346            6        0.03
   12           19997            3        0.02
   13           22128            5        0.02
   14           24242         1174        4.84
   15           25408            2        0.01
   16           10906            0        0.00


                                                                                

## Model Performance Metrics

In [8]:
# Model coverage analysis
print("="*80)
print("MODEL COVERAGE ANALYSIS")
print("="*80)

# User coverage
rec_user_set = set(rec_df.select("user_id").rdd.flatMap(lambda x: x).collect())
watch_user_set = set(watch_df.select("user_id").rdd.flatMap(lambda x: x).collect())
rate_user_set = set(rate_df.select("user_id").rdd.flatMap(lambda x: x).collect())

cold_start_users = rec_user_set - watch_user_set - rate_user_set
warm_users = rec_user_set.intersection(watch_user_set.union(rate_user_set))
highly_engaged = rec_user_set.intersection(watch_user_set).intersection(rate_user_set)

print(f"\nUsers requesting recommendations: {len(rec_user_set):,}")
print(f"Cold start users (no history): {len(cold_start_users):,} ({len(cold_start_users)/len(rec_user_set)*100:.2f}%)")
print(f"Warm users (with history): {len(warm_users):,} ({len(warm_users)/len(rec_user_set)*100:.2f}%)")
print(f"Highly engaged users (watch + rate): {len(highly_engaged):,} ({len(highly_engaged)/len(rec_user_set)*100:.2f}%)")

# Item coverage
total_movies = df.select("movie_id").distinct().count()
recommended_movies = rec_df.filter(col("recommendations").isNotNull()).select("recommendations").distinct().count()

print(f"\nTotal movies in catalog: {total_movies:,}")
print(f"Movies being recommended: {recommended_movies:,}")
print(f"Catalog coverage: {recommended_movies/total_movies*100:.2f}%")

MODEL COVERAGE ANALYSIS


                                                                                


Users requesting recommendations: 166,039
Cold start users (no history): 1,607 (0.97%)
Warm users (with history): 164,432 (99.03%)
Highly engaged users (watch + rate): 102,587 (61.78%)





Total movies in catalog: 33,865
Movies being recommended: 1,025
Catalog coverage: 3.03%


                                                                                

In [9]:
# Personalization metrics
print("\n" + "="*80)
print("PERSONALIZATION ANALYSIS")
print("="*80)

successful_recs = rec_df.filter(col("status_code") == 200)
distinct_recommendation_sets = successful_recs.select("recommendations").distinct().count()
total_recommendation_requests = successful_recs.count()

print(f"\nTotal successful requests: {total_recommendation_requests:,}")
print(f"Distinct recommendation sets: {distinct_recommendation_sets:,}")
print(f"Personalization Score: {distinct_recommendation_sets/total_recommendation_requests*100:.2f}%")

if distinct_recommendation_sets == 1:
    print("\n⚠️  CRITICAL: ALL users are receiving IDENTICAL recommendations!")
    print("   This indicates ZERO personalization - model may be using fallback strategy")
elif distinct_recommendation_sets / total_recommendation_requests < 0.1:
    print("\n⚠️  WARNING: Very low personalization detected")
    print(f"   Only {distinct_recommendation_sets} unique recommendations for {total_recommendation_requests:,} users")
else:
    print("\n✓ Recommendations show reasonable personalization")

# Show top recommendations
print("\nMost common recommendations:")
successful_recs.groupBy("recommendations").count().orderBy("count", ascending=False).limit(10).show(truncate=False)


PERSONALIZATION ANALYSIS


                                                                                


Total successful requests: 224,366
Distinct recommendation sets: 1,022
Personalization Score: 0.46%

   Only 1022 unique recommendations for 224,366 users

Most common recommendations:




+-----------------------------------------------------+------+
|recommendations                                      |count |
+-----------------------------------------------------+------+
|the+shawshank+redemption+1994                        |212132|
|the+lord+of+the+rings+the+two+towers+2002            |201   |
|the+lion+king+1994                                   |187   |
|interstellar+2014                                    |155   |
|kikis+delivery+service+1989                          |142   |
|my+neighbor+totoro+1988                              |142   |
|constantine+2005                                     |141   |
|the+lord+of+the+rings+the+fellowship+of+the+ring+2001|140   |
|the+dark+knight+2008                                 |117   |
|inception+2010                                       |114   |
+-----------------------------------------------------+------+



                                                                                

In [10]:
# Recommendation diversity
print("\n" + "="*80)
print("RECOMMENDATION DIVERSITY")
print("="*80)

movie_rec_freq = successful_recs.groupBy("recommendations").count().orderBy("count", ascending=False)
unique_movies_recommended = movie_rec_freq.count()

print(f"\nUnique movies recommended: {unique_movies_recommended:,}")
print(f"Total recommendation instances: {total_recommendation_requests:,}")

if unique_movies_recommended > 0:
    print(f"Average times per movie recommended: {total_recommendation_requests / unique_movies_recommended:.2f}")

if unique_movies_recommended == 1:
    print("\n⚠️  CRITICAL: Only ONE movie being recommended to all users!")
    print("   Model has likely fallen back to most-popular fallback strategy")


RECOMMENDATION DIVERSITY





Unique movies recommended: 1,022
Total recommendation instances: 224,366
Average times per movie recommended: 219.54


                                                                                

In [None]:
# Conversion analysis - CORRECTED to match specific recommended movies
print("\n" + "="*80)
print("RECOMMENDATION CONVERSION ANALYSIS")
print("="*80)

users_with_recs = set(rec_df.filter(col("status_code") == 200).select("user_id").distinct().rdd.flatMap(lambda x: x).collect())

print("Analyzing conversion for SPECIFIC recommended movies (not any movie)...")

# CORRECT METHOD: Join on both user_id AND movie_id (recommendations)
# This ensures we're tracking if users watched/rated the SPECIFIC movie that was recommended

# Prepare recommendation data with movie_id
rec_with_movie = rec_df.filter(col("status_code") == 200).select(
    "user_id", 
    col("recommendations").alias("movie_id"),  # recommendations field is the movie_id
    col("timestamp").alias("rec_time")
)

# Prepare watch/rate data with movie_id
watch_with_movie = watch_df.select("user_id", "movie_id", col("timestamp").alias("watch_time"))
rate_with_movie = rate_df.select("user_id", "movie_id", col("timestamp").alias("rate_time"))

# Join on BOTH user_id AND movie_id to ensure same movie
# Filter to only count actions AFTER the recommendation
rec_to_watch = rec_with_movie.join(watch_with_movie, ["user_id", "movie_id"], "inner") \
    .filter(col("watch_time") > col("rec_time"))

rec_to_rate = rec_with_movie.join(rate_with_movie, ["user_id", "movie_id"], "inner") \
    .filter(col("rate_time") > col("rec_time"))

# Count unique users who converted (watched/rated the SPECIFIC recommended movie)
converted_watch_users = rec_to_watch.select("user_id").distinct().count()
converted_rate_users = rec_to_rate.select("user_id").distinct().count()

# Also count total conversion instances (a user might watch same recommended movie multiple times)
total_watch_conversions = rec_to_watch.count()
total_rate_conversions = rec_to_rate.count()

print(f"\nUsers receiving recommendations: {len(users_with_recs):,}")
print(f"\n### SPECIFIC Movie Conversion (Recommended Movie Only) ###")
print(f"Users who watched the RECOMMENDED movie: {converted_watch_users:,}")
print(f"Users who rated the RECOMMENDED movie: {converted_rate_users:,}")
print(f"Total watch conversion instances: {total_watch_conversions:,}")
print(f"Total rate conversion instances: {total_rate_conversions:,}")

if len(users_with_recs) > 0:
    print(f"\n### Conversion Rates ###")
    print(f"Watch conversion rate (specific movie): {converted_watch_users/len(users_with_recs)*100:.2f}%")
    print(f"Rate conversion rate (specific movie): {converted_rate_users/len(users_with_recs)*100:.2f}%")
    print(f"\nNote: This measures users who watched/rated the EXACT movie that was recommended,")
    print(f"      not just any movie after receiving a recommendation.")

In [12]:
# Recommendation quality
print("\n" + "="*80)
print("RECOMMENDATION QUALITY ANALYSIS")
print("="*80)

# Calculate average ratings for movies
movie_avg_ratings = rate_df.groupBy("movie_id") \
    .agg(
        spark_avg("rating").alias("avg_rating"),
        spark_count("rating").alias("rating_count")
    )

# Join with recommended movies
rec_with_recs = rec_df.filter(col("recommendations").isNotNull())
recs_with_quality = rec_with_recs.select("recommendations").distinct() \
    .join(
        movie_avg_ratings,
        col("recommendations") == col("movie_id"),
        "left"
    )

print("\nRecommended movies with quality metrics:")
recs_with_quality.select("recommendations", "avg_rating", "rating_count").show(20, truncate=False)

# High quality movies count
high_quality_movies = movie_avg_ratings.filter("avg_rating >= 4.0 AND rating_count >= 5")
total_rated_movies = movie_avg_ratings.count()
high_quality_count = high_quality_movies.count()

print(f"\nHigh quality movies in catalog (avg rating >= 4.0, min 5 ratings): {high_quality_count:,} / {total_rated_movies:,} ({high_quality_count/total_rated_movies*100:.2f}%)")


RECOMMENDATION QUALITY ANALYSIS

Recommended movies with quality metrics:


                                                                                

+---------------------------------------------------+------------------+------------+
|recommendations                                    |avg_rating        |rating_count|
+---------------------------------------------------+------------------+------------+
|leon+the+professional+1994                         |4.010869565217392 |184         |
|gone+girl+2014                                     |4.027397260273973 |146         |
|die+hard+1988                                      |3.9591836734693877|98          |
|the+spongebob+movie+sponge+out+of+water+2015       |3.259259259259259 |27          |
|master+and+commander+the+far+side+of+the+world+2003|3.671875          |64          |
|the+black+cauldron+1985                            |3.3846153846153846|13          |
|thomas+pynchon+a+journey+into+the+mind+of+p.+2003  |3.5               |2           |
|when+we+were+kings+1996                            |4.571428571428571 |7           |
|grease+1978                                        |3




High quality movies in catalog (avg rating >= 4.0, min 5 ratings): 721 / 17,146 (4.21%)


                                                                                

## Executive Summary

In [None]:
# Generate executive summary
print("="*80)
print("EXECUTIVE SUMMARY - ONLINE EVALUATION REPORT")
print(f"Analysis Date: {ANALYSIS_DATE}")
print("="*80)

print("\n### SERVICE HEALTH ###")
print(f"Total Requests: {total_requests:,}")
print(f"Success Rate: {(successful_requests / total_requests * 100):.2f}%")
print(f"Error Rate: {(failed_requests / total_requests * 100):.2f}%")

# Service health status
if (successful_requests / total_requests) >= 0.99:
    print("Status: ✓ EXCELLENT")
elif (successful_requests / total_requests) >= 0.95:
    print("Status: ✓ GOOD")
elif (successful_requests / total_requests) >= 0.90:
    print("Status: ⚠️  DEGRADED")
else:
    print("Status: ❌ CRITICAL")

print("\n### MODEL PERFORMANCE ###")
print(f"Personalization Score: {distinct_recommendation_sets/total_recommendation_requests*100:.2f}%")
print(f"Unique Movies Recommended: {unique_movies_recommended:,}")
print(f"Catalog Coverage: {recommended_movies/total_movies*100:.2f}%")
print(f"Cold Start Users: {len(cold_start_users)/len(rec_user_set)*100:.2f}%")
print(f"Watch Conversion Rate (specific movie): {converted_watch_users/len(users_with_recs)*100:.2f}%")
print(f"Rate Conversion Rate (specific movie): {converted_rate_users/len(users_with_recs)*100:.2f}%")

# Model health status
if distinct_recommendation_sets == 1:
    print("Status: ❌ CRITICAL - No personalization")
elif distinct_recommendation_sets / total_recommendation_requests < 0.1:
    print("Status: ⚠️  WARNING - Low personalization")
elif distinct_recommendation_sets / total_recommendation_requests < 0.5:
    print("Status: ⚠️  DEGRADED - Limited personalization")
else:
    print("Status: ✓ GOOD")

print("\n### KEY INSIGHTS ###")

insights = []

if distinct_recommendation_sets == 1:
    insights.append("⚠️  All users receiving identical recommendations - investigate model loading")

if len(cold_start_users) / len(rec_user_set) > 0.5:
    insights.append(f"⚠️  High cold-start rate ({len(cold_start_users)/len(rec_user_set)*100:.1f}%) - many users have no history")

if (failed_requests / total_requests) > 0.05:
    insights.append(f"⚠️  High error rate ({failed_requests/total_requests*100:.1f}%) - investigate service issues")

# Updated to use specific movie conversion rate
if converted_watch_users / len(users_with_recs) > 0.2:
    insights.append(f"✓ Good conversion rate ({converted_watch_users/len(users_with_recs)*100:.1f}%) - users engaging with recommended movies")
elif converted_watch_users / len(users_with_recs) > 0.1:
    insights.append(f"⚠️  Moderate conversion rate ({converted_watch_users/len(users_with_recs)*100:.1f}%) - room for improvement")
else:
    insights.append(f"⚠️  Low conversion rate ({converted_watch_users/len(users_with_recs)*100:.1f}%) - recommendations may not be relevant")

if recommended_movies / total_movies < 0.01:
    insights.append(f"⚠️  Very low catalog coverage ({recommended_movies/total_movies*100:.2f}%) - most movies not recommended")

if distinct_recommendation_sets / total_recommendation_requests < 0.01:
    insights.append(f"⚠️  Very low personalization ({distinct_recommendation_sets/total_recommendation_requests*100:.2f}%) - most users get same recommendations")

if not insights:
    insights.append("✓ No major issues detected")

for insight in insights:
    print(f"  {insight}")

print("\n" + "="*80)
print("Report generated at:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print("="*80)

## Cleanup

In [None]:
# Stop Spark session (uncomment if needed)
# spark.stop()
# print("✓ Spark session stopped")