# Model Persistence and Pipeline Testing

## Overview

**Why Model Persistence?**
- Training can be time-consuming
- Reuse models in production
- Share models with team members
- Version control for models

## Topics Covered

### Part 1: Saving and Loading Models
- **joblib**: Preferred for sklearn (efficient for large numpy arrays)
- **pickle**: Python's built-in serialization
- **Comparison**: When to use which

### Part 2: Pipeline Persistence
- Saving complete pipelines (preprocessing + model)
- Ensuring reproducibility
- Version management

### Part 3: Testing Strategies
- Validating loaded models
- Testing pipeline integrity
- Production readiness checks
- Model versioning and monitoring

## Setup and Import

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pickle
import joblib
import json
import os
from pathlib import Path
from datetime import datetime
from sklearn.datasets import load_iris, load_breast_cancer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

np.random.seed(42)

# Create models directory
os.makedirs('models', exist_ok=True)
print("✓ Libraries imported successfully")
print("✓ Models directory created")

## Part 1: Basic Model Persistence

### 1.1 Saving with joblib (Recommended)

In [None]:
# Train a model
iris = load_iris()
X, y = iris.data, iris.target

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)

print("Original Model:")
print(f"  Train Accuracy: {train_score:.4f}")
print(f"  Test Accuracy:  {test_score:.4f}")

# Save with joblib
model_path = 'models/iris_rf_model.joblib'
joblib.dump(model, model_path)
print(f"\n✓ Model saved to: {model_path}")

# Check file size
file_size = os.path.getsize(model_path) / 1024  # KB
print(f"  File size: {file_size:.2f} KB")

In [None]:
# Load model
loaded_model = joblib.load(model_path)

print("Loaded Model:")
print(f"  Type: {type(loaded_model)}")
print(f"  Parameters: {loaded_model.get_params()}")

# Verify it works
loaded_train_score = loaded_model.score(X_train, y_train)
loaded_test_score = loaded_model.score(X_test, y_test)

print(f"\nLoaded Model Performance:")
print(f"  Train Accuracy: {loaded_train_score:.4f}")
print(f"  Test Accuracy:  {loaded_test_score:.4f}")

# Verify exact match
assert train_score == loaded_train_score, "Train scores don't match!"
assert test_score == loaded_test_score, "Test scores don't match!"
print("\n✓ Model loaded successfully and produces identical results!")

### 1.2 Saving with pickle

In [None]:
# Save with pickle
pickle_path = 'models/iris_rf_model.pkl'

with open(pickle_path, 'wb') as f:
    pickle.dump(model, f)

print(f"✓ Model saved with pickle to: {pickle_path}")
print(f"  File size: {os.path.getsize(pickle_path) / 1024:.2f} KB")

# Load with pickle
with open(pickle_path, 'rb') as f:
    pickle_model = pickle.load(f)

# Verify
pickle_test_score = pickle_model.score(X_test, y_test)
print(f"\nPickle Model Test Accuracy: {pickle_test_score:.4f}")
print("✓ Model loaded from pickle successfully!")

### 1.3 joblib vs pickle Comparison

In [None]:
import time

# Train larger model for comparison
cancer = load_breast_cancer()
X_cancer, y_cancer = cancer.data, cancer.target
X_tr, X_te, y_tr, y_te = train_test_split(X_cancer, y_cancer, test_size=0.2, random_state=42)

large_model = RandomForestClassifier(n_estimators=500, random_state=42)
large_model.fit(X_tr, y_tr)

print("Comparing joblib vs pickle for larger model:")
print("="*70)

# joblib timing
start = time.time()
joblib.dump(large_model, 'models/large_model.joblib')
joblib_save_time = time.time() - start

start = time.time()
joblib.load('models/large_model.joblib')
joblib_load_time = time.time() - start

joblib_size = os.path.getsize('models/large_model.joblib') / (1024 * 1024)  # MB

# pickle timing
start = time.time()
with open('models/large_model.pkl', 'wb') as f:
    pickle.dump(large_model, f)
pickle_save_time = time.time() - start

start = time.time()
with open('models/large_model.pkl', 'rb') as f:
    pickle.load(f)
pickle_load_time = time.time() - start

pickle_size = os.path.getsize('models/large_model.pkl') / (1024 * 1024)  # MB

