In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, ExtraTreesRegressor, AdaBoostRegressor
from sklearn.linear_model import Ridge, Lasso
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.neighbors import KNeighborsRegressor
from sklearn.tree import DecisionTreeRegressor
import matplotlib.pyplot as plt
import seaborn as sns
from copy import deepcopy

# Global parameters for DFDBCSO
PopSize = 200  # Increased population size
MaxIter = 500  # Increased iterations
phi = 0.1      # Increased mutation intensity

# Load and preprocess data
def load_and_preprocess_data():
    data = pd.read_csv("/content/train.csv")  # Replace with your dataset path

    # Drop rows with NaN in target or features
    data = data.dropna(subset=["critical_temp"])
    X = data.drop(columns=["critical_temp"])
    y = data["critical_temp"]

    # Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Split into training, validation, and test sets
    X_train, X_temp, y_train, y_temp = train_test_split(X_scaled, y, test_size=0.3, random_state=42)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

    return X_train, X_val, X_test, y_train, y_val, y_test

# Define regression models
def define_models():
    models = {
        "RandomForest": RandomForestRegressor(n_estimators=200, max_depth=15, random_state=42),
        "GradientBoosting": GradientBoostingRegressor(n_estimators=200, learning_rate=0.05, max_depth=8, random_state=42),
        "SVR": SVR(kernel='rbf', C=10, gamma=0.1),
        "Ridge": Ridge(alpha=1.0),
        "Lasso": Lasso(alpha=0.01, random_state=42),
        "ExtraTrees": ExtraTreesRegressor(n_estimators=200, max_depth=15, random_state=42),
        "AdaBoost": AdaBoostRegressor(n_estimators=200, random_state=42),
        "KNN": KNeighborsRegressor(n_neighbors=10),
        "DecisionTree": DecisionTreeRegressor(max_depth=10, random_state=42)
    }
    return models

# Train and evaluate models
def train_and_evaluate_models(models, X_train, X_val, y_train, y_val, X_test, y_test):
    results = []
    predictions = {}
    test_results = []

    for name, model in models.items():
        model.fit(X_train, y_train)

        # Validation set predictions
        y_val_pred = model.predict(X_val)
        predictions[name] = y_val_pred

        val_mse = mean_squared_error(y_val, y_val_pred)
        val_mae = mean_absolute_error(y_val, y_val_pred)
        val_r2 = r2_score(y_val, y_val_pred)

        # Test set predictions
        y_test_pred = model.predict(X_test)
        test_mse = mean_squared_error(y_test, y_test_pred)
        test_mae = mean_absolute_error(y_test, y_test_pred)
        test_r2 = r2_score(y_test, y_test_pred)

        results.append([name, val_mse, val_mae, val_r2])
        test_results.append([name, test_mse, test_mae, test_r2])

        print(f"{name} Validation Results:\nMSE: {val_mse:.4f}, MAE: {val_mae:.4f}, R2: {val_r2:.4f}")
        print(f"{name} Test Results:\nMSE: {test_mse:.4f}, MAE: {test_mae:.4f}, R2: {test_r2:.4f}\n")

    results_df = pd.DataFrame(results, columns=["Model", "Validation MSE", "Validation MAE", "Validation R2"])
    test_results_df = pd.DataFrame(test_results, columns=["Model", "Test MSE", "Test MAE", "Test R2"])

    # Sort by Test MSE to select top 3
    test_results_df = test_results_df.sort_values(by="Test MSE", ascending=True)

    return results_df, test_results_df, predictions

# DFDBCSO optimization for ensemble weights
def calculate_fdb_scores(weights, predictions, y_val):
    fitness = []
    for w in weights:
        ensemble_pred = sum(w[i] * predictions[name] for i, name in enumerate(predictions.keys()))
        mse = mean_squared_error(y_val, ensemble_pred)
        fitness.append(-mse)  # Minimize MSE only
    return np.array(fitness)

