In [None]:

# Text preprocessing functions
def preprocess_reddit_comments(df):
    """Clean and preprocess Reddit comment text"""
    
    # Convert Unix timestamp to readable date
    df = df.withColumn("created_date", 
                      from_unixtime(col("created_utc")).cast("timestamp"))
    
    # Basic text cleaning for comments
    df = df.withColumn("clean_body", 
                      regexp_replace(col("comment_body"), r"http\S+", ""))  # Remove URLs
    df = df.withColumn("clean_body", 
                      regexp_replace(col("clean_body"), r"u/\w+", ""))  # Remove user mentions
    df = df.withColumn("clean_body", 
                      regexp_replace(col("clean_body"), r"r/\w+", ""))  # Remove subreddit mentions
    df = df.withColumn("clean_body", 
                      regexp_replace(col("clean_body"), r"[^a-zA-Z\s]", ""))  # Keep only letters
    df = df.withColumn("clean_body", 
                      lower(col("clean_body")))  # Convert to lowercase
    df = df.withColumn("clean_body", 
                      regexp_replace(col("clean_body"), r"\s+", " "))  # Normalize whitespace
    df = df.withColumn("clean_body", 
                      trim(col("clean_body")))  # Remove leading/trailing spaces
    
    # Filter out empty or very short comments
    df = df.filter(col("clean_body").isNotNull() & (length(col("clean_body")) > 5))
    
    # Add text length for analysis
    df = df.withColumn("comment_length", length(col("comment_body")))
    
    return df

# Sentiment analysis using TextBlob
def analyze_sentiment_textblob(text):
    """Analyze sentiment using TextBlob"""
    if text is None or text.strip() == "":
        return (0.0, 0.0, "neutral")
    
    try:
        blob = TextBlob(text)
        polarity = blob.sentiment.polarity
        subjectivity = blob.sentiment.subjectivity
        
        if polarity > 0.1:
            sentiment = "positive"
        elif polarity < -0.1:
            sentiment = "negative"
        else:
            sentiment = "neutral"
        
        return (float(polarity), float(subjectivity), sentiment)
    except:
        return (0.0, 0.0, "neutral")

# Register UDF for sentiment analysis
sentiment_udf = udf(analyze_sentiment_textblob, 
                   StructType([
                       StructField("polarity", DoubleType(), True),
                       StructField("subjectivity", DoubleType(), True),
                       StructField("sentiment", StringType(), True)
                   ]))

def perform_sentiment_analysis(df):
    """Perform sentiment analysis on Reddit comments"""
    
    # Apply sentiment analysis
    df = df.withColumn("sentiment_analysis", sentiment_udf(col("clean_body")))
    
    # Extract sentiment components
    df = df.withColumn("polarity", col("sentiment_analysis.polarity")) \
           .withColumn("subjectivity", col("sentiment_analysis.subjectivity")) \
           .withColumn("sentiment", col("sentiment_analysis.sentiment"))
    
    # Add detailed sentiment categories
    df = df.withColumn("sentiment_category",
                      when(col("polarity") > 0.5, "very_positive")
                      .when(col("polarity") > 0.1, "positive")
                      .when(col("polarity") < -0.5, "very_negative")
                      .when(col("polarity") < -0.1, "negative")
                      .otherwise("neutral"))
    
    # Add engagement indicators
    df = df.withColumn("engagement_level",
                      when(col("score") > 10, "high")
                      .when(col("score") > 0, "medium")
                      .when(col("score") >= -5, "low")
                      .otherwise("very_low"))
    
    return df

def create_real_time_aggregations(df):
    """Create real-time aggregations with watermarking"""
    
    # Add watermark for late data handling
    df_watermarked = df.withWatermark("kafka_timestamp", "10 minutes")
    
    # 5-minute windowed sentiment analysis
    windowed_sentiment = df_watermarked \
        .groupBy(
            window(col("kafka_timestamp"), "5 minutes", "1 minutes"),
            col("sentiment")
        ) \
        .agg(
            count("*").alias("comment_count"),
            avg("polarity").alias("avg_polarity"),
            avg("score").alias("avg_reddit_score"),
            collect_list("subreddit").alias("subreddits")
        ) \
        .select(
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("sentiment"),
            col("comment_count"),
            col("avg_polarity"),
            col("avg_reddit_score"),
            size(array_distinct(col("subreddits"))).alias("unique_subreddits")
        )
    
    # Subreddit-wise sentiment aggregation
    subreddit_sentiment = df_watermarked \
        .groupBy(
            window(col("kafka_timestamp"), "10 minutes", "5 minutes"),
            col("subreddit"),
            col("sentiment")
        ) \
        .agg(
            count("*").alias("comment_count"),
            avg("polarity").alias("avg_polarity"),
            max("score").alias("max_score"),
            min("score").alias("min_score")
        )
    
    return windowed_sentiment, subreddit_sentiment

