### Project Overview: House Price Prediction

For this tutorial, we'll build a house price prediction pipeline using the California Housing dataset. This project demonstrates all essential ML pipeline components while solving a practical real-world problem.

#### Business Problem


Predict house prices based on location, house characteristics, and demographic data to help real estate professionals make informed decisions.

###### Step 1: Environment Setup and Data Ingestion

pip install pandas numpy matplotlib seaborn scikit-learn joblib jupyter

In [None]:
# Required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.pipeline import Pipeline
import joblib
import warnings
import os
from datetime import datetime

warnings.filterwarnings('ignore')
plt.style.use('default')
sns.set_palette("husl")

print("ML Pipeline Environment Ready!")
print(f"Python version: {sys.version}")
print(f"Pandas version: {pd.__version__}")
print(f"Scikit-learn version: {sklearn.__version__}")

###### Data Ingestion Pipeline Class

In [None]:
class DataIngestion:
    """Handles data loading and initial validation"""

    def __init__(self, data_path=None):
        self.raw_data = None
        self.feature_names = None
        self.target_name = None
        self.data_path = data_path

    def load_california_housing_data(self):
        """Load California housing dataset"""
        try:
            # Load dataset
            california_housing = fetch_california_housing()

            # Create DataFrame
            self.raw_data = pd.DataFrame(
                california_housing.data,
                columns=california_housing.feature_names
            )
            self.raw_data['target'] = california_housing.target
            self.feature_names = list(california_housing.feature_names)
            self.target_name = 'target'

            print("Data loaded successfully!")
            print(f"Dataset shape: {self.raw_data.shape}")
            return self.raw_data

        except Exception as e:
            print(f"Error loading data: {str(e)}")
            return None

    def load_custom_data(self, file_path):
        """Load custom dataset from CSV file"""
        try:
            if file_path.endswith('.csv'):
                self.raw_data = pd.read_csv(file_path)
            elif file_path.endswith('.xlsx'):
                self.raw_data = pd.read_excel(file_path)
            else:
                raise ValueError("Unsupported file format. Use CSV or Excel.")

            print(f"Custom data loaded successfully!")
            print(f"Dataset shape: {self.raw_data.shape}")
            return self.raw_data

        except Exception as e:
            print(f"Error loading custom data: {str(e)}")
            return None

    def basic_data_info(self):
        """Display basic information about the dataset"""
        if self.raw_data is not None:
            print("\nDataset Information:")
            print(f"Shape: {self.raw_data.shape}")
            print(f"Memory usage: {self.raw_data.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
            print("\nColumn Information:")
            print(self.raw_data.info())

            print("\nFirst 5 rows:")
            print(self.raw_data.head())

            print("\nBasic Statistics:")
            print(self.raw_data.describe())

            return True
        return False

    def check_data_quality(self):
        """Check for data quality issues"""
        if self.raw_data is None:
            return False

        print("\nData Quality Check:")

        # Check for missing values
        missing_values = self.raw_data.isnull().sum()
        if missing_values.sum() > 0:
            print("Missing Values Found:")
            print(missing_values[missing_values > 0])
        else:
            print("No missing values found")

        # Check for duplicates
        duplicates = self.raw_data.duplicated().sum()
        print(f"Duplicate rows: {duplicates}")

        # Check for infinite values
        numeric_cols = self.raw_data.select_dtypes(include=[np.number]).columns
        inf_values = np.isinf(self.raw_data[numeric_cols]).sum().sum()
        print(f"Infinite values: {inf_values}")

        return True

# Initialize and load data
data_ingestion = DataIngestion()
df = data_ingestion.load_california_housing_data()
data_ingestion.basic_data_info()
data_ingestion.check_data_quality()

##### Step 2: Data Exploration and Analysis

##### Exploratory Data Analysis Class

In [None]:
class DataExploration:
    """Comprehensive data exploration and visualization"""

    def __init__(self, data, target_column, feature_columns):
        self.data = data.copy()
        self.target_column = target_column
        self.feature_columns = feature_columns

    def statistical_summary(self):
        """Generate comprehensive statistical summary"""
        print("Statistical Summary:")
        print("=" * 50)

        # Basic statistics
        print("\nDescriptive Statistics:")
        print(self.data[self.feature_columns + [self.target_column]].describe())

        # Correlation with target
        correlations = self.data[self.feature_columns].corrwith(self.data[self.target_column])
        print(f"\nCorrelations with {self.target_column}:")
        print(correlations.sort_values(ascending=False))

    def visualize_distributions(self):
        """Create distribution plots for all features"""
        n_features = len(self.feature_columns)
        n_cols = 3
        n_rows = (n_features + n_cols - 1) // n_cols

        plt.figure(figsize=(15, 5 * n_rows))

        for i, column in enumerate(self.feature_columns):
            plt.subplot(n_rows, n_cols, i + 1)
            plt.hist(self.data[column], bins=30, alpha=0.7, edgecolor='black')
            plt.title(f'Distribution of {column}')
            plt.xlabel(column)
            plt.ylabel('Frequency')

        plt.tight_layout()
        plt.show()

    def correlation_heatmap(self):
        """Create correlation heatmap"""
        plt.figure(figsize=(12, 10))
        correlation_matrix = self.data[self.feature_columns + [self.target_column]].corr()

        sns.heatmap(correlation_matrix,
                   annot=True,
                   cmap='coolwarm',
                   center=0,
                   square=True,
                   fmt='.2f')
        plt.title('Feature Correlation Heatmap')
        plt.tight_layout()
        plt.show()

    def target_analysis(self):
        """Analyze target variable"""
        plt.figure(figsize=(15, 5))

        # Target distribution
        plt.subplot(1, 3, 1)
        plt.hist(self.data[self.target_column], bins=50, alpha=0.7, edgecolor='black')
        plt.title(f'Distribution of {self.target_column}')
        plt.xlabel(self.target_column)
        plt.ylabel('Frequency')

        # Box plot
        plt.subplot(1, 3, 2)
        plt.boxplot(self.data[self.target_column])
        plt.title(f'Box Plot of {self.target_column}')
        plt.ylabel(self.target_column)

        # Q-Q plot
        plt.subplot(1, 3, 3)
        from scipy import stats
        stats.probplot(self.data[self.target_column], dist="norm", plot=plt)
        plt.title(f'Q-Q Plot of {self.target_column}')

        plt.tight_layout()
        plt.show()

    def feature_target_relationships(self):
        """Visualize relationships between features and target"""
        n_features = len(self.feature_columns)
        n_cols = 3
        n_rows = (n_features + n_cols - 1) // n_cols

        plt.figure(figsize=(15, 5 * n_rows))

        for i, column in enumerate(self.feature_columns):
            plt.subplot(n_rows, n_cols, i + 1)
            plt.scatter(self.data[column], self.data[self.target_column], alpha=0.5)
            plt.xlabel(column)
            plt.ylabel(self.target_column)
            plt.title(f'{column} vs {self.target_column}')

            # Add trend line
            z = np.polyfit(self.data[column], self.data[self.target_column], 1)
            p = np.poly1d(z)
            plt.plot(self.data[column], p(self.data[column]), "r--", alpha=0.8)

        plt.tight_layout()
        plt.show()

# Perform data exploration
explorer = DataExploration(df, 'target', data_ingestion.feature_names)
explorer.statistical_summary()
explorer.visualize_distributions()
explorer.correlation_heatmap()
explorer.target_analysis()
explorer.feature_target_relationships()

##### Step 3: Data Preprocessing Pipeline

###### Comprehensive Data Preprocessing Class

In [None]:
class DataPreprocessor:
    """Handle all data preprocessing steps"""

    def __init__(self, target_column):
        self.target_column = target_column
        self.scaler = None
        self.feature_columns = None
        self.preprocessing_steps = []

    def handle_missing_values(self, data, strategy='mean'):
        """Handle missing values in the dataset"""
        print("Handling missing values...")

        if data.isnull().sum().sum() == 0:
            print("No missing values found")
            return data

        numeric_columns = data.select_dtypes(include=[np.number]).columns
        categorical_columns = data.select_dtypes(include=['object']).columns

        data_processed = data.copy()

        # Handle numeric columns
        for col in numeric_columns:
            if data_processed[col].isnull().sum() > 0:
                if strategy == 'mean':
                    fill_value = data_processed[col].mean()
                elif strategy == 'median':
                    fill_value = data_processed[col].median()
                elif strategy == 'mode':
                    fill_value = data_processed[col].mode()[0]
                else:
                    fill_value = 0

                data_processed[col].fillna(fill_value, inplace=True)
                print(f"Filled {col} missing values with {strategy}: {fill_value:.2f}")

        # Handle categorical columns
        for col in categorical_columns:
            if data_processed[col].isnull().sum() > 0:
                fill_value = data_processed[col].mode()[0]
                data_processed[col].fillna(fill_value, inplace=True)
                print(f"Filled {col} missing values with mode: {fill_value}")

        self.preprocessing_steps.append(f"Missing values handled using {strategy} strategy")
        return data_processed

    def remove_outliers(self, data, method='iqr', threshold=1.5):
        """Remove outliers from the dataset"""
        print(f"Removing outliers using {method} method...")

        data_processed = data.copy()
        numeric_columns = data_processed.select_dtypes(include=[np.number]).columns
        numeric_columns = [col for col in numeric_columns if col != self.target_column]

        initial_shape = data_processed.shape[0]

        if method == 'iqr':
            for column in numeric_columns:
                Q1 = data_processed[column].quantile(0.25)
                Q3 = data_processed[column].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR

                outliers_mask = (data_processed[column] >= lower_bound) & (data_processed[column] <= upper_bound)
                data_processed = data_processed[outliers_mask]

        elif method == 'zscore':
            from scipy import stats
            for column in numeric_columns:
                z_scores = np.abs(stats.zscore(data_processed[column]))
                data_processed = data_processed[z_scores < threshold]

        final_shape = data_processed.shape[0]
        removed_count = initial_shape - final_shape

        print(f"Removed {removed_count} outliers ({removed_count/initial_shape*100:.2f}% of data)")
        self.preprocessing_steps.append(f"Outliers removed using {method} method: {removed_count} rows")

        return data_processed

    def scale_features(self, X_train, X_test, method='standard'):
        """Scale features using specified method"""
        print(f"Scaling features using {method} scaling...")

        if method == 'standard':
            self.scaler = StandardScaler()
        elif method == 'robust':
            self.scaler = RobustScaler()
        else:
            raise ValueError("Unsupported scaling method")

        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        # Convert back to DataFrame
        X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns, index=X_train.index)
        X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_test.columns, index=X_test.index)

        self.preprocessing_steps.append(f"Features scaled using {method} scaling")

        return X_train_scaled, X_test_scaled

    def create_train_test_split(self, data, test_size=0.2, random_state=42):
        """Create train-test split"""
        print(f"Creating train-test split (test_size={test_size})...")

        # Separate features and target
        X = data.drop(columns=[self.target_column])
        y = data[self.target_column]

        self.feature_columns = X.columns.tolist()

        # Split the data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y,
            test_size=test_size,
            random_state=random_state,
            shuffle=True
        )

        print(f"Training set shape: {X_train.shape}")
        print(f"Test set shape: {X_test.shape}")

        self.preprocessing_steps.append(f"Data split into train ({1-test_size:.0%}) and test ({test_size:.0%})")

        return X_train, X_test, y_train, y_test

    def get_preprocessing_summary(self):
        """Get summary of all preprocessing steps"""
        print("\nPreprocessing Summary:")
        print("=" * 50)
        for i, step in enumerate(self.preprocessing_steps, 1):
            print(f"{i}. {step}")

