In [4]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import qmc
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, Matern, RationalQuadratic, Kernel, Hyperparameter
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error

import GPy
import gpytorch

In [5]:
class CategoryOverlapKernelGPy(GPy.kern.Kern):
    """GPy implementation of a categorical overlap kernel."""

    def __init__(self, input_dim, variance=1.0, active_dims=None, name='catoverlap'):
        super().__init__(input_dim, active_dims=active_dims, name=name)
        self.variance = GPy.core.parameterization.Param('variance', variance)
        self.link_parameter(self.variance)

    def K(self, X, X2=None):
        if X2 is None:
            X2 = X

        diff = X[:, None] - X2[None, :]
        diff[np.where(np.abs(diff))] = 1  # Mark different categories
        k_cat = self.variance * (1 - np.mean(diff, axis=-1))  # Normalize overlap
        return k_cat



In [7]:
X_test = np.array([[4], [2], [2], [1], [0], [8]]) #each row is a category

# Create kernel and compute the kernel matrix
gpy_kernel = CategoryOverlapKernelGPy(input_dim=1, variance=1.0)
K_gpy = gpy_kernel.K(X_test)
print("GPy Kernel Matrix:\n", K_gpy)



GPy Kernel Matrix:
 [[1. 0. 0. 0. 0. 0.]
 [0. 1. 1. 0. 0. 0.]
 [0. 1. 1. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0.]
 [0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 1.]]


In [10]:
import torch
import gpytorch

class CategoryOverlapKernel(gpytorch.kernels.Kernel):
    """Custom kernel for cateogrical varaibles using category overlap similarity."""
    
    def __init__(self, **kwargs):
        super().__init__(has_lengthsacle = False, **kwargs)
    
    def forward(self, x1, x2, diag = False, **params):
        if diag:
            return torch.ones(x1.shape[0], dtype=x1.dtype, device=x1.device)
        
        #check if categorical values are teh same (Kronecker delta function)
        overlap = x1[:, None] == x2[None, :].float()
        return overlap
    
    
class CombinedKernel(gpytorch.kernels.Kernel):
    """Combined Matern kernel and Category Overlap Kernel"""

    def __init__(self, matern_nu=2.5, **kwargs):
        super().__init__(**kwargs)
        self.matern_kernel = gpytorch.kernels.MaternKernel(nu=matern_nu)
        self.category_kernel = CategoryOverlapKernel()

    def forward(self, x1, x2, diag = False, **params):
        #split cont and cat features
        x1_cont, x1_cat = x1[..., :-1], x1[..., -1]
        x2_cont, x2_cat = x2[..., :-1], x2[..., -1]

        # computer kernel values
        matern_val = self.matern_kernel(x1_cont, x2_cont, diag=diag, **params)
        category_val = self.category_kernel(x1_cat, x2_cat, diag=diag, **params)

        #combine kernels
        return matern_val * category_val
    
class CustomGPModel(gpytorch.models.ExactGP):
    def __init__ (self, train_x, train_y, likelihood):
        super().__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = CombinedKernel()

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)
    

In [11]:
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C

class SimpleCoCaBO:
    def __init__(self, continuous_dim, categorical_dim, kernel=None, noise_var=1e-5):
        # Initialize the model with dimensions of continuous and categorical variables
        self.continuous_dim = continuous_dim
        self.categorical_dim = categorical_dim
        
        # Define the kernel for the Gaussian Process (RBF + constant kernel)
        self.kernel = kernel if kernel else C(1.0, (1e-4, 1e1)) * RBF(1.0, (1e-4, 1e1))
        
        # Initialize the Gaussian Process Regressor
        self.gpr = GaussianProcessRegressor(kernel=self.kernel, alpha=noise_var)

        # Storage for past data
        self.X = []  # Stores continuous + categorical variables
        self.y = []  # Stores corresponding objective function values

    def fit(self, X_cont, X_cat, y):
        """Fit the Gaussian Process model on both continuous and categorical data."""
        # Combine continuous and categorical data
        X_combined = np.hstack((X_cont, X_cat))
        
        # Fit the Gaussian Process Regressor model
        self.gpr.fit(X_combined, y)
        
        # Store the data for future optimization
        self.X.extend(X_combined)
        self.y.extend(y)

    def predict(self, X_cont, X_cat):
        """Predict mean and variance for new points."""
        X_combined = np.hstack((X_cont, X_cat))
        mean, std = self.gpr.predict(X_combined, return_std=True)
        return mean, std

    def ucb(self, X_cont, X_cat, kappa=2.0):
        """Upper Confidence Bound (UCB) acquisition function."""
        mean, std = self.predict(X_cont, X_cat)
        ucb_values = mean + kappa * std
        return ucb_values

    def optimize(self, X_cont, X_cat, kappa=2.0):
        """Optimize the acquisition function (UCB)."""
        ucb_values = self.ucb(X_cont, X_cat, kappa)
        best_idx = np.argmax(ucb_values)  # Select the index with the highest UCB value
        return X_cont[best_idx], X_cat[best_idx]

# Example usage
if __name__ == "__main__":
    # Example continuous and categorical variables
    X_cont = np.array([[0.5], [0.2], [0.7]])  # Example continuous variables
    X_cat = np.array([[0], [1], [0]])  # Example categorical variables (just encoded as 0 or 1)
    y = np.array([0.3, 0.7, 0.5])  # Objective values

    # Instantiate the SimpleCoCaBO object
    optimizer = SimpleCoCaBO(continuous_dim=1, categorical_dim=1)

    # Fit the model to the data
    optimizer.fit(X_cont, X_cat, y)

    # Predict UCB values for new points
    new_cont = np.array([[0.6], [0.3]])  # New continuous points to evaluate
    new_cat = np.array([[1], [0]])  # New categorical points

    best_cont, best_cat = optimizer.optimize(new_cont, new_cat)
    print(f"Best continuous: {best_cont}, Best categorical: {best_cat}")


Best continuous: [0.6], Best categorical: [1]


testing