# Shadow Traffic Implementation for AI Model Evaluation

This notebook demonstrates a simple implementation of shadow traffic for evaluating AI model performance metrics.

Shadow traffic enables you to test a new model implementation by processing duplicate requests without affecting users (just make sure not to ddos product systems!).

Unlike chapters 2-3, chapter 4 focuses on system performance that is still very critical to how a model influences a product.

In [None]:
# Import necessary libraries
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from concurrent.futures import ThreadPoolExecutor

# Set plot styling for better visualization
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = [12, 6]
plt.rcParams['font.size'] = 12

## 1. Define a Simple Recommendation Model

First, we'll create a class that simulates a movie recommendation model with configurable performance characteristics.

In [None]:
class RecommendationModel:
    """Simple movie recommendation model simulator with performance tracking"""
    
    def __init__(self, name, avg_latency_ms):
        self.name = name
        self.avg_latency_ms = avg_latency_ms
        # Performance tracking metrics
        self.request_count = 0
        self.error_count = 0
        self.latencies = []
    
    def get_recommendations(self, user_id, context=None):
        """Generate recommendations with simulated latency"""
        start_time = time.time()
        self.request_count += 1
        
        try:
            # Simulate variable latency with some randomness
            latency = max(10, np.random.normal(self.avg_latency_ms, self.avg_latency_ms * 0.2)) / 1000.0
            time.sleep(latency)
            
            # Generate mock recommendations
            recommendations = [
                {"movie_id": i, "title": f"Movie {i}", "score": np.random.random()} 
                for i in range(1, 6)
            ]
            
            # Record actual latency for this request
            actual_latency = (time.time() - start_time) * 1000  # in milliseconds
            self.latencies.append(actual_latency)
            
            return recommendations
            
        except Exception as e:
            self.error_count += 1
            # In a real system, you would log the error
            return []
    
    def get_performance_metrics(self):
        """Return key performance metrics about this model's behavior"""
        if not self.latencies:
            return {"name": self.name, "request_count": 0}
        
        return {
            "name": self.name,
            "request_count": self.request_count,
            "error_rate": self.error_count / self.request_count if self.request_count > 0 else 0,
            "avg_latency_ms": np.mean(self.latencies),
            "p50_latency_ms": np.percentile(self.latencies, 50),
            "p95_latency_ms": np.percentile(self.latencies, 95),
            "p99_latency_ms": np.percentile(self.latencies, 99),
            "max_latency_ms": np.max(self.latencies)
        }

## 2. Create the Shadow Traffic Router

Next, we'll implement a router that handles sending duplicate requests to both the production and shadow systems.

In [None]:
class ShadowTrafficRouter:
    """Routes traffic to both production and shadow systems and collects metrics"""
    
    def __init__(self, production_model, shadow_model):
        self.production_model = production_model
        self.shadow_model = shadow_model
        self.comparison_data = []
    
    def route_request(self, user_id, context=None):
        """Process a request through both production and shadow models"""
        prod_start_time = time.time()
        
        # Get recommendations from production model (this is what users will see)
        prod_recommendations = self.production_model.get_recommendations(user_id, context)
        prod_latency = (time.time() - prod_start_time) * 1000  # ms
        
        # Asynchronously get recommendations from shadow model
        # In production, this would typically be done via message queue or async processing
        with ThreadPoolExecutor(max_workers=1) as executor:
            shadow_start_time = time.time()
            future = executor.submit(self.shadow_model.get_recommendations, user_id, context)
            shadow_recommendations = future.result(timeout=2.0)  # 2-second timeout
            shadow_latency = (time.time() - shadow_start_time) * 1000  # ms
        
        # Log this request for comparison analysis
        self.comparison_data.append({
            "timestamp": time.time(),
            "user_id": user_id,
            "context": str(context),  # Convert to string for DataFrame compatibility
            "prod_latency_ms": prod_latency,
            "shadow_latency_ms": shadow_latency,
            "latency_diff_ms": shadow_latency - prod_latency,
            "prod_recommendation_count": len(prod_recommendations),
            "shadow_recommendation_count": len(shadow_recommendations)
        })
        
        # Return only the production recommendations to the user
        return prod_recommendations
    
    def get_performance_comparison(self):
        """Generate a comparison report between production and shadow models"""
        if not self.comparison_data:
            return "No comparison data available"
        
        df = pd.DataFrame(self.comparison_data)
        
        # Get aggregate metrics
        metrics = {
            "request_count": len(df),
            "avg_prod_latency_ms": df["prod_latency_ms"].mean(),
            "avg_shadow_latency_ms": df["shadow_latency_ms"].mean(),
            "p95_prod_latency_ms": df["prod_latency_ms"].quantile(0.95),
            "p95_shadow_latency_ms": df["shadow_latency_ms"].quantile(0.95),
            "p99_prod_latency_ms": df["prod_latency_ms"].quantile(0.99),
            "p99_shadow_latency_ms": df["shadow_latency_ms"].quantile(0.99),
            "max_prod_latency_ms": df["prod_latency_ms"].max(),
            "max_shadow_latency_ms": df["shadow_latency_ms"].max(),
            "avg_latency_increase_ms": df["latency_diff_ms"].mean(),
            "avg_latency_increase_pct": (df["shadow_latency_ms"].mean() / df["prod_latency_ms"].mean() - 1) * 100
        }
        
        return metrics