def dfdbcso_ensemble(predictions, y_val):
    global PopSize, MaxIter, phi
    num_models = len(predictions)
    weights = np.random.uniform(0, 1, size=(PopSize, num_models))
    weights /= np.sum(weights, axis=1, keepdims=True)  # Ensure weights sum to 1
    velocity = np.zeros((PopSize, num_models))
    best_weights = weights.copy()
    best_fitness = np.full(PopSize, -np.inf)

    for cur_iter in range(MaxIter):
        alpha = cur_iter / MaxIter
        fitness = calculate_fdb_scores(weights, predictions, y_val)

        # Update pbest
        for i in range(PopSize):
            if fitness[i] > best_fitness[i]:
                best_weights[i] = weights[i]
                best_fitness[i] = fitness[i]

        # Global best
        gbest_idx = np.argmax(best_fitness)
        gbest = best_weights[gbest_idx]

        # Update velocity and position
        for i in range(PopSize):
            velocity[i] = np.random.rand(num_models) * velocity[i] + \
                          np.random.rand(num_models) * (best_weights[i] - weights[i]) + \
                          phi * (gbest - weights[i])
            weights[i] = np.clip(weights[i] + velocity[i], 0, 1)
            weights[i] /= np.sum(weights[i])

    return gbest, -best_fitness[gbest_idx]  # Return MSE

# Visualize results
def visualize_results(results_df, test_results_df, ensemble_mse, ensemble_mae, ensemble_r2, y_test, ensemble_pred):
    # Bar plot for model performance on validation set
    plt.figure(figsize=(10, 6))
    sns.barplot(x='Model', y='Validation R2', data=results_df)
    plt.title("Model Performance on Validation Set (R2 Score)")
    plt.xticks(rotation=45)
    plt.show()

    # Bar plot for model performance on test set with ensemble
    plt.figure(figsize=(12, 6))
    ensemble_row = pd.DataFrame({
        "Model": ["Ensemble (DFDBCSO)"],
        "Test MSE": [ensemble_mse],
        "Test MAE": [ensemble_mae],
        "Test R2": [ensemble_r2]
    })
    test_results_with_ensemble = pd.concat([test_results_df, ensemble_row], ignore_index=True)
    colors = ["gold" if model == "Ensemble (DFDBCSO)" else "blue" for model in test_results_with_ensemble["Model"]]
    ax = sns.barplot(x='Model', y='Test R2', data=test_results_with_ensemble, palette=colors)
    plt.title("Model Performance (R2 Score)")
    plt.xticks(rotation=45)

    # Annotate bars with their values
    for p in ax.patches:
        ax.annotate(format(p.get_height(), '.4f'),
                    (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha = 'center', va = 'center',
                    xytext = (0, 10),
                    textcoords = 'offset points')

    plt.show()

    # Plot true vs predicted values for ensemble model
    plt.figure(figsize=(10, 6))
    plt.scatter(y_test, ensemble_pred, color='green', alpha=0.6, label='Predicted vs Actual')
    plt.plot([min(y_test), max(y_test)], [min(y_test), max(y_test)], 'r--', label='Ideal Prediction')
    plt.xlabel("Actual Values")
    plt.ylabel("Predicted Values")
    plt.title("DFDBCSO Ensemble Model: Actual vs Predicted")
    plt.legend()
    plt.show()

    print(f"Optimized Ensemble Model MSE: {ensemble_mse:.4f}, MAE: {ensemble_mae:.4f}, R2: {ensemble_r2:.4f}")

# Main function
def main():
    # Load and preprocess data
    X_train, X_val, X_test, y_train, y_val, y_test = load_and_preprocess_data()

    # Define models
    models = define_models()

    # Train and evaluate models
    results_df, test_results_df, predictions = train_and_evaluate_models(models, X_train, X_val, y_train, y_val, X_test, y_test)
    print("Validation Performance:")
    print(results_df)
    print("Test Performance:")
    print(test_results_df)

    # Select top 3 models based on test performance
    top_models = test_results_df.head(3)["Model"].values
    print("Top 3 models:", top_models)
    top_predictions = {name: predictions[name] for name in top_models}

    # Optimize ensemble weights using full DFDBCSO
    best_weights, best_mse = dfdbcso_ensemble(top_predictions, y_val)
    print("Optimized Ensemble Weights:", best_weights)

    # Evaluate on the test set
    ensemble_pred = sum(best_weights[i] * models[top_models[i]].predict(X_test) for i in range(len(top_models)))
    mse = mean_squared_error(y_test, ensemble_pred)
    mae = mean_absolute_error(y_test, ensemble_pred)
    r2 = r2_score(y_test, ensemble_pred)
    print(f"Test Set Ensemble MSE: {mse:.4f}, MAE: {mae:.4f}, R2: {r2:.4f}")

    # Visualize results
    visualize_results(results_df, test_results_df, mse, mae, r2, y_test, ensemble_pred)

if __name__ == "__main__":
    main()
