In [None]:
# ----------------------------------------------------------------------------
# CELL 1: Variables Configuration
# ----------------------------------------------------------------------------

# GCP Settings
GCP_PROJECT_ID = "agile-producer-471907-s7"
GCS_BUCKET = "data_netflix_2025"
TEMP_GCS_BUCKET = "temp_netflix_2025"

# Data Paths
STREAMING_DATA_PATH = f"gs://{GCS_BUCKET}/streaming"
USERS_CATALOG_PATH = f"gs://{GCS_BUCKET}/raw/users.csv"
MOVIES_CATALOG_PATH = f"gs://{GCS_BUCKET}/raw/movies.csv"

# BigQuery Settings
BQ_DATASET = "netflix_streaming"
BQ_OUTPUT_TABLE = "engagement_health_realtime" 
BQ_TABLE_FULL = f"{GCP_PROJECT_ID}.{BQ_DATASET}.{BQ_OUTPUT_TABLE}"

# Spark Settings
SPARK_MASTER = "spark://spark-master:7077"
DRIVER_MEMORY = "2g"
EXECUTOR_MEMORY = "2g"
EXECUTOR_CORES = "1"

# Streaming Settings
MAX_FILES_PER_TRIGGER = 1
WINDOW_DURATION = "5 minutes"
TRIGGER_INTERVAL = "10 seconds"
WATERMARK_DELAY = "10 minutes"

# Display Settings
DISPLAY_ITERATIONS = 30
DISPLAY_INTERVAL_SECONDS = 10
TOP_N_RESULTS = 10

LOW_ENGAGEMENT_THRESHOLD = 20.0 

print("Configuration loaded successfully")
print(f"Output destination: {BQ_TABLE_FULL}")
print(f"Streaming source: {STREAMING_DATA_PATH}")

In [None]:
# ----------------------------------------------------------------------------
# CELL 2: SPARK SESSION SETUP
# ----------------------------------------------------------------------------

from pyspark import SparkConf
from pyspark.sql import SparkSession

print("=" * 80)
print("INITIALIZING SPARK SESSION")
print("=" * 80)

# Configure Spark
sparkConf = SparkConf()
sparkConf.setMaster(SPARK_MASTER)
sparkConf.setAppName("NetflixStreamingPipeline")
sparkConf.set("spark.driver.memory", DRIVER_MEMORY)
sparkConf.set("spark.executor.memory", EXECUTOR_MEMORY)
sparkConf.set("spark.executor.cores", EXECUTOR_CORES)

# Create Spark session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Configure GCS access
spark.conf.set('temporaryGcsBucket', TEMP_GCS_BUCKET)
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

print(f"Spark session created: {spark.version}")
print(f"Application ID: {spark.sparkContext.applicationId}")
print("GCS access configured")
print("=" * 80)

In [None]:
# ----------------------------------------------------------------------------
# CELL 3: Static Data Loading
# ----------------------------------------------------------------------------


from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType
from pyspark.sql.functions import col, when, coalesce, lit

print("=" * 80)
print("LOADING & CLEANING STATIC DATA")
print("=" * 80)

