# Streaming Simulation for Fake News Detection

This notebook implements a streaming simulation for fake news detection using PySpark MLlib and Databricks-native functionality. Since true streaming is not available in the Community Edition, we emulate a streaming scenario by processing data in small batches.

## Key Features
- Batch-based streaming simulation
- PySpark MLlib model inference
- Interactive visualizations using Databricks-native and Plotly
- Real-time analytics dashboard
- Optimized for Databricks Community Edition

## Setup and Configuration

First, we'll set up our Spark session with configurations optimized for the Databricks Community Edition.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, current_timestamp, count, desc, expr, struct, to_json
from pyspark.sql.types import StringType, IntegerType, DoubleType, ArrayType, TimestampType
import time
import json
import numpy as np
from pyspark.ml.pipeline import PipelineModel

# Configure Spark session optimized for Databricks Community Edition
spark = SparkSession.builder \
    .appName("FakeNewsDetection_StreamingSimulation") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.driver.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

# Display Spark configuration
print(f"Spark version: {spark.version}")
print(f"Shuffle partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")
print(f"Driver memory: {spark.conf.get('spark.driver.memory')}")

## Define Paths and Create Directories

We'll define paths for data, models, and results, and create necessary directories in DBFS.

In [None]:
# Define paths for Databricks DBFS
stream_data_path = "dbfs:/FileStore/fake_news_detection/data/stream_data"
stream_results_path = "dbfs:/FileStore/fake_news_detection/data/stream_results"
model_path = "dbfs:/FileStore/fake_news_detection/models/fake_news_best_model"
logs_path = "dbfs:/FileStore/fake_news_detection/logs"

# Create directories if they don't exist using dbutils
dbutils.fs.mkdirs(stream_data_path.replace("dbfs:", ""))
dbutils.fs.mkdirs(stream_results_path.replace("dbfs:", ""))
dbutils.fs.mkdirs(logs_path.replace("dbfs:", ""))

## Load Model

We'll load the pre-trained PySpark ML model for fake news detection.

In [None]:
# Load the saved model
try:
    model = PipelineModel.load(model_path)
    print("Model loaded successfully.")
except Exception as e:
    print(f"Error loading model: {e}")
    print("Using a placeholder model path. Please update with the correct model path.")
    # If model doesn't exist, we'll use a placeholder for demonstration
    model_path = "/dbfs/FileStore/models/fake_news_best_model"

## Create Sample Streaming Data

We'll create sample data for our streaming simulation by sampling from the Hive tables.

In [None]:
# Import the HiveDataIngestion class
import sys
sys.path.append('/dbfs/FileStore/tables')
from hive_data_ingestion import HiveDataIngestion

# Create an instance
data_ingestion = HiveDataIngestion(spark)

# Load data from Hive tables
real_df, fake_df = data_ingestion.load_data_from_hive()

# Create a sample for streaming simulation
# We'll use stratified sampling to get a balanced dataset
real_sample = real_df.sample(fraction=0.05, seed=42)
fake_sample = fake_df.sample(fraction=0.05, seed=42)

# Combine samples and add unique IDs
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

real_sample = real_sample.withColumn("label", lit(1))
fake_sample = fake_sample.withColumn("label", lit(0))

# Combine samples
stream_df = real_sample.union(fake_sample)

# Add unique IDs
window = Window.orderBy(monotonically_increasing_id())
stream_df = stream_df.withColumn("id", row_number().over(window))

# Shuffle the data
stream_df = stream_df.orderBy(expr("rand()"))

print(f"Created streaming sample with {stream_df.count()} records")
print("Class distribution:")
stream_df.groupBy("label").count().show()

## Split Data into Batches

We'll split the data into small batches to simulate a streaming scenario.

In [None]:
# Function to split data into batches
def create_batches(df, batch_size=10):
    """
    Split a DataFrame into batches of specified size.
    
    Args:
        df: DataFrame to split
        batch_size: Number of records per batch
        
    Returns:
        List of DataFrames, each containing a batch of records
    """
    # Get total number of records
    total_records = df.count()
    
    # Calculate number of batches
    num_batches = (total_records + batch_size - 1) // batch_size
    
    # Create batches
    batches = []
    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, total_records)
        
        # Use limit and offset to get a batch
        batch_df = df.limit(end_idx).subtract(df.limit(start_idx))
        batches.append(batch_df)
    
    return batches

# Create batches
batch_size = 10  # Small batch size for demonstration
batches = create_batches(stream_df, batch_size)

print(f"Created {len(batches)} batches of size {batch_size}")

## Streaming Simulation Function

We'll define a function to simulate streaming by processing batches with a delay between them.

