#### 1. Import libraries

In [None]:
import torch
torch.set_default_dtype(torch.float64)

import math
import numpy
from numpy import linalg as LA
import pandas
from copy import deepcopy
from scipy import special
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, f1_score, average_precision_score
from sklearn.metrics import root_mean_squared_error, mean_squared_error, r2_score

import import_ipynb
import data_analysis
import utility_functions

#### 2. Binary linear classifiers

##### Definition of mixture models class

In [1]:
class Pac_bayes_clf:
    def __init__(self, learning_rate=0.01, lambda_reg=100, W_ini=None, w_ext_ini=None, K_ini=None, TAU_ini=None, K_p=None, TAU_p=None, max_iters=2000, class_weights=None, print_flag=False):
        self.lr = learning_rate                                    # the step size at each iteration
        self.Lambda = lambda_reg                                   # the regularization rate (lambda_reg should be higher than 1)
        self.W_ini, self.w_ext_ini = W_ini, w_ext_ini              # initial mean vector weights Wi and w_ext of the hyperplanes Vi and v_ext
        self.K_ini, self.TAU_ini = K_ini, TAU_ini                  # initial values of the shape and rate of locality parameter beta to find (beta = k/tau)
        self.K_p, self.TAU_p = K_p, TAU_p                          # prior value of the shape and rate (gamma distribution parameter)
        self.max_iters = max_iters                                 # maximum number of iterations before stopping independently of any early stopping
        self.weights = class_weights                               # to take into account the skewed distribution of the classes (if necessary), it just a flag
        self.print_flag = print_flag                               # flag to print some infos
        self.W, self.MU, self.SIGMA = None, None, None             # model parameters to be find in n localities
        self.w_ext, self.mu_ext, self.sigma_ext = None, None, None # model external parameters to be find out of n localities
        self.K, self.TAU, self.BETA = None, None, None             # the locality parameters beta to find beta = (k, tau)
        self.weights_vet = None                                    # to take into account the weights the target (if necessary)
    
    def fit(self, X, y, Mat_dist):
        # data shape and number of vicinity points
        n_samples, n_features = X.shape
        n_points = Mat_dist.shape[0]

        # compute class_weight
        if self.weights is None:
            self.weights_vet = numpy.ones((n_samples,1)) 
        else:
            class_weights = compute_class_weight(class_weight='balanced', classes=numpy.unique(y), y=y)
            self.weights_vet = numpy.where(y == numpy.unique(y)[0], class_weights[0], class_weights[1])

        # initialize parameters
        if self.W_ini is None:
            self.W_ini, self.w_ext_ini = torch.randn(n_points, n_features), torch.randn(n_features)
            self.K_ini, self.TAU_ini = 50 * torch.rand(n_points), 50 * torch.rand(n_points)
            
        self.W, self.MU, self.SIGMA = deepcopy(self.W_ini), torch.zeros(n_points), torch.ones(n_points)
        self.w_ext, self.mu_ext, self.sigma_ext = deepcopy(self.w_ext_ini), torch.tensor([0.0]), torch.tensor([1.0])
        self.K, self.TAU = deepcopy(self.K_ini), deepcopy(self.TAU_ini)

        # concatenate (initialize parameters)
        starting_point = ((self.W, self.MU, self.SIGMA, self.w_ext, self.mu_ext, self.sigma_ext), (torch.from_numpy(self.K_p), torch.from_numpy(self.TAU_p)), (self.K, self.TAU))
        
        # convert data to tensor
        X_, y_, Mat_dist_, self.weights_vet = torch.from_numpy(X), torch.from_numpy(y), torch.from_numpy(Mat_dist), torch.from_numpy(self.weights_vet)

        # minimized params
        gaussian_params, gamma_params = utility_functions.minimize_binary_clf_gn(starting_point, X_, y_, Mat_dist_, self.lr, self.Lambda, self.max_iters, self.weights_vet, self.print_flag)

        # get minimized params and convert initial params to numpy array
        self.W, self.MU, self.SIGMA, self.w_ext, self.mu_ext, self.sigma_ext = gaussian_params
        self.K, self.TAU = gamma_params
        self.BETA = numpy.round(self.K / self.TAU, 6)
            
    def predict(self, X, Mat_dists):
        n_points = Mat_dists.shape[0]

        W_params = numpy.concatenate((self.W, self.w_ext.reshape(1, self.w_ext.size)), axis=0)
        MU_params = numpy.concatenate((self.MU, numpy.array([self.mu_ext])), axis=0)
        approx = (numpy.matmul(X, W_params.T) + MU_params).T
        
        # get idx_W for each samples
        idx_W = numpy.argmin((Mat_dists + numpy.where(Mat_dists <= self.BETA.reshape(n_points,1), 0, 1)), axis=0)
        number_of_overlap_region = numpy.sum(numpy.where(Mat_dists <= self.BETA.reshape(n_points,1), 1, 0), axis=0)
        idx_W[numpy.where(number_of_overlap_region == 0)[0]] = n_points

        mask = numpy.nonzero( (numpy.ones((n_points+1,1)) * idx_W) == (numpy.arange(n_points+1)).reshape(n_points+1,1) )
        mask = (mask[0][numpy.argsort(mask[1])], numpy.sort(mask[1]))
        return numpy.where(numpy.sign(approx[mask]) <= 0, -1, 1)
    
    def score(self, X, y, Mat_dists):
        return numpy.round(accuracy_score(y, self.predict(X, Mat_dists))*100, 4)

    def risk_bound(self, X, y, Mat_dists):
        # get data shape
        n_points, n_samples = Mat_dists.shape
        
        # compute Theta
        Mat_Theta = special.gdtrc(self.TAU.reshape(n_points,1), self.K.reshape(n_points, 1), Mat_dists)
        
        # Mat_loss_in and Loss_out
        norm_X_square = LA.norm(X, axis=1)**2
        Mat_loss_in = 1 - special.ndtr( (y.reshape(n_samples, 1) * (numpy.dot(X, self.W.T) + self.MU)) / numpy.sqrt(self.SIGMA**2 + norm_X_square.reshape(n_samples, 1)) ).T
        Loss_out = 1 - special.ndtr( (y * (numpy.dot(X, self.w_ext) + self.mu_ext)) / numpy.sqrt(self.sigma_ext**2 + norm_X_square) )
    
        # compute empirical risk
        ER = numpy.mean(numpy.sum(Mat_Theta * Mat_loss_in, axis=0) + numpy.prod(1 - Mat_Theta, axis=0) * Loss_out)
        
        # compute KL divergence
        gaussian_kl_var = - math.log(numpy.prod(self.SIGMA) * self.sigma_ext) + 0.5 * (numpy.sum(self.SIGMA**2) + self.sigma_ext**2 - (n_points + 1))
        gaussian_kl = 0.5 * (numpy.sum(LA.norm(self.W, axis=1)**2, axis=0) + numpy.sum(self.MU**2, axis=0) + LA.norm(self.w_ext)**2 + self.mu_ext**2)
        gamma_kl = numpy.sum( (self.K - self.K_p) * special.psi(self.K) + special.gammaln(self.K_p) - special.gammaln(self.K) + self.K_p * numpy.log(self.TAU / self.TAU_p) + self.K * ((self.TAU_p - self.TAU) / self.TAU) )
        return ER, ER + (1 / self.Lambda) * (gaussian_kl_var + gaussian_kl + gamma_kl)
        

