In [1]:
import torch
import xgboost
import numpy as np
from sklearn.model_selection import ParameterGrid

Found Intel OpenMP ('libiomp') and LLVM OpenMP ('libomp') loaded at
the same time. Both libraries are known to be incompatible and this
can cause random crashes or deadlocks on Linux when loaded in the
same Python program.
Using threadpoolctl may cause crashes or deadlocks. For more
information and possible workarounds, please see
    https://github.com/joblib/threadpoolctl/blob/master/multiple_openmp.md



In [2]:
class XGBoost(torch.nn.Module):
    """
    A PyTorch-compatible XGBoost model with training, prediction, saving, loading, 
    and backward pass support.
    """
    def __init__(self, params):
        super().__init__()
        self.params = params
        self.model = None
        self.evals_result = {}

    def train(self, inputs, targets):
        """
        Train the XGBoost model with cross-validation and hyperparameter tuning.
        """
        inputs_np = inputs.clone().detach().cpu().squeeze().numpy()
        targets_np = targets.clone().detach().cpu().squeeze().numpy()
        
        dtrain = xgboost.DMatrix(inputs_np, label=targets_np)

        # Use cross-validation to evaluate the best parameters if param_grid is provided
        param_grid = {"eta":[0.002, 0.4, 0.8], "max_depth": [2, 4, 6, 10]}
        if param_grid:
            best_params = None
            best_score = float("inf")
            for params in ParameterGrid(param_grid):
                # Merge given params with default ones
                params = {**self.params, **params}
                cv_results = xgboost.cv(
                    params,
                    dtrain,
                    num_boost_round=100,
                    nfold=5,
                    metrics=["rmse"],
                    early_stopping_rounds=10,
                    seed=42
                )
                mean_rmse = cv_results["test-rmse-mean"].min()
                if mean_rmse < best_score:
                    best_score = mean_rmse
                    best_params = params
            print(f"Best Parameters: {best_params}, Best RMSE: {best_score}")
            self.params = best_params  # Update the model's parameters with the best ones

        # Train the model using the best parameters
        num_samples = inputs_np.shape[0]
        split_idx = int(0.8 * num_samples)
        train_inputs_np, eval_inputs_np = inputs_np[:split_idx], inputs_np[split_idx:]
        train_targets_np, eval_targets_np = targets_np[:split_idx], targets_np[split_idx:]
        
        # Create DMatrix for train and eval sets
        dtrain = xgboost.DMatrix(train_inputs_np, label=train_targets_np)
        deval = xgboost.DMatrix(eval_inputs_np, label=eval_targets_np)
        evals = [(dtrain, "train"), (deval, "eval")]
        self.evals_result = {}
        self.model = xgboost.train(self.params, 
                                   dtrain, 
                                   num_boost_round=100, 
                                   evals=evals, 
                                   evals_result=self.evals_result, 
                                   early_stopping_rounds=10
                                   )
        train_loss = self.evals_result["train"][self.params["eval_metric"]]
        eval_loss = self.evals_result["eval"][self.params["eval_metric"]]

        return train_loss, eval_loss

    def predict(self, inputs):
        """
        Predict outputs for given inputs using the trained model.
        """
        preds = XGBoostAutoGrad.apply(self.model, inputs)
        nr, nb, dx = inputs.shape
        _, dz = preds.shape
        z_mu = preds.view(nr, nb, dz)[...,:int(dz/2)]
        z_std = torch.abs(preds.view(nr, nb, dz)[..., int(dz/2):]) 

        return z_mu, z_std

    def save(self, path):
        """
        Save the trained model to the specified path.
        """
        if self.model is None:
            raise ValueError("The model has not been trained yet.")
        self.model.save_model(path)
    
    def load(self, path):
        """
        Load a trained model from the specified path.
        """
        self.model = xgboost.Booster()
        self.model.load_model(path)

    def forward(self, inputs):
        """
        Forward pass: call predict internally.
        """
        return self.predict(inputs)

class XGBoostAutoGrad(torch.autograd.Function):

    @staticmethod
    def forward(model, x):
        """
        Predict outputs for given inputs using the trained model.
        """
        nr, nb, dx = x.shape
        x_2d = x.view(nr*nb, dx)
        x_np = x_2d.clone().detach().cpu().numpy()
        dmatrix = xgboost.DMatrix(x_np)
        preds = model.predict(dmatrix)
        preds_tensor = torch.tensor(preds, dtype=x.dtype, device=x.device)
 
        return preds_tensor

    @staticmethod
    def setup_context(ctx, inputs, output):
        model, x = inputs
        preds = output
        ctx.save_for_backward(x, preds)
        ctx.model = model

    @staticmethod
    def backward(ctx, grad_output):
        """
        Backward pass: compute gradients w.r.t. the inputs using finite differences.
        """
        x, preds = ctx.saved_tensors
        nr, nb, dx = x.shape
        x_2d = x.view(nr*nb, dx)
        x_np = x_2d.clone().detach().cpu().numpy()
        _, dz = preds.shape
        preds_np = preds.clone().detach().cpu().numpy()

        # Finite differences for gradient approximation
        epsilon = 1e-2
        grads = np.zeros((nr*nb, dz, dx))
        for i in range(x_np.shape[1]):  # Loop over each feature
            perturbed_inputs = x_np.copy()
            perturbed_inputs[:, i] += epsilon  # Apply small perturbation to the i-th feature
            perturbed_dmatrix = xgboost.DMatrix(perturbed_inputs)
            perturbed_preds = ctx.model.predict(perturbed_dmatrix, output_margin=True)
            
            # Compute gradient approximation for the i-th feature
            grads[..., i] = (perturbed_preds - preds_np) / epsilon

        # Convert gradients to PyTorch tensors
        grad_input = torch.tensor(grads, dtype=x.dtype, device=x.device)

        return None, grad_input*grad_output.unsqueeze(2)

In [None]:
# Generate synthetic data
torch.manual_seed(42)
x = torch.rand(100, 1, 2, requires_grad=True)  # Enable gradients for input
y = 2 * x[..., 0] - 3 * x[..., 1] + 0.5  # Linear relationship

# Define XGBoost model wrapper
xgb_model_args = {"objective": "reg:squarederror",
                  "max_depth": 3,
                  "eta": 0.1,
                  "eval_metric": "rmse"
                  }
model = XGBoost(xgb_model_args)

# Fit the model
model.train(x, y)

# Predict
predictions = model.predict(x)

# Define a loss function (mean squared error)
loss = torch.mean((predictions - y) ** 2)

# Backpropagate to compute gradients
loss.backward()

# Print gradients
print("Gradients of the inputs:")
print(x.grad)

In [None]:
x.shape, y.shape

(torch.Size([100, 2]), torch.Size([100]))