# Snapp Real-Time Ride Demand Forecasting: Portfolio Demo

This notebook demonstrates the end-to-end Machine Learning Operations (MLOps) pipeline for real-time ride demand forecasting, a core component for platforms like Snapp.

**Project Goal**: Predict future ride demand in specific geographical grids (e.g., 1x1 km) for upcoming time intervals (e.g., next 15 minutes) to optimize driver allocation, dynamic pricing, and overall operational efficiency.

## 1. Environment Setup and Configuration
We begin by loading our project configuration and setting up logging.

In [None]:
import os
import sys
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
import json
import mlflow
import time

# Ensure the project root is in the path for module imports
if os.getcwd().endswith('notebooks'):
    sys.path.insert(0, os.path.abspath('..'))
    sys.path.insert(0, os.path.abspath('../src'))
else:
    sys.path.insert(0, os.path.abspath('.'))
    sys.path.insert(0, os.path.abspath('./src'))

from src.utils.config_reader import ConfigReader
from src.utils.logging_setup import setup_logging
from src.data_artifacts.tehran_traffic_simulator.generate_snapp_data import SnappDataGenerator
from src.batch_processing.batch_feature_creation import BatchFeatureCreator
from src.stream_analytics.spark_stream_features import SparkStreamFeatures # Note: Requires Spark environment
from src.model_training.train import ModelTrainer
from src.model_serving.prediction_logic import PredictionLogic
from model_experiments.ab_testing.lono_interface import LonoABTester
from model_experiments.monitoring.data_drift_detector import DataDriftDetector
from model_experiments.monitoring.model_performance_monitor import ModelPerformanceMonitor

logger = setup_logging("portfolio_demo")
config = ConfigReader(env="dev")

print("Configuration loaded successfully.")
# Set MLflow tracking URI for local demo (if not already set via env var)
mlflow.set_tracking_uri(config.get("model_registry.uri"))
mlflow.set_experiment(f"{config.get('model_registry.model_name')}_experiment")
print(f"MLflow tracking URI: {mlflow.get_tracking_uri()}")

## 2. Data Simulation & Feature Engineering
We'll simulate real-time ride events for Tehran and then generate both batch and stream-based features.

In [None]:
# Generate Synthetic Data
print("\n--- Generating Synthetic Raw Data ---")
start_sim = datetime.now() - timedelta(days=5)
end_sim = datetime.now() - timedelta(hours=1) # simulate historical data up to a recent point
generator = SnappDataGenerator(start_sim, end_sim)
synthetic_raw_data_df = generator.generate_events(num_events_per_15min=25)
synthetic_raw_data_path = "data_artifacts/sample_data/synthetic_snapp_data.csv"
synthetic_raw_data_df.to_csv(synthetic_raw_data_path, index=False)
print(f"Generated {len(synthetic_raw_data_df)} synthetic events and saved to {synthetic_raw_data_path}")
print(synthetic_raw_data_df.head())

# Batch Feature Creation
print("\n--- Running Batch Feature Creation ---")
batch_feature_creator = BatchFeatureCreator(env="dev")
batch_features_df = batch_feature_creator.generate_batch_features()
batch_features_path = "data_artifacts/sample_data/batch_processed_features.csv"
batch_features_df.to_csv(batch_features_path, index=False)
print(f"Generated {len(batch_features_df)} batch features and saved to {batch_features_path}")
print(batch_features_df.head())

# Stream Feature Creation (Conceptual - requires a running Spark/Kafka cluster, demonstrating with an empty run)
print("\n--- Simulating Stream Feature Creation (Conceptual) ---")
print("To run real stream features, ensure Kafka and Spark are running and uncomment the code below.")

## 3. Model Training & MLflow Integration
Train our XGBoost model using the batch-processed features and log everything with MLflow.

In [None]:
print("\n--- Training Model with MLflow ---")
model_trainer = ModelTrainer(env="dev")
model_trainer.train_model()

print("Model training complete. Check MLflow UI for experiment results.")

