## Hyperparameter tuning - Random Forest , XGBoost, LightGBM

In [None]:
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time # To time operations

# --- ML Models ---
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
import lightgbm as lgb

# --- Preprocessing & Splitting ---
from sklearn.model_selection import train_test_split, KFold, GridSearchCV
from sklearn.preprocessing import MinMaxScaler

# --- Metrics & Utilities ---
from sklearn.metrics import mean_squared_error, r2_score
from tqdm import tqdm # Keep progress bar support if needed (though GridSearchCV has its own)
from sklearn.exceptions import ConvergenceWarning
import warnings

# Filter convergence warnings for cleaner output if desired
# warnings.filterwarnings("ignore", category=ConvergenceWarning)
# warnings.filterwarnings("ignore", category=FutureWarning)


: 

In [None]:

# Ensure reproducibility
seed = 42
os.environ['PYTHONHASHSEED'] = str(seed)
random.seed(seed)
np.random.seed(seed)

# --- Data Loading and Preprocessing (Same as before) ---
def load_and_preprocess_data(file_path, test_size=0.2, random_state=42):
    """Loads data, scales features, and splits into train and test sets."""
    try:
        df = pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        print("Creating dummy data for demonstration.")
        # Dummy data generation needs feature names if columns are expected later
        num_features = 9 # Assuming 9 features based on previous context
        df = pd.DataFrame({
            f'feature{i+1}': np.random.rand(200) for i in range(num_features)
        })
        # Example dummy target relationship
        df['efficiency'] = (df['feature1'] * 50 + df['feature2'] * 30 +
                            np.random.normal(0, 5, 200)) # Added some noise

    if 'efficiency' not in df.columns:
        raise ValueError("Column 'efficiency' not found in the dataframe.")

    X = df.drop('efficiency', axis=1)
    y = df['efficiency']

    # Basic check for NaN/Infinite values
    if X.isnull().sum().sum() > 0 or y.isnull().sum() > 0:
        print("Warning: NaN values found in data. Consider imputation.")
        # Example: X = X.fillna(X.median()) # Use median or other strategy
        #          y = y.fillna(y.median())
    if np.isinf(X.values).sum() > 0 or np.isinf(y.values).sum() > 0:
         print("Warning: Infinite values found in data. Consider handling.")
         # Example: X.replace([np.inf, -np.inf], np.nan, inplace=True) then impute

    scaler = MinMaxScaler(feature_range=(-1, 1))
    X_scaled = scaler.fit_transform(X)

    X_train, X_test, y_train, y_test = train_test_split(
        X_scaled, y, test_size=test_size, random_state=random_state
    )

    # Ensure y are numpy arrays
    y_train = y_train.to_numpy() if isinstance(y_train, pd.Series) else np.asarray(y_train)
    y_test = y_test.to_numpy() if isinstance(y_test, pd.Series) else np.asarray(y_test)

    print(f"Data shapes: Train (X:{X_train.shape}, y:{y_train.shape}), Test (X:{X_test.shape}, y:{y_test.shape})")
    return (X_train, y_train), (X_test, y_test), scaler


# --- GridSearchCV Tuning Function (Same as before) ---
def tune_model_gridsearch(model, param_grid, X_train, y_train, n_splits=5, scoring='r2'):
    """Tunes hyperparameters using GridSearchCV with K-Fold CV."""
    model_name = model.__class__.__name__
    print(f"\n--- Tuning {model_name} using GridSearchCV ---")
    print(f"Parameter Grid: {param_grid}")
    print(f"CV Folds (k): {n_splits}")

    kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)

    # Add verbosity control for LightGBM
    fit_params = {}
    if isinstance(model, lgb.LGBMRegressor):
        # Suppress verbose logging during CV fits for LGBM
        fit_params['callbacks'] = [lgb.log_evaluation(period=0)]

    start_time = time.time()
    grid_search = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        scoring=scoring,
        cv=kf,
        n_jobs=-1, # Use all available CPU cores
        verbose=1  # Show progress from GridSearchCV
    )

    grid_search.fit(X_train, y_train, **fit_params)

    runtime = time.time() - start_time
    print(f"{model_name} tuning finished in {runtime:.2f} seconds.")
    print(f"Best Parameters: {grid_search.best_params_}")
    # Ensure score is extracted correctly, handle potential issues if fit failed
    best_score = grid_search.best_score_ if hasattr(grid_search, 'best_score_') else np.nan
    print(f"Best CV Score ({scoring}): {best_score:.6f}")

    # Return the best model found by GridSearchCV (already refit on full training data)
    return grid_search.best_estimator_, best_score