## 3. Run a Shadow Traffic Evaluation

Now we'll create instances of both models and simulate user traffic through our shadow traffic router.

 This may take awhile...

In [None]:
# Create our models - production is faster, shadow is slower but theoretically better
production_model = RecommendationModel("Current Production Model", avg_latency_ms=150)  # 150ms avg
shadow_model = RecommendationModel("New Candidate Model", avg_latency_ms=220)  # 220ms avg

# Create router
router = ShadowTrafficRouter(production_model, shadow_model)

# Simulate user traffic (500 requests)
print("Simulating user traffic with shadow evaluation...")
for i in range(500):
    user_id = f"user_{i % 100}"  # 100 unique users making multiple requests
    context = {"page": "homepage", "device": np.random.choice(["mobile", "desktop", "tv"])}
    
    # Route request through both systems
    recommendations = router.route_request(user_id, context)
    
    # Simulate varying traffic patterns with small delays between requests
    time.sleep(np.random.exponential(0.02))
    
    # Show progress
    if (i+1) % 100 == 0:
        print(f"Processed {i+1} requests")

print("Simulation complete")

## 4. Analyze the Performance Results

Let's examine the performance metrics collected during our shadow traffic evaluation.

The output of this analysis is latency (p50,p95,p99, max) etc

In [None]:
# Get the metrics from both models
prod_metrics = production_model.get_performance_metrics()
shadow_metrics = shadow_model.get_performance_metrics()

# Format and display production model metrics
print("Production Model Performance Metrics:")
for key, value in prod_metrics.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.2f}")
    else:
        print(f"  {key}: {value}")

print("\nShadow Model Performance Metrics:")
for key, value in shadow_metrics.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.2f}")
    else:
        print(f"  {key}: {value}")

# Get the comparison report
comparison = router.get_performance_comparison()
print("\nPerformance Comparison (Shadow vs Production):")
for key, value in comparison.items():
    if isinstance(value, float):
        print(f"  {key}: {value:.2f}")
    else:
        print(f"  {key}: {value}")

## 5. Visualize the Latency Distributions

Visualization is key to understanding performance differences between models.

In [None]:
# Create visualization of latency comparison
df = pd.DataFrame(router.comparison_data)

# 1. Latency distribution comparison
plt.figure(figsize=(15, 6))

plt.subplot(1, 2, 1)
plt.hist(df["prod_latency_ms"], alpha=0.5, bins=30, label="Production Model")
plt.hist(df["shadow_latency_ms"], alpha=0.5, bins=30, label="Shadow Model")
plt.xlabel("Latency (ms)")
plt.ylabel("Request Count")
plt.title("Latency Distribution Comparison")
plt.legend()

# 2. Percentile latency comparison
plt.subplot(1, 2, 2)
percentiles = range(5, 100, 5)
prod_percentiles = [np.percentile(df["prod_latency_ms"], p) for p in percentiles]
shadow_percentiles = [np.percentile(df["shadow_latency_ms"], p) for p in percentiles]

plt.plot(percentiles, prod_percentiles, 'b-o', label="Production Model")
plt.plot(percentiles, shadow_percentiles, 'r-o', label="Shadow Model")
plt.xlabel("Percentile")
plt.ylabel("Latency (ms)")
plt.title("Latency by Percentile")
plt.grid(True)
plt.legend()

