# Module 9.1 – Model Deployment with FastAPI & Docker

## 🎯 Learning Objectives

By the end of this notebook, you will be able to:

- Deploy trained ML models as REST APIs using FastAPI
- Package models with proper versioning and metadata
- Create containerized deployment artifacts with Docker
- Test API endpoints and validate deployment health
- Understand production deployment patterns for semiconductor manufacturing

## 📋 Prerequisites

- Completed Module 3 (foundational ML pipelines)
- Basic understanding of REST APIs
- Docker installed (for containerization sections)
- FastAPI dependencies installed (`pip install fastapi uvicorn pydantic`)

## 🔧 Setup and Imports

In [None]:
import json
import subprocess
import time
from pathlib import Path
import requests
import pandas as pd
import numpy as np
import joblib
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

# Set up paths
NOTEBOOK_DIR = Path.cwd()
ROOT_DIR = NOTEBOOK_DIR.parent.parent.parent
TEMP_MODELS_DIR = ROOT_DIR / 'temp_models'
PIPELINE_SCRIPT = NOTEBOOK_DIR / '9.1-model-deployment-pipeline.py'
API_DIR = ROOT_DIR / 'infrastructure' / 'api'

print(f"📁 Notebook directory: {NOTEBOOK_DIR}")
print(f"📁 Root directory: {ROOT_DIR}")
print(f"📁 Models directory: {TEMP_MODELS_DIR}")
print(f"🐍 Pipeline script: {PIPELINE_SCRIPT}")
print(f"🔧 API directory: {API_DIR}")

# Create directories if they don't exist
TEMP_MODELS_DIR.mkdir(exist_ok=True)

print("\n✅ Setup complete!")

## 📊 Step 1: Create and Train a Demo Model

Let's create a simple semiconductor yield prediction model to demonstrate deployment.

In [None]:
def generate_semiconductor_data(n_samples=1000, random_state=42):
    """Generate synthetic semiconductor process data."""
    np.random.seed(random_state)
    
    # Process parameters
    temperature = np.random.normal(450, 15, n_samples)  # Celsius
    pressure = np.random.normal(2.5, 0.3, n_samples)   # Torr
    flow_rate = np.random.normal(120, 10, n_samples)   # sccm
    time = np.random.normal(60, 5, n_samples)          # seconds
    
    # Create yield based on process window
    # Optimal conditions: temp=450, pressure=2.5, flow=120, time=60
    temp_effect = -0.1 * (temperature - 450)**2 / 100
    pressure_effect = -2.0 * (pressure - 2.5)**2
    flow_effect = -0.05 * (flow_rate - 120)**2 / 100
    time_effect = -0.08 * (time - 60)**2 / 100
    
    yield_percentage = 85 + temp_effect + pressure_effect + flow_effect + time_effect
    yield_percentage += np.random.normal(0, 2, n_samples)  # Add noise
    
    # Ensure yield is in reasonable range
    yield_percentage = np.clip(yield_percentage, 0, 100)
    
    return pd.DataFrame({
        'temperature': temperature,
        'pressure': pressure,
        'flow': flow_rate,
        'time': time,
        'yield': yield_percentage
    })

# Generate data
df = generate_semiconductor_data(1000)
print("📊 Generated synthetic semiconductor data:")
print(df.head())
print(f"\n📈 Data shape: {df.shape}")
print(f"📉 Yield statistics:")
print(df['yield'].describe())

In [None]:
# Split data and train model
X = df[['temperature', 'pressure', 'flow', 'time']]
y = df['yield']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create and train pipeline
model_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('regressor', LinearRegression())
])

model_pipeline.fit(X_train, y_train)

# Evaluate model
y_pred = model_pipeline.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"🎯 Model Performance:")
print(f"   RMSE: {np.sqrt(mse):.2f}%")
print(f"   R²: {r2:.3f}")

# Save the model
model_path = TEMP_MODELS_DIR / 'demo_yield_model.joblib'
joblib.dump(model_pipeline, model_path)
print(f"\n💾 Model saved to: {model_path}")