##### Choice of Lambda

In [None]:
def build_mixture_clf(X_train, X_val, y_train, y_val, Mat_dist_train, Mat_dist_val, T=10, lr=0.01, lambda_param=1000, K_p=None, TAU_p=None, max_iters=2000, class_weights=None, print_flag=False): 
    old_val_ER, old_val_risk = 1e10, 1e10
    for i in range(T):
        mixture_clf = Pac_bayes_clf(learning_rate=lr, lambda_reg=lambda_param, K_p=K_p, TAU_p=TAU_p, max_iters=max_iters, class_weights=class_weights, print_flag=print_flag)
        mixture_clf.fit(X_train, y_train, Mat_dist_train)
        new_val_ER, new_val_risk = mixture_clf.risk_bound(X_val, y_val, Mat_dist_val)
        
        if (new_val_ER < old_val_ER) and (new_val_risk < old_val_risk):
            Initial_parameters = (mixture_clf.W_ini, mixture_clf.w_ext_ini, mixture_clf.K_ini, mixture_clf.TAU_ini)
            val_score, old_val_ER, old_val_risk = mixture_clf.score(X_val, y_val, Mat_dist_val), new_val_ER, new_val_risk
    return val_score, Initial_parameters

def lambda_validation_clf(X_train, X_val, y_train, y_val, Mat_dist_train, Mat_dist_val, lr=0.01, K_p=None, TAU_p=None, max_iters=2000, class_weights=None): 
    T, old_val_score = 10 * len(K_p), 0
    lambda_params = numpy.arange(1, 5.25, 0.25) * (1 / len(K_p)) * X_train.shape[0]

    i = 0
    pbar = tqdm(desc="Tuning Lambda ("+str(T)+" random restarts for each lambda) : ", total=len(lambda_params), position=0)
        
    while(i < len(lambda_params)) :
        lambda_param = lambda_params[i]
        torch.manual_seed(lambda_param)
        new_val_score, new_Initial_parameters = build_mixture_clf(X_train, X_val, y_train, y_val, Mat_dist_train, Mat_dist_val, T=T, lr=lr, lambda_param=lambda_param, 
                                                                  K_p=K_p, TAU_p=TAU_p, max_iters=max_iters, class_weights=class_weights, print_flag=False)
        if (new_val_score >= old_val_score) :
            lambda_param_, Initial_parameters, old_val_score = lambda_param, new_Initial_parameters, new_val_score
        
        pbar.update(1)
        i = i + 1

        elapsed = pbar.format_dict["elapsed"]
        if elapsed > 1800:
            break;
            
    pbar.close()
    return lambda_param_, Initial_parameters
    

##### Mixture Program