def detect_trending_topics(df):
    """Detect trending topics and keywords in Trump discussions"""
    
    # Extract key phrases (simple approach - can be enhanced with NLP)
    df = df.withColumn("words", split(col("clean_body"), " "))
    df = df.withColumn("word_count", size(col("words")))
    
    # Explode words for analysis
    words_df = df.select(
        col("comment_id"),
        col("kafka_timestamp"),
        col("sentiment"),
        col("subreddit"),
        explode(col("words")).alias("word")
    ).filter(length(col("word")) > 3)  # Filter short words
    
    # Word frequency with sentiment context
    word_sentiment_freq = words_df \
        .withWatermark("kafka_timestamp", "15 minutes") \
        .groupBy(
            window(col("kafka_timestamp"), "15 minutes", "5 minutes"),
            col("word"),
            col("sentiment")
        ) \
        .agg(
            count("*").alias("word_frequency"),
            countDistinct("comment_id").alias("unique_comments"),
            collect_set("subreddit").alias("subreddits_mentioned")
        ) \
        .filter(col("word_frequency") >= 3)  # Filter infrequent words
    
    return word_sentiment_freq

# Output sinks for different analyses
def setup_output_sinks(processed_df, windowed_sentiment, subreddit_sentiment, word_trends):
    """Setup various output sinks for streaming results"""
    
    # 1. Console output for monitoring
    console_query = processed_df.select(
        col("comment_id"),
        col("author"),
        col("subreddit"),
        col("sentiment"),
        col("polarity"),
        col("score"),
        substring(col("comment_body"), 1, 100).alias("comment_preview")
    ).writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", "false") \
        .option("numRows", 10) \
        .trigger(processingTime="30 seconds") \
        .start()
    
    # 2. Parquet files for historical analysis
    parquet_query = processed_df.select(
        col("comment_id"),
        col("comment_body"),
        col("author"),
        col("created_date"),
        col("score"),
        col("subreddit"),
        col("sentiment"),
        col("polarity"),
        col("subjectivity"),
        col("sentiment_category"),
        col("engagement_level"),
        col("kafka_timestamp")
    ).writeStream \
        .outputMode("append") \
        .format("parquet") \
        .option("path", "/tmp/trump_sentiment_data") \
        .option("checkpointLocation", "/tmp/trump_sentiment_checkpoint_parquet") \
        .partitionBy("sentiment", "subreddit") \
        .trigger(processingTime="60 seconds") \
        .start()
    
    # 3. Windowed sentiment aggregations
    windowed_console = windowed_sentiment.writeStream \
        .outputMode("update") \
        .format("console") \
        .option("truncate", "false") \
        .trigger(processingTime="60 seconds") \
        .queryName("windowed_sentiment") \
        .start()
    
    # 4. Subreddit sentiment trends
    subreddit_console = subreddit_sentiment.writeStream \
        .outputMode("update") \
        .format("console") \
        .option("truncate", "false") \
        .trigger(processingTime="120 seconds") \
        .queryName("subreddit_sentiment") \
        .start()
    
    # 5. Trending words analysis
    trending_words_console = word_trends.writeStream \
        .outputMode("update") \
        .format("console") \
        .option("truncate", "false") \
        .trigger(processingTime="180 seconds") \
        .queryName("trending_words") \
        .start()
    
    # 6. Real-time alerts for extreme sentiment
    alerts_df = processed_df.filter(
        (col("polarity") > 0.7) | (col("polarity") < -0.7)
    ).select(
        lit("SENTIMENT_ALERT").alias("alert_type"),
        col("comment_id"),
        col("author"),
        col("subreddit"),
        col("sentiment_category"),
        col("polarity"),
        col("score"),
        current_timestamp().alias("alert_timestamp"),
        substring(col("comment_body"), 1, 200).alias("comment_text")
    )
    
    alerts_query = alerts_df.writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", "false") \
        .trigger(processingTime="10 seconds") \
        .queryName("sentiment_alerts") \
        .start()
    
    return {
        "console": console_query,
        "parquet": parquet_query,
        "windowed": windowed_console,
        "subreddit": subreddit_console,
        "trending": trending_words_console,
        "alerts": alerts_query
    }