# Comparison table
comparison = pd.DataFrame({
    'Method': ['joblib', 'pickle'],
    'Save Time (s)': [joblib_save_time, pickle_save_time],
    'Load Time (s)': [joblib_load_time, pickle_load_time],
    'File Size (MB)': [joblib_size, pickle_size]
})

print(comparison.to_string(index=False))
print("\n💡 joblib is usually faster and more efficient for sklearn models!")

## Part 2: Pipeline Persistence

### 2.1 Saving Complete Pipelines

In [None]:
# Create and train pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])

print("Pipeline Structure:")
print(pipeline)

# Fit pipeline
pipeline.fit(X_train, y_train)
pipeline_score = pipeline.score(X_test, y_test)

print(f"\nPipeline Test Accuracy: {pipeline_score:.4f}")

# Save pipeline
pipeline_path = 'models/iris_pipeline.joblib'
joblib.dump(pipeline, pipeline_path)
print(f"\n✓ Pipeline saved to: {pipeline_path}")

# What gets saved?
print("\nWhat's saved in the pipeline:")
print("  1. StandardScaler with fitted mean and std")
print("  2. LogisticRegression with trained coefficients")
print("  3. All hyperparameters")
print("  4. Complete preprocessing logic")

In [None]:
# Load pipeline
loaded_pipeline = joblib.load(pipeline_path)

print("Loaded Pipeline:")
print(loaded_pipeline)

# Use on new data (simulated)
new_data = np.array([[5.1, 3.5, 1.4, 0.2]])  # Single sample
prediction = loaded_pipeline.predict(new_data)
probabilities = loaded_pipeline.predict_proba(new_data)

print(f"\nPrediction on new data:")
print(f"  Features: {new_data[0]}")
print(f"  Predicted class: {prediction[0]} ({iris.target_names[prediction[0]]})")
print(f"  Probabilities: {probabilities[0]}")

# Verify pipeline consistency
loaded_score = loaded_pipeline.score(X_test, y_test)
assert pipeline_score == loaded_score, "Pipeline scores don't match!"
print(f"\n✓ Loaded pipeline produces identical results: {loaded_score:.4f}")

### 2.2 Model Metadata and Versioning

In [None]:
# Create comprehensive model metadata
def save_model_with_metadata(model, X_train, X_test, y_train, y_test, 
                            model_name, version, description):
    """
    Save model with complete metadata for production use
    """
    # Create version directory
    model_dir = f'models/{model_name}/v{version}'
    os.makedirs(model_dir, exist_ok=True)
    
    # Save model
    model_path = f'{model_dir}/model.joblib'
    joblib.dump(model, model_path)
    
    # Calculate metrics
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)
    y_pred = model.predict(X_test)
    
    # Create metadata
    metadata = {
        'model_name': model_name,
        'version': version,
        'description': description,
        'created_at': datetime.now().isoformat(),
        'model_type': type(model).__name__,
        'parameters': model.get_params() if hasattr(model, 'get_params') else {},
        'training_data': {
            'n_train_samples': len(X_train),
            'n_test_samples': len(X_test),
            'n_features': X_train.shape[1],
            'n_classes': len(np.unique(y_train))
        },
        'performance': {
            'train_accuracy': float(train_score),
            'test_accuracy': float(test_score),
            'confusion_matrix': confusion_matrix(y_test, y_pred).tolist()
        },
        'sklearn_version': '1.0+',  # In real code, import sklearn.__version__
        'python_version': '3.9+'
    }
    
    # Save metadata as JSON
    metadata_path = f'{model_dir}/metadata.json'
    with open(metadata_path, 'w') as f:
        json.dump(metadata, f, indent=2)
    
    # Save sample predictions for validation
    sample_data = {
        'input': X_test[:5].tolist(),
        'expected_output': y_test[:5].tolist(),
        'predictions': model.predict(X_test[:5]).tolist()
    }
    
    sample_path = f'{model_dir}/sample_predictions.json'
    with open(sample_path, 'w') as f:
        json.dump(sample_data, f, indent=2)
    
    print(f"✓ Model saved with metadata:")
    print(f"  Model: {model_path}")
    print(f"  Metadata: {metadata_path}")
    print(f"  Samples: {sample_path}")
    
    return model_dir

# Save model with metadata
model_dir = save_model_with_metadata(
    pipeline,
    X_train, X_test, y_train, y_test,
    model_name='iris_classifier',
    version='1.0',
    description='Logistic regression with standard scaling for Iris classification'
)

print(f"\n✓ Complete model package created at: {model_dir}")