# Watch events schema (same as before)
watch_events_schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("movie_id", StringType(), True),
    StructField("watch_date", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("watch_duration_minutes", DoubleType(), True),
    StructField("progress_percentage", DoubleType(), True),
    StructField("action", StringType(), True),
    StructField("quality", StringType(), True),
    StructField("location_country", StringType(), True),
    StructField("is_download", BooleanType(), True),
    StructField("user_rating", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

# Load USERS catalog
print(f"\n1. Loading users from: {USERS_CATALOG_PATH}")
users_static = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(USERS_CATALOG_PATH)

# Clean users data
users_static = users_static.select(
    "user_id",
    "country",
    "subscription_plan",
    coalesce(col("monthly_spend"), lit(0.0)).alias("monthly_spend"),  # Handle nulls
    coalesce(col("age"), lit(0)).cast("int").alias("age"),
    coalesce(col("gender"), lit("Unknown")).alias("gender"),
    "is_active"
).cache()

print(f"Users loaded: {users_static.count():,}")
print("Sample users:")
users_static.show(3, truncate=False)

# Load MOVIES catalog
print(f"\n2. Loading movies from: {MOVIES_CATALOG_PATH}")
movies_static = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(MOVIES_CATALOG_PATH)

# Clean movies data
movies_static = movies_static.select(
    "movie_id",
    "title",
    "content_type",
    "genre_primary",
    coalesce(col("imdb_rating"), lit(0.0)).alias("imdb_rating"),  # Handle nulls
    coalesce(col("is_netflix_original"), lit(False)).alias("is_netflix_original"),
    "release_year",
    "rating"
).cache()

print(f"Movies loaded: {movies_static.count():,}")
print("Sample movies:")
movies_static.show(3, truncate=False)

print("=" * 80)

In [None]:
# ----------------------------------------------------------------------------
# CELL 4: Streaming Query with Churn Detection
# ----------------------------------------------------------------------------

from pyspark.sql.functions import (
    window, col, count, avg, sum as spark_sum,
    approx_count_distinct, to_timestamp, desc,
    when, isnan,lit, round as spark_round, max as spark_max
)

print("=" * 80)
print("BUILDING ENGAGEMENT & CHURN RISK PIPELINE")
print("=" * 80)

# Read streaming data
print(f"Setting up streaming source: {STREAMING_DATA_PATH}")
stream_df = spark.readStream \
    .schema(watch_events_schema) \
    .option("maxFilesPerTrigger", MAX_FILES_PER_TRIGGER) \
    .json(STREAMING_DATA_PATH)

# Convert timestamp and clean nulls
stream_df = stream_df \
    .withColumn("event_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("progress_percentage", 
        when(isnan(col("progress_percentage")) | col("progress_percentage").isNull(), lit(0.0))
        .otherwise(col("progress_percentage"))
    ) \
    .withColumn("watch_duration_minutes",
        when(isnan(col("watch_duration_minutes")) | col("watch_duration_minutes").isNull(), lit(0.0))
        .otherwise(col("watch_duration_minutes"))
    )
    
# JOIN 1: Enrich with USER data
enriched_stream = stream_df.join(users_static, "user_id", "left")
print("âœ“ Joined with users (country, subscription, monthly_spend)")

# JOIN 2: Enrich with MOVIE data
enriched_stream = enriched_stream.join(movies_static, "movie_id", "left")
print("âœ“ Joined with movies (genre, imdb_rating, is_netflix_original)")

# Add CHURN SIGNAL FLAGS
enriched_stream = enriched_stream \
    .withColumn("is_low_engagement", 
                when(col("progress_percentage") < LOW_ENGAGEMENT_THRESHOLD, 1).otherwise(0)) \
    .withColumn("is_quality_downgrade",
                when((col("quality") == "SD") | (col("quality") == "HD"), 1).otherwise(0)) \
    .withColumn("is_abandoned",
                when((col("action") == "stopped") & (col("progress_percentage") < 50), 1).otherwise(0)) \
    .withColumn("is_completed",
                when(col("action") == "completed", 1).otherwise(0))

print("âœ“ Churn signal flags added (low_engagement, quality_downgrade, abandoned, completed)")

# Add watermark
enriched_stream = enriched_stream.withWatermark("event_time", WATERMARK_DELAY)
print(f"âœ“ Watermark configured: {WATERMARK_DELAY}")

# AGGREGATION: Multi-dimensional engagement health
print(f"\nBuilding aggregation: {WINDOW_DURATION} windows")
engagement_health = enriched_stream \
    .groupBy(
        window(col("event_time"), WINDOW_DURATION),
        "country",
        "genre_primary",
        "device_type",
        "quality",
        "is_netflix_original",
        "content_type"
    ) \
    .agg(
        # Core engagement metrics
        count("*").alias("total_sessions"),
        approx_count_distinct("user_id").alias("unique_users"),
        spark_sum("watch_duration_minutes").alias("total_watch_minutes"),
        avg("progress_percentage").alias("avg_completion_pct"),
        
        # Content quality signals
        avg("imdb_rating").alias("avg_content_quality"),
        
        # Churn risk signals
        spark_sum("is_low_engagement").alias("low_engagement_count"),
        spark_sum("is_abandoned").alias("abandonment_count"),
        spark_sum("is_quality_downgrade").alias("quality_downgrade_count"),
        spark_sum("is_completed").alias("completion_count"),
        
        # Business metrics
        spark_sum("monthly_spend").alias("total_revenue_impact"),
        avg("monthly_spend").alias("avg_subscriber_value")
    ) \
    .select(
        # Window
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        
        # Dimensions
        "country",
        "genre_primary",
        "device_type",
        "quality",
        "is_netflix_original",
        "content_type",
        
        # Core metrics
        "total_sessions",
        "unique_users",
        spark_round("total_watch_minutes", 1).alias("total_watch_minutes"),
        spark_round("avg_completion_pct", 2).alias("avg_completion_pct"),
        spark_round("avg_content_quality", 2).alias("avg_content_quality"),
        
        # Churn signals
        "low_engagement_count",
        "abandonment_count",
        "quality_downgrade_count",
        "completion_count",
        
        # Calculated KPIs
        spark_round((col("low_engagement_count") / col("total_sessions") * 100), 2).alias("low_engagement_rate"),
        spark_round((col("abandonment_count") / col("total_sessions") * 100), 2).alias("abandonment_rate"),
        spark_round((col("completion_count") / col("total_sessions") * 100), 2).alias("completion_rate"),
        
        # Business metrics
        spark_round("total_revenue_impact", 2).alias("revenue_impact"),
        spark_round("avg_subscriber_value", 2).alias("avg_subscriber_value"),
        
        # Alert level (HIGH if >60% low engagement)
        when(col("low_engagement_count") / col("total_sessions") > 0.6, "HIGH")
            .when(col("low_engagement_count") / col("total_sessions") > 0.4, "MEDIUM")
            .otherwise("LOW").alias("alert_level")
    ) \
    .orderBy(desc("low_engagement_rate"))  # Order by worst engagement first

print("âœ“ Aggregation complete: engagement health with churn signals")
print("\nMetrics included:")
print("  - Engagement: sessions, users, watch time, completion rate")
print("  - Churn Signals: low engagement rate, abandonment rate")
print("  - Quality: content rating, quality downgrades")
print("  - Business: revenue impact, subscriber value")
print("  - Alerts: automated alert levels (HIGH/MEDIUM/LOW)")

# Define BigQuery write function
def write_to_bigquery(batch_df, batch_id):
    """Write engagement health metrics to BigQuery"""
    print(f"Writing batch {batch_id} to BigQuery...")
    if batch_df.count() > 0:
        batch_df.write.format('bigquery') \
            .option('table', BQ_TABLE_FULL) \
            .mode("append")  \
            .save()
        print(f"âœ“ Batch {batch_id} written ({batch_df.count()} rows)")
    else:
        print(f"âŠ˜ Batch {batch_id} is empty")

# Start streaming queries
print("\n" + "=" * 80)
print("STARTING STREAMING QUERIES")
print("=" * 80)

# Memory sink (for monitoring)
query_memory = engagement_health \
    .writeStream \
    .queryName("engagement_health_memory") \
    .format("memory") \
    .outputMode("complete") \
    .start()

print(f"âœ“ Memory sink started: {query_memory.id}")

# BigQuery sink
query_bigquery = engagement_health \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime=TRIGGER_INTERVAL) \
    .foreachBatch(write_to_bigquery) \
    .start()

print(f"âœ“ BigQuery sink started: {query_bigquery.id}")
print(f"âœ“ Output table: {BQ_TABLE_FULL}")
print("=" * 80)


In [None]:
# ----------------------------------------------------------------------------
# CELL 5: Live Monitoring Display
# ----------------------------------------------------------------------------

from time import sleep
from datetime import datetime

print("=" * 80)
print("ENGAGEMENT & CHURN RISK DASHBOARD")
print("=" * 80)
print(f"Update interval: {DISPLAY_INTERVAL_SECONDS} seconds")
print(f"Showing HIGH RISK segments (sorted by low_engagement_rate)")
print("=" * 80)

try:
    for iteration in range(DISPLAY_ITERATIONS):
        print(f"\n{'=' * 80}")
        print(f"UPDATE #{iteration + 1} | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"{'=' * 80}")
        
        # Query top risk segments
        result_df = spark.sql(f"""
            SELECT 
                window_start,
                country,
                genre_primary,
                device_type,
                quality,
                total_sessions,
                unique_users,
                avg_completion_pct,
                low_engagement_rate,
                abandonment_rate,
                alert_level
            FROM engagement_health_memory
            WHERE total_sessions >= 1
            ORDER BY low_engagement_rate DESC, total_sessions DESC
            LIMIT {TOP_N_RESULTS}
        """)
        
        print("\nðŸš¨ HIGHEST CHURN RISK SEGMENTS:")
        result_df.show(TOP_N_RESULTS, truncate=False)
        
        # Summary stats
        summary = spark.sql("""
            SELECT 
                COUNT(*) as total_segments,
                SUM(total_sessions) as total_sessions,
                AVG(low_engagement_rate) as avg_churn_risk,
                SUM(CASE WHEN alert_level = 'HIGH' THEN 1 ELSE 0 END) as high_risk_segments
            FROM engagement_health_memory
        """)
        
        print("\nðŸ“Š OVERALL HEALTH:")
        summary.show(truncate=False)
        
        print(f"\nQuery Status: Memory={query_memory.isActive}, BigQuery={query_bigquery.isActive}")
        
        sleep(DISPLAY_INTERVAL_SECONDS)
        
except KeyboardInterrupt:
    print("\nðŸ›‘ Stopping pipelines...")
    query_memory.stop()
    query_bigquery.stop()
    print("âœ“ Stopped")

In [None]:
# ----------------------------------------------------------------------------
# CELL 6: CLEANUP - STOP SPARK SESSION
# ----------------------------------------------------------------------------

print("=" * 80)
print("CLEANUP: STOPPING SPARK SESSION")
print("=" * 80)

spark.stop()

print("Spark session stopped successfully")
print("\nPipeline Summary:")
print(f"  - Processed streaming events from: {STREAMING_DATA_PATH}")
print(f"  - Output written to: {BQ_TABLE_FULL}")
print(f"  - Ready for Looker Studio dashboard creation")
print("=" * 80)