In [None]:
def Mixture_clf(data, target_name, X0, synthetic_data_flag=False, train_size=0.75, lr=0.01, lambda_param=100, max_iters=2000, lambda_validation=False, times=1, return_flag='simple'):
    # given point of interest X0 (center of vicinity data or specific point)
    assert isinstance(X0, pandas.DataFrame)
    
    # uncouping X and y
    X, y = data_analysis.uncouping_x_y_clf(data.copy(), target_name)
    class_weights = None
    print('***************** Mixtures of transparent local models with known points of interest *****************')
    print(f'Training_set = {round((train_size * 100))}%, Validation_set = {round(((1 - train_size)/2) * 100)}%, Test_set = {round(((1 - train_size)/2) * 100)}%, class_weights = {class_weights}, lambda_validation = {lambda_validation}, times = {times}')
    
    All_acc_score = []
    for i in tqdm(numpy.arange(times), desc="For Random Data Split = "+str(times)+" …", total=times, position=0):
        # split the dataset X into the training set X_train and temporary set X_temp
        X_train, X_temp, y_train, y_temp = train_test_split(X, y, train_size = train_size, stratify=y, random_state=i)
        # split the dataset X_temp into the validation set X_val and testing set X_test
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, train_size = 0.5, stratify=y_temp, random_state=0)
        X_train, X_val, X_test = data_analysis.reset_index_data(data_1=X_train, data_2=X_val, data_3=X_test, data_4=None)
    
        # define gamma_priors
        K_p, TAU_p = 2.0 * numpy.ones(X0.shape[0]), (1 / 10) * numpy.ones(X0.shape[0])
        
        # data encoding (target encoding for category variables) and scaling (example : 'TargetEncoder', 'OrdinalEncoder', etc...)
        if synthetic_data_flag==False:
            X_train_enc, X_val_enc, X_test_enc, X0_enc = data_analysis.data_processing(xtrain=X_train.copy(), ytrain=y_train.copy(), xtest_1=X_val.copy(), xtest_2=X_test.copy(), xtest_3=X0.copy(), check_multicollinearity=True)
            X_train_enc, X_val_enc, X_test_enc, X0_enc = X_train_enc.values.copy(), X_val_enc.values.copy(), X_test_enc.values.copy(), X0.values.copy()
        else:
            X_train_enc, X_val_enc, X_test_enc, X0_enc = X_train.values.copy(), X_val.values.copy(), X_test.values.copy(), X0.values.copy()

         # get set of vicinity distance (example : 'euclidean', 'gower', etc...)
        Mat_dist_train = LA.norm(X_train_enc - X0_enc.reshape(X0_enc.shape[0], 1, X0_enc.shape[1]), axis=2)
        Mat_dist_val = LA.norm(X_val_enc - X0_enc.reshape(X0_enc.shape[0], 1, X0_enc.shape[1]), axis=2)
        Mat_dist_test = LA.norm(X_test_enc - X0_enc.reshape(X0_enc.shape[0], 1, X0_enc.shape[1]), axis=2)

        # finding the best lambda by cross validation on data
        if lambda_validation == True:
            lambda_param, Initial_parameters = lambda_validation_clf(X_train_enc.copy(), X_val_enc.copy(), y_train.copy(), y_val.copy(), Mat_dist_train, Mat_dist_val, lr=lr, K_p=K_p, TAU_p=TAU_p, max_iters=max_iters, class_weights=class_weights)
        else :
            torch.manual_seed(lambda_param)
            Acc_score, Initial_parameters = build_mixture_clf(X_train_enc.copy(), X_val_enc.copy(), y_train.copy(), y_val.copy(), Mat_dist_train, Mat_dist_val, T=50, lr=lr, lambda_param=lambda_param, K_p=K_p, TAU_p=TAU_p, max_iters=max_iters, class_weights=class_weights, print_flag=False)
        
        # fitting model
        mixture_clf = Pac_bayes_clf(learning_rate=lr, lambda_reg=lambda_param, W_ini=Initial_parameters[0], w_ext_ini=Initial_parameters[1], K_ini=Initial_parameters[2], TAU_ini=Initial_parameters[3], K_p=K_p.copy(), TAU_p=TAU_p.copy(), max_iters=max_iters, class_weights=class_weights, print_flag=False)
        mixture_clf.fit(X_train_enc, y_train, Mat_dist_train)
        
        # prediction
        y_train_preds = mixture_clf.predict(X_train_enc, Mat_dist_train)
        y_val_preds = mixture_clf.predict(X_val_enc, Mat_dist_val)
        y_test_preds = mixture_clf.predict(X_test_enc, Mat_dist_test)
        
        # compute risk for each dataset
        risk_bound_set = []
        risk_bound_set.append(mixture_clf.risk_bound(X_train_enc, y_train, Mat_dist_train))
        risk_bound_set.append(mixture_clf.risk_bound(X_val_enc, y_val, Mat_dist_val))
        risk_bound_set.append(mixture_clf.risk_bound(X_test_enc, y_test, Mat_dist_test))
        risk_bound_set = numpy.round(numpy.array(risk_bound_set), 4)
    
        # get summary
        summary_random = results_summary_clf(y_train, y_val, y_test, y_train_preds, y_val_preds, y_test_preds, risk_bound_set[:,0], risk_bound_set[:,1])
        All_acc_score.append(summary_random['Accuracy'].values)

        if (i == 0):
            W_random_state, MU_random_state, SIGMA_random_state = mixture_clf.W, mixture_clf.MU, mixture_clf.SIGMA
            w_ext_random_state, mu_ext_random_state, sigma_ext_random_state = mixture_clf.w_ext, mixture_clf.mu_ext, mixture_clf.sigma_ext
            K_random_state, TAU_random_state, BETA_random_state = mixture_clf.K, mixture_clf.TAU, mixture_clf.BETA
            summary_random_state = summary_random
        else:
            W_random_state += mixture_clf.W
            MU_random_state += mixture_clf.MU
            SIGMA_random_state += mixture_clf.SIGMA
            w_ext_random_state += mixture_clf.w_ext
            mu_ext_random_state += mixture_clf.mu_ext
            sigma_ext_random_state += mixture_clf.sigma_ext
            K_random_state += mixture_clf.K
            TAU_random_state += mixture_clf.TAU
            BETA_random_state += mixture_clf.BETA
            summary_random_state += summary_random
        
        print(f'W = {mixture_clf.W}, MU = {numpy.round(mixture_clf.MU, 6)}, lambda_param = {lambda_param}')
        print(f'w_ext = {mixture_clf.w_ext}, mu_ext = {round(mixture_clf.mu_ext, 6)}')
        print(f'SIGMA = {numpy.round(mixture_clf.SIGMA, 6)}, sigma_ext = {round(mixture_clf.sigma_ext, 6)}')

    W, MU, SIGMA = (W_random_state / times), (MU_random_state / times), (SIGMA_random_state / times)
    w_ext, mu_ext, sigma_ext = (w_ext_random_state / times), (mu_ext_random_state / times), (sigma_ext_random_state / times)
    K, TAU, BETA = (K_random_state / times), (TAU_random_state / times), (BETA_random_state / times)
    summary = (summary_random_state / times).astype('float64')
    summary['Std_accuracy'] = numpy.std(numpy.array(All_acc_score, dtype='float64'), axis=0) # add column for accuracy standard deviation
    summary = summary.round(4)
    
    if return_flag=='simple':
        return X0_enc, lambda_param, W, w_ext, MU, mu_ext, K, TAU, BETA, summary
    else :
        return X0_enc, lambda_param, W, w_ext, MU, mu_ext, K, TAU, BETA, summary, X_train_enc, X_test_enc, Mat_dist_train, Mat_dist_test, y_train.copy(), y_test.copy(), y_train_preds.copy(), y_test_preds.copy()
        