# Promote the latest model to 'Production' stage (simulated)
client = mlflow.tracking.MlflowClient()
model_name = config.get("model_registry.model_name")
try:
    latest_version = client.get_latest_versions(model_name, stages=["None"])[0]
    client.transition_model_version_stage(
        name=latest_version.name,
        version=latest_version.version,
        stage="Production"
    )
    print(f"Latest model version {latest_version.version} transitioned to Production stage.")
except Exception as e:
    print(f"Could not transition model to Production (perhaps no model registered yet, or it's already in Production): {e}")

## 4. Real-time Model Serving (FastAPI Simulation)
Demonstrate interaction with the deployed model serving API for predictions.

In [None]:
print("\n--- Simulating Model Serving API Interaction ---")

try:
    prediction_logic = PredictionLogic(env="dev")
    print("PredictionLogic initialized, model loaded.")

    # Prepare a sample feature set for prediction
    sample_features_df = batch_features_df.sample(1, random_state=42)
    sample_features = sample_features_df[prediction_logic.feature_columns].iloc[0].to_dict()
    actual_demand = sample_features_df[config.get("features_params.target_variable")].iloc[0]
    
    print("Sample features:", sample_features)
    
    predicted_demand = prediction_logic.predict_demand(sample_features)
    print(f"Predicted demand: {predicted_demand:.2f}, Actual demand: {actual_demand:.2f}")

    # Batch Prediction
    sample_features_list = batch_features_df.sample(5, random_state=100)[prediction_logic.feature_columns].to_dict(orient='records')
    batch_predictions = prediction_logic.batch_predict_demand(sample_features_list)
    print("Batch predictions (first 5):")
    for i, pred in enumerate(batch_predictions):
        print(f"  Prediction {i+1}: {pred:.2f}")

except Exception as e:
    print(f"Error during model serving simulation: {e}")
    print("Please ensure the MLflow registry has a model in 'Production' stage and paths are correct.")
    print("For a full demo, run `uvicorn src.model_serving.api_app:app --reload` in a separate terminal.")

## 5. A/B Testing with LONO (Simulated)
Demonstrate how `LonoABTester` would route requests to different model versions.

In [None]:
print("\n--- Simulating A/B Testing with LONO ---")
ab_tester = LonoABTester(env="dev")

# Create a dummy A/B test config file if it doesn't exist for the demo
ab_test_config_path = "conf/ab_test_rules.json"
if not os.path.exists(ab_test_config_path):
    with open(ab_test_config_path, "w") as f:
        json.dump({
            "champion_model_version": "Production",
            "challenger_model_version": "Staging",
            "challenger_traffic_percentage": 0.3 # 30% to challenger
        }, f)
    print("Created dummy A/B test config for demo.")

sample_features_for_ab = batch_features_df.sample(1, random_state=50)[prediction_logic.feature_columns].iloc[0].to_dict()

print("Making 10 A/B tested predictions:")
for i in range(10):
    pred = ab_tester.get_prediction(sample_features_for_ab)
    print(f"  Request {i+1}: Predicted demand = {pred:.2f}")
    time.sleep(0.1)

# Clean up dummy config
if os.path.exists(ab_test_config_path):
    os.remove(ab_test_config_path)
    print(f"Cleaned up {ab_test_config_path}.")

## 6. Model Monitoring (Data Drift & Performance)
Demonstrate how drift and performance monitoring would be executed.

In [None]:
print("\n--- Running Data Drift Detection ---")
drift_detector = DataDriftDetector(env="dev")
drift_report = drift_detector.run_drift_detection_pipeline()
print("Drift Report:\n", json.dumps(drift_report, indent=4))

print("\n--- Running Model Performance Monitoring ---")
performance_monitor = ModelPerformanceMonitor(env="dev")
performance_report = performance_monitor.run_performance_monitoring_pipeline()
print("Performance Report:\n", json.dumps(performance_report, indent=4))

print("\nDemo complete. Explore the `model_experiments/monitoring` directory for generated reports.")