In [1]:
import numpy as np
import scipy
from scipy import linalg
import qpsolvers
from qpsolvers import solve_qp
import scipy.sparse as sp

# SSVM UTILS 

def SquareDist(X1, X2):
    n = X1.shape[0]
    m = X2.shape[0]

    sq1 = np.sum(X1 * X1, axis=1)
    sq2 = np.sum(X2 * X2, axis=1)
    
    D = np.outer(sq1, np.ones(m)) + np.outer(np.ones(n), sq2) - 2 * np.dot(X1, X2.T)
    
    return D

def KernelMatrix(X1, X2, kernel, param):

    if kernel == 'linear':
        K = np.matmul(X1, X2.T)
    elif kernel == 'polynomial':
        K = (1 + np.matmul(X1, X2.T)) ** param
    elif kernel == 'gaussian':
        K = np.exp(-1 / (2 * param**2) * SquareDist(X1, X2))
    else:
        raise ValueError("Invalid kernel type")
    
    return K

# SSVM TRAINING 

def is_positive_definite(matrix):
    # Check if the matrix is symmetric
    if not np.allclose(matrix, matrix.T):
        return False

    # Calculate eigenvalues
    eigenvalues, _ = np.linalg.eig(matrix)

    # Check if all eigenvalues are greater than zero
    if np.all(eigenvalues > 0):
        return True
    else:
        return False

def SSVM_Train(Xtr, Ytr, kernel, param, tau, eta, solver = "osqp"):
    
    n = Xtr.shape[0]
    
    Ytr = Ytr.squeeze() # Ytr must be 1-dimensional (n,)
    
    tau = np.sort(tau)
    m = tau.shape[1]

    K = KernelMatrix(Xtr, Xtr, kernel, param)
    D = np.diag(Ytr)

    H1 = D @ K @ D #@: same of np.matmul(a,b)
    O = np.zeros((n, n * m))
    OO = np.zeros((n * m, n * m))

    H = 1 * eta * np.block([[H1, O], [O.T, OO]])
    f = np.concatenate((np.ones(n), np.zeros(n * m)))

    lb = np.zeros(n * (m + 1))
    ub = np.ones(n * (m + 1))
    ub[:n] = np.inf

    for k in range(m):
        ub[n * (k+1):n * (k + 2)] = 0.5 * ((1 - 2 * tau[:, k]) * Ytr + 1)
        
    A1 = np.eye(n)
    A2Cell = [-np.ones((1, m))] * n  # Creating a list of length n, each element being an array of shape (1, m)
    A2 = scipy.linalg.block_diag(*A2Cell)  # Creating a block diagonal matrix using the list of arrays

    Aeq1 = np.hstack((A1, A2))
    beq1 = np.zeros(n)

    A3 = np.zeros((m, n))
    
    A4Cell = [Ytr] * m
    A4 = scipy.linalg.block_diag(*A4Cell)

    Aeq2 = np.hstack((A3, A4))
    beq2 = np.zeros(m)
    
    Aeq = np.vstack((Aeq1, Aeq2))
    beq = np.hstack((beq1, beq2))
    
    if not is_positive_definite(H):
    
        H = 0.5*(H+H.T)

    H = sp.csc_matrix(H)
    
    Aeq = sp.csc_matrix(Aeq)
        
    sol = solve_qp(H, -f, None, None, Aeq, beq, lb, ub, solver = solver, eps_abs=1e-6)
        
    if sol is not None and sol.any():
        alpha_bar = sol[:n]
    else:
        print("No solution found!")
        return None
    
    return alpha_bar

def offset(Xtr, Ytr, alpha, kernel, param, eta, tau):
    
    Ytr = Ytr.squeeze() # Ytr must be 1-dimensional (n,)
    
    if  tau.shape[1] > 1:
        print('Cannot compute the offset, there must be only 1 tau!')
        return
    else:
        C = 0.5 * ((1 - 2 * tau) * Ytr + 1).reshape(1,Xtr.shape[0])
        thr = 0.001

        ind = np.where((alpha - (0 + thr) > 0) & (alpha - (C - thr) < 0))[1]
       
        X_SV = Xtr[ind]
        Y_SV = Ytr[ind]

        if kernel == 'linear':
            w = -eta * (alpha.T @ np.diag(Ytr)) @ Xtr     
            bii = X_SV@w.T + Y_SV.reshape(-1,1)

        else:
            K = KernelMatrix(Xtr, X_SV, kernel, param)
            D = np.diag(Ytr)
            bii = -eta * (K.T @ (D @ alpha)) + Y_SV
            
        b = np.random.choice(bii.flatten(), 1)
            
        return b





In [2]:
from sklearn.base import BaseEstimator, ClassifierMixin
import pandas as pd
import cvxopt
from cvxopt import matrix, solvers

import joblib
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.model_selection import train_test_split