# --- Main Execution ---
def main():
    # --- Configuration ---
    file_path = 'data_set.csv' # <--- *** UPDATE THIS PATH ***
    test_set_size = 0.2
    num_cv_folds_tuning = 5 # Folds for GridSearchCV

    # Flags to enable/disable model tuning/evaluation
    # Set flags for the models you want to run
    run_random_forest = True
    run_xgboost = True
    run_lightgbm = True

    # GridSearchCV Parameter Grids (Adjust these based on desired search complexity/time)
    # Smaller grids run faster
    rf_param_grid = {
        'n_estimators': [100, 200],        # Number of trees
        'max_depth': [10, 20, None],       # Max depth of trees (None=unlimited)
        'min_samples_split': [2, 5],       # Min samples to split a node
        'min_samples_leaf': [1, 3],        # Min samples in a leaf node
        'max_features': ['sqrt', 1.0]      # Features to consider for split ('sqrt', 'log2', float fraction, or 1.0 for all in recent sklearn)
    }

    xgb_param_grid = {
        'n_estimators': [100, 200],        # Number of boosting rounds
        'max_depth': [3, 5, 7],            # Max depth per tree
        'learning_rate': [0.05, 0.1],      # Step size shrinkage
        'subsample': [0.7, 0.9, 1.0],        # Fraction of samples used per tree
        'colsample_bytree': [0.7, 0.9, 1.0],   # Fraction of features used per tree
        # Add other parameters like gamma, reg_alpha, reg_lambda if needed
    }

    lgbm_param_grid = {
        'n_estimators': [100, 200],        # Number of boosting rounds
        'max_depth': [5, 10, -1],          # Max depth (-1 = no limit)
        'learning_rate': [0.05, 0.1],      # Step size shrinkage
        'num_leaves': [20, 31, 40],        # Max leaves in one tree (key param for LGBM)
        'subsample': [0.7, 0.9, 1.0],        # Fraction of samples used per tree
        'colsample_bytree': [0.7, 0.9, 1.0],   # Fraction of features used per tree
         # Add other parameters like reg_alpha, reg_lambda if needed
    }

    # --- Data Loading ---
    try:
        (X_train, y_train), (X_test, y_test), scaler = load_and_preprocess_data(
            file_path, test_size=test_set_size, random_state=seed
        )
    except (FileNotFoundError, ValueError) as e:
        print(f"Error loading or processing data: {e}"); return

    # Dictionary to store results
    best_models = {}
    cv_scores = {}
    test_results = {}

    # --- Random Forest Tuning (Optional) ---
    if run_random_forest:
        # Initialize model with random state for reproducibility
        rf_model = RandomForestRegressor(random_state=seed, n_jobs=1) # n_jobs=1 here if GridSearchCV uses -1
        best_rf_model, best_rf_cv_score = tune_model_gridsearch(
            rf_model, rf_param_grid, X_train, y_train, n_splits=num_cv_folds_tuning, scoring='r2'
        )
        best_models['RandomForest'] = best_rf_model
        cv_scores['RandomForest'] = best_rf_cv_score

    # --- XGBoost Tuning (Optional) ---
    if run_xgboost:
        # Specify objective for regression and other base parameters
        # Set verbosity=0 to suppress XGBoost's own messages during CV
        xgb_model = xgb.XGBRegressor(objective='reg:squarederror', random_state=seed, verbosity=0, n_jobs=1)
        best_xgb_model, best_xgb_cv_score = tune_model_gridsearch(
            xgb_model, xgb_param_grid, X_train, y_train, n_splits=num_cv_folds_tuning, scoring='r2'
        )
        best_models['XGBoost'] = best_xgb_model
        cv_scores['XGBoost'] = best_xgb_cv_score

    # --- LightGBM Tuning (Optional) ---
    if run_lightgbm:
        # Specify objective, metric, and control verbosity
        lgbm_model = lgb.LGBMRegressor(objective='regression', metric='rmse', random_state=seed, verbosity=-1, n_jobs=1)
        best_lgbm_model, best_lgbm_cv_score = tune_model_gridsearch(
            lgbm_model, lgbm_param_grid, X_train, y_train, n_splits=num_cv_folds_tuning, scoring='r2'
        )
        best_models['LightGBM'] = best_lgbm_model
        cv_scores['LightGBM'] = best_lgbm_cv_score

    # --- Final Evaluation on Test Set ---
    print("\n--- Final Model Evaluation on Test Set ---")
    if not best_models:
        print("No models were successfully tuned.")
        return

    for name, model in best_models.items():
        try:
            start_predict_time = time.time()
            y_pred_test = model.predict(X_test)
            predict_time = time.time() - start_predict_time

            test_r2 = r2_score(y_test, y_pred_test)
            test_mse = mean_squared_error(y_test, y_pred_test)
            test_results[name] = {
                'Test_R2': test_r2,
                'Test_MSE': test_mse,
                'Best_CV_R2': cv_scores.get(name, np.nan),
                'Predict_Time_s': predict_time
            }
            print(f"{name}:")
            print(f"  Test R2:  {test_r2:.6f}")
            print(f"  Test MSE: {test_mse:.6f}")
            print(f"  Best CV R2: {cv_scores.get(name, np.nan):.6f}")
            print(f"  Prediction Time: {predict_time:.4f}s")
        except Exception as e:
            print(f"Error evaluating {name} on test set: {e}")
            test_results[name] = {
                'Test_R2': np.nan, 'Test_MSE': np.nan,
                'Best_CV_R2': cv_scores.get(name, np.nan), 'Predict_Time_s': np.nan
            }

    # --- Optional: Add Comparative Visualization ---
    if test_results:
        print("\n--- Results Summary ---")
        results_df = pd.DataFrame(test_results).T # Transpose for better table format
        # Ensure consistent column order
        results_df = results_df[['Best_CV_R2', 'Test_R2', 'Test_MSE', 'Predict_Time_s']]
        print(results_df.sort_values(by='Test_R2', ascending=False))


        plt.figure(figsize=(10, 6))
        models_evaluated = list(results_df.index)
        test_r2_scores = results_df['Test_R2'].fillna(-1).values # Handle potential NaN for plotting

        # Define colors - ensure enough colors if more models are added
        colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b']
        bars = plt.bar(models_evaluated, test_r2_scores, color=colors[:len(models_evaluated)])

        plt.ylabel('Test R2 Score')
        plt.title('Comparison of Tuned Models on Test Set (R2 Score)')
        # Adjust ylim dynamically based on scores
        min_score = results_df['Test_R2'].min()
        max_score = results_df['Test_R2'].max()
        lower_lim = max(0, min_score - 0.05) if pd.notna(min_score) else 0
        upper_lim = min(1.05, max_score + 0.05) if pd.notna(max_score) else 1.05
        plt.ylim(bottom=lower_lim, top=upper_lim)

        # Add score labels on top of bars
        for bar in bars:
            yval = bar.get_height()
            if pd.notna(yval) and yval > -1 : # Don't label if score was NaN
                 plt.text(bar.get_x() + bar.get_width()/2.0, yval, f'{yval:.4f}', va='bottom', ha='center', fontsize=9)

        plt.xticks(rotation=15, ha='right') # Rotate labels slightly if needed
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.show()


    print("\nScript finished.")
    # Return the dictionary of results and perhaps the best models themselves
    return best_models, test_results, scaler


# --- Run the script ---
if __name__ == "__main__":
    # This might take some time depending on the data size and parameter grids
    tuned_models, final_results, data_scaler = main()

    # Example: Access the best Random Forest model if it was tuned
    # if tuned_models and 'RandomForest' in tuned_models:
    #     best_rf = tuned_models['RandomForest']
    #     print("\nBest Random Forest Model Parameters:")
    #     print(best_rf.get_params())
        # # Make predictions with best_rf on new data (after scaling with data_scaler)
        # # new_data_scaled = data_scaler.transform(new_raw_data)
        # # predictions = best_rf.predict(new_data_scaled)