In [None]:
# Load model with metadata
def load_model_with_metadata(model_dir):
    """
    Load model and its metadata
    """
    # Load model
    model_path = f'{model_dir}/model.joblib'
    model = joblib.load(model_path)
    
    # Load metadata
    metadata_path = f'{model_dir}/metadata.json'
    with open(metadata_path, 'r') as f:
        metadata = json.load(f)
    
    # Load sample predictions
    sample_path = f'{model_dir}/sample_predictions.json'
    with open(sample_path, 'r') as f:
        samples = json.load(f)
    
    return model, metadata, samples

# Load and display
loaded_model, metadata, samples = load_model_with_metadata(model_dir)

print("Loaded Model Information:")
print("="*70)
print(f"Model: {metadata['model_name']} v{metadata['version']}")
print(f"Type: {metadata['model_type']}")
print(f"Created: {metadata['created_at']}")
print(f"Description: {metadata['description']}")
print(f"\nPerformance:")
print(f"  Train Accuracy: {metadata['performance']['train_accuracy']:.4f}")
print(f"  Test Accuracy: {metadata['performance']['test_accuracy']:.4f}")
print(f"\nTraining Data:")
print(f"  Samples: {metadata['training_data']['n_train_samples']} train, "
      f"{metadata['training_data']['n_test_samples']} test")
print(f"  Features: {metadata['training_data']['n_features']}")
print(f"  Classes: {metadata['training_data']['n_classes']}")

## Part 3: Testing Strategies

### 3.1 Model Validation Tests

In [None]:
def validate_loaded_model(model, metadata, samples, X_test, y_test):
    """
    Comprehensive validation of loaded model
    """
    print("Running Model Validation Tests...")
    print("="*70)
    
    tests_passed = 0
    tests_failed = 0
    
    # Test 1: Model type check
    print("\n[Test 1] Model Type Check")
    expected_type = metadata['model_type']
    actual_type = type(model).__name__
    if expected_type == actual_type or hasattr(model, 'named_steps'):
        print(f"  ✓ PASS: Model type matches ({actual_type})")
        tests_passed += 1
    else:
        print(f"  ✗ FAIL: Expected {expected_type}, got {actual_type}")
        tests_failed += 1
    
    # Test 2: Sample predictions match
    print("\n[Test 2] Sample Predictions Consistency")
    sample_inputs = np.array(samples['input'])
    expected_preds = np.array(samples['predictions'])
    actual_preds = model.predict(sample_inputs)
    
    if np.array_equal(expected_preds, actual_preds):
        print(f"  ✓ PASS: Predictions match saved samples")
        tests_passed += 1
    else:
        print(f"  ✗ FAIL: Predictions don't match")
        print(f"    Expected: {expected_preds}")
        print(f"    Got: {actual_preds}")
        tests_failed += 1
    
    # Test 3: Performance within tolerance
    print("\n[Test 3] Performance Validation")
    current_score = model.score(X_test, y_test)
    saved_score = metadata['performance']['test_accuracy']
    tolerance = 0.001
    
    if abs(current_score - saved_score) < tolerance:
        print(f"  ✓ PASS: Performance matches (current: {current_score:.4f}, saved: {saved_score:.4f})")
        tests_passed += 1
    else:
        print(f"  ✗ FAIL: Performance differs significantly")
        print(f"    Current: {current_score:.4f}")
        print(f"    Saved: {saved_score:.4f}")
        tests_failed += 1
    
    # Test 4: Input shape validation
    print("\n[Test 4] Input Shape Validation")
    expected_features = metadata['training_data']['n_features']
    test_features = X_test.shape[1]
    
    if expected_features == test_features:
        print(f"  ✓ PASS: Input shape matches ({test_features} features)")
        tests_passed += 1
    else:
        print(f"  ✗ FAIL: Input shape mismatch")
        print(f"    Expected: {expected_features} features")
        print(f"    Got: {test_features} features")
        tests_failed += 1
    
    # Test 5: Model has required methods
    print("\n[Test 5] Required Methods Check")
    required_methods = ['predict', 'score']
    missing_methods = [m for m in required_methods if not hasattr(model, m)]
    
    if not missing_methods:
        print(f"  ✓ PASS: All required methods present")
        tests_passed += 1
    else:
        print(f"  ✗ FAIL: Missing methods: {missing_methods}")
        tests_failed += 1
    
    # Summary
    print("\n" + "="*70)
    print("VALIDATION SUMMARY")
    print("="*70)
    print(f"Tests Passed: {tests_passed}/{tests_passed + tests_failed}")
    print(f"Tests Failed: {tests_failed}/{tests_passed + tests_failed}")
    
    if tests_failed == 0:
        print("\n✓ All validation tests passed! Model is ready for use.")
    else:
        print("\n⚠ Some validation tests failed. Review before using in production.")
    
    return tests_failed == 0