# Custom batch processing function
def process_batch(batch_df, batch_id):
    """Custom processing for each micro-batch"""
    print(f"\n=== Processing Batch {batch_id} ===")
    
    if batch_df.count() > 0:
        # Quick batch statistics
        sentiment_counts = batch_df.groupBy("sentiment").count().collect()
        avg_polarity = batch_df.agg(avg("polarity")).collect()[0][0]
        
        print(f"Batch size: {batch_df.count()}")
        print(f"Average polarity: {avg_polarity:.3f}")
        print("Sentiment distribution:")
        for row in sentiment_counts:
            print(f"  {row['sentiment']}: {row['count']}")
        
        # Most extreme sentiments in this batch
        if batch_df.filter(col("polarity") > 0.5).count() > 0:
            print("\nMost positive comment:")
            batch_df.filter(col("polarity") > 0.5) \
                   .orderBy(desc("polarity")) \
                   .select("author", "subreddit", "polarity", "comment_body") \
                   .limit(1).show(truncate=False)
        
        if batch_df.filter(col("polarity") < -0.5).count() > 0:
            print("\nMost negative comment:")
            batch_df.filter(col("polarity") < -0.5) \
                   .orderBy("polarity") \
                   .select("author", "subreddit", "polarity", "comment_body") \
                   .limit(1).show(truncate=False)

def main():
    """Main execution function for real-time Trump sentiment analysis"""
    
    print("Starting Real-time Trump Reddit Sentiment Analysis...")
    print("Connecting to Kafka stream: reddit-comments-trump")
    
    # Create Kafka stream
    raw_stream = create_kafka_stream()
    
    # Preprocess comments
    preprocessed_stream = preprocess_reddit_comments(raw_stream)
    
    # Perform sentiment analysis
    sentiment_stream = perform_sentiment_analysis(preprocessed_stream)
    
    # Create aggregations
    windowed_sentiment, subreddit_sentiment = create_real_time_aggregations(sentiment_stream)
    
    # Detect trending topics
    word_trends = detect_trending_topics(sentiment_stream)
    
    # Setup output sinks
    queries = setup_output_sinks(sentiment_stream, windowed_sentiment, 
                               subreddit_sentiment, word_trends)
    
    # Alternative: Custom batch processing
    custom_query = sentiment_stream.writeStream \
        .outputMode("append") \
        .foreachBatch(process_batch) \
        .trigger(processingTime="45 seconds") \
        .start()
    
    print("All streaming queries started. Monitoring Trump discussions...")
    print("Press Ctrl+C to stop the analysis")
    
    try:
        # Wait for termination
        spark.streams.awaitAnyTermination()
    except KeyboardInterrupt:
        print("\nStopping all streaming queries...")
        for query_name, query in queries.items():
            if query.isActive:
                query.stop()
                print(f"Stopped {query_name} query")
        
        if custom_query.isActive:
            custom_query.stop()
            print("Stopped custom batch processing query")

def monitor_stream_health():
    """Monitor the health of streaming queries"""
    active_streams = spark.streams.active
    
    print(f"\n=== Stream Health Monitor ===")
    print(f"Active streams: {len(active_streams)}")
    
    for stream in active_streams:
        print(f"\nStream: {stream.name}")
        print(f"Status: {stream.status}")
        print(f"ID: {stream.id}")
        
        if len(stream.recentProgress) > 0:
            recent = stream.recentProgress[-1]
            print(f"Batch ID: {recent.get('batchId', 'N/A')}")
            print(f"Input size: {recent.get('inputSize', 'N/A')}")
            print(f"Processing time: {recent.get('durationMs', {}).get('triggerExecution', 'N/A')} ms")

if __name__ == "__main__":
    try:
        main()
    finally:
        # Cleanup
        spark.stop()
        print("Spark session stopped.")

# Additional utility functions for offline analysis
def analyze_historical_data(parquet_path="/tmp/trump_sentiment_data"):
    """Analyze historical sentiment data"""
    
    print("Loading historical sentiment data...")
    historical_df = spark.read.parquet(parquet_path)
    
    print("Historical Analysis Results:")
    print(f"Total comments analyzed: {historical_df.count()}")
    
    # Overall sentiment distribution
    historical_df.groupBy("sentiment").count().show()
    
    # Top subreddits by activity
    historical_df.groupBy("subreddit") \
                 .agg(count("*").alias("comment_count"),
                      avg("polarity").alias("avg_sentiment")) \
                 .orderBy(desc("comment_count")) \
                 .show(20)
    
    # Sentiment trends over time
    historical_df.groupBy(to_date("created_date").alias("date"), "sentiment") \
                 .count() \
                 .orderBy("date") \
                 .show()
    
    return historical_df

Starting Real-time Trump Reddit Sentiment Analysis...
Connecting to Kafka stream: reddit-comments-trump
Spark session stopped.


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.