class ScalableSVMClassifier(BaseEstimator, ClassifierMixin):
    
    def __init__(self, eta, kernel, param, tau, solver = 'osqp'):

        # these are the actual parameters required
        self.eta = eta
        self.kernel = kernel
        self.param = param
        self.tau = tau
        
        if solver not in qpsolvers.available_solvers:
            print(f"{solver} is not a supported solver")
        else:
            self.solver = solver
        
    def fit(self, X, y):

        #self.Xtr = X
        #self.Ytr = y
        # if method is not classic, we need to keep n_c calibration samples apart from the training process
        '''
        if self.method!="classic":
            n_c = int(np.ceil((7.47) / self.epsilon * np.log(1 / self.delta)))
            self.Xtr,self.Xcal,self.Ytr, self.Ycal = train_test_split(X,y,test_size = n_c,random_state = 12)
        else:
            self.Xtr = X
            self.Ytr = y
        '''
        self.Xtr = X
        self.Ytr = y
        
        self.classes_ = np.unique(y)
        #print(self.Xtr.shape)
        #print(self.Ytr.shape)
        self.alpha = SSVM_Train(self.Xtr, self.Ytr, self.kernel, self.param, self.tau, self.eta, solver = self.solver)
        self.b = offset(self.Xtr, self.Ytr, self.alpha, self.kernel, self.param, self.eta, self.tau)
        
        return self

    def FPcontrol(self, Xcal, Ycal, epsilon, method):

        """Given a method, computes the right scaling parameter b_eps """
        
         # if method and epsilon are None, classic SVM
        #self.b_eps = 0
        #self.epsilon = epsilon
        #self.delta = delta
        if method not in ["classic","cp","ps"]:
            print(f"{method} is not a supported method")
        else:
            self.method = method

        if self.method == "ps":

            self.b_eps = b_epsPS(self.Xtr,self.Ytr, Xcal, Ycal, self.b, self.alpha, self.kernel, self.param, self.eta, epsilon)

        elif self.method == "cp":

            self.b_eps = b_epsCP(self.Xtr,self.Ytr, Xcal, Ycal, self.b, self.alpha, self.kernel, self.param, self.eta, epsilon)

        elif self.method == "classic":

            self.b_eps = 0

        return self
    
    def get_params(self, deep=True):
    
        return {"alpha": self.alpha, "b": self.b, "b_eps": self.b_eps, "eta": self.eta,"kernel":self.kernel, "param": self.param,"tau": self.tau, "solver":self.solver} #,"method":self.method}#, "epsilon":self.epsilon, "delta":self.delta}

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self
    
    def predict(self, X):
        return -np.sign(self.decision_function(X)).reshape((len(X),))
    
    def decision_function(self, X):
        """X: test data """
        '''
        if self.method!="classic":
            self.b_eps = FPcontrol(self.Xtr,self.Ytr, self.Xcal, self.Ycal,self.method, self.b, self.alpha, self.kernel, self.param, self.eta, self.epsilon)
        else:
            self.b_eps = 0
        '''

        K = KernelMatrix(self.Xtr, X, self.kernel, self.param)

        return -self.b -self.eta*(np.matmul(K.T,np.matmul(np.diag(self.Ytr.squeeze()),self.alpha))) + self.b_eps 

In [3]:
datapath = "simulationVictorTopological/"
train = "train.csv"
test = "test.csv"
cal = "calib.csv"

dbtrain = pd.read_csv(datapath+train)
dbtest = pd.read_csv(datapath+test)
dbcal = pd.read_csv(datapath+cal)
dbtrain

Xtr = dbtrain[['meanEntropy', 'medianEntropy', 'stdsEntropy', 'iqrsEntropy']].values
Ytr = dbtrain[['output']].values
Ytr[Ytr>1] = -1
Ytr[Ytr==0] = 1

Xts = dbtest[['meanEntropy', 'medianEntropy', 'stdsEntropy', 'iqrsEntropy']].values
Yts = dbtest[['output']].values
Yts[Yts>1] = -1
Yts[Yts==0] = 1

Xcal = dbcal[['meanEntropy', 'medianEntropy', 'stdsEntropy', 'iqrsEntropy']].values
Ycal = dbcal[['output']].values
Ycal[Ycal>1] = -1
Ycal[Ycal==0] = 1

In [4]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

# Normalizar las variables predictoras
X_scaled = scaler.fit_transform(Xtr)


In [8]:
# Parameters settings. Alberto suggest to try changing parameters, use Gaussian kernel, normalize features and this things.
# con kernel gaussian y X_scaled it works. o solo con el gaussian
kernel = "gaussian"
param = 3 #that is the degree of the polinomial
tau = np.array([[0.5]])
eta = 0.3

epsilon = 0.1

import time
start_time = time.time()

model = ScalableSVMClassifier(eta, kernel, param, tau)

model = model.fit(Xtr,Ytr)
joblib.dump(model, datapath+"classicSVM_safeEff.sav")
end_time = time.time()

elapsed_time = end_time - start_time

print(f"Elapsed time: {elapsed_time} seconds")

Elapsed time: 48.68444585800171 seconds