# Run validation
is_valid = validate_loaded_model(loaded_model, metadata, samples, X_test, y_test)

### 3.2 Production Readiness Checks

In [None]:
def production_readiness_check(model, metadata):
    """
    Check if model is ready for production deployment
    """
    print("Production Readiness Checklist")
    print("="*70)
    
    checks = []
    
    # Check 1: Performance threshold
    test_acc = metadata['performance']['test_accuracy']
    min_accuracy = 0.85  # 85% minimum
    status = "✓" if test_acc >= min_accuracy else "✗"
    checks.append({
        'Check': 'Performance Threshold',
        'Status': status,
        'Details': f"{test_acc:.1%} (min: {min_accuracy:.1%})"
    })
    
    # Check 2: Metadata completeness
    required_fields = ['model_name', 'version', 'created_at', 'model_type', 'performance']
    has_all_fields = all(field in metadata for field in required_fields)
    status = "✓" if has_all_fields else "✗"
    checks.append({
        'Check': 'Metadata Completeness',
        'Status': status,
        'Details': 'All required fields present' if has_all_fields else 'Missing fields'
    })
    
    # Check 3: Training data sufficiency
    n_train = metadata['training_data']['n_train_samples']
    n_features = metadata['training_data']['n_features']
    min_samples = n_features * 10  # Rule of thumb: 10x features
    status = "✓" if n_train >= min_samples else "⚠"
    checks.append({
        'Check': 'Training Data Sufficiency',
        'Status': status,
        'Details': f"{n_train} samples, {n_features} features"
    })
    
    # Check 4: Model size (for deployment)
    model_size = 0.5  # MB (placeholder)
    max_size = 100  # 100 MB limit
    status = "✓" if model_size < max_size else "⚠"
    checks.append({
        'Check': 'Model Size',
        'Status': status,
        'Details': f"{model_size:.1f} MB (max: {max_size} MB)"
    })
    
    # Check 5: Prediction methods available
    has_predict = hasattr(model, 'predict')
    has_proba = hasattr(model, 'predict_proba')
    status = "✓" if has_predict else "✗"
    details = "predict" + (", predict_proba" if has_proba else "")
    checks.append({
        'Check': 'Prediction Methods',
        'Status': status,
        'Details': details
    })
    
    # Check 6: Version control
    has_version = 'version' in metadata and metadata['version'] is not None
    status = "✓" if has_version else "✗"
    checks.append({
        'Check': 'Version Control',
        'Status': status,
        'Details': f"v{metadata['version']}" if has_version else 'No version'
    })
    
    # Display results
    checks_df = pd.DataFrame(checks)
    print(checks_df.to_string(index=False))
    
    # Overall assessment
    passed = sum(1 for c in checks if c['Status'] == '✓')
    total = len(checks)
    
    print("\n" + "="*70)
    print(f"Overall: {passed}/{total} checks passed")
    
    if passed == total:
        print("\n✓ Model is production-ready!")
    elif passed >= total * 0.8:
        print("\n⚠ Model is mostly ready, but review warnings")
    else:
        print("\n✗ Model needs improvements before production deployment")
    
    return passed == total

# Run production checks
is_production_ready = production_readiness_check(loaded_model, metadata)

### 3.3 Inference Testing