In [None]:
# Function to simulate streaming
def simulate_streaming(batches, model, delay_seconds=2):
    """
    Simulate streaming by processing batches with a delay.
    
    Args:
        batches: List of DataFrames, each containing a batch of records
        model: PySpark ML model for prediction
        delay_seconds: Delay between batches in seconds
        
    Returns:
        DataFrame with all results
    """
    # Initialize results list
    results = []
    
    # Process each batch
    for i, batch_df in enumerate(batches):
        print(f"Processing batch {i+1}/{len(batches)}...")
        
        # Add timestamp
        batch_df = batch_df.withColumn("timestamp", current_timestamp())
        
        # Save batch to stream data directory
        batch_path = f"{stream_data_path}/batch_{i}.parquet"
        batch_df.write.mode("overwrite").parquet(batch_path)
        
        # Apply model to get predictions
        try:
            predictions = model.transform(batch_df)
            
            # Select relevant columns
            result_df = predictions.select(
                "id", "title", "text", "label", "prediction", "timestamp"
            )
            
            # Save batch results
            result_path = f"{stream_results_path}/batch_{i}_results.parquet"
            result_df.write.mode("overwrite").parquet(result_path)
            
            # Add to results list
            results.append(result_df)
            
            # Display batch results
            print(f"Batch {i+1} results:")
            display(result_df.select("title", "prediction", "timestamp"))
            
            # Update streaming metrics
            batch_metrics = result_df.groupBy("prediction").count().collect()
            fake_count = next((row["count"] for row in batch_metrics if row["prediction"] == 0.0), 0)
            real_count = next((row["count"] for row in batch_metrics if row["prediction"] == 1.0), 0)
            print(f"Batch {i+1} metrics: {fake_count} fake news, {real_count} real news")
            
            # Wait before next batch
            if i < len(batches) - 1:
                print(f"Waiting {delay_seconds} seconds before next batch...")
                time.sleep(delay_seconds)
                
        except Exception as e:
            print(f"Error processing batch {i+1}: {e}")
    
    # Combine all results
    if results:
        combined_results = results[0]
        for df in results[1:]:
            combined_results = combined_results.union(df)
        return combined_results
    else:
        return None

## Run Streaming Simulation

Now we'll run the streaming simulation and collect the results.

In [None]:
# Start timer
start_time = time.time()

# Run streaming simulation
try:
    # Use a subset of batches for demonstration
    demo_batches = batches[:10]  # Use first 10 batches
    
    # Run simulation
    results_df = simulate_streaming(demo_batches, model, delay_seconds=2)
    
    # Cache results for faster processing
    if results_df is not None:
        results_df.cache()
        print(f"\nStreaming simulation completed in {time.time() - start_time:.2f} seconds")
        print(f"Processed {results_df.count()} records")
    else:
        print("\nNo results returned from streaming simulation")
        
except Exception as e:
    print(f"Error in streaming simulation: {e}")

## Save Aggregated Results

We'll save the aggregated results for future reference.

In [None]:
# Save aggregated results
if 'results_df' in locals() and results_df is not None:
    # Save as parquet
    aggregated_path = f"{stream_results_path}/aggregated_results.parquet"
    results_df.write.mode("overwrite").parquet(aggregated_path)
    print(f"Aggregated results saved to {aggregated_path}")
    
    # Save as table
    results_df.write.mode("overwrite").saveAsTable("stream_results")
    print("Aggregated results saved as table 'stream_results'")

## Analyze Results with Databricks-Native Visualizations

We'll analyze the results using Databricks-native visualizations.

In [None]:
# Analyze results
if 'results_df' in locals() and results_df is not None:
    # Class distribution
    print("\nPrediction Distribution:")
    prediction_dist = results_df.groupBy("prediction").count().orderBy("prediction")
    display(prediction_dist)
    
    # Confusion matrix
    print("\nConfusion Matrix:")
    confusion_matrix = results_df.groupBy("label").pivot("prediction").count().fillna(0)
    display(confusion_matrix)
    
    # Accuracy calculation
    correct_predictions = results_df.filter(col("label") == col("prediction")).count()
    total_predictions = results_df.count()
    accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
    print(f"\nAccuracy: {accuracy:.4f}")

## Time Series Analysis

We'll analyze how predictions change over time.

In [None]:
# Time series analysis
if 'results_df' in locals() and results_df is not None:
    # Group by minute and count predictions
    print("\nPredictions Over Time:")
    time_series = results_df \
        .withColumn("minute", expr("date_trunc('minute', timestamp)")) \
        .groupBy("minute") \
        .pivot("prediction", [0.0, 1.0]) \
        .count() \
        .orderBy("minute") \
        .fillna(0)
    
    # Rename columns for clarity
    if "0.0" in time_series.columns:
        time_series = time_series.withColumnRenamed("0.0", "fake_news")
    else:
        time_series = time_series.withColumn("fake_news", lit(0))
        
    if "1.0" in time_series.columns:
        time_series = time_series.withColumnRenamed("1.0", "real_news")
    else:
        time_series = time_series.withColumn("real_news", lit(0))
    
    # Display time series
    display(time_series)