# Apply preprocessing
preprocessor = DataPreprocessor('target')

# Handle missing values and outliers
df_processed = preprocessor.handle_missing_values(df, strategy='median')
df_processed = preprocessor.remove_outliers(df_processed, method='iqr', threshold=1.5)

# Create train-test split
X_train, X_test, y_train, y_test = preprocessor.create_train_test_split(df_processed)

# Scale features
X_train_scaled, X_test_scaled = preprocessor.scale_features(X_train, X_test, method='standard')

# Get preprocessing summary
preprocessor.get_preprocessing_summary()

##### Step 4: Feature Engineering

###### Advanced Feature Engineering Class

In [None]:
class FeatureEngineer:
    """Create and transform features for better model performance"""

    def __init__(self):
        self.new_features = []
        self.feature_importance = {}

    def create_polynomial_features(self, X, degree=2, interaction_only=False):
        """Create polynomial features"""
        from sklearn.preprocessing import PolynomialFeatures

        print(f"Creating polynomial features (degree={degree})...")

        poly = PolynomialFeatures(degree=degree, interaction_only=interaction_only, include_bias=False)
        X_poly = poly.fit_transform(X)

        # Get feature names
        feature_names = poly.get_feature_names_out(X.columns)
        X_poly_df = pd.DataFrame(X_poly, columns=feature_names, index=X.index)

        self.new_features.append(f"Polynomial features (degree={degree})")
        print(f"Created {X_poly_df.shape[1] - X.shape[1]} new polynomial features")

        return X_poly_df

    def create_domain_specific_features(self, X):
        """Create domain-specific features for housing data"""
        print("Creating domain-specific features...")

        X_engineered = X.copy()

        # Rooms per person
        if 'AveRooms' in X.columns and 'AveOccup' in X.columns:
            X_engineered['RoomsPerPerson'] = X_engineered['AveRooms'] / X_engineered['AveOccup']
            self.new_features.append('RoomsPerPerson')

        # Bedrooms ratio
        if 'AveBedrms' in X.columns and 'AveRooms' in X.columns:
            X_engineered['BedroomsRatio'] = X_engineered['AveBedrms'] / X_engineered['AveRooms']
            self.new_features.append('BedroomsRatio')

        # Population density
        if 'Population' in X.columns and 'AveOccup' in X.columns:
            X_engineered['PopulationDensity'] = X_engineered['Population'] / X_engineered['AveOccup']
            self.new_features.append('PopulationDensity')

        # Income per room
        if 'MedInc' in X.columns and 'AveRooms' in X.columns:
            X_engineered['IncomePerRoom'] = X_engineered['MedInc'] / X_engineered['AveRooms']
            self.new_features.append('IncomePerRoom')

        # Age categories
        if 'HouseAge' in X.columns:
            X_engineered['AgeCategory'] = pd.cut(X_engineered['HouseAge'],
                                               bins=[0, 10, 25, 40, float('inf')],
                                               labels=['New', 'Recent', 'Mature', 'Old'])
            X_engineered['AgeCategory'] = X_engineered['AgeCategory'].astype(str)
            self.new_features.append('AgeCategory')

        # Location clustering (simplified)
        if 'Latitude' in X.columns and 'Longitude' in X.columns:
            # Create location clusters based on lat/long
            from sklearn.cluster import KMeans
            location_features = X_engineered[['Latitude', 'Longitude']]
            kmeans = KMeans(n_clusters=5, random_state=42)
            X_engineered['LocationCluster'] = kmeans.fit_predict(location_features)
            self.new_features.append('LocationCluster')

        print(f"Created {len(self.new_features)} domain-specific features")
        return X_engineered

    def select_features(self, X, y, method='correlation', k=10):
        """Select best features using specified method"""
        from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression

        print(f"Selecting top {k} features using {method} method...")

        # Handle categorical features
        X_numeric = X.select_dtypes(include=[np.number])

        if method == 'correlation':
            # Use correlation with target
            correlations = X_numeric.corrwith(y).abs()
            top_features = correlations.nlargest(k).index.tolist()

        elif method == 'f_regression':
            selector = SelectKBest(score_func=f_regression, k=k)
            selector.fit(X_numeric, y)
            top_features = X_numeric.columns[selector.get_support()].tolist()

        elif method == 'mutual_info':
            selector = SelectKBest(score_func=mutual_info_regression, k=k)
            selector.fit(X_numeric, y)
            top_features = X_numeric.columns[selector.get_support()].tolist()

        print(f"Selected features: {top_features}")
        return top_features

    def get_feature_engineering_summary(self):
        """Get summary of feature engineering steps"""
        print("\nFeature Engineering Summary:")
        print("=" * 50)
        for i, feature in enumerate(self.new_features, 1):
            print(f"{i}. {feature}")