# Test prediction
test_input = np.array([[450.0, 2.5, 120.0, 60.0]])  # Optimal conditions
test_prediction = model_pipeline.predict(test_input)
print(f"\n🧪 Test prediction for optimal conditions: {test_prediction[0]:.2f}% yield")

## 📦 Step 2: Package Model for Deployment

Use our deployment pipeline to create deployment-ready artifacts.

In [None]:
def run_pipeline_command(cmd_args):
    """Run the deployment pipeline CLI and return parsed JSON output."""
    result = subprocess.run(
        ['python', str(PIPELINE_SCRIPT)] + cmd_args,
        capture_output=True,
        text=True,
        cwd=NOTEBOOK_DIR
    )
    
    if result.returncode != 0:
        print(f"❌ Error running command: {' '.join(cmd_args)}")
        print(f"Stderr: {result.stderr}")
        return None
    
    try:
        return json.loads(result.stdout)
    except json.JSONDecodeError:
        print(f"❌ Failed to parse JSON output: {result.stdout}")
        return None

# Package the model
package_result = run_pipeline_command([
    'package',
    '--model-path', str(model_path),
    '--version', '1.0.0',
    '--output-name', 'demo_deployment'
])

if package_result:
    print("📦 Model packaging successful:")
    print(json.dumps(package_result, indent=2))
    
    deployment_dir = Path(package_result['output_dir'])
    print(f"\n📁 Deployment artifacts created in: {deployment_dir}")
    
    # List deployment contents
    print("\n📋 Deployment contents:")
    for item in deployment_dir.iterdir():
        print(f"   {item.name}")
else:
    print("❌ Model packaging failed")

In [None]:
# Validate the deployment
if package_result:
    validate_result = run_pipeline_command([
        'validate',
        '--deployment-dir', package_result['output_dir']
    ])
    
    if validate_result:
        print("✅ Deployment validation:")
        print(json.dumps(validate_result, indent=2))
        
        if validate_result['status'] == 'valid':
            print("\n🎉 Deployment is valid and ready for API serving!")
        else:
            print("\n❌ Deployment validation failed:")
            for issue in validate_result['issues']:
                print(f"   - {issue}")
    else:
        print("❌ Validation failed")

In [None]:
# Examine the metadata
if package_result:
    metadata_path = Path(package_result['metadata_path'])
    
    if metadata_path.exists():
        with open(metadata_path, 'r') as f:
            metadata = json.load(f)
        
        print("📄 Model metadata:")
        print(json.dumps(metadata, indent=2))
    else:
        print(f"❌ Metadata file not found: {metadata_path}")

## 🚀 Step 3: Start the FastAPI Service

Now let's start the API service and test it. We'll copy our deployment artifacts to the expected location.

In [None]:
import shutil

# Copy model and metadata to the expected API location
if package_result:
    deployment_dir = Path(package_result['output_dir'])
    
    # Copy model
    api_model_path = TEMP_MODELS_DIR / 'model.joblib'
    api_metadata_path = TEMP_MODELS_DIR / 'metadata.json'
    
    shutil.copy2(deployment_dir / 'model.joblib', api_model_path)
    shutil.copy2(deployment_dir / 'metadata.json', api_metadata_path)
    
    print(f"📋 Copied model to API location:")
    print(f"   Model: {api_model_path}")
    print(f"   Metadata: {api_metadata_path}")
    
    # Check if FastAPI can import
    try:
        import uvicorn
        import fastapi
        print("\n✅ FastAPI dependencies available")
        print(f"   FastAPI version: {fastapi.__version__}")
        print(f"   Uvicorn version: {uvicorn.__version__}")
    except ImportError as e:
        print(f"\n❌ FastAPI dependencies missing: {e}")
        print("Please install: pip install fastapi uvicorn pydantic")
else:
    print("❌ Cannot proceed without packaged model")

### Starting the API Server

**Note**: In a Jupyter notebook, we can't easily run the FastAPI server in the background. 

**Option 1**: Run in a separate terminal:
```bash
cd /path/to/repository
PYTHONPATH=$PWD uvicorn infrastructure.api.main:app --host 0.0.0.0 --port 8000
```

**Option 2**: Use the cells below to test if a server is already running.

Let's check if an API server is available and test it:

In [None]:
# Test if API server is running
API_BASE_URL = "http://localhost:8000"

def test_api_connection():
    """Test if the API server is running and responsive."""
    try:
        response = requests.get(f"{API_BASE_URL}/health", timeout=5)
        if response.status_code == 200:
            return True, response.json()
        else:
            return False, f"HTTP {response.status_code}"
    except requests.exceptions.RequestException as e:
        return False, str(e)

# Check connection
is_connected, result = test_api_connection()

if is_connected:
    print("🎉 API server is running!")
    print("📊 Health status:")
    print(json.dumps(result, indent=2))
else:
    print(f"❌ API server not accessible: {result}")
    print("\n💡 To start the server, run in a terminal:")
    print(f"   cd {ROOT_DIR}")
    print("   PYTHONPATH=$PWD uvicorn infrastructure.api.main:app --host 0.0.0.0 --port 8000")

## 🧪 Step 4: Test API Endpoints

If the API server is running, let's test all the endpoints.

In [None]:
def test_api_endpoints():
    """Test all API endpoints and display results."""
    
    # Test health endpoint
    print("🏥 Testing /health endpoint:")
    try:
        response = requests.get(f"{API_BASE_URL}/health")
        if response.status_code == 200:
            health_data = response.json()
            print(json.dumps(health_data, indent=2))
            model_loaded = health_data.get('model_loaded', False)
            print(f"   Model loaded: {'✅' if model_loaded else '❌'}")
        else:
            print(f"   ❌ HTTP {response.status_code}: {response.text}")
    except Exception as e:
        print(f"   ❌ Error: {e}")
    
    print("\n" + "="*50 + "\n")
    
    # Test version endpoint
    print("📋 Testing /version endpoint:")
    try:
        response = requests.get(f"{API_BASE_URL}/version")
        if response.status_code == 200:
            version_data = response.json()
            print(json.dumps(version_data, indent=2))
        else:
            print(f"   ❌ HTTP {response.status_code}: {response.text}")
    except Exception as e:
        print(f"   ❌ Error: {e}")
    
    print("\n" + "="*50 + "\n")
    
    # Test metrics endpoint
    print("📊 Testing /metrics endpoint:")
    try:
        response = requests.get(f"{API_BASE_URL}/metrics")
        if response.status_code == 200:
            metrics_data = response.json()
            print(json.dumps(metrics_data, indent=2))
        else:
            print(f"   ❌ HTTP {response.status_code}: {response.text}")
    except Exception as e:
        print(f"   ❌ Error: {e}")
    
    print("\n" + "="*50 + "\n")
    
    # Test prediction endpoint
    print("🔮 Testing /predict endpoint:")
    
    # Test with optimal conditions
    test_cases = [
        {
            "name": "Optimal conditions",
            "data": {
                "temperature": 450.0,
                "pressure": 2.5,
                "flow": 120.0,
                "time": 60.0
            }
        },
        {
            "name": "High temperature",
            "data": {
                "temperature": 480.0,
                "pressure": 2.5,
                "flow": 120.0,
                "time": 60.0
            }
        },
        {
            "name": "Low pressure",
            "data": {
                "temperature": 450.0,
                "pressure": 2.0,
                "flow": 120.0,
                "time": 60.0
            }
        }
    ]
    
    for test_case in test_cases:
        print(f"\n🧪 Testing: {test_case['name']}")
        try:
            response = requests.post(
                f"{API_BASE_URL}/predict",
                json=test_case['data'],
                headers={"Content-Type": "application/json"}
            )
            if response.status_code == 200:
                prediction_data = response.json()
                print(f"   Input: {test_case['data']}")
                print(f"   Prediction: {prediction_data['prediction']:.2f}% yield")
                print(f"   Confidence: {prediction_data['confidence']:.3f}")
                print(f"   Model version: {prediction_data['model_version']}")
            else:
                print(f"   ❌ HTTP {response.status_code}: {response.text}")
        except Exception as e:
            print(f"   ❌ Error: {e}")

# Run the tests if API is available
is_connected, _ = test_api_connection()
if is_connected:
    test_api_endpoints()
else:
    print("❌ API server not available. Please start the server first.")