## Interactive Visualizations with Plotly

We'll create interactive visualizations using Plotly for a more engaging experience.

In [None]:
# Install plotly if not already installed
try:
    import plotly.express as px
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    print("Plotly is already installed.")
except ImportError:
    print("Installing plotly...")
    !pip install plotly
    import plotly.express as px
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots

In [None]:
# Create interactive visualizations with Plotly
if 'results_df' in locals() and results_df is not None:
    # Convert to pandas for Plotly
    prediction_dist_pd = prediction_dist.toPandas()
    prediction_dist_pd['prediction'] = prediction_dist_pd['prediction'].map({0.0: 'Fake News', 1.0: 'Real News'})
    
    # Create pie chart
    fig = px.pie(
        prediction_dist_pd, 
        values='count', 
        names='prediction',
        title='Prediction Distribution',
        color='prediction',
        color_discrete_map={'Fake News': '#FF6B6B', 'Real News': '#4ECDC4'},
        hole=0.4
    )
    
    # Add annotations
    fig.update_traces(textposition='inside', textinfo='percent+label')
    fig.update_layout(
        title_font_size=24,
        legend_title_font_size=18,
        legend_font_size=16
    )
    
    # Display the figure
    fig.show()

In [None]:
# Create time series visualization with Plotly
if 'results_df' in locals() and results_df is not None and 'time_series' in locals():
    # Convert to pandas for Plotly
    time_series_pd = time_series.toPandas()
    
    # Create line chart
    fig = go.Figure()
    
    # Add traces
    fig.add_trace(go.Scatter(
        x=time_series_pd['minute'],
        y=time_series_pd['fake_news'],
        mode='lines+markers',
        name='Fake News',
        line=dict(color='#FF6B6B', width=3),
        marker=dict(size=8)
    ))
    
    fig.add_trace(go.Scatter(
        x=time_series_pd['minute'],
        y=time_series_pd['real_news'],
        mode='lines+markers',
        name='Real News',
        line=dict(color='#4ECDC4', width=3),
        marker=dict(size=8)
    ))
    
    # Update layout
    fig.update_layout(
        title='Predictions Over Time',
        title_font_size=24,
        xaxis_title='Time',
        yaxis_title='Number of Articles',
        legend_title='Prediction',
        legend_title_font_size=18,
        legend_font_size=16,
        hovermode='x unified'
    )
    
    # Display the figure
    fig.show()

## Interactive Dashboard with Custom HTML

We'll create an interactive dashboard using custom HTML and JavaScript.

In [None]:
# Create interactive dashboard with custom HTML
if 'results_df' in locals() and results_df is not None:
    # Get metrics for dashboard
    total_records = results_df.count()
    fake_count = results_df.filter(col("prediction") == 0.0).count()
    real_count = results_df.filter(col("prediction") == 1.0).count()
    fake_percentage = fake_count / total_records * 100 if total_records > 0 else 0
    real_percentage = real_count / total_records * 100 if total_records > 0 else 0
    execution_time = time.time() - start_time
    
    # Create metrics JSON
    metrics = {
        "total_records": total_records,
        "fake_count": fake_count,
        "real_count": real_count,
        "fake_percentage": fake_percentage,
        "real_percentage": real_percentage,
        "execution_time": execution_time
    }
    
    # Create HTML for dashboard
    html = f"""
    <html>
    <head>
        <title>Fake News Detection Dashboard</title>
        <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
        <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 0; padding: 20px; background-color: #f5f5f5; }}
            .dashboard {{ display: flex; flex-wrap: wrap; gap: 20px; }}
            .card {{ background-color: white; border-radius: 8px; box-shadow: 0 4px 8px rgba(0,0,0,0.1); padding: 20px; }}
            .metric-card {{ width: 200px; text-align: center; }}
            .chart-card {{ width: 100%; max-width: 600px; }}
            .metric-value {{ font-size: 36px; font-weight: bold; margin: 10px 0; }}
            .metric-label {{ font-size: 16px; color: #666; }}
            .fake {{ color: #FF6B6B; }}
            .real {{ color: #4ECDC4; }}
            h1 {{ color: #333; }}
        </style>
    </head>
    <body>
        <h1>Fake News Detection - Streaming Dashboard</h1>
        <div class="dashboard">
            <div class="card metric-card">
                <div class="metric-label">Total Articles</div>
                <div class="metric-value">{metrics['total_records']}</div>
            </div>
            <div class="card metric-card">
                <div class="metric-label">Fake News</div>
                <div class="metric-value fake">{metrics['fake_count']}</div>
                <div>({metrics['fake_percentage']:.1f}%)</div>
            </div>
            <div class="card metric-card">
                <div class="metric-label">Real News</div>
                <div class="metric-value real">{metrics['real_count']}</div>
                <div>({metrics['real_percentage']:.1f}%)</div>
            </div>
            <div class="card metric-card">
                <div class="metric-label">Processing Time</div>
                <div class="metric-value">{metrics['execution_time']:.2f}s</div>
            </div>
            <div class="card chart-card">
                <canvas id="distributionChart"></canvas>
            </div>
        </div>
        
        <script>
            // Create distribution chart
            const ctx = document.getElementById('distributionChart').getContext('2d');
            const chart = new Chart(ctx, {{
                type: 'doughnut',
                data: {{
                    labels: ['Fake News', 'Real News'],
                    datasets: [{{
                        data: [{metrics['fake_count']}, {metrics['real_count']}],
                        backgroundColor: ['#FF6B6B', '#4ECDC4'],
                        borderColor: ['#FF6B6B', '#4ECDC4'],
                        borderWidth: 1
                    }}]
                }},
                options: {{
                    responsive: true,
                    plugins: {{
                        legend: {{
                            position: 'top',
                        }},
                        title: {{
                            display: true,
                            text: 'Prediction Distribution'
                        }}
                    }}
                }}
            }});
        </script>
    </body>
    </html>
    """
    
    # Display the dashboard
    displayHTML(html)