# Apply feature engineering
feature_engineer = FeatureEngineer()

# Create domain-specific features
X_train_engineered = feature_engineer.create_domain_specific_features(X_train_scaled)
X_test_engineered = feature_engineer.create_domain_specific_features(X_test_scaled)

# Handle categorical features for modeling
from sklearn.preprocessing import LabelEncoder

categorical_columns = X_train_engineered.select_dtypes(include=['object']).columns
for col in categorical_columns:
    le = LabelEncoder()
    X_train_engineered[col] = le.fit_transform(X_train_engineered[col].astype(str))
    X_test_engineered[col] = le.transform(X_test_engineered[col].astype(str))

# Select best features
numeric_features = X_train_engineered.select_dtypes(include=[np.number])
top_features = feature_engineer.select_features(numeric_features, y_train, method='correlation', k=15)

# Keep only selected features
X_train_final = X_train_engineered[top_features]
X_test_final = X_test_engineered[top_features]

print(f"\nFinal feature set shape: {X_train_final.shape}")
feature_engineer.get_feature_engineering_summary()

##### Step 5: Model Training and Validation

###### Comprehensive Model Training Pipeline

In [None]:
class ModelTrainer:
    """Train and validate multiple machine learning models"""

    def __init__(self):
        self.models = {}
        self.trained_models = {}
        self.model_scores = {}
        self.best_model = None
        self.best_model_name = None

    def initialize_models(self):
        """Initialize different models for comparison"""
        self.models = {
            'Linear Regression': LinearRegression(),
            'Ridge Regression': Ridge(random_state=42),
            'Random Forest': RandomForestRegressor(random_state=42, n_jobs=-1),
            'Gradient Boosting': GradientBoostingRegressor(random_state=42)
        }

        print(f"Initialized {len(self.models)} models for training")

    def train_models(self, X_train, y_train):
        """Train all models"""
        print("Training models...")

        for name, model in self.models.items():
            print(f"Training {name}...")
            model.fit(X_train, y_train)
            self.trained_models[name] = model

        print("All models trained successfully!")

    def cross_validate_models(self, X_train, y_train, cv=5):
        """Perform cross-validation for all models"""
        print(f"Performing {cv}-fold cross-validation...")

        cv_results = {}

        for name, model in self.models.items():
            # Perform cross-validation
            cv_scores = cross_val_score(model, X_train, y_train,
                                      cv=cv, scoring='neg_mean_squared_error')

            cv_results[name] = {
                'CV_MSE_Mean': -cv_scores.mean(),
                'CV_MSE_Std': cv_scores.std(),
                'CV_RMSE_Mean': np.sqrt(-cv_scores.mean()),
                'CV_Scores': cv_scores
            }

            print(f"{name}: CV RMSE = {np.sqrt(-cv_scores.mean()):.4f} (+/- {cv_scores.std()*2:.4f})")

        self.cv_results = cv_results
        return cv_results

    def hyperparameter_tuning(self, X_train, y_train, model_name='Random Forest'):
        """Perform hyperparameter tuning for specified model"""
        print(f"Performing hyperparameter tuning for {model_name}...")

        if model_name == 'Random Forest':
            param_grid = {
                'n_estimators': [100, 200, 300],
                'max_depth': [10, 20, None],
                'min_samples_split': [2, 5, 10],
                'min_samples_leaf': [1, 2, 4]
            }
            model = RandomForestRegressor(random_state=42, n_jobs=-1)

        elif model_name == 'Gradient Boosting':
            param_grid = {
                'n_estimators': [100, 200, 300],
                'learning_rate': [0.05, 0.1, 0.15],
                'max_depth': [3, 5, 7],
                'subsample': [0.8, 0.9, 1.0]
            }
            model = GradientBoostingRegressor(random_state=42)

        else:
            print(f"Hyperparameter tuning not implemented for {model_name}")
            return None

        # Perform grid search
        grid_search = GridSearchCV(
            model, param_grid,
            cv=3, scoring='neg_mean_squared_error',
            n_jobs=-1, verbose=1
        )

        grid_search.fit(X_train, y_train)

        print(f"Best parameters for {model_name}:")
        print(grid_search.best_params_)
        print(f"Best CV score: {np.sqrt(-grid_search.best_score_):.4f}")

        # Update the model with best parameters
        self.trained_models[f"{model_name}_Tuned"] = grid_search.best_estimator_

        return