##### Performance measurement

In [None]:
def results_summary_clf(ytrain_true, yval_true, ytest_true, ytrain_pred, yval_pred, ytest_pred, Gibbs_risk_set, risk_bound_set):
    # for global model
    Summary_index = ['Training set', 'Validation set', 'Testing set']
    Summary_columns = ['Accuracy', 'F1_score', 'Precision_score', 'Gibbs_risk', 'Risk_bound']
    Summary_results = pandas.DataFrame(index=Summary_index, columns=Summary_columns)

    # performance of the global model
    Summary_results.loc[Summary_index[0], Summary_columns[0]] = round(accuracy_score(ytrain_true, ytrain_pred)*100, 2)
    Summary_results.loc[Summary_index[0], Summary_columns[1]] = round(f1_score(ytrain_true, ytrain_pred)*100, 2)
    Summary_results.loc[Summary_index[0], Summary_columns[2]] = round(average_precision_score(ytrain_true, ytrain_pred)*100, 2)
    Summary_results.loc[Summary_index[0], Summary_columns[3]] = Gibbs_risk_set[0]
    Summary_results.loc[Summary_index[0], Summary_columns[4]] = risk_bound_set[0]
    
    Summary_results.loc[Summary_index[1], Summary_columns[0]] = round(accuracy_score(yval_true, yval_pred)*100, 2)
    Summary_results.loc[Summary_index[1], Summary_columns[1]] = round(f1_score(yval_true, yval_pred)*100, 2)
    Summary_results.loc[Summary_index[1], Summary_columns[2]] = round(average_precision_score(yval_true, yval_pred)*100, 2)
    Summary_results.loc[Summary_index[1], Summary_columns[3]] = Gibbs_risk_set[1]
    Summary_results.loc[Summary_index[1], Summary_columns[4]] = risk_bound_set[1]
    
    Summary_results.loc[Summary_index[2], Summary_columns[0]] = round(accuracy_score(ytest_true, ytest_pred)*100, 2)
    Summary_results.loc[Summary_index[2], Summary_columns[1]] = round(f1_score(ytest_true, ytest_pred)*100, 2)
    Summary_results.loc[Summary_index[2], Summary_columns[2]] = round(average_precision_score(ytest_true, ytest_pred)*100, 2)
    Summary_results.loc[Summary_index[2], Summary_columns[3]] = Gibbs_risk_set[2]
    Summary_results.loc[Summary_index[2], Summary_columns[4]] = risk_bound_set[2]
    
    return Summary_results

