In [5]:
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
from PIL import Image            
import os
import imageio                    
from IPython.display import display
from IPython.display import display, Image
from sklearn.gaussian_process import GaussianProcessRegressor 
from sklearn.gaussian_process.kernels import Matern      
import plotly.graph_objects as go

from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC

In [12]:
# Acquisition function
def prob_i(x, gp_model, best_y):
    """_summary_

    Args:
        x (_type_): _description_
        gp_model (_type_): _description_
        best_y (_type_): _description_

    Returns:
        _type_: _description_
    """    
    if len(x.shape)==1 or x.shape[1]==1:
        y_pred, y_std = gp_model.predict(x.reshape(-1, 1), return_std=True)
    else:
        y_pred, y_std = gp_model.predict(x, return_std=True)
    z = (y_pred - best_y) / y_std
    pi = norm.cdf(z)
    return pi

def expected_i(x, gp_model, best_y):
    """_summary_

    Args:
        x (_type_): _description_
        gp_model (_type_): _description_
        best_y (_type_): _description_

    Returns:
        _type_: _description_
    """    
    if len(x.shape)==1 or x.shape[1]==1:
        y_pred, y_std = gp_model.predict(x.reshape(-1, 1), return_std=True)
    else:
        y_pred, y_std = gp_model.predict(x, return_std=True)
    z = (y_pred - best_y) / y_std
    ei = (y_pred - best_y) * norm.cdf(z) + y_std * norm.pdf(z)
    return ei

In [11]:
def smc(x,y,N,k_dim,T, var):
    '''Parameters:
    
    - x = sample of x
    - y = sample of y
    - N = number of samples for smc
    - k_dim = number of hyperparameters
    - T = 
    - var = T/100
    
    Output: theta_best = optimized hyperparameters'''
    
    #inizialization
    theta = np.zeros((N,k_dim,T))
    theta[:,:,0] = np.full((N,k_dim), 10) #to avoid negative theta we start far from 0
    w = np.zeros((N,T))
    
    
    for t in range(1, T):
        for i in range(N):
            for j in range(k_dim):
                theta[i,j,t] = np.random.normal(loc=theta[i,j,t-1], scale=var[j])
            
            
            #compute weights (gp needs to be computed with each set of hyperpars)
            if k_dim==2:
                kernel = (theta[i,0,t]**2) * Matern(length_scale=theta[i,1,t], nu=1.5)
                gp = GaussianProcessRegressor(kernel=kernel, optimizer=None, alpha=1e-5)
                gp.fit(x.reshape(-1, 1),y)
            elif k_dim==3:
                kernel = (theta[i,0,t]**2) * Matern(length_scale=[theta[i,1,t],theta[i,2,t]], nu=1.5)
                gp = GaussianProcessRegressor(kernel=kernel, optimizer=None, alpha=1e-5)
                gp.fit(x,y)
                
            w[i,t] = np.exp(gp.log_marginal_likelihood()*1e-3) #scaled to avoid underflow; will be normalized, so no worry

        #normalize weights
        w[:,t]/=np.sum(w[:,t])
        
        '''start resampling'''
        #print(w[:,t])
        #resample with replacement:
        for i in range(N):
            index = np.random.choice(N, size=1, p=w[:,t])
            theta[i,:,t] = theta[index,:,t]
        
    theta_best = np.mean(theta[:,:,T-1], axis=0)
    return theta_best

1D E BRANIN