##### Setp 6: Model Evaluation and Selection

###### Advanced Model Evaluation

In [None]:
class ModelEvaluator:
    """Comprehensive model evaluation and analysis"""

    def __init__(self, model, model_name):
        self.model = model
        self.model_name = model_name

    def detailed_evaluation(self, X_test, y_test):
        """Perform detailed model evaluation"""
        print(f"Detailed Evaluation for {self.model_name}")
        print("=" * 50)

        # Make predictions
        y_pred = self.model.predict(X_test)

        # Calculate all metrics
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)

        # Calculate additional metrics
        mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100

        print(f"Mean Squared Error (MSE): {mse:.4f}")
        print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")
        print(f"Mean Absolute Error (MAE): {mae:.4f}")
        print(f"R² Score: {r2:.4f}")
        print(f"Mean Absolute Percentage Error (MAPE): {mape:.2f}%")

        # Residual analysis
        residuals = y_test - y_pred
        print(f"\nResidual Analysis:")
        print(f"Mean Residual: {np.mean(residuals):.4f}")
        print(f"Std Residual: {np.std(residuals):.4f}")

        return {
            'predictions': y_pred,
            'residuals': residuals,
            'metrics': {
                'MSE': mse, 'RMSE': rmse, 'MAE': mae,
                'R2': r2, 'MAPE': mape
            }
        }

    def plot_predictions(self, y_test, y_pred):
        """Plot actual vs predicted values"""
        plt.figure(figsize=(15, 5))

        # Actual vs Predicted scatter plot
        plt.subplot(1, 3, 1)
        plt.scatter(y_test, y_pred, alpha=0.5)
        plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
        plt.xlabel('Actual Values')
        plt.ylabel('Predicted Values')
        plt.title(f'{self.model_name}: Actual vs Predicted')

        # Residual plot
        plt.subplot(1, 3, 2)
        residuals = y_test - y_pred
        plt.scatter(y_pred, residuals, alpha=0.5)
        plt.axhline(y=0, color='r', linestyle='--')
        plt.xlabel('Predicted Values')
        plt.ylabel('Residuals')
        plt.title(f'{self.model_name}: Residual Plot')

        # Residual distribution
        plt.subplot(1, 3, 3)
        plt.hist(residuals, bins=30, alpha=0.7, edgecolor='black')
        plt.xlabel('Residuals')
        plt.ylabel('Frequency')
        plt.title(f'{self.model_name}: Residual Distribution')

        plt.tight_layout()
        plt.show()

    def learning_curves(self, X_train, y_train, cv=5):
        """Plot learning curves to check for overfitting"""
        from sklearn.model_selection import learning_curve

        print("Generating learning curves...")

        train_sizes, train_scores, val_scores = learning_curve(
            self.model, X_train, y_train, cv=cv,
            train_sizes=np.linspace(0.1, 1.0, 10),
            scoring='neg_mean_squared_error', n_jobs=-1
        )

        # Calculate mean and std
        train_rmse_mean = np.sqrt(-train_scores.mean(axis=1))
        train_rmse_std = np.sqrt(train_scores.std(axis=1))
        val_rmse_mean = np.sqrt(-val_scores.mean(axis=1))
        val_rmse_std = np.sqrt(val_scores.std(axis=1))

        # Plot learning curves
        plt.figure(figsize=(10, 6))
        plt.plot(train_sizes, train_rmse_mean, 'o-', label='Training RMSE')
        plt.fill_between(train_sizes, train_rmse_mean - train_rmse_std,
                         train_rmse_mean + train_rmse_std, alpha=0.1)

        plt.plot(train_sizes, val_rmse_mean, 'o-', label='Validation RMSE')
        plt.fill_between(train_sizes, val_rmse_mean - val_rmse_std,
                         val_rmse_mean + val_rmse_std, alpha=0.1)

        plt.xlabel('Training Set Size')
        plt.ylabel('RMSE')
        plt.title(f'{self.model_name}: Learning Curves')
        plt.legend()
        plt.grid(True)
        plt.show()