#### 3. Linear regression

##### Definition of mixture models class

In [None]:
class Pac_bayes_reg:
    def __init__(self, learning_rate=3, lambda_reg=0.1, K_ini=None, TAU_ini=None, K_p=None, TAU_p=None, max_iters=1000, weights=None, print_flag=False):
        self.lr = learning_rate                                    # the step size at each iteration
        self.Lambda = lambda_reg                                   # the regularization rate (lambda_reg should be higher than 1)
        self.K_ini, self.TAU_ini = K_ini, TAU_ini                  # initial values of the shape and rate of locality parameter beta to find (beta = k/tau)
        self.K_p, self.TAU_p = K_p, TAU_p                          # prior value of the shape and rate (gamma distribution parameter)
        self.max_iters = max_iters                                 # maximum number of iterations before stopping independently of any early stopping
        self.weights_vet = weights                                 # to take into account the skewed distribution of the classes (if necessary)
        self.print_flag = print_flag                               # flag to print some infos
        self.W, self.RHO, self.MU, self.SIGMA = None, None, None, None                 # model parameters to be find in n localities
        self.w_ext, self.rho_ext, self.mu_ext, self.sigma_ext = None, None, None, None # model external parameters to be find out of n localities
        self.K, self.TAU, self.BETA = None, None, None                                 # the locality parameters beta to find beta = (k, tau)
        self.y_std = None

    def fit(self, X, y, Mat_dist):
        # data shape and number of vicinity points
        n_samples, n_features = X.shape
        n_points = Mat_dist.shape[0]
        self.y_std = numpy.std(y)

        # compute weight
        self.weights_vet = numpy.ones(n_samples) if self.weights_vet is None else self.weights_vet

        # initialize parameters
        if self.K_ini is None:
            self.K_ini, self.TAU_ini = 50 * torch.rand(n_points), 50 * torch.rand(n_points)

        self.K, self.TAU = deepcopy(self.K_ini), deepcopy(self.TAU_ini)
        
        self.W, self.RHO = torch.zeros(n_points, n_features), torch.ones(n_points)
        self.MU, self.SIGMA = torch.zeros(n_points), torch.ones(n_points)

        self.w_ext, self.rho_ext = torch.zeros(n_features), torch.tensor([1.0])
        self.mu_ext, self.sigma_ext = torch.tensor([0.0]), torch.tensor([1.0])

        # concatenate (initialize parameters)
        starting_point = ((self.W, self.RHO, self.MU, self.SIGMA, self.w_ext, self.rho_ext, self.mu_ext, self.sigma_ext), (torch.from_numpy(self.K_p), torch.from_numpy(self.TAU_p)), (self.K, self.TAU))
        
        # convert data to tensor
        X_, y_, Mat_dist_, self.weights_vet = torch.from_numpy(X), torch.from_numpy(y / self.y_std), torch.from_numpy(Mat_dist), torch.from_numpy(self.weights_vet)

        # minimized params
        gaussian_params, gamma_params = utility_functions.minimize_reg_gn(starting_point, X_, y_, Mat_dist_, self.lr, self.Lambda, self.max_iters, self.weights_vet, self.print_flag)

        # get minimized params and convert initial params to numpy array
        self.W, self.RHO, self.MU, self.SIGMA, self.w_ext, self.rho_ext, self.mu_ext, self.sigma_ext = gaussian_params
        self.K, self.TAU = gamma_params
        self.BETA = numpy.round(self.K / self.TAU, 6)
            
    def predict(self, X, Mat_dists):
        n_points = Mat_dists.shape[0]

        W_params = numpy.concatenate((self.W, self.w_ext.reshape(1, self.w_ext.size)), axis=0)
        MU_params = numpy.concatenate((self.MU, numpy.array([self.mu_ext])), axis=0)
        approx = ( (numpy.matmul(X, W_params.T) + MU_params) * self.y_std ).T
        
        # get idx_W for each samples
        idx_W = numpy.argmin((Mat_dists + numpy.where(Mat_dists <= self.BETA.reshape(n_points,1), 0, 1)), axis=0)
        number_of_overlap_region = numpy.sum(numpy.where(Mat_dists <= self.BETA.reshape(n_points,1), 1, 0), axis=0)
        idx_W[numpy.where(number_of_overlap_region == 0)[0]] = n_points

        mask = numpy.nonzero( (numpy.ones((n_points+1,1)) * idx_W) == (numpy.arange(n_points+1)).reshape(n_points+1,1) )
        mask = (mask[0][numpy.argsort(mask[1])], numpy.sort(mask[1]))
        return approx[mask]
    
    def score(self, X, y, Mat_dists):
        return numpy.round(r2_score(y_true=y, y_pred=self.predict(X, Mat_dists)), 6)

    def risk_bound(self, X, y, Mat_dists):
        n_points, n_samples = Mat_dists.shape
        n_features = X.shape[1]
        
        # compute Theta
        Mat_Theta = special.gdtrc(self.TAU.reshape(n_points,1), self.K.reshape(n_points, 1), Mat_dists)
        
        # Mat_loss_in and Loss_out
        norm_X_square = LA.norm(X, axis=1)**2
        Mat_loss_in = (norm_X_square.reshape(n_samples, 1) * self.RHO**2 + self.SIGMA**2 + ((numpy.dot(X, self.W.T) + self.MU) * self.y_std - y.reshape(n_samples, 1))**2).T
        Loss_out = norm_X_square * self.rho_ext**2 + self.sigma_ext**2 + ((numpy.dot(X, self.w_ext) + self.mu_ext) * self.y_std - y)**2

        # compute empirical risk
        ER = numpy.mean(numpy.sum(Mat_Theta * Mat_loss_in, axis=0) + numpy.prod(1 - Mat_Theta, axis=0) * Loss_out)
        
        # compute KL divergence
        gaussian_kl = - n_features * math.log(numpy.prod(self.RHO) * self.rho_ext) - math.log(numpy.prod(self.SIGMA) * self.sigma_ext) + 0.5 * (n_features * (numpy.sum(self.RHO**2) + self.rho_ext**2) + (numpy.sum(self.SIGMA**2) + self.sigma_ext**2) - ((n_points + 1) * n_features + (n_points + 1)))
        gaussian_kl += 0.5 * (numpy.sum(LA.norm(self.W, axis=1)**2, axis=0) + numpy.sum(self.MU**2, axis=0) + LA.norm(self.w_ext)**2 + self.mu_ext**2)
        gamma_kl = numpy.sum((self.K - self.K_p) * special.psi(self.K) + special.gammaln(self.K_p) - special.gammaln(self.K) + self.K_p * numpy.log(self.TAU / self.TAU_p) + self.K * ((self.TAU_p - self.TAU) / self.TAU))
        return ER, ER + (1 / self.Lambda) * (gaussian_kl + gamma_kl)
        