In [None]:
def test_inference_pipeline(model, n_samples=100):
    """
    Test model inference with various inputs
    """
    print("Testing Inference Pipeline")
    print("="*70)
    
    # Test 1: Single sample prediction
    print("\n[Test 1] Single Sample Prediction")
    try:
        single_input = X_test[0:1]
        pred = model.predict(single_input)
        print(f"  ✓ Single prediction works: {pred[0]}")
    except Exception as e:
        print(f"  ✗ Single prediction failed: {e}")
    
    # Test 2: Batch prediction
    print("\n[Test 2] Batch Prediction")
    try:
        batch_input = X_test[:10]
        preds = model.predict(batch_input)
        print(f"  ✓ Batch prediction works: {len(preds)} predictions")
    except Exception as e:
        print(f"  ✗ Batch prediction failed: {e}")
    
    # Test 3: Probability predictions (if available)
    print("\n[Test 3] Probability Predictions")
    if hasattr(model, 'predict_proba'):
        try:
            proba = model.predict_proba(X_test[:5])
            print(f"  ✓ Probability predictions work")
            print(f"    Shape: {proba.shape}")
            print(f"    Sum to 1: {np.allclose(proba.sum(axis=1), 1.0)}")
        except Exception as e:
            print(f"  ✗ Probability prediction failed: {e}")
    else:
        print("  - Probability predictions not available")
    
    # Test 4: Edge cases
    print("\n[Test 4] Edge Cases")
    
    # Minimum values
    try:
        min_input = np.min(X_test, axis=0).reshape(1, -1)
        pred = model.predict(min_input)
        print(f"  ✓ Minimum values: prediction = {pred[0]}")
    except Exception as e:
        print(f"  ✗ Minimum values failed: {e}")
    
    # Maximum values
    try:
        max_input = np.max(X_test, axis=0).reshape(1, -1)
        pred = model.predict(max_input)
        print(f"  ✓ Maximum values: prediction = {pred[0]}")
    except Exception as e:
        print(f"  ✗ Maximum values failed: {e}")
    
    # Test 5: Performance timing
    print("\n[Test 5] Inference Speed")
    import time
    
    n_iterations = 100
    start = time.time()
    for _ in range(n_iterations):
        model.predict(X_test[:1])
    elapsed = time.time() - start
    
    avg_time = (elapsed / n_iterations) * 1000  # milliseconds
    print(f"  Average prediction time: {avg_time:.3f} ms")
    
    if avg_time < 10:
        print("  ✓ Very fast (< 10ms)")
    elif avg_time < 100:
        print("  ✓ Fast enough for most applications")
    else:
        print("  ⚠ Slower than expected, may need optimization")
    
    print("\n✓ Inference testing complete!")

# Run inference tests
test_inference_pipeline(loaded_model)

## Part 4: Model Versioning System

In [None]:
class ModelVersionManager:
    """
    Simple model version management system
    """
    def __init__(self, base_dir='models'):
        self.base_dir = base_dir
        os.makedirs(base_dir, exist_ok=True)
    
    def save_version(self, model, model_name, version, metadata):
        """Save a new model version"""
        version_dir = f"{self.base_dir}/{model_name}/v{version}"
        os.makedirs(version_dir, exist_ok=True)
        
        # Save model
        model_path = f"{version_dir}/model.joblib"
        joblib.dump(model, model_path)
        
        # Save metadata
        metadata['version'] = version
        metadata['created_at'] = datetime.now().isoformat()
        metadata_path = f"{version_dir}/metadata.json"
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)
        
        print(f"✓ Saved {model_name} v{version}")
        return version_dir
    
    def load_version(self, model_name, version):
        """Load specific model version"""
        version_dir = f"{self.base_dir}/{model_name}/v{version}"
        model_path = f"{version_dir}/model.joblib"
        metadata_path = f"{version_dir}/metadata.json"
        
        model = joblib.load(model_path)
        with open(metadata_path, 'r') as f:
            metadata = json.load(f)
        
        return model, metadata
    
    def list_versions(self, model_name):
        """List all versions of a model"""
        model_dir = f"{self.base_dir}/{model_name}"
        if not os.path.exists(model_dir):
            return []
        
        versions = []
        for item in os.listdir(model_dir):
            if item.startswith('v'):
                version = item[1:]  # Remove 'v' prefix
                metadata_path = f"{model_dir}/{item}/metadata.json"
                if os.path.exists(metadata_path):
                    with open(metadata_path, 'r') as f:
                        metadata = json.load(f)
                    versions.append({
                        'version': version,
                        'created_at': metadata.get('created_at', 'Unknown'),
                        'test_accuracy': metadata.get('performance', {}).get('test_accuracy', 0)
                    })
        
        return sorted(versions, key=lambda x: x['version'])
    
    def compare_versions(self, model_name, versions):
        """Compare multiple versions"""
        comparison = []
        for version in versions:
            try:
                _, metadata = self.load_version(model_name, version)
                comparison.append({
                    'Version': version,
                    'Created': metadata.get('created_at', 'Unknown')[:10],
                    'Test Accuracy': metadata.get('performance', {}).get('test_accuracy', 0),
                    'Model Type': metadata.get('model_type', 'Unknown')
                })
            except:
                pass
        
        return pd.DataFrame(comparison)