In [15]:
# from Classes import BlackBox
# forse meglio fare un file con la classe?
class BlackBox:
    def __init__(self):
        """_summary_
        """        
        #Branin
        self.a = 1.0
        self.b = 5.1 / (4.0 * (np.pi ** 2))
        self.c = 5.0 / np.pi
        self.r = 6.0
        self.s = 10.0
        self.t = 1 / (8.0 * np.pi)
        
        #hartman 3d
        self.alpha = np.array([1.0, 1.2, 3.0, 3.2], dtype=float)
        
        self.P = 10**(-4) * np.array([[3689, 1170, 2673],
                 [4699, 4387, 7470],
                 [1091, 8732, 5547],
                 [381, 5743, 8828]],dtype=float)
        
        self.A = np.array([[3.0, 10, 30],
                 [0.1, 10, 35],
                 [3.0, 10, 30],
                 [0.1, 10, 35]], dtype=float)

        
    @staticmethod        
    def simple_func(x: np.ndarray)-> np.ndarray:
        y = np.sin(x) + np.cos(2*x)
        return y


    @staticmethod
    def branin(x1: np.ndarray,
               x2: np.ndarray,
                a: float, 
                b: float, 
                c: float, 
                r: float, 
                s: float, 
                t: float) -> np.ndarray:
        Hb = np.zeros_like(x1, dtype=float)
        Hb = a * (x2 - b * (x1 ** 2) + c * x1 - r) ** 2 + s * (1 - t) * np.cos(x1) + s
        return Hb


    @staticmethod
    def compute_y_values(xx: np.ndarray, alpha: np.ndarray, A, P)-> float:
        '''
        input: 
            xx = c(x1, x2, x3)
        
        parameters:
            self.alpha
            self.P
            self.A
        '''
        outer = 0.0
        for ii in range(4):
            inner = 0
            for jj in range(3):
                xj = xx[jj]
                Aij = A[ii, jj]
                Pij = P[ii, jj]
                inner += Aij * (xj - Pij) ** 2
            new = alpha[ii] * np.exp(-inner)
            outer += new
        y = -outer
        return y
    
    @classmethod
    def hartmann3d(cls, x1: np.ndarray, 
                   x2: np.ndarray,
                   x3: np.ndarray, 
                   n: int, 
                   alpha: np.ndarray, 
                   A: np.ndarray, 
                   P: np.ndarray)->np.ndarray:
        
        y_values = np.zeros((n, n, n))
        for i, x1_val in enumerate(x1):
            for j, x2_val in enumerate(x2):
                for k, x3_val in enumerate(x3):
                    y = cls.compute_y_values( np.array([x1_val, x2_val, x3_val]), alpha, A, P)
                    y_values[i, j, k] = y
        return y_values

In [16]:
def sampling(matriX: np.ndarray, n_sample: int, a: float, b: float, c: float, r: float, s: float, t: float )-> tuple[np.ndarray, np.ndarray]:
    """_summary_

    Args:
        matriX (np.ndarray): _description_
        n_sample (int): _description_
        a (float): _description_
        b (float): _description_
        c (float): _description_
        r (float): _description_
        s (float): _description_
        t (float): _description_

    Returns:
        tuple[np.ndarray, np.ndarray]: _description_
    """
    s_x = np.zeros((n_sample,2), dtype=float)
    s_x[:,0] = np.random.choice(matriX[:,0], n_sample)
    s_x[:,1] = np.random.choice(matriX[:,1], n_sample)
    
    s_z = np.zeros_like(s_x, dtype=float)
    s_z = my_blackbox.branin(s_x[:,0], s_x[:,1], a=a, b=b, c=c, r=r, s=s, t=t)

    return s_x, s_z

PLOT FUNCTIONS

In [3]:
def plot_1D(matriX, my_blackbox, improv, y_pred, y_std, sample_x, sample_y):
    plt.plot(matriX, my_blackbox.simple_func(matriX), color='orange', label='Black Box Function', linewidth=2, alpha=0.7)
    plt.plot(matriX, 10*improv, color='red', linestyle='dashed', label='10 x Surrogate Function', alpha=0.8)
    plt.fill_between(matriX, y_pred - 2*y_std, y_pred + 2*y_std, color='blue', alpha=0.2)
    plt.plot(matriX, y_pred, color='blue', label='Gaussian Process', alpha=0.7, linewidth=2)
    plt.scatter(sample_x, sample_y, color='red', label='Previous Points')



def plot_MLP(matriX, y_pred, y_std, sample_x, sample_y):
    """_summary_

    Args:
        matriX (_type_): _description_
        y_pred (_type_): _description_
        y_std (_type_): _description_
        sample_x (_type_): _description_
        sample_y (_type_): _description_
    """    ''''''
    plt.fill_between(matriX, y_pred - 2*y_std, y_pred + 2*y_std, color='blue', alpha=0.2)
    plt.plot(matriX, y_pred, color='blue', label='Gaussian Process', alpha=0.7, linewidth=2)
    plt.scatter(sample_x, sample_y, color='red', label='Previous Points')


def plot_3d_surface_variance(matriX, y_values, y_std):
    """_summary_

    Args:
        matriX (_type_): _description_
        y_values (_type_): _description_
        y_std (_type_): _description_
    """    
    y_values_reshaped = y_values.reshape(matriX.shape(0))
    y_std_reshaped = y_std.reshape(matriX.shape(0))

    fig = go.Figure(data=[go.Surface(x=matriX[:,0], y=matriX[:,1], z=y_values_reshaped, colorscale='Viridis', name='ypred')])

    fig.add_trace(go.Surface(x=matriX[:,0], y=matriX[:,1], z=y_values_reshaped + y_std_reshaped, colorscale='Viridis',showscale=False, opacity=0.6, name='ypred + y_std'))
    fig.add_trace(go.Surface(x=matriX[:,0], y=matriX[:,1], z=y_values_reshaped - y_std_reshaped, colorscale='Viridis',showscale=False, opacity=0.6, name='ypred - y_std'))

    # Set layout
    fig.update_layout(scene=dict(
                        xaxis_title='X1',
                        yaxis_title='X2',
                        zaxis_title='Acquisition function'))
    
    # Show plot
    fig.show()