##### Choice of Lambda

In [None]:
def build_mixture_reg(X_train, X_val, y_train, y_val, Mat_dist_train, Mat_dist_val, T=10, lr=0.01, lambda_param=1000, K_p=None, TAU_p=None, max_iters=1000, weights=None, print_flag=False): 
    old_val_ER, old_val_risk = 1e10, 1e10
    for i in range(T):
        mixture_reg = Pac_bayes_reg(learning_rate=lr, lambda_reg=lambda_param, K_p=K_p, TAU_p=TAU_p, max_iters=max_iters, weights=weights, print_flag=print_flag)
        mixture_reg.fit(X_train, y_train, Mat_dist_train)
        new_val_ER, new_val_risk = mixture_reg.risk_bound(X_val, y_val, Mat_dist_val)

        if (new_val_ER < old_val_ER) and (new_val_risk < old_val_risk):
            Initial_parameters = (mixture_reg.K_ini, mixture_reg.TAU_ini)
            val_score, old_val_ER, old_val_risk = mixture_reg.score(X_val, y_val, Mat_dist_val), new_val_ER, new_val_risk
    return val_score, Initial_parameters

def lambda_validation_reg(X_train, X_val, y_train, y_val, Mat_dist_train, Mat_dist_val, lr=0.01, K_p=None, TAU_p=None, max_iters=1000, weights=None): 
    T, old_val_score = 10 * len(K_p), -1e10
    lambda_params = numpy.arange(1, 5.25, 0.25) * (1 / len(K_p)) * X_train.shape[0]

    i = 0
    pbar = tqdm(desc="Tuning Lambda ("+str(T)+" random restarts for each lambda) : ", total=len(lambda_params), position=0)
        
    while(i < len(lambda_params)) :
        lambda_param = lambda_params[i]
        torch.manual_seed(lambda_param)
        new_val_score, new_Initial_parameters = build_mixture_reg(X_train, X_val, y_train, y_val, Mat_dist_train, Mat_dist_val, T=T, lr=lr, lambda_param=lambda_param, 
                                                                  K_p=K_p, TAU_p=TAU_p, max_iters=max_iters, weights=weights, print_flag=False)
        if (new_val_score >= old_val_score):
            lambda_param_, Initial_parameters, old_val_score = lambda_param, new_Initial_parameters, new_val_score
        
        pbar.update(1)
        i = i + 1

        elapsed = pbar.format_dict["elapsed"]
        if elapsed > 1800:
            break;
            
    pbar.close()
    return lambda_param_, Initial_parameters

##### Mixture Program