# Evaluate the best model
evaluator = ModelEvaluator(trainer.best_model, trainer.best_model_name)
evaluation_results = evaluator.detailed_evaluation(X_test_final, y_test)
evaluator.plot_predictions(y_test, evaluation_results['predictions'])
evaluator.learning_curves(X_train_final, y_train)

##### Step 7: Model Deployment Pipeline

###### Simple Deployment and Prediction System

In [1]:
class ModelDeployment:
    """Handle model deployment and prediction serving"""

    def __init__(self, model, preprocessor, feature_engineer, feature_columns):
        self.model = model
        self.preprocessor = preprocessor
        self.feature_engineer = feature_engineer
        self.feature_columns = feature_columns
        self.deployment_info = {}

    def save_model_artifacts(self, model_path='model_artifacts'):
        """Save all model artifacts for deployment"""
        import os
        from datetime import datetime

        # Create directory if it doesn't exist
        os.makedirs(model_path, exist_ok=True)

        # Save model
        model_file = os.path.join(model_path, 'trained_model.pkl')
        joblib.dump(self.model, model_file)

        # Save preprocessor
        preprocessor_file = os.path.join(model_path, 'preprocessor.pkl')
        joblib.dump(self.preprocessor, preprocessor_file)

        # Save feature columns
        feature_file = os.path.join(model_path, 'feature_columns.pkl')
        joblib.dump(self.feature_columns, feature_file)

        # Save deployment info
        self.deployment_info = {
            'model_type': type(self.model).__name__,
            'feature_count': len(self.feature_columns),
            'deployment_date': datetime.now().isoformat(),
            'model_path': model_file,
            'preprocessor_path': preprocessor_file,
            'feature_path': feature_file
        }

        info_file = os.path.join(model_path, 'deployment_info.pkl')
        joblib.dump(self.deployment_info, info_file)

        print(f"Model artifacts saved to: {model_path}")
        print(f"Model type: {self.deployment_info['model_type']}")
        print(f"Feature count: {self.deployment_info['feature_count']}")

    def load_model_artifacts(self, model_path='model_artifacts'):
        """Load model artifacts for inference"""
        import os

        try:
            # Load model
            model_file = os.path.join(model_path, 'trained_model.pkl')
            self.model = joblib.load(model_file)

            # Load preprocessor
            preprocessor_file = os.path.join(model_path, 'preprocessor.pkl')
            self.preprocessor = joblib.load(preprocessor_file)

            # Load feature columns
            feature_file = os.path.join(model_path, 'feature_columns.pkl')
            self.feature_columns = joblib.load(feature_file)

            # Load deployment info
            info_file = os.path.join(model_path, 'deployment_info.pkl')
            self.deployment_info = joblib.load(info_file)

            print("Model artifacts loaded successfully!")
            return True

        except Exception as e:
            print(f"Error loading model artifacts: {str(e)}")
            return False

    def predict_single(self, input_data):
        """Make prediction for a single input"""
        try:
            # Convert to DataFrame if it's a dictionary
            if isinstance(input_data, dict):
                input_df = pd.DataFrame([input_data])
            else:
                input_df = input_data.copy()

            # Apply the same preprocessing steps
            # Note: This is simplified - in practice, you'd need to apply
            # the exact same preprocessing pipeline

            # Ensure all required features are present
            missing_features = set(self.feature_columns) - set(input_df.columns)
            if missing_features:
                raise ValueError(f"Missing features: {missing_features}")

            # Select only the features used in training
            input_processed = input_df[self.feature_columns]

            # Make prediction
            prediction = self.model.predict(input_processed)

            return {
                'prediction': float(prediction[0]),
                'input_features': input_data,
                'model_type': type(self.model).__name__
            }

        except Exception as e:
            return {
                'error': str(e),
                'prediction': None
            }

    def predict_batch(self, input_data):
        """Make predictions for multiple inputs"""
        try:
            # Ensure input is DataFrame
            if isinstance(input_data, list):
                input_df = pd.DataFrame(input_data)
            else:
                input_df = input_data.copy()

            # Select only the features used in training
            input_processed = input_df[self.feature_columns]

            # Make predictions
            predictions = self.model.predict(input_processed)

            return {
                'predictions': predictions.tolist(),
                'count': len(predictions),
                'model_type': type(self.model).__name__
            }

        except Exception as e:
            return {
                'error': str(e),
                'predictions': None
            }

    def create_prediction_api(self):
        """Create a simple prediction API structure"""
        api_code = '''
from flask import Flask, request, jsonify
import joblib
import pandas as pd
import numpy as np

app = Flask(__name__)

# Load model artifacts at startup
model = joblib.load('model_artifacts/trained_model.pkl')
preprocessor = joblib.load('model_artifacts/preprocessor.pkl')
feature_columns = joblib.load('model_artifacts/feature_columns.pkl')

@app.route('/predict', methods=['POST'])
def predict():
    try:
        # Get input data
        data = request.json

        # Convert to DataFrame
        input_df = pd.DataFrame([data])

        # Preprocess and predict
        input_processed = input_df[feature_columns]
        prediction = model.predict(input_processed)

        return jsonify({
            'prediction': float(prediction[0]),
            'status': 'success'
        })

    except Exception as e:
        return jsonify({
            'error': str(e),
            'status': 'error'
        })

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy'})

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)
'''

        with open('prediction_api.py', 'w') as f:
            f.write(api_code)

        print("Prediction API created: prediction_api.py")
        print("Run with: python prediction_api.py")

    def create_requirements_file(self):
        """Create requirements.txt for deployment"""
        requirements = [
            'pandas>=1.3.0',
            'numpy>=1.21.0',
            'scikit-learn>=1.0.0',
            'joblib>=1.0.0',
            'matplotlib>=3.4.0',
            'seaborn>=0.11.0',
            'flask>=2.0.0'
        ]

        with open('requirements.txt', 'w') as f:
            for req in requirements:
                f.write(req + '\n')

        print("Requirements file created: requirements.txt")

