# world_model_v6 (with XGBoost)
May 7, 2025 (Updated with XGBoost)

## 1.1 Imports

In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.linear_model import Ridge # Keep for comparison or if needed later
import xgboost as xgb 
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import StandardScaler
import joblib
import os
import random

XGBoostError: 
XGBoost Library (libxgboost.dylib) could not be loaded.
Likely causes:
  * OpenMP runtime is not installed
    - vcomp140.dll or libgomp-1.dll for Windows
    - libomp.dylib for Mac OSX
    - libgomp.so for Linux and other UNIX-like OSes
    Mac OSX users: Run `brew install libomp` to install OpenMP runtime.

  * You are running 32-bit Python on a 64-bit OS

Error message(s): ["dlopen(/Users/albertolandi/anaconda3/envs/IR2/lib/python3.10/site-packages/xgboost/lib/libxgboost.dylib, 0x0006): Library not loaded: @rpath/libomp.dylib\n  Referenced from: <54A1AE05-1E14-3DA2-A8D0-062134694298> /Users/albertolandi/anaconda3/envs/IR2/lib/python3.10/site-packages/xgboost/lib/libxgboost.dylib\n  Reason: tried: '/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/Users/albertolandi/anaconda3/envs/IR2/lib/python3.10/lib-dynload/../../libomp.dylib' (no such file), '/Users/albertolandi/anaconda3/envs/IR2/bin/../lib/libomp.dylib' (no such file)"]


In [None]:
# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)
# xgb.DMatrix.set_random_state(42) # For older versions, newer ones use np.random

## 1.2 Load Dataset

In [None]:
def load_data(filepath="../dataset/dataset_v4.txt"):
    """Loads the dataset using pandas."""
    try:
        df = pd.read_csv(filepath)
        print(f"Dataset loaded successfully. Shape: {df.shape}")
        df = df.dropna()
        print(f"Shape after dropping NaNs: {df.shape}")
        return df
    except FileNotFoundError:
        print(f"Error: Dataset file not found at {filepath}")
        return None
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return None

In [None]:
# 1. Load the dataset
# Create a dummy dataset_v4.txt for testing if it doesn't exist
dummy_data = """distance_red_init,angle_red_init,distance_green_init,angle_green_init,distance_blue_init,angle_blue_init,rSpeed,lSpeed,distance_red_final,angle_red_final,distance_green_final,angle_green_final,distance_blue_final,angle_blue_final
847.18237502475,135.0077102870207,848.4285070131579,44.91001799841399,848.6299022262742,-134.9082395663736,30,17,919.3516568234484,169.5845864368397,744.6266372511928,77.89688504365728,957.4204577425843,-110.95023009036865
919.3516568234484,169.5845864368397,744.6266372511928,77.89688504365728,957.4204577425843,-110.95023009036865,-6,25,963.2935290670038,108.50162756834925,698.9871122863157,17.50247248666159,1010.68276681774,-176.66445630856867
963.2935290670038,108.50162756834925,698.9871122863157,17.50247248666159,1010.68276681774,-176.66445630856867,-9,-6,932.4764088112901,95.43414527435988,692.6089322190409,1.4040251940733697,1010.456972359338,171.60189312665523
932.4764088112901,95.43414527435988,692.6089322190409,1.4040251940733697,1010.456972359338,171.60189312665523,24,-23,902.6519916326283,-177.68212789984284,698.1799037485613,86.04292804685633,1000.9520797712298,-99.71404373987178
902.6519916326283,-177.68212789984284,698.1799037485613,86.04292804685633,1000.9520797712298,-99.71404373987178,-17,-23,923.7137529546012,-152.87166980372595,793.969133781329,118.83300496617363,909.4480927449757,-71.09601015189716
"""
dataset_dir = "../dataset"
dataset_path = os.path.join(dataset_dir, "dataset_v4.txt")
if not os.path.exists(dataset_dir):
    os.makedirs(dataset_dir)
if not os.path.exists(dataset_path):
    with open(dataset_path, "w") as f:
        f.write(dummy_data)
    print(f"Created dummy dataset at {dataset_path}")

dataframe = load_data(filepath=dataset_path)

In [None]:
if dataframe is not None:
    dataframe.info()
else:
    print("DataFrame is None, skipping info().")

## 1.3 Preprocess Dataset

In [None]:
def prepare_data(df):
    """Separates features (X) and target variables (Y)."""
    if df is None:
        return None, None
    # Input Features: initial state (6) + action (2) = 8 features
    X = df.iloc[:, :8].values
    # Target Variables: final state (6) = 6 features
    Y = df.iloc[:, 8:].values
    print(f"Features (X) shape: {X.shape}")
    print(f"Targets (Y) shape: {Y.shape}")
    return X, Y

In [None]:
def split_data(X, Y, test_size=0.2, random_state=42):
    """Splits data into training and testing sets."""
    if X is None or Y is None:
        return None, None, None, None
    X_train, X_test, Y_train, Y_test = train_test_split(
        X, Y, test_size=test_size, random_state=random_state
    )
    print(f"Training set size: {X_train.shape[0]} samples")
    print(f"Testing set size: {X_test.shape[0]} samples")
    return X_train, X_test, Y_train, Y_test