def make_gif_MLP(folder_path, frames,name):
    """_summary_

    Args:
        folder_path (_type_): _description_
        frames (_type_): _description_
        name (_type_): _description_
    """    
    gif_path = os.path.join(folder_path, name)

    with imageio.get_writer(gif_path, mode='I', duration=0.5) as writer:
        for frame in frames:
            image = imageio.imread(os.path.join(folder_path, frame))
            writer.append_data(image)



NEURAL NETWORK

In [17]:
#For the 2D function:
def blackbox_mlp_2D(theta, X_train,y_train, X_test,y_test):
    '''Parameters:
        - theta: 2D array
          - theta[0] : learning rate
          - theta[1] : batch size
    '''
    if len(theta)==2:
        mlp = MLPClassifier(max_iter=100, alpha=1e-4, solver='sgd',
                        tol=1e-4, random_state=2072380, learning_rate_init=theta[0], hidden_layer_sizes=(20,20),\
                            batch_size=int(theta[1])).fit(X_train,y_train)
        mlp.predict(X_test)
    
    if len(theta)==2:
        mlp = MLPClassifier(max_iter=100, 
                            alpha=1e-4, 
                            solver='sgd',
                            tol=1e-4, 
                            random_state=2072380, 
                            learning_rate_init=theta[0], 
                            hidden_layer_sizes=(20,20)).fit(X_train,y_train)
        mlp.predict(X_test) 
    return mlp.score(X_test,y_test)

In [20]:
def optimize_MLP_2d(matriX,
                    y,
                    x1,
                    x2,
                    optimizer, 
                    num_iterations, 
                    acquisition, 
                    N, 
                    T, 
                    var, 
                    k_dim, 
                    folder_path:str, 
                    name_gif:str):
    """_summary_

    Args:
        matriX (_type_): _description_
        y (_type_): _description_
        x1 (_type_): _description_
        x2 (_type_): _description_
        optimizer (_type_): _description_
        num_iterations (_type_): _description_
        acquisition (_type_): _description_
        N (_type_): _description_
        T (_type_): _description_
        var (_type_): _description_
        k_dim (_type_): _description_
    """    
    frames = []
    plt.figure(figsize=(10, 6))

    theta_best = [1.0]*matriX.shape(1)#initial hyperpars if we use smc

    if (optimizer=="fmin_l_bfgs_b"):
        gp_model = GaussianProcessRegressor(kernel=kernel, alpha=1e-5, optimizer= "fmin_l_bfgs_b")
    # if it's smc it will be initialized after
    #eventually add here also vihola condition
        
    for i in range(num_iterations):
        if (i%10==0):
            print('Iteration number : ', i)
        
        #Update of kernel (automatic if optimizer = lbfgs)
        if (optimizer=='smc'):
            #Manually update kernel hyperparameters
            kernel = theta_best[0]**2 * Matern(length_scale=theta_best[1:], nu=1.5)
            gp_model = GaussianProcessRegressor(kernel=kernel, alpha=1e-5, optimizer= None)
            
        # Fit the Gaussian process model to the sampled points
        gp_model.fit(x,y)
    
 
        y_pred, y_std = gp_model.predict(matriX, return_std=True)
    
        best_idx = np.argmax(y)
        best_x = x[best_idx]
        best_y = y[best_idx]
    
        
        if acquisition==0:
            improv = expected_i(matriX,gp_model,best_y)
        else:
            improv = prob_i(matriX,gp_model,best_y)
            
        
        if i < num_iterations - 1:
            new_x = matriX[np.argmax(improv)].reshape(-1,2)  # Select the next point based on
            new_y = blackbox_mlp_2D(new_x[0])
            x = np.concatenate((x, new_x))
            y = np.append(y, new_y)
        
        if (optimizer=="smc"):
            #Optimize hyperpars with smc
            theta_best = smc(x,y,N,k_dim,T,var)
        
                # Save frame
        if matriX.shape(1)==1:
            filename = f"frame_{i}.png"
            plt.savefig(os.path.join(folder_path, filename))
            frames.append(filename)
            plt.clf()  # Clear current figure
    


    if matriX.shape(1)==1:
    # Create the GIF using the frames saved in the specified folder
        make_gif_MLP(folder_path, frames, name_gif)
        # Remove the saved frames
        for frame_file in frames:
            os.remove(os.path.join(folder_path, frame_file))
    if matriX.shape(1)==2:
        # Final plot
        plot_3d_surface_variance(x1, x2, y_pred,y_std)

    
    #Final data
    x_2d = x
    y_2d = y
        
    print('Optimized theta: ', gp_model.kernel_)
    return(x_2d,y_2d)