## Query Results with Spark SQL

We'll query the results using Spark SQL for additional analysis.

In [None]:
# Query results with Spark SQL
if 'results_df' in locals() and results_df is not None:
    # Create temporary view
    results_df.createOrReplaceTempView("streaming_results")
    
    # Query prediction distribution
    print("\nPrediction Distribution (SQL):")
    spark.sql("""
        SELECT 
            CASE 
                WHEN prediction = 0 THEN 'Fake News'
                WHEN prediction = 1 THEN 'Real News'
                ELSE 'Unknown'
            END AS prediction_type,
            COUNT(*) as count,
            ROUND(COUNT(*) * 100 / (SELECT COUNT(*) FROM streaming_results), 2) as percentage
        FROM streaming_results
        GROUP BY prediction
        ORDER BY prediction
    """).show()
    
    # Query accuracy
    print("\nAccuracy by Batch (SQL):")
    spark.sql("""
        WITH batch_metrics AS (
            SELECT 
                MINUTE(timestamp) as batch_minute,
                COUNT(*) as total,
                SUM(CASE WHEN label = prediction THEN 1 ELSE 0 END) as correct
            FROM streaming_results
            GROUP BY MINUTE(timestamp)
        )
        SELECT 
            batch_minute,
            total,
            correct,
            ROUND(correct * 100 / total, 2) as accuracy_percentage
        FROM batch_metrics
        ORDER BY batch_minute
    """).show()

## Save Streaming Metrics

We'll save the streaming metrics for future reference.

In [None]:
# Save streaming metrics
if 'results_df' in locals() and results_df is not None:
    # Create metrics DataFrame
    metrics_data = [
        ("total_records", total_records),
        ("fake_count", fake_count),
        ("real_count", real_count),
        ("fake_percentage", fake_percentage),
        ("real_percentage", real_percentage),
        ("execution_time", execution_time),
        ("accuracy", accuracy)
    ]
    
    metrics_schema = ["metric_name", "metric_value"]
    metrics_df = spark.createDataFrame(metrics_data, metrics_schema)
    
    # Save metrics
    metrics_path = f"{logs_path}/streaming_metrics.parquet"
    metrics_df.write.mode("overwrite").parquet(metrics_path)
    print(f"Streaming metrics saved to {metrics_path}")
    
    # Display metrics
    print("\nStreaming Metrics:")
    display(metrics_df)

## Conclusion

In this notebook, we've implemented a streaming simulation for fake news detection using PySpark MLlib and Databricks-native functionality. We've demonstrated how to:

1. Load a pre-trained PySpark ML model
2. Create sample data for streaming simulation
3. Process data in batches to emulate streaming
4. Apply the model to each batch
5. Save and analyze the results
6. Create interactive visualizations

This approach can be extended to handle real streaming data in production environments.

## Next Steps

1. Implement real-time alerting for high-confidence fake news
2. Create a Databricks dashboard for monitoring streaming results
3. Integrate with external systems for automated response
4. Implement model retraining based on streaming feedback