In [None]:
# 2. Prepare Data
if dataframe is not None:
    X, Y = prepare_data(dataframe)
else:
    X, Y = None, None
    print("Skipping data preparation as dataframe is None.")

In [None]:
def scale_features(X_train, X_test):
    """Scales input features using StandardScaler."""
    if X_train is None or X_test is None:
        return None, None, None
    scaler = StandardScaler()
    # Fit scaler ONLY on training data
    X_train_scaled = scaler.fit_transform(X_train)
    # Transform both train and test data
    X_test_scaled = scaler.transform(X_test)
    print("Features scaled.")
    return X_train_scaled, X_test_scaled, scaler # Return scaler to save it

In [None]:
# 3. Split Data
if X is not None and Y is not None:
    X_train, X_test, Y_train, Y_test = split_data(X, Y)
    # 4. Scale Features (Important!)
    X_train_scaled, X_test_scaled, scaler = scale_features(X_train, X_test)
else:
    X_train, X_test, Y_train, Y_test = None, None, None, None
    X_train_scaled, X_test_scaled, scaler = None, None, None
    print("Skipping data splitting and scaling as X or Y is None.")

## 1.4 Train model

### Original Ridge Regression (for reference)

In [None]:
def train_ridge_regression(X_train, Y_train):
    """Trains a Ridge Regression model with hyperparameter optimization using
    GridSearchCV."""
    if X_train is None or Y_train is None:
        print("Skipping Ridge training as data is None.")
        return None
    print("Training Ridge Regression model with GridSearchCV...")
    param_grid = {
        'alpha': [0.001, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0, 10000.0, 1000000.0],
        'solver': ['auto', 'svd', 'cholesky', 'lsqr', 'sparse_cg', 'sag', 'saga']
    }
    ridge = Ridge(random_state=42)
    grid_search = GridSearchCV(
        estimator=ridge,
        param_grid=param_grid,
        scoring='neg_mean_squared_error',
        cv=5,
        n_jobs=-1,
        verbose=1
    )
    grid_search.fit(X_train, Y_train)
    print("\nGridSearchCV Complete for Ridge.")
    print(f"Best parameters found: {grid_search.best_params_}")
    print(f"Best cross-validation score (negative MSE): {grid_search.best_score_:.4f}")
    return grid_search.best_estimator_

# # Example usage for Ridge (commented out if focusing on XGBoost)
# if X_train_scaled is not None and Y_train is not None:
#     print("\n--- Training Ridge Model ---")
#     world_model_ridge = train_ridge_regression(X_train_scaled, Y_train)
# else:
#     world_model_ridge = None

### XGBoost Regressor

In [None]:
def train_xgboost_regression(X_train, Y_train):
    """Trains an XGBoost Regressor model with hyperparameter optimization using GridSearchCV."""
    if X_train is None or Y_train is None:
        print("Skipping XGBoost training as data is None.")
        return None
        
    print("Training XGBoost Regressor model with GridSearchCV...")

    # Define the parameter grid for XGBoost
    # This is a starting grid. You might want to expand or refine it.
    # For faster initial runs, you can reduce the number of options or use a smaller cv.
    param_grid_xgb = {
        'n_estimators': [100, 200, 300], # Number of boosting rounds
        'learning_rate': [0.01, 0.05, 0.1], # Step size shrinkage
        'max_depth': [3, 5, 7], # Maximum depth of a tree
        # 'subsample': [0.7, 0.8, 1.0], # Subsample ratio of the training instance
        # 'colsample_bytree': [0.7, 0.8, 1.0], # Subsample ratio of columns when constructing each tree
        # 'gamma': [0, 0.1, 0.2], # Minimum loss reduction required to make a further partition
        # 'reg_alpha': [0, 0.01, 0.1], # L1 regularization
        # 'reg_lambda': [0.1, 1, 10] # L2 regularization (XGBoost's default is 1 for lambda)
    }

    # Create the XGBoost regressor model
    # objective='reg:squarederror' is common for regression
    # XGBoost can use multiple cores for training individual models via n_jobs in its constructor
    # For multi-output regression, XGBoost handles it natively.
    xgb_model = xgb.XGBRegressor(objective='reg:squarederror', random_state=42, n_jobs=-1)

    # Set up GridSearchCV
    # cv=3 for quicker search, use cv=5 for more robust results
    grid_search_xgb = GridSearchCV(
        estimator=xgb_model,
        param_grid=param_grid_xgb,
        scoring='neg_mean_squared_error', # Optimize for lower MSE
        cv=3, # Reduced for speed in this example; use 5 for better tuning
        n_jobs=-1, # Use all available CPU cores for GridSearchCV's parallel fits
        verbose=2 # Higher verbosity to see progress
    )

    # Fit GridSearchCV on the training data
    grid_search_xgb.fit(X_train, Y_train)

    # Print the best parameters and corresponding score
    print("\nGridSearchCV Complete for XGBoost.")
    print(f"Best parameters found: {grid_search_xgb.best_params_}")
    print(f"Best cross-validation score (negative MSE): {grid_search_xgb.best_score_:.4f}")

    # Return the best model found by GridSearchCV
    return grid_search_xgb.best_estimator_