In [None]:
def Mixture_reg(data, target_name, X0, synthetic_data_flag=False, train_size=0.75, lr=0.01, lambda_param=1e-3, max_iters=1000, lambda_validation=False, times=1, return_flag='simple'):
    # given point of interest X0 (center of vicinity data or specific point)
    assert isinstance(X0, pandas.DataFrame)

    # uncouping X and y reg
    X, y = data_analysis.uncouping_x_y_reg(data, target_name)
    weights = None
    print('***************** Mixtures of transparent local models with known points of interest *****************')
    print(f'Training_set = {round((train_size * 100))}%, Validation_set = {round(((1 - train_size)/2) * 100)}%, Test_set = {round(((1 - train_size)/2) * 100)}%, weights = {weights}, lambda_validation = {lambda_validation}, times = {times}')

    for i in tqdm(numpy.arange(times), desc="For Random Data Split = "+str(times)+" …", total=times, position=0):
        # split the dataset X into the training set X_train and temporary set X_temp
        X_train, X_temp, y_train, y_temp = train_test_split(X, y, train_size = train_size, random_state=i)
        # split the dataset X_temp into the validation set X_val and testing set X_test
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, train_size = 0.5, random_state=0)
        X_train, X_val, X_test = data_analysis.reset_index_data(data_1=X_train, data_2=X_val, data_3=X_test, data_4=None)
    
        # define gamma_priors
        K_p, TAU_p = 2.0 * numpy.ones(X0.shape[0]), (1 / 10) * numpy.ones(X0.shape[0])
        
        # data encoding (target encoding for category variables) and scaling (example : 'TargetEncoder', 'OrdinalEncoder', etc...)
        if synthetic_data_flag==False:
            X_train_enc, X_val_enc, X_test_enc, X0_enc = data_analysis.data_processing(xtrain=X_train.copy(), ytrain=y_train.copy(), xtest_1=X_val.copy(), xtest_2=X_test.copy(), xtest_3=X0.copy(), check_multicollinearity=True)
            X_train_enc, X_val_enc, X_test_enc, X0_enc = X_train_enc.values.copy(), X_val_enc.values.copy(), X_test_enc.values.copy(), X0.values.copy()
        else:
            X_train_enc, X_val_enc, X_test_enc, X0_enc = X_train.values.copy(), X_val.values.copy(), X_test.values.copy(), X0.values.copy()

        # get set of vicinity distance (example : 'euclidean', 'gower', etc...)
        Mat_dist_train = LA.norm(X_train_enc - X0_enc.reshape(X0_enc.shape[0], 1, X0_enc.shape[1]), axis=2)
        Mat_dist_val = LA.norm(X_val_enc - X0_enc.reshape(X0_enc.shape[0], 1, X0_enc.shape[1]), axis=2)
        Mat_dist_test = LA.norm(X_test_enc - X0_enc.reshape(X0_enc.shape[0], 1, X0_enc.shape[1]), axis=2)        
        
        # finding the best lambda by cross validation on data
        if lambda_validation == True:
            lambda_param, Initial_parameters = lambda_validation_reg(X_train_enc.copy(), X_val_enc.copy(), y_train.copy(), y_val.copy(), Mat_dist_train, Mat_dist_val, lr=lr, K_p=K_p, TAU_p=TAU_p, max_iters=max_iters, weights=weights)
        else :
            torch.manual_seed(lambda_param)
            R2_score, Initial_parameters = build_mixture_reg(X_train_enc.copy(), X_val_enc.copy(), y_train.copy(), y_val.copy(), Mat_dist_train, Mat_dist_val, T=50, lr=lr, lambda_param=lambda_param, K_p=K_p, TAU_p=TAU_p, max_iters=max_iters, weights=weights, print_flag=False)
            
        # fitting model
        mixture_reg = Pac_bayes_reg(learning_rate=lr, lambda_reg=lambda_param, K_ini=Initial_parameters[0], TAU_ini=Initial_parameters[1], K_p=K_p.copy(), TAU_p=TAU_p.copy(), max_iters=max_iters, weights=weights, print_flag=False)
        mixture_reg.fit(X_train_enc, y_train, Mat_dist_train)
        
        # prediction
        y_train_preds = mixture_reg.predict(X_train_enc, Mat_dist_train)
        y_val_preds = mixture_reg.predict(X_val_enc, Mat_dist_val)
        y_test_preds = mixture_reg.predict(X_test_enc, Mat_dist_test)
        
        # compute risk for each dataset
        risk_bound_set = []
        risk_bound_set.append(mixture_reg.risk_bound(X_train_enc, y_train, Mat_dist_train))
        risk_bound_set.append(mixture_reg.risk_bound(X_val_enc, y_val, Mat_dist_val))
        risk_bound_set.append(mixture_reg.risk_bound(X_test_enc, y_test, Mat_dist_test))
        risk_bound_set = numpy.round(numpy.array(risk_bound_set), 4)
    
        # get summary
        summary_random = results_summary_reg(y_train, y_val, y_test, y_train_preds, y_val_preds, y_test_preds, risk_bound_set[:,0], risk_bound_set[:,1])

        if (i == 0):
            W_random_state, RHO_random_state, MU_random_state, SIGMA_random_state = mixture_reg.W, mixture_reg.RHO, mixture_reg.MU, mixture_reg.SIGMA
            w_ext_random_state, rho_ext_random_state, mu_ext_random_state, sigma_ext_random_state = mixture_reg.w_ext, mixture_reg.rho_ext, mixture_reg.mu_ext, mixture_reg.sigma_ext
            K_random_state, TAU_random_state, BETA_random_state = mixture_reg.K, mixture_reg.TAU, mixture_reg.BETA
            summary_random_state = summary_random
        else:
            W_random_state += mixture_reg.W
            RHO_random_state += mixture_reg.RHO
            MU_random_state += mixture_reg.MU
            SIGMA_random_state += mixture_reg.SIGMA
            w_ext_random_state += mixture_reg.w_ext
            rho_ext_random_state += mixture_reg.rho_ext
            mu_ext_random_state += mixture_reg.mu_ext
            sigma_ext_random_state += mixture_reg.sigma_ext
            K_random_state += mixture_reg.K
            TAU_random_state += mixture_reg.TAU
            BETA_random_state += mixture_reg.BETA
            summary_random_state += summary_random
            
        print(f'W = {mixture_reg.W}, MU = {numpy.round(mixture_reg.MU, 6)}, lambda_param = {lambda_param}')
        print(f'w_ext = {mixture_reg.w_ext}, mu_ext = {round(mixture_reg.mu_ext, 6)}')
        print(f'RHO = {mixture_reg.RHO}, SIGMA = {numpy.round(mixture_reg.SIGMA, 6)}')
        print(f'rho_ext = {mixture_reg.rho_ext}, sigma_ext = {round(mixture_reg.sigma_ext, 6)}')

    W, RHO, MU, SIGMA = (W_random_state / times), (RHO_random_state / times), (MU_random_state / times), (SIGMA_random_state / times)
    w_ext, rho_ext, mu_ext, sigma_ext = (w_ext_random_state / times), (rho_ext_random_state / times), (mu_ext_random_state / times), (sigma_ext_random_state / times)
    K, TAU, BETA = (K_random_state / times), (TAU_random_state / times), (BETA_random_state / times)
    summary = (summary_random_state / times).astype('float64')
    summary = summary.round(4)

    print(f'*********** END ***********')
    print(f'W = {W}, MU = {numpy.round(MU, 6)}')
    print(f'w_ext = {w_ext}, mu_ext = {round(mu_ext, 6)}')
    print(f'RHO = {RHO}, SIGMA = {numpy.round(SIGMA, 6)}')
    print(f'rho_ext = {rho_ext}, sigma_ext = {round(sigma_ext, 6)}')
    
    if return_flag=='simple':
        return X0_enc, lambda_param, W, w_ext, MU, mu_ext, K, TAU, BETA, summary
    else :
        return X0_enc, lambda_param, W, w_ext, MU, mu_ext, K, TAU, BETA, summary, X_train_enc, X_test_enc, Mat_dist_train, Mat_dist_test, y_train.copy(), y_test.copy(), y_train_preds.copy(), y_test_preds.copy()
        

