# Testing Fraud Detection Inference Service

In this notebook, we'll test our deployed fraud detection model with KServe.

In [None]:
import requests
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

## Load Test Data

In [None]:
# Load the test data
test_data = pd.read_csv('data/credit_card_data.csv').sample(10, random_state=42)
test_data.head()

## Make Predictions

In [None]:
# Set the predictor URL (you should get this from the previous notebook)
predictor_url = "http://<your-kserve-endpoint>/v1/models/fraud-detection:predict"

# Prepare test data for prediction
feature_columns = [col for col in test_data.columns if col != 'Class']
X_test = test_data[feature_columns].values.tolist()

# Create prediction payload
payload = {
    "instances": X_test
}

# Send prediction request
headers = {"Content-Type": "application/json"}
response = requests.post(predictor_url, json=payload, headers=headers)

if response.status_code == 200:
    predictions = response.json()
    print("Predictions:")
    print(json.dumps(predictions, indent=2))
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## Get Explanations

In [None]:
# Set the explainer URL
explainer_url = "http://<your-kserve-endpoint>/v1/models/fraud-detection:explain"

# Send explanation request (using the same data)
response = requests.post(explainer_url, json=payload, headers=headers)

if response.status_code == 200:
    explanations = response.json()
    print("Explanations:")
    print(json.dumps(explanations, indent=2))
    
    # Visualize explanations for the first instance
    if 'explanations' in explanations and len(explanations['explanations']) > 0:
        first_explanation = explanations['explanations'][0]
        feature_importance = first_explanation['feature_importance']
        
        # Sort features by importance
        sorted_features = sorted(feature_importance.items(), key=lambda x: abs(x[1]), reverse=True)
        top_features = sorted_features[:10]  # Top 10 features
        
        # Plot feature importance
        plt.figure(figsize=(12, 6))
        feature_names = [f[0] for f in top_features]
        feature_values = [f[1] for f in top_features]
        
        # Create colormap based on positive/negative values
        colors = ['red' if x < 0 else 'green' for x in feature_values]
        
        plt.barh(feature_names, feature_values, color=colors)
        plt.xlabel('SHAP Value (Impact on Prediction)')
        plt.ylabel('Feature')
        plt.title('Top Features Influencing Fraud Prediction')
        plt.tight_layout()
        plt.show()
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## Test Performance Under Load

In [None]:
import time
from concurrent.futures import ThreadPoolExecutor

def make_prediction(payload):
    start_time = time.time()
    response = requests.post(predictor_url, json=payload, headers=headers)
    end_time = time.time()
    return {
        "status_code": response.status_code,
        "response_time": end_time - start_time
    }

# Generate multiple test instances (100 instances)
num_instances = 100
large_test_data = pd.read_csv('data/credit_card_data.csv').sample(num_instances, random_state=42)
X_large_test = large_test_data[feature_columns].values.tolist()

# Split into batches of 10 instances each
batch_size = 10
batches = [X_large_test[i:i + batch_size] for i in range(0, len(X_large_test), batch_size)]
payloads = [{"instances": batch} for batch in batches]

# Make concurrent requests
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(make_prediction, payload) for payload in payloads]
    for future in futures:
        results.append(future.result())

# Analyze results
response_times = [result["response_time"] for result in results]
success_count = sum(1 for result in results if result["status_code"] == 200)

print(f"Success Rate: {success_count}/{len(results)} ({success_count/len(results)*100:.2f}%)")
print(f"Average Response Time: {np.mean(response_times):.4f} seconds")
print(f"Min Response Time: {np.min(response_times):.4f} seconds")
print(f"Max Response Time: {np.max(response_times):.4f} seconds")

# Plot response time distribution
plt.figure(figsize=(10, 6))
sns.histplot(response_times, kde=True)
plt.xlabel('Response Time (seconds)')
plt.ylabel('Frequency')
plt.title('Distribution of Response Times')
plt.grid(True)
plt.show()

## Canary Deployment Test

In [None]:
# In this section, we'll test the canary deployment
# First, we need to modify the model and deploy a v2 version
# This would typically be done in a separate pipeline run

import kfp
from src.pipeline.pipeline import fraud_detection_pipeline

# Compile the pipeline
pipeline_func = fraud_detection_pipeline
pipeline_filename = "fraud_detection_pipeline_v2.yaml"
kfp.compiler.Compiler().compile(pipeline_func, pipeline_filename)

# Connect to the Kubeflow Pipelines API
client = kfp.Client()

# Run the pipeline for v2 model
run_v2 = client.run_pipeline(
    experiment_id=experiment.id,
    job_name="fraud-detection-training-v2",
    pipeline_package_path=pipeline_filename,
    params={
        "data_path": "data/credit_card_data.csv",
        "model_name": "fraud-detection",
        "model_version": "v2"
    }
)

print(f"Pipeline run for v2 model submitted with ID: {run_v2.id}")

In [None]:
# After the v2 model is trained, deploy canary
from kubernetes import client, config
from kserve import KServeClient

# Load Kubernetes configuration
config.load_kube_config()

# Create KServe client
kserve_client = KServeClient()

# Load the canary deployment YAML
with open('kserve/canary_rollout.yaml', 'r') as f:
    canary_service = yaml.safe_load(f)

# Deploy the canary
kserve_client.replace(canary_service)

print(f"Canary deployment started for {service_name} with 20% traffic to v2")
wait_for_service_ready(service_name, namespace)

In [None]:
# Test the canary deployment
# We'll make multiple calls and see which version responds
num_calls = 50
version_counts = {'v1': 0, 'v2': 0, 'unknown': 0}

for i in range(num_calls):
    response = requests.post(predictor_url, json=payload, headers=headers)
    
    if response.status_code == 200:
        # This is a simplified way to check version - in reality
        # you would need to include model version in the response
        # or check model-specific differences in predictions
        version_counts['unknown'] += 1
    else:
        print(f"Error on call {i}: {response.status_code}")

print("Calls distribution:")
for version, count in version_counts.items():
    print(f"{version}: {count} ({count/num_calls*100:.2f}%)")

# In a real scenario, you'd want to look at model headers or other
# identifiers to track which version of the model is responding

## Monitoring KServe Performance

In [None]:
# Get the service metrics
from kubernetes import client

# Create a custom objects API client
api_instance = client.CustomObjectsApi()

# Get service metrics
try:
    metrics = api_instance.get_namespaced_custom_object(
        group="serving.kubeflow.org",
        version="v1beta1",
        namespace=namespace,
        plural="inferenceservices",
        name=service_name
    )
    
    # Extract status and metrics
    status = metrics.get('status', {})
    
    print("Service Status:")
    print(json.dumps(status, indent=2))
    
    # If using Prometheus, we could also query it directly
    # for more detailed metrics
except Exception as e:
    print(f"Error getting metrics: {e}")