In [None]:
# Train the XGBoost model
if X_train_scaled is not None and Y_train is not None:
    print("\n--- Training XGBoost Model ---")
    world_model_xgb = train_xgboost_regression(X_train_scaled, Y_train)
else:
    world_model_xgb = None
    print("Skipping XGBoost model training call.")

## 1.5 Evaluate

In [None]:
def evaluate_model(model, X_test, Y_test, model_name="Model"):
    """Evaluates the model using MAE, MSE, and RMSE."""
    if model is None or X_test is None or Y_test is None:
        print(f"Skipping evaluation for {model_name} as model or data is None.")
        return None, None
        
    Y_pred = model.predict(X_test)

    mae = mean_absolute_error(Y_test, Y_pred)
    mse = mean_squared_error(Y_test, Y_pred)
    rmse = np.sqrt(mse) # Root Mean Squared Error

    print(f"\n--- {model_name} Evaluation ---")
    print(f"Mean Absolute Error (MAE): {mae:.4f}")
    print(f"Mean Squared Error (MSE): {mse:.4f}")
    print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")

    # Optional: Print metrics per output feature
    print("\nMAE per output feature:")
    output_features = [
        'dist_red_final', 'angle_red_final', 'dist_green_final',
        'angle_green_final', 'dist_blue_final', 'angle_blue_final'
    ]
    for i, name in enumerate(output_features):
        # Ensure Y_test and Y_pred are 2D, even if only one sample was predicted
        y_test_col = Y_test[:, i] if Y_test.ndim > 1 else Y_test
        y_pred_col = Y_pred[:, i] if Y_pred.ndim > 1 else Y_pred
        
        mae_feature = mean_absolute_error(y_test_col, y_pred_col)
        print(f"  {name}: {mae_feature:.4f}")
    
    return mae, mse

In [None]:
# Evaluate the XGBoost model
if world_model_xgb is not None and X_test_scaled is not None and Y_test is not None:
    print("\n--- Evaluating XGBoost Model ---")
    xgb_mae, xgb_mse = evaluate_model(world_model_xgb, X_test_scaled, Y_test, model_name="XGBoost")
    if xgb_mae is not None:
        print(f"XGBoost Test RMSE: {np.sqrt(xgb_mse):.4f}")
else:
    print("Skipping XGBoost model evaluation.")

## 1.6 Save model

In [None]:
def save_model_and_scaler(model, scaler, model_filename="world_model.joblib", scaler_filename="scaler.joblib"):
    """Saves the trained model and scaler to disk."""
    if model is None or scaler is None:
        print("Skipping saving model/scaler as one of them is None.")
        return

    try:
        model_dir = "../src/models" # Relative to script location
        os.makedirs(model_dir, exist_ok=True)

        model_path = os.path.join(model_dir, model_filename)
        scaler_path = os.path.join(model_dir, scaler_filename)

        joblib.dump(model, model_path)
        print(f"Model saved to {model_path}")
        joblib.dump(scaler, scaler_path)
        print(f"Scaler saved to {scaler_path}")
    except Exception as e:
        print(f"Error saving model/scaler: {e}")

In [None]:
# Save the XGBoost model and scaler
if world_model_xgb is not None and scaler is not None:
    print("\n--- Saving XGBoost Model and Scaler ---")
    save_model_and_scaler(world_model_xgb, scaler, 
                          model_filename="xgb_world_model_v6.joblib", 
                          scaler_filename="xgb_scaler_v6.joblib") # Use distinct names
else:
    print("Skipping XGBoost model saving.")

## 1.7 Example Prediction

In [None]:
# Example prediction (how you'd use it later)
def example_prediction(model, X_test_original, Y_test_original, scaler_obj, model_name="Model"):
    if model is None or X_test_original is None or Y_test_original is None or scaler_obj is None:
        print(f"Skipping example prediction for {model_name} as essential components are missing.")
        return

    print(f"\n--- Example Prediction ({model_name}) ---")

    # Take the first sample from the original (unscaled) test set
    sample_X_orig = X_test_original[0].reshape(1, -1)
    sample_Y_actual = Y_test_original[0]

    # Scale the sample using the *saved* (or current) scaler
    sample_X_scaled = scaler_obj.transform(sample_X_orig)

    # Predict using the trained model
    sample_Y_pred = model.predict(sample_X_scaled)

    print(f"Input State + Action (Original): {sample_X_orig[0]}")
    print(f"Input State + Action (Scaled):   {sample_X_scaled[0]}")
    print(f"Actual Final State:              {sample_Y_actual}")
    print(f"Predicted Final State:           {sample_Y_pred[0]}")

In [None]:
# Example prediction with XGBoost model
if world_model_xgb and X_test is not None and Y_test is not None and scaler is not None:
    example_prediction(world_model_xgb, X_test, Y_test, scaler, model_name="XGBoost")
else:
    print("Skipping XGBoost example prediction due to missing components.")