In [38]:
# Mini CoCaBO. Adapted from Ru's CoCaBO.

# Author: Subaru

# 1 Define the problem domain, or the data structure we want to fit the model onto.

'''
# this is currently unknown, but defined here to integrate with code
# f = function GPR model, but can't be created without the experimental results
'''
#this list denotes the number of discrete variables under each categorical variable
categories = [2, 3] # first cateogory hsa 2 values, second has 3 values

C = categories

# below categoricals have to be congrusent with above 
bounds = [
    {'name': 'h1', 'type': 'categorical', 'domain': (0, 1)}, # 2 values
    {'name': 'h2', 'type': 'categorical', 'domain': (0, 1, 2)}, # 3 values
    {'name': 'x1', 'type': 'continuous', 'domain': (-1, 1)}, # define bounds for continuous variables
    {'name': 'x2', 'type': 'continuous', 'domain': (-1, 1)},
    {'name': 'x3', 'type': 'continuous', 'domain': (-1, 1)},
    {'name': 'x4', 'type': 'continuous', 'domain': (-1, 1)}
]


In [None]:
# Assuming you already have your data as raw lab results:
# Sample data for demonstration (replace with actual data from lab)
# Categorical variables: randomly generate indices based on the domains
# Continuous variables: uses Latin Hypercube Sampling (LHS) to generate samples
# LHS is a statistical method for generating a distribution of samples of parameter values from a multidimensional distribution.

import numpy as np
import pandas as pd
from pyDOE import lhs


# Number of samples to generate
n_samples = 10
random_seed =42
np.random.seed = random_seed

# Generate Latin Hypercube Samples for continuous variables
n_continuous = len(bounds) - len(categories)  # Continuous variables come after categorical ones
continuous_bounds = [b['domain'] for b in bounds[len(categories):]]

# Initialize LHS for continuous variables
lhs_samples = lhs(n_continuous, samples=n_samples)

# Map LHS samples to the continuous variables' bounds
continuous_data = np.zeros((n_samples, n_continuous))
for i, (lower, upper) in enumerate(continuous_bounds):
    continuous_data[:, i] = lhs_samples[:, i] * (upper - lower) + lower

# Generate categorical data using LHS-style sampling
categorical_data = []
for i, cat in enumerate(categories):
    categorical_data.append(np.random.choice(range(cat), size=n_samples))

# Combine categorical and continuous data into a single data set
data = np.column_stack(categorical_data + [continuous_data[:, i] for i in range(n_continuous)])

# Convert to pandas DataFrame for easy manipulation
columns = [b['name'] for b in bounds]  # Extract column names (variable names)
df = pd.DataFrame(data, columns=columns)

# Print the generated DataFrame
print(df)
print(categorical_data)
print(continuous_data)



    h1   h2        x1        x2        x3        x4