##### Performance measurement

In [None]:
def results_summary_reg(ytrain_true, yval_true, ytest_true, ytrain_pred, yval_pred, ytest_pred, Gibbs_risk_set, risk_bound_set):
    # for global model
    Summary_index = ['Training set', 'Validation set', 'Testing set']
    Summary_columns = ['R2_score', 'RMSE', 'MSE', 'Gibbs_risk', 'Risk_bound']
    Summary_results = pandas.DataFrame(index=Summary_index, columns=Summary_columns)

    # performance of the global model
    Summary_results.loc[Summary_index[0], Summary_columns[0]] = round(r2_score(ytrain_true, ytrain_pred), 4)
    Summary_results.loc[Summary_index[0], Summary_columns[1]] = round(root_mean_squared_error(ytrain_true, ytrain_pred), 4)
    Summary_results.loc[Summary_index[0], Summary_columns[2]] = round(mean_squared_error(ytrain_true, ytrain_pred), 4)
    Summary_results.loc[Summary_index[0], Summary_columns[3]] = Gibbs_risk_set[0]
    Summary_results.loc[Summary_index[0], Summary_columns[4]] = risk_bound_set[0]
    
    Summary_results.loc[Summary_index[1], Summary_columns[0]] = round(r2_score(yval_true, yval_pred), 4)
    Summary_results.loc[Summary_index[1], Summary_columns[1]] = round(root_mean_squared_error(yval_true, yval_pred), 4)
    Summary_results.loc[Summary_index[1], Summary_columns[2]] = round(mean_squared_error(yval_true, yval_pred), 4)
    Summary_results.loc[Summary_index[1], Summary_columns[3]] = Gibbs_risk_set[1]
    Summary_results.loc[Summary_index[1], Summary_columns[4]] = risk_bound_set[1]
    
    Summary_results.loc[Summary_index[2], Summary_columns[0]] = round(r2_score(ytest_true, ytest_pred), 4)
    Summary_results.loc[Summary_index[2], Summary_columns[1]] = round(root_mean_squared_error(ytest_true, ytest_pred), 4)
    Summary_results.loc[Summary_index[2], Summary_columns[2]] = round(mean_squared_error(ytest_true, ytest_pred), 4)
    Summary_results.loc[Summary_index[2], Summary_columns[3]] = Gibbs_risk_set[2]
    Summary_results.loc[Summary_index[2], Summary_columns[4]] = risk_bound_set[2]
    
    return Summary_results