## 🧪 Step 5: Validate Input Schemas

Test the API's input validation to ensure it properly rejects invalid data.

In [None]:
def test_input_validation():
    """Test API input validation with invalid data."""
    
    invalid_test_cases = [
        {
            "name": "Temperature too high",
            "data": {
                "temperature": 700.0,  # Above max of 600
                "pressure": 2.5,
                "flow": 120.0,
                "time": 60.0
            },
            "expected_status": 422
        },
        {
            "name": "Negative pressure",
            "data": {
                "temperature": 450.0,
                "pressure": -1.0,  # Below min of 0.1
                "flow": 120.0,
                "time": 60.0
            },
            "expected_status": 422
        },
        {
            "name": "Missing field",
            "data": {
                "temperature": 450.0,
                "pressure": 2.5,
                # Missing flow and time
            },
            "expected_status": 422
        },
        {
            "name": "Wrong data type",
            "data": {
                "temperature": "not_a_number",
                "pressure": 2.5,
                "flow": 120.0,
                "time": 60.0
            },
            "expected_status": 422
        }
    ]
    
    print("🛡️ Testing input validation:")
    
    for test_case in invalid_test_cases:
        print(f"\n🧪 Testing: {test_case['name']}")
        try:
            response = requests.post(
                f"{API_BASE_URL}/predict",
                json=test_case['data'],
                headers={"Content-Type": "application/json"}
            )
            
            if response.status_code == test_case['expected_status']:
                print(f"   ✅ Correctly rejected with HTTP {response.status_code}")
                if response.status_code == 422:
                    error_data = response.json()
                    print(f"   📝 Error details: {error_data.get('detail', 'No details')}")
            else:
                print(f"   ❌ Expected HTTP {test_case['expected_status']}, got {response.status_code}")
                print(f"   📝 Response: {response.text}")
                
        except Exception as e:
            print(f"   ❌ Error: {e}")

# Run validation tests if API is available
is_connected, _ = test_api_connection()
if is_connected:
    test_input_validation()
else:
    print("❌ API server not available for validation testing.")

## 📊 Step 6: Performance Testing

Let's test the API performance with multiple requests.

In [None]:
import concurrent.futures
import statistics

def measure_response_time(test_data):
    """Measure response time for a single API call."""
    start_time = time.time()
    try:
        response = requests.post(
            f"{API_BASE_URL}/predict",
            json=test_data,
            headers={"Content-Type": "application/json"},
            timeout=10
        )
        end_time = time.time()
        
        if response.status_code == 200:
            return end_time - start_time, True
        else:
            return end_time - start_time, False
    except Exception:
        return time.time() - start_time, False

def performance_test(num_requests=50):
    """Run performance test with multiple concurrent requests."""
    
    test_data = {
        "temperature": 450.0,
        "pressure": 2.5,
        "flow": 120.0,
        "time": 60.0
    }
    
    print(f"⚡ Running performance test with {num_requests} requests...")
    
    # Sequential requests
    print("\n📈 Sequential requests:")
    sequential_times = []
    sequential_success = 0
    
    start_total = time.time()
    for i in range(min(10, num_requests)):  # Limit sequential to 10 for time
        response_time, success = measure_response_time(test_data)
        sequential_times.append(response_time)
        if success:
            sequential_success += 1
    end_total = time.time()
    
    print(f"   ✅ Success rate: {sequential_success}/{len(sequential_times)} ({100*sequential_success/len(sequential_times):.1f}%)")
    print(f"   ⏱️ Average response time: {statistics.mean(sequential_times)*1000:.1f}ms")
    print(f"   ⏱️ Median response time: {statistics.median(sequential_times)*1000:.1f}ms")
    print(f"   📊 Total time: {end_total - start_total:.2f}s")
    
    # Concurrent requests
    print("\n🔄 Concurrent requests:")
    concurrent_times = []
    concurrent_success = 0
    
    start_total = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(measure_response_time, test_data) for _ in range(min(20, num_requests))]
        
        for future in concurrent.futures.as_completed(futures):
            response_time, success = future.result()
            concurrent_times.append(response_time)
            if success:
                concurrent_success += 1
    end_total = time.time()
    
    print(f"   ✅ Success rate: {concurrent_success}/{len(concurrent_times)} ({100*concurrent_success/len(concurrent_times):.1f}%)")
    print(f"   ⏱️ Average response time: {statistics.mean(concurrent_times)*1000:.1f}ms")
    print(f"   ⏱️ Median response time: {statistics.median(concurrent_times)*1000:.1f}ms")
    print(f"   📊 Total time: {end_total - start_total:.2f}s")
    print(f"   🚀 Throughput: {len(concurrent_times)/(end_total - start_total):.1f} requests/second")