0  0.0  2.0 -0.561729  0.424159  0.416662  0.614814
1  0.0  1.0  0.968700 -0.231780 -0.456543  0.338718
2  1.0  2.0  0.492801 -0.071257  0.274009 -0.802539
3  0.0  2.0  0.319719 -0.949339  0.806108  0.051825
4  0.0  1.0 -0.667612 -0.577574 -0.297410  0.474079
5  1.0  2.0  0.010305  0.791323  0.181799 -0.298815
6  0.0  1.0 -0.124414  0.887285 -0.033701  0.847615
7  1.0  2.0  0.649020  0.373001  0.790293 -0.160773
8  0.0  1.0 -0.986401 -0.613304 -0.637784 -0.792919
9  0.0  0.0 -0.318134  0.069725 -0.920371 -0.594758
[array([0, 0, 1, 0, 0, 1, 0, 1, 0, 0]), array([2, 1, 2, 2, 1, 2, 1, 2, 1, 0])]
[[-0.56172898  0.42415907  0.41666228  0.61481382]
 [ 0.96870036 -0.23178036 -0.45654338  0.33871791]
 [ 0.49280147 -0.07125669  0.27400891 -0.80253891]
 [ 0.31971886 -0.94933853  0.80610825  0.05182498]
 [-0.66761246 -0.57757413 -0.29740959  0.47407899]
 [ 0.01030549  0.79132341  0.18179871 -0.29881527]
 [-0.12441381  0.88728535 -0.03370062  0.84

In [42]:
# Define the fake objective function
def fake_objective_function(ht_list, X):
    """A simple quadratic function for testing purposes"""
    return np.sum(np.square(X)) + np.sum(ht_list)  # Just an example

# Create the y values (response) for each row
y_values = np.array([fake_objective_function(list(df.iloc[i, :len(categories)]), df.iloc[i, len(categories):]) for i in range(len(df))])

df1 = df
# Add the fake y values as a new column to the dataframe
df1['y'] = y_values

print(df1)

    h1   h2        x1        x2        x3        x4          y
0  0.0  2.0 -0.561729  0.424159  0.416662  0.614814  12.331591
1  0.0  1.0  0.968700 -0.231780 -0.456543  0.338718   7.675713
2  1.0  2.0  0.492801 -0.071257  0.274009 -0.802539  19.704807
3  0.0  2.0  0.319719 -0.949339  0.806108  0.051825  17.022005
4  0.0  1.0 -0.667612 -0.577574 -0.297410  0.474079   6.471065
5  1.0  2.0  0.010305  0.791323  0.181799 -0.298815  17.800944
6  0.0  1.0 -0.124414  0.887285 -0.033701  0.847615   8.884543
7  1.0  2.0  0.649020  0.373001  0.790293 -0.160773  21.941324
8  0.0  1.0 -0.986401 -0.613304 -0.637784 -0.792919  14.840257
9  0.0  0.0 -0.318134  0.069725 -0.920371 -0.594758   3.014855


In [26]:
import random


# These definitions outside class are utility functions placed here, so it's all in one place.
def with_proba(epsilon):
        """Bernoulli test: Returns True with probability epsilon, otherwise False."""
        assert 0 <= epsilon <= 1, "epsilon must be between 0 and 1"
        return random.random() < epsilon


"""
Created based on
https://jeremykun.com/2013/11/08/adversarial-bandits-and-the-exp3-algorithm/
"""
# draw: [float] -> int
# pick an index from the given list of floats proportionally
# to the size of the entry (i.e. normalize to a probability
# distribution and draw according to the probabilities).
def draw(weights):
    choice = random.uniform(0, sum(weights))
    #    print(choice)
    choiceIndex = 0

    for weight in weights:
        choice -= weight
        if choice <= 0:
            return choiceIndex
        choiceIndex += 1

# distr: [float] -> (float)
# Normalize a list of floats to a probability distribution.  Gamma is an
# egalitarianism factor, which tempers the distribtuion toward being uniform as
# it grows from zero to one.
def distr(weights, gamma=0.0):
    theSum = float(sum(weights))
    return tuple((1.0 - gamma) * (w / theSum) + (gamma / len(weights)) for w in weights)

def DepRound(weights_p, k=1, isWeights=True):
    r""" [[Algorithms for adversarial bandit problems with multiple plays, by T.Uchiya, A.Nakamura and M.Kudo, 2010](http://hdl.handle.net/2115/47057)] Figure 5 (page 15) is a very clean presentation of the algorithm.

    - Inputs: :math:`k < K` and weights_p :math:`= (p_1, \dots, p_K)` such that :math:`\sum_{i=1}^{K} p_i = k` (or :math:`= 1`).
    - Output: A subset of :math:`\{1,\dots,K\}` with exactly :math:`k` elements. Each action :math:`i` is selected with probability exactly :math:`p_i`.

    Example:

    >>> import numpy as np; import random
    >>> np.random.seed(0); random.seed(0)  # for reproductibility!
    >>> K = 5
    >>> k = 2

    >>> weights_p = [ 2, 2, 2, 2, 2 ]  # all equal weights
    >>> DepRound(weights_p, k)
    [3, 4]
    >>> DepRound(weights_p, k)
    [3, 4]
    >>> DepRound(weights_p, k)
    [0, 1]

    >>> weights_p = [ 10, 8, 6, 4, 2 ]  # decreasing weights
    >>> DepRound(weights_p, k)
    [0, 4]
    >>> DepRound(weights_p, k)
    [1, 2]
    >>> DepRound(weights_p, k)
    [3, 4]

    >>> weights_p = [ 3, 3, 0, 0, 3 ]  # decreasing weights
    >>> DepRound(weights_p, k)
    [0, 4]
    >>> DepRound(weights_p, k)
    [0, 4]
    >>> DepRound(weights_p, k)
    [0, 4]
    >>> DepRound(weights_p, k)
    [0, 1]

    - See [[Gandhi et al, 2006](http://dl.acm.org/citation.cfm?id=1147956)] for the details.
    """
    p = np.array(weights_p)
    K = len(p)
    # Checks
    assert k < K, f"Error: k = {k} should be < K = {K}."  # DEBUG
    if not np.isclose(np.sum(p), 1):
        p = p / np.sum(p)
    assert np.all(0 <= p) and np.all(p <= 1), f"Error: the weights (p_1, ..., p_K) should all be 0 <= p_i <= 1. Got {p}"  # DEBUG
    assert np.isclose(np.sum(p), 1), f"Error: the sum of weights p_1 + ... + p_K should =1. Got: {np.sum(p)}"  # DEBUG
    # Main loop
    possible_ij = [a for a in range(K) if 0 < p[a] < 1]
    while possible_ij:
        # Choose distinct i, j with 0 < p_i, p_j < 1
        if len(possible_ij) == 1:
            i = np.random.choice(possible_ij, size=1)
            j = i
        else:
            i, j = np.random.choice(possible_ij, size=2, replace=False)
        pi, pj = p[i], p[j]
        assert 0 <= pi <= 1, f"Error: pi = {pi} (with i = {i}) is not 0 <= pi <= 1."  # DEBUG
        assert 0 <= pj <= 1, f"Error: pj = {pj} (with j = {j}) is not 0 <= pj <= 1."  # DEBUG
        assert i != j, f"Error: i = {i} is different than with j = {j}." # DEBUG

        # Set alpha, beta
        alpha, beta = min(1 - pi, pj), min(pi, 1 - pj)
        proba = alpha / (alpha + beta)
        if with_proba(proba):  # with probability = proba = alpha/(alpha+beta)
            pi, pj = pi + alpha, pj - alpha
        else:            # with probability = 1 - proba = beta/(alpha+beta)
            pi, pj = pi - beta, pj + beta

        # Store
        p[i], p[j] = pi, pj
        # And update
        possible_ij = [a for a in range(K) if 0 < p[a] < 1]
        if len([a for a in range(K) if np.isclose(p[a], 0)]) == K - k:
            break
    # Final step
    subset = [a for a in range(K) if np.isclose(p[a], 1)]
    if len(subset) < k:
        subset = [a for a in range(K) if not np.isclose(p[a], 0)]
    assert len(subset) == k, f"Error: DepRound({weights_p}, {k}) is supposed to return a set of size {k}, but {subset} has size {len(subset)}..." # DEBUG
    return subset


In [27]:
def test_draw():
    weights = [1, 2, 3]
    result = draw(weights)
    assert result in [0, 1, 2], f"Test failed. Result was: {result}"
    print("Test passed for draw function.")

test_draw()

Test passed for draw function.


In [28]:
def test_distr():
    weights = [2, 2, 2]
    result = distr(weights)
    assert sum(result) == 1.0, f"Test failed. Sum of probabilities is: {sum(result)}"
    assert all(0 <= x <= 1 for x in result), f"Test failed. Result contains out-of-bound values: {result}"
    print("Test passed for distr function.")

test_distr()

Test passed for distr function.


In [29]:
def test_depround():
    weights_p = [2, 2, 2, 2, 2]
    k = 2
    result = DepRound(weights_p, k)
    assert len(result) == k, f"Test failed. Expected subset of size {k}, but got {len(result)}"
    print(f"Test passed for DepRound function. Result: {result}")

test_depround()

Test passed for DepRound function. Result: [1, 2]


In [30]:
def test_with_proba():
    prob = 0.7
    result = with_proba(prob)
    assert result in [True, False], f"Test failed. Result was: {result}"
    print("Test passed for with_proba function.")

test_with_proba()

Test passed for with_proba function.


In [31]:
import numpy as np
from scipy.optimize import minimize
class miniCoCaBO(): # combining baseBO and CoCaBO_base from Ru

     # Ru BaseBO and CoCaBO_base __init__  
    def __init__(self, objfn, initN, bounds, C, 
                  acq_type, kernel_mix=0.5, mix_lr=10, 
                  model_update_interval =10,
                  ard=False, rand_seed=42, debug=False,
                  batch_size=1, **kwargs):
        self.f = objfn  # function to optimise
        self.bounds = bounds  # function bounds
        self.batch_size = batch_size
        self.C = C  # no of categories
        self.C_list = C # Ensure compatibility with existing functions
        self.initN = initN  # no: of initial points
        self.nDim = len(self.bounds)  # dimension
        self.acq_type = acq_type # Acquisition function type
        self.rand_seed = rand_seed
        self.debug = debug
        self.saving_path = None
        self.kwargs = kwargs
        self.best_val_list =[]

        self.x_bounds = np.vstack([d['domain'] for d in self.bounds
                                   if d['type'] == 'continuous'])

        # Store the ht recommendations for each iteration
        self.ht_recommendations = []
        self.ht_hist_batch = []

        '''
        # Store the name of the alogrithm'
        self. policy = None'
        '''

        self.X=[]
        self.Y=[]

        # To check the best vals
        self.gp_bestvals = []
        self.ARD = ard

        # Keeping track of current interation helps control mix learning
        self.iteration = None

        self.model_hp = None
        self.default_cont_lengthscale = 0.2

        self.mix = kernel_mix

        if ((model_update_interval % mix_lr ==0) or (mix_lr % model_update_interval ==0)):
            self.mix_learn_rate = mix_lr
            self.model_update_interval = model_update_interval
        else:
            self.mix_learn_rate = min(mix_lr, model_update_interval)
            self.model_update_interval = min(mix_lr, model_update_interval)
        self.mix_used = 0.5

        self.name = "miniCoCaBO"

     #Ru BaseBO function   
    def my_func(self, Z):
        Z = np.atleast_2d(Z)
        if len(Z) == 1:
            X = Z[0, len(self.C):]
            ht_list = list(Z[0, :len(self.C)])
            return self.f(ht_list, X)
        else:
            f_vals = np.zeros(len(Z))
            for ii in range(len(Z)):
                X = Z[ii, len(self.C):]
                ht_list = list(Z[ii, :len(self.C)].astype(int))
                f_vals[ii] = self.f(ht_list, X)
            return f_vals 
    

    # Ru CoCaBO_base function This functions controls how categorical weights (Wc) 
    # are updated. If a certain category is overrepresented alpha reduces their dominance
    # and if a certain category is underrepresented alpha increases their probability of being selected
    # in the next iteration. This is controled by the gamma parameter. 

    # Large gamma = explore, small gamma = exploit
    def estimate_alpha(self, batch_size, gamma, Wc, C):

        def single_evaluation(alpha):
            denominator = sum([alpha if val > alpha else val for idx, val in enumerate(Wc)])
            rightside = (1 / batch_size - gamma / C) / (1 - gamma)
            output = np.abs(alpha / denominator - rightside)

            return output

        x_tries = np.random.uniform(0, np.max(Wc), size=(100, 1))
        y_tries = [single_evaluation(val) for val in x_tries]
        # find x optimal for init
        # print(f'ytry_len={len(y_tries)}')
        idx_min = np.argmin(y_tries)
        x_init_min = x_tries[idx_min]

        res = minimize(single_evaluation, x_init_min, method='BFGS', options={'gtol': 1e-6, 'disp': False})
        if isinstance(res, float):
            return res
        else:
            return res.x
    # Ru CoCaBO_base function
    def compute_reward_for_all_cat_variable(self, ht_next_batch_list, batch_size):
        # Obtain the reward for each categorical variable: B x len(self.C_list)
        ht_batch_list_rewards = np.zeros((batch_size, len(self.C_list)))
        for b in range(batch_size):
            ht_next_list = ht_next_batch_list[b, :]

            for i in range(len(ht_next_list)):
                idices = np.where(self.data[0][:, i] == ht_next_list[i])
                ht_result = self.result[0][idices]
                ht_reward = np.max(ht_result * -1)
                ht_batch_list_rewards[b, i] = ht_reward
        return ht_batch_list_rewards
    # Ru CoCaBO_base function
    def update_weights_for_all_cat_var(self, Gt_ht_list, ht_batch_list, Wc_list, gamma_list,
                                        probabilityDistribution_list, batch_size, S0=None):
        for j in range(len(self.C_list)):
            Wc = Wc_list[j]
            C = self.C_list[j]
            gamma = gamma_list[j]
            probabilityDistribution = probabilityDistribution_list[j]
            # print(f'cat_var={j}, prob={probabilityDistribution}')

            if batch_size > 1:
                ht_batch_list = ht_batch_list.astype(int)
                Gt_ht = Gt_ht_list[:, j]
                mybatch_ht = ht_batch_list[:, j]  # 1xB
                for ii, ht in enumerate(mybatch_ht):
                    Gt_ht_b = Gt_ht[ii]
                    estimatedReward = 1.0 * Gt_ht_b / probabilityDistribution[ht]
                    if ht not in S0:
                        Wc[ht] *= np.exp(batch_size * estimatedReward * gamma / C)
            else:
                Gt_ht = Gt_ht_list[j]
                ht = ht_batch_list[j]  # 1xB
                estimatedReward = 1.0 * Gt_ht / probabilityDistribution[ht]
                Wc[ht] *= np.exp(estimatedReward * gamma / C)

        return Wc_list
    # Ru CoCaBO_base function
    def compute_prob_dist_and_draw_hts(self, Wc_list, gamma_list, batch_size):

        if batch_size > 1:
            ht_batch_list = np.zeros((batch_size, len(self.C_list)))
            probabilityDistribution_list = []

            for j in range(len(self.C_list)):
                Wc = Wc_list[j]
                gamma = gamma_list[j]
                C = self.C_list[j]
                # perform some truncation here
                maxW = np.max(Wc)
                temp = np.sum(Wc) * (1.0 / batch_size - gamma / C) / (1 - gamma)
                if gamma < 1 and maxW >= temp:
                    # find a threshold alpha
                    alpha = self.estimate_alpha(batch_size, gamma, Wc, C)
                    S0 = [idx for idx, val in enumerate(Wc) if val > alpha]
                else:
                    S0 = []
                # Compute the probability for each category
                probabilityDistribution = distr(Wc, gamma)

                # draw a batch here
                if batch_size < C:
                    mybatch_ht = DepRound(probabilityDistribution, k=batch_size)
                else:
                    mybatch_ht = np.random.choice(len(probabilityDistribution), batch_size, p=probabilityDistribution)

                # ht_batch_list size: len(self.C_list) x B
                ht_batch_list[:, j] = mybatch_ht[:]

                # ht_batch_list.append(mybatch_ht)
                probabilityDistribution_list.append(probabilityDistribution)

            return ht_batch_list, probabilityDistribution_list, S0

        else:
            ht_list = []
            probabilityDistribution_list = []
            for j in range(len(self.C_list)):
                Wc = Wc_list[j]
                gamma = gamma_list[j]
                # Compute the probability for each category
                probabilityDistribution = distr(Wc, gamma)
                # Choose a categorical variable at random
                ht = draw(probabilityDistribution)
                ht_list.append(ht)
                probabilityDistribution_list.append(probabilityDistribution)

            return ht_list, probabilityDistribution_list

