In [None]:
import pandas as pd
import numpy as np
from catboost import CatBoostRegressor
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import PowerTransformer
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error

# ===================  Optimizer Functions =====================
# Note: optimizer implementation
# Sources: https://github.com/thieu1995/mealpy
# https://github.com/Valdecy/pyMetaheuristic

# =================== CatBoost & Data Preparation =====================

def load_and_prepare_data(file_path):
    """Load data and prepare train/test splits with scaling."""
    data = pd.read_excel(file_path)
    X = data.iloc[:, :-1].values
    y = data.iloc[:, -1].values
    
    # Apply scaling
    scaler_X = PowerTransformer()
    X_scaled = scaler_X.fit_transform(X)
    
    # Split the data
    X_train, X_test, y_train, y_test, train_indices, test_indices = train_test_split(
        X_scaled, y, data.index, test_size=0.2, random_state=42
    )
    
    return X_train, X_test, y_train, y_test, train_indices, test_indices, scaler_X

def create_objective_function_with_kfold(X_train, y_train, n_splits=5):
    """Create objective function with K-fold cross-validation for hyperparameter optimization."""
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    kfold_splits = list(kf.split(X_train))
    
    def objective_function(params):
        depth, learning_rate, l2_leaf_reg, bagging_temperature, random_strength = params
        
        depth = int(depth)
        
        model_params = {
            'depth': depth,
            'learning_rate': learning_rate,
            'l2_leaf_reg': l2_leaf_reg,
            'bagging_temperature': bagging_temperature,
            'random_strength': random_strength,
            'verbose': False,
            'thread_count': -1
        }
        
        # Perform K-fold cross-validation
        fold_rmse_scores = []
        for train_idx, val_idx in kfold_splits:
            X_fold_train, X_fold_val = X_train[train_idx], X_train[val_idx]
            y_fold_train, y_fold_val = y_train[train_idx], y_train[val_idx]
            
            model = CatBoostRegressor(**model_params)
            model.fit(X_fold_train, y_fold_train, eval_set=(X_fold_val, y_fold_val), verbose=False)
            
            y_pred = model.predict(X_fold_val)
            rmse = np.sqrt(mean_squared_error(y_fold_val, y_pred))
            fold_rmse_scores.append(rmse)
        
        return np.mean(fold_rmse_scores)
    
    return objective_function, kfold_splits

def optimize_catboost_hyperparameters(X_train, y_train, iterations=10, population=50, n_splits=5):
    """Optimize CatBoost hyperparameters using CSA algorithm with K-fold CV."""
    
    # Define parameter bounds for CatBoost
    param_bounds = {
        'depth': (4, 8),
        'learning_rate': (0.05, 0.2),
        'l2_leaf_reg': (3, 8),
        'bagging_temperature': (0.4, 0.8),
        'random_strength': (0.5, 3)
    }
    
    param_names = ['depth', 'learning_rate', 'l2_leaf_reg', 'bagging_temperature', 'random_strength']
    
    lb = [param_bounds[name][0] for name in param_names]
    ub = [param_bounds[name][1] for name in param_names]
    
    # Create objective function with K-fold CV
    objective_function, kfold_splits = create_objective_function_with_kfold(X_train, y_train, n_splits)
    
    # Create problem bounds using Mealpy's FloatVar
    bounds = FloatVar(lb=lb, ub=ub, name="hyperparams")
    problem_dict = {
        "bounds": bounds,
        "minmax": "min",
        "obj_func": objective_function
    }
    
    # Run CSA optimization
    optimizer = OriginalCircleSA(epoch=iterations, pop_size=population)
    g_best = optimizer.solve(problem_dict)
    best_solution = g_best.solution
    best_fitness = g_best.target.fitness
    
    # Convert to dictionary format
    best_params = {
        'depth': int(best_solution[0]),
        'learning_rate': best_solution[1],
        'l2_leaf_reg': best_solution[2],
        'bagging_temperature': best_solution[3],
        'random_strength': best_solution[4],
        'verbose': False,
        'thread_count': -1
    }
    
    return best_params, best_fitness, kfold_splits

def evaluate_fold_performance(X_train, y_train, kfold_splits, best_params, train_indices):
    """Evaluate model performance on individual K-fold splits."""
    fold_results = {}
    
    for fold_idx, (train_idx, val_idx) in enumerate(kfold_splits):
        X_fold_train, X_fold_val = X_train[train_idx], X_train[val_idx]
        y_fold_train, y_fold_val = y_train[train_idx], y_train[val_idx]
        
        # Train model on this fold
        fold_model = CatBoostRegressor(**best_params)
        fold_model.fit(X_fold_train, y_fold_train, eval_set=(X_fold_val, y_fold_val), verbose=False)
        
        # Get predictions
        fold_train_pred = fold_model.predict(X_fold_train)
        fold_val_pred = fold_model.predict(X_fold_val)
        
        # Calculate metrics
        fold_results[f'fold_{fold_idx + 1}'] = {
            'train_indices': train_indices[train_idx],
            'val_indices': train_indices[val_idx],
            'fold_train_pred': fold_train_pred,
            'fold_val_pred': fold_val_pred,
            'metrics': {
                'train_r2': r2_score(y_fold_train, fold_train_pred),
                'train_rmse': np.sqrt(mean_squared_error(y_fold_train, fold_train_pred)),
                'train_mae': mean_absolute_error(y_fold_train, fold_train_pred),
                'val_r2': r2_score(y_fold_val, fold_val_pred),
                'val_rmse': np.sqrt(mean_squared_error(y_fold_val, fold_val_pred)),
                'val_mae': mean_absolute_error(y_fold_val, fold_val_pred)
            }
        }
    
    return fold_results

def train_and_evaluate_model(X_train, X_test, y_train, y_test, params):
    """Train CatBoost model with optimized parameters and return metrics."""
    
    # Train the model
    catboost_model = CatBoostRegressor(**params)
    catboost_model.fit(X_train, y_train, eval_set=(X_test, y_test), verbose=False)
    
    # Make predictions
    train_predictions = catboost_model.predict(X_train)
    test_predictions = catboost_model.predict(X_test)
    
    # Calculate metrics
    metrics = {
        'train_r2': r2_score(y_train, train_predictions),
        'train_rmse': np.sqrt(mean_squared_error(y_train, train_predictions)),
        'train_mae': mean_absolute_error(y_train, train_predictions),
        'test_r2': r2_score(y_test, test_predictions),
        'test_rmse': np.sqrt(mean_squared_error(y_test, test_predictions)),
        'test_mae': mean_absolute_error(y_test, test_predictions)
    }
    
    return catboost_model, metrics

def main(file_path="P03.xlsx", iterations=10, population=50, n_splits=5):
    """Main function to run CatBoost optimization with CSA and K-fold CV."""
    
    # Load and prepare data
    X_train, X_test, y_train, y_test, train_indices, test_indices, scaler = load_and_prepare_data(file_path)
    
    # Optimize hyperparameters with K-fold CV
    best_params, best_fitness, kfold_splits = optimize_catboost_hyperparameters(
        X_train, y_train, iterations, population, n_splits
    )
    
    # Train and evaluate final model
    model, metrics = train_and_evaluate_model(X_train, X_test, y_train, y_test, best_params)
    
    # Evaluate individual fold performance
    fold_results = evaluate_fold_performance(X_train, y_train, kfold_splits, best_params, train_indices)
    
    return model, best_params, metrics, fold_results

if __name__ == "__main__":
    model, params, metrics, fold_results = main()