plt.tight_layout()
plt.show()

## 6. Make a Performance-Based Decision

Based on our engineering performance metrics, we can make a data-driven decision about whether the new model meets our performance requirements.

In [None]:
# Define acceptable thresholds (these would be based on your requirements)
p99_threshold = 1.3  # Allow up to 30% increase in p99 latency
avg_threshold = 1.5  # Allow up to 50% increase in average latency

# Check if shadow model meets performance requirements
p99_ratio = shadow_metrics["p99_latency_ms"] / prod_metrics["p99_latency_ms"]
avg_ratio = shadow_metrics["avg_latency_ms"] / prod_metrics["avg_latency_ms"]

print("Engineering Performance Evaluation Decision:")
print("------------------------------------------")

if p99_ratio > p99_threshold:
    print(f"REJECT (Totally BAD!): p99 latency increase of {(p99_ratio-1)*100:.1f}% exceeds threshold of {(p99_threshold-1)*100:.1f}%")
elif avg_ratio > avg_threshold:
    print(f"REJECT (Totally BAD!): Average latency increase of {(avg_ratio-1)*100:.1f}% exceeds threshold of {(avg_threshold-1)*100:.1f}%")
elif shadow_metrics["error_rate"] > 2 * prod_metrics["error_rate"]:
    print(f"REJECT (Totally BAD!): Error rate too high ({shadow_metrics['error_rate']:.4f} vs {prod_metrics['error_rate']:.4f})")
else:
    print(f"APPROVE (Legit!): Performance impact within acceptable thresholds")
    print(f"   • Average latency: {prod_metrics['avg_latency_ms']:.1f}ms → {shadow_metrics['avg_latency_ms']:.1f}ms (+{(avg_ratio-1)*100:.1f}%)")
    print(f"   • p99 latency: {prod_metrics['p99_latency_ms']:.1f}ms → {shadow_metrics['p99_latency_ms']:.1f}ms (+{(p99_ratio-1)*100:.1f}%)")



## 7. Bonus: Create a Performance Dashboard

In a real-world scenario, you might want to visualize the performance metrics over time in a dashboard.

In [None]:
# Create a time-series view of the latency
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
df.set_index('timestamp', inplace=True)

# Resample to 30-second intervals for clearer visualization
prod_series = df['prod_latency_ms'].resample('30S').mean()
shadow_series = df['shadow_latency_ms'].resample('30S').mean()

plt.figure(figsize=(12, 8))

# Plot moving averages
plt.subplot(2, 1, 1)
plt.plot(prod_series.index, prod_series, 'b-', label='Production Model')
plt.plot(shadow_series.index, shadow_series, 'r-', label='Shadow Model')
plt.ylabel('Latency (ms)')
plt.title('Average Latency Over Time (30s intervals)')
plt.legend()
plt.grid(True)

# Plot latency difference
plt.subplot(2, 1, 2)
diff_series = shadow_series - prod_series
plt.fill_between(diff_series.index, diff_series, 0, where=diff_series>=0, 
                  color='r', alpha=0.3, label='Shadow slower')
plt.fill_between(diff_series.index, diff_series, 0, where=diff_series<0, 
                  color='g', alpha=0.3, label='Shadow faster')
plt.plot(diff_series.index, diff_series, 'k-')
plt.axhline(y=0, color='gray', linestyle='--')
plt.ylabel('Latency Difference (ms)')
plt.xlabel('Time')
plt.title('Shadow Model vs Production Model Latency Difference')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

## Summary

This notebook demonstrated how to implement shadow traffic evaluation for AI models with a focus on engineering performance metrics. Using this approach, you can:

1. Test new model implementations in a production-like environment with real user requests
2. Collect detailed performance metrics without affecting the user experience
3. Make data-driven decisions about whether a model meets your engineering requirements
4. Identify performance issues before exposing users to a new model implementation

Once a model passes engineering performance evaluation via shadow traffic, it provides confidence that when the model is introduced in an A/B testing setting, you won’t be fighting system performance issues. Instead, you can focus on measuring user and business outcomes, knowing that latency, stability, and throughput are unlikely to be the bottlenecks.