# Run performance test if API is available
is_connected, _ = test_api_connection()
if is_connected:
    performance_test()
else:
    print("❌ API server not available for performance testing.")

## 🐳 Step 7: Docker Deployment Demo

Let's examine the Docker configuration and understand how to deploy with containers.

In [None]:
# Check Docker files
dockerfile_api = ROOT_DIR / 'Dockerfile.api'
docker_compose = ROOT_DIR / 'docker-compose.yml'

print("🐳 Docker configuration files:")
print(f"   API Dockerfile: {dockerfile_api} {'✅' if dockerfile_api.exists() else '❌'}")
print(f"   Docker Compose: {docker_compose} {'✅' if docker_compose.exists() else '❌'}")

if dockerfile_api.exists():
    print("\n📄 Dockerfile.api contents:")
    with open(dockerfile_api, 'r') as f:
        dockerfile_content = f.read()
    print(dockerfile_content[:500] + "..." if len(dockerfile_content) > 500 else dockerfile_content)

print("\n🚀 Docker deployment commands:")
print("")
print("# Build the API image:")
print(f"docker build -f Dockerfile.api -t semiconductor-ml-api .")
print("")
print("# Run the API container:")
print("docker run -p 8001:8000 \\")
print("  -v $(pwd)/temp_models:/app/temp_models \\")
print("  semiconductor-ml-api")
print("")
print("# Use docker-compose:")
print("docker-compose up model-api")
print("")
print("💡 The API will be available at http://localhost:8001 when using Docker")

## 📋 Step 8: Check Docker Build (Optional)

If Docker is available, let's test building the image.

In [None]:
def check_docker_availability():
    """Check if Docker is available and working."""
    try:
        result = subprocess.run(['docker', '--version'], capture_output=True, text=True)
        if result.returncode == 0:
            return True, result.stdout.strip()
        else:
            return False, "Docker command failed"
    except FileNotFoundError:
        return False, "Docker not found"

def test_docker_build():
    """Test building the Docker image (dry run)."""
    print("🐳 Testing Docker build...")
    
    try:
        # Do a dry run of the build process
        result = subprocess.run(
            ['docker', 'build', '-f', 'Dockerfile.api', '--dry-run', '.'],
            capture_output=True,
            text=True,
            cwd=ROOT_DIR,
            timeout=30
        )
        
        if result.returncode == 0:
            print("✅ Docker build validation successful")
            return True
        else:
            print(f"❌ Docker build validation failed: {result.stderr}")
            return False
            
    except subprocess.TimeoutExpired:
        print("⏰ Docker build test timed out")
        return False
    except Exception as e:
        print(f"❌ Docker build test error: {e}")
        return False

# Check Docker availability
docker_available, docker_info = check_docker_availability()

if docker_available:
    print(f"✅ Docker available: {docker_info}")
    
    # Note: Actual build testing is commented out to avoid long build times
    # Uncomment the next line if you want to test building
    # test_docker_build()
    
    print("\n💡 To build and test the Docker image manually:")
    print(f"   cd {ROOT_DIR}")
    print("   docker build -f Dockerfile.api -t semiconductor-ml-api .")
    print("   docker run -p 8001:8000 -v $(pwd)/temp_models:/app/temp_models semiconductor-ml-api")
    
else:
    print(f"❌ Docker not available: {docker_info}")
    print("💡 Docker is optional for this module, but recommended for production deployment")

## 📚 Step 9: Summary and Next Steps

Let's summarize what we've accomplished and outline next steps for production deployment.

In [None]:
print("🎯 Module 9.1 - Model Deployment Summary")
print("="*50)