# Example usage
version_manager = ModelVersionManager()

# Save multiple versions
print("Saving multiple model versions...\n")

# Version 1.0 - Logistic Regression
lr_model = LogisticRegression(max_iter=1000, random_state=42)
lr_model.fit(X_train, y_train)
version_manager.save_version(
    lr_model, 'iris_model', '1.0',
    {'model_type': 'LogisticRegression', 'performance': {'test_accuracy': lr_model.score(X_test, y_test)}}
)

# Version 2.0 - Random Forest
rf_model = RandomForestClassifier(n_estimators=50, random_state=42)
rf_model.fit(X_train, y_train)
version_manager.save_version(
    rf_model, 'iris_model', '2.0',
    {'model_type': 'RandomForestClassifier', 'performance': {'test_accuracy': rf_model.score(X_test, y_test)}}
)

# Version 2.1 - Random Forest (improved)
rf_model2 = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model2.fit(X_train, y_train)
version_manager.save_version(
    rf_model2, 'iris_model', '2.1',
    {'model_type': 'RandomForestClassifier', 'performance': {'test_accuracy': rf_model2.score(X_test, y_test)}}
)

# List all versions
print("\nAvailable versions:")
versions = version_manager.list_versions('iris_model')
for v in versions:
    print(f"  v{v['version']}: {v['test_accuracy']:.4f} accuracy (created: {v['created_at'][:10]})")

# Compare versions
print("\nVersion Comparison:")
comparison_df = version_manager.compare_versions('iris_model', ['1.0', '2.0', '2.1'])
print(comparison_df.to_string(index=False))

## Best Practices Summary

### Saving Models

```python
# ✓ DO: Use joblib for sklearn models
joblib.dump(model, 'model.joblib')

# ✓ DO: Save entire pipelines, not just models
pipeline = Pipeline([('scaler', StandardScaler()), ('model', model)])
joblib.dump(pipeline, 'pipeline.joblib')

# ✓ DO: Include version and metadata
metadata = {
    'version': '1.0',
    'created_at': datetime.now().isoformat(),
    'performance': {'test_accuracy': score}
}

# ✗ DON'T: Forget to save preprocessing steps
# ✗ DON'T: Hardcode file paths without version control
```

### Loading Models

```python
# ✓ DO: Validate after loading
loaded_model = joblib.load('model.joblib')
assert loaded_model.score(X_test, y_test) == expected_score

# ✓ DO: Check model type and version
# ✓ DO: Test with sample predictions

# ✗ DON'T: Use without validation
# ✗ DON'T: Assume compatibility across sklearn versions
```

### Versioning

1. **Use semantic versioning**: major.minor.patch (e.g., 1.2.3)
2. **Save metadata**: performance, training data info, hyperparameters
3. **Track dependencies**: sklearn version, Python version
4. **Keep multiple versions**: For rollback capability
5. **Document changes**: What improved between versions

### Testing

1. **Model type validation**: Verify correct model loaded
2. **Performance validation**: Check accuracy matches metadata
3. **Sample prediction tests**: Ensure predictions are consistent
4. **Input validation**: Test with various input shapes
5. **Edge case testing**: Min/max values, empty inputs
6. **Inference speed testing**: Ensure acceptable latency

### Production Deployment

```python
# Checklist before deployment:
✓ Model performance meets threshold
✓ Complete metadata available
✓ Validation tests pass
✓ Inference speed acceptable
✓ Version control in place
✓ Rollback plan ready
✓ Monitoring configured
```

### Common Pitfalls

- ❌ Saving model without preprocessing steps
- ❌ No version control or metadata
- ❌ Not testing loaded model before use
- ❌ Using pickle instead of joblib for sklearn
- ❌ Hardcoding paths instead of relative paths
- ❌ No validation or testing strategy
- ❌ Forgetting about sklearn version compatibility

In [None]:
# Cleanup (optional)
import shutil

print("Model persistence demonstration complete!")
print("\nGenerated files:")
for root, dirs, files in os.walk('models'):
    level = root.replace('models', '').count(os.sep)
    indent = ' ' * 2 * level
    print(f"{indent}{os.path.basename(root)}/")
    subindent = ' ' * 2 * (level + 1)
    for file in files:
        print(f"{subindent}{file}")

# Uncomment to clean up
# shutil.rmtree('models')
# print("\n✓ Cleaned up models directory")