# Deploy the model
deployment = ModelDeployment(
    model=trainer.best_model,
    preprocessor=preprocessor,
    feature_engineer=feature_engineer,
    feature_columns=X_train_final.columns.tolist()
)

# Save model artifacts
deployment.save_model_artifacts()

# Create API and requirements
deployment.create_prediction_api()
deployment.create_requirements_file()

# Test prediction
sample_input = {
    'MedInc': 8.3252,
    'HouseAge': 41.0,
    'AveRooms': 6.984,
    'AveBedrms': 1.024,
    'Population': 322.0,
    'AveOccup': 2.556,
    'Latitude': 37.88,
    'Longitude': -122.23
}

# Note: This is a simplified example. The actual input would need to match
# the exact features used in training after feature engineering

NameError: name 'trainer' is not defined

##### Step 8: Testing and Validation

###### Comprehensive Testing Suite

In [None]:
class ModelTester:
    """Comprehensive testing for ML pipeline"""

    def __init__(self, deployment):
        self.deployment = deployment
        self.test_results = {}

    def test_data_quality(self, test_data):
        """Test data quality and integrity"""
        print("Testing Data Quality...")

        tests = {
            'no_missing_values': test_data.isnull().sum().sum() == 0,
            'no_infinite_values': np.isfinite(test_data.select_dtypes(include=[np.number])).all().all(),
            'positive_target': (test_data['target'] > 0).all() if 'target' in test_data.columns else True,
            'reasonable_ranges': True  # Add specific range checks
        }

        print("Data Quality Tests:")
        for test_name, result in tests.items():
            status = "PASS" if result else "FAIL"
            print(f"  {test_name}: {status}")

        return tests

    def test_model_performance(self, X_test, y_test, thresholds):
        """Test model performance against thresholds"""
        print("Testing Model Performance...")

        y_pred = self.deployment.model.predict(X_test)

        # Calculate metrics
        rmse = np.sqrt(mean_squared_error(y_test, y_pred))
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)

        tests = {
            'rmse_threshold': rmse < thresholds.get('rmse', float('inf')),
            'mae_threshold': mae < thresholds.get('mae', float('inf')),
            'r2_threshold': r2 > thresholds.get('r2', 0)
        }

        print("Performance Tests:")
        for test_name, result in tests.items():
            status = "PASS" if result else "FAIL"
            print(f"  {test_name}: {status}")

        print(f"  Actual RMSE: {rmse:.4f}")
        print(f"  Actual MAE: {mae:.4f}")
        print(f"  Actual R²: {r2:.4f}")

        return tests

    def test_prediction_consistency(self, test_inputs, n_runs=5):
        """Test prediction consistency across multiple runs"""
        print("Testing Prediction Consistency...")

        predictions = []
        for i in range(n_runs):
            pred = self.deployment.predict_batch(test_inputs)
            if pred['predictions'] is not None:
                predictions.append(pred['predictions'])

        if predictions:
            # Calculate coefficient of variation
            predictions_array = np.array(predictions)
            cv = np.std(predictions_array, axis=0) / np.mean(predictions_array, axis=0)

            # Test if predictions are consistent (CV < 0.01)
            consistency_test = (cv < 0.01).all()

            print(f"  Consistency test: {'PASS' if consistency_test else 'FAIL'}")
            print(f"  Average CV: {np.mean(cv):.6f}")

            return consistency_test

        return False

    def test_edge_cases(self):
        """Test model behavior with edge cases"""
        print("Testing Edge Cases...")

        # Test with extreme values
        edge_cases = [
            {'MedInc': 0.1, 'HouseAge': 1.0, 'AveRooms': 1.0, 'AveBedrms': 0.1,
             'Population': 1.0, 'AveOccup': 1.0, 'Latitude': 32.0, 'Longitude': -124.0},
            {'MedInc': 15.0, 'HouseAge': 52.0, 'AveRooms': 20.0, 'AveBedrms': 5.0,
             'Population': 10000.0, 'AveOccup': 10.0, 'Latitude': 42.0, 'Longitude': -114.0}
        ]

        tests = {
            'handles_extreme_low': True,
            'handles_extreme_high': True,
            'no_negative_predictions': True
        }

        for i, case in enumerate(edge_cases):
            try:
                result = self.deployment.predict_single(case)
                if result['prediction'] is None:
                    tests[f'handles_extreme_{"low" if i == 0 else "high"}'] = False
                elif result['prediction'] < 0:
                    tests['no_negative_predictions'] = False
            except Exception:
                tests[f'handles_extreme_{"low" if i == 0 else "high"}'] = False

        print("Edge Case Tests:")
        for test_name, result in tests.items():
            status = "PASS" if result else "FAIL"
            print(f"  {test_name}: {status}")

        return tests

    def generate_test_report(self, all_tests):
        """Generate comprehensive test report"""
        print("\n" + "="*50)
        print("ML PIPELINE TEST REPORT")
        print("="*50)

        total_tests = sum(len(test_group) for test_group in all_tests.values())
        passed_tests = sum(sum(test_group.values()) for test_group in all_tests.values())

        print(f"Total Tests: {total_tests}")
        print(f"Passed Tests: {passed_tests}")
        print(f"Failed Tests: {total_tests - passed_tests}")
        print(f"Pass Rate: {passed_tests/total_tests*100:.1f}%")

        print("\nDetailed Results:")
        for category, tests in all_tests.items():
            print(f"\n{category}:")
            for test_name, result in tests.items():
                status = "✓ PASS" if result else "✗ FAIL"
                print(f"  {test_name}: {status}")

        return {
            'total_tests': total_tests,
            'passed_tests': passed_tests,
            'pass_rate': passed_tests/total_tests*100
        }

# Run comprehensive tests
tester = ModelTester(deployment)

# Define performance thresholds
performance_thresholds = {
    'rmse': 1.0,  # Adjust based on your requirements
    'mae': 0.8,
    'r2': 0.6
}

# Run all tests
all_test_results = {}
all_test_results['Data Quality'] = tester.test_data_quality(df_processed)
all_test_results['Model Performance'] = tester.test_model_performance(
    X_test_final, y_test, performance_thresholds
)
all_test_results['Prediction Consistency'] = {'consistency': tester.test_prediction_consistency(X_test_final.head())}
all_test_results['Edge Cases'] = tester.test_edge_cases()

# Generate final report
test_report = tester.generate_test_report(all_test_results)