# Check what we've completed
completed_tasks = []
pending_tasks = []

# Model creation and training
if model_path.exists():
    completed_tasks.append("✅ Created and trained ML model")
else:
    pending_tasks.append("❌ Model creation failed")

# Model packaging
if package_result and package_result.get('status') == 'exported':
    completed_tasks.append("✅ Packaged model for deployment")
else:
    pending_tasks.append("❌ Model packaging failed")

# Validation
if validate_result and validate_result.get('status') == 'valid':
    completed_tasks.append("✅ Validated deployment artifacts")
else:
    pending_tasks.append("❌ Deployment validation failed")

# API testing
is_connected, _ = test_api_connection()
if is_connected:
    completed_tasks.append("✅ Tested FastAPI endpoints")
else:
    pending_tasks.append("⚠️ API server not running (optional for notebook)")

# Docker
docker_available, _ = check_docker_availability()
if docker_available:
    completed_tasks.append("✅ Docker available for containerization")
else:
    completed_tasks.append("⚠️ Docker not available (optional)")

print("\n📋 Completed Tasks:")
for task in completed_tasks:
    print(f"   {task}")

if pending_tasks:
    print("\n⚠️ Pending Tasks:")
    for task in pending_tasks:
        print(f"   {task}")

print("\n🚀 Key Achievements:")
print("   • Built end-to-end model deployment pipeline")
print("   • Created REST API with FastAPI and Pydantic validation")
print("   • Implemented proper model versioning and metadata")
print("   • Set up Docker containerization for production")
print("   • Demonstrated input validation and error handling")
print("   • Performed basic performance testing")

print("\n🎯 Next Steps for Production:")
print("   1. Set up authentication and authorization")
print("   2. Implement proper logging and monitoring")
print("   3. Add model performance tracking")
print("   4. Set up CI/CD pipeline for automated deployment")
print("   5. Configure load balancing and auto-scaling")
print("   6. Implement model A/B testing")
print("   7. Add comprehensive error handling and alerting")
print("   8. Set up model drift detection")

print("\n📖 Additional Resources:")
print("   • FastAPI documentation: https://fastapi.tiangolo.com/")
print("   • Docker best practices: https://docs.docker.com/develop/best-practices/")
print("   • MLOps patterns: https://ml-ops.org/")
print("   • Module fundamentals: 9.1-model-deployment-fundamentals.md")
print("   • Quick reference: 9.1-model-deployment-quick-ref.md")

print("\n🎉 Congratulations! You've successfully completed Module 9.1 - Model Deployment")

## 🧹 Cleanup (Optional)

Run this cell to clean up temporary files created during the demo.

In [None]:
import shutil

def cleanup_demo_files():
    """Clean up temporary files created during the demo."""
    
    cleanup_items = []
    
    # Model files
    if model_path.exists():
        cleanup_items.append(str(model_path))
    
    # Deployment directories
    for item in TEMP_MODELS_DIR.glob('deployment_*'):
        if item.is_dir():
            cleanup_items.append(str(item))
    
    # API model files
    api_model = TEMP_MODELS_DIR / 'model.joblib'
    api_metadata = TEMP_MODELS_DIR / 'metadata.json'
    
    if api_model.exists():
        cleanup_items.append(str(api_model))
    if api_metadata.exists():
        cleanup_items.append(str(api_metadata))
    
    if cleanup_items:
        print("🧹 The following files/directories can be cleaned up:")
        for item in cleanup_items:
            print(f"   {item}")
        
        response = input("\nDo you want to delete these files? (y/N): ")
        
        if response.lower() in ['y', 'yes']:
            for item in cleanup_items:
                item_path = Path(item)
                try:
                    if item_path.is_dir():
                        shutil.rmtree(item_path)
                    else:
                        item_path.unlink()
                    print(f"   ✅ Deleted: {item}")
                except Exception as e:
                    print(f"   ❌ Failed to delete {item}: {e}")
            print("\n✅ Cleanup completed!")
        else:
            print("\n🚫 Cleanup cancelled")
    else:
        print("🧹 No demo files found to clean up")

# Uncomment the next line to run cleanup
# cleanup_demo_files()