In [119]:
import numpy as np
import pandas as pd
from numpy import linalg

# cvxopt QP solver
import cvxopt
from cvxopt import solvers, matrix
solvers.options['show_progress'] = False # Verbose quite

import scipy
from scipy.spatial.distance import pdist, cdist, squareform

class SVM:
    
    def __init__(self, C=1, kernel='rbf', gamma=0.1, mode='OVO', loss='hinge_loss', c=0, degree=2):
        self.C = C
        self.kernel = kernel # kernel_function 'rbf', 'linear'
        self.gamma = gamma # Kernel coefficient gamma for 'rbf'
        self.loss = loss
        self.mode = mode
        self.c = c # Intercept of the polynomial kernel
        self.degree = degree # Degree of the polynomial kernel
        self.alphas_ = [] # coefficients of the estimators

    def fit(self, X, y, ):
        
        self.X_train_ = X
        self.n_samples_ = y.shape[0] # n_samples
        self.classes_, self.repartition_ = np.unique(y, return_counts=True)
        self.classes_ = self.classes_.astype(int)
        self.n_classes_ = self.classes_.shape[0]
        self.repartition_ = self.repartition_ / self.n_classes_
        self.K_ = self.fit_kernel()
        
        
        if self.mode == 'OVA':
            
            for class_ in self.classes_:
                y_copy = y.copy()
                y_copy[y_copy != class_] = -1
                y_copy[y_copy == class_] = 1
                self.fit_dual(y_copy)
                
                # Solving the QP
                sol = solvers.qp(matrix(self.K_), matrix(self.p_), matrix(self.G_), matrix(self.h_))
                # Saving the solution
                self.alphas_.append(np.array(sol['x']).reshape(-1,))
    
        elif self.mode == 'OVO':
            
            masks_samples = [] # empty boolean list, True if the sample is in the current OVO comparison
    
            for class_plus in range(self.n_classes_):
                for class_minus in range(class_plus+1, 10):
                    y_copy = y.copy()
                    
                    # Mask select the samples which match the classes considered in the current OVO comparison
                    mask = (y_copy == class_plus) | (y_copy == class_minus)
                    masks_samples.append(mask) # saving the mask
                    y_copy = y_copy[mask]
                    y_copy[y_copy == class_minus] = -1
                    y_copy[y_copy == class_plus] = 1
                    
                    # Updating the size of the subproblem for the dual computation
                    # Important otherwise it would be built for n_samples = all the sample
                    self.n_samples_ = y_copy.shape[0]

                    this_K = self.K_[mask, :]
                    this_K = this_K[:, mask]
                    self.fit_dual(y_copy)
                    # Solvign the QP
                    sol = solvers.qp(matrix(this_K), matrix(self.p_), matrix(self.G_), matrix(self.h_))
                    # Saving the solution in list attibute alphas_
                    self.alphas_.append(np.array(sol['x']).reshape(-1,))
                    
            # Don't forget to update the n_samples attribute again
            self.n_samples_ = y.shape[0] 
            # Saving the masks as attributes
            self.masks_samples_ = np.array(masks_samples)
            
        else:
            raise Exception('mode should be OVA or OVO')
        return self
        
    def predict(self, X_test):
        
        self.K_test_ = self.fit_kernel_test(X_test)
        self.n_test_ = X_test.shape[0] # size of the test sample
        n = self.n_test_
        
        if self.mode == 'OVA':
            classes_res = np.empty((self.classes_.shape[0], n))

            for class_ in self.classes_:
                alpha = self.alphas_[class_]
                classes_res[class_] = np.dot(self.K_test_, alpha)
            y_pred = classes_res.argmax(axis=0)
            return y_pred
        
        elif self.mode == 'OVO':
            
            # calling OVO_estimators_masks() to built masks_estimators
            # Used in masks_estimators*K_test later
            # rmk : OVO_estimators_masks() is always the same, but cheap to compute
            self.masks_estimators_ = OVO_estimators_masks()            
            
            estimators_preds = [] # list of length n_estimators which contains the predictions
            # n_estimators = 45 in OVO : n_classes_(n_classes+1)/2
            
            for idx, mask in enumerate(self.masks_samples_):
                alpha = self.alphas_[idx]
                estimators_preds.append(np.dot(self.K_test_[:, mask], alpha))
            
            # Converting the result and taking the sign for prediction
            estimators_preds = np.sign(np.array(estimators_preds))
            
            # Choosing which class is predicted using the mask_matrix built with OVO_idx_class()
            classes_preds = [] # list of length n_classes_, contain the predictions for each class
            for mask in self.masks_estimators_:
                class_pred = estimators_preds.copy()
                class_pred = class_pred*mask.reshape(-1,1)
                class_pred[class_pred < 0] = 0
                class_pred = class_pred.sum(0)
                classes_preds.append(class_pred)
            
            classes_preds = np.array(classes_preds)
            
            # Argmax give the index of the row with the highest score
            # rows are ordered so the index corresponds to the class
            y_pred = classes_preds.argmax(0)
            
            return y_pred

    def fit_kernel(self):
        
        X = self.X_train_
        
        if self.kernel == 'rbf':
            pairwise_dists = squareform(pdist(X, 'euclidean'))
            K = scipy.exp(-self.gamma*pairwise_dists ** 2)
        
        elif self.kernel == 'linear':
            # In fact it's not a kernel
            K = np.dot(X, X.transpose())
        
        elif self.kernel == 'polynomial':
            K = (self.c + np.dot(X, X.transpose())) ** self.degree
                       
        else:
            raise Exception('the kernel must either be rbf or linear')
                       
        return K
                       
    def fit_kernel_test(self, X_test):
        # Compute the kernel gram matrix for the TEST set
            
        if self.kernel == 'rbf':
            pairwise_dists = cdist(X_test, self.X_train_)
            K = scipy.exp(-self.gamma*pairwise_dists ** 2)
        
        elif self.kernel == 'linear':
            K = np.dot(X_test, self.X_train_.transpose() + self.c)
                       
        elif self.kernel == 'polynomial':
            K = (self.c + np.dot(X_test, self.X_train_.transpose())) ** self.degree 
                       
        else:
            raise Exception('the kernel must either be rbf or linear')
        
        return K
    
    def fit_dual(self, y):
        
        if self.loss == 'hinge_loss':
            n = self.n_samples_
            diag_y = np.diag(y)
            self.p_ = (-y)
            self.Q_ = self.K_ # Quadratic matrix
            self.G_ = np.r_[diag_y, -diag_y] # Constraint matrix of size(2*n, n)
            self.h_ = np.r_[self.C*np.ones(n), np.zeros(n)]
                       
        elif self.loss == 'squared_hinge_loss':
            """
            in progress"""
            
        else:
            raise Exception('loss should be hinge loss or squared_hinge_loss')
        
        return self
                       
    def describe(self):
        # Describe the train set
        print(" The Data contain ", self.n_classes_, " classes, and the repartion is :  ", self.repartition_)

def OVO_estimators_masks():
    # DIFFICULT
    # Build a matrix with 1 and 0 for the OVO prediction
    masks = []
    for n in range(10):    
        mask = []
        for k in range(10):
            for j in range(k+1, 10):
                if (k == n) | (j==n):
                    if j==n:
                        mask.append(-1)
                    else:
                        mask.append(1)
                else:
                    mask.append(0)
        mask = np.array(mask)
        masks.append(mask)
    masks = np.array(masks)
    return masks


In [4]:
%%time
df_X_train = pd.read_csv('Xtr.csv', header=None, usecols=np.arange(3072))
df_X_test = pd.read_csv('Xte.csv', header=None, usecols=np.arange(3072))
df_y_train = pd.read_csv('Ytr.csv')


X_train = np.array(df_X_train, dtype=float)
X_test = np.array(df_X_test, dtype=float)
y_train = np.array(df_y_train['Prediction'], dtype=float)

CPU times: user 4.69 s, sys: 88 ms, total: 4.78 s
Wall time: 4.77 s


In [121]:
### TEST PART ###

# Amount of data used for train and for test
n_train = 1000
n_test = n_train/10 

# Splitting to train and val set
X = X_train[-n_train:]
y = y_train[-n_train:]

X_val = X_train[:n_test]
y_val = y_train[:n_test]

# Grid for hyper parameters to be tested
cs = [0] 
degrees = [2, 3, 4]

# Running part

# Results storage
scores = []
y_preds = []
for c in cs:
    for degree in degrees:
        print(degree)
        svm = SVM(mode ='OVO', kernel='polynomial')
        svm.fit(X, y)
        y_pred = svm.predict(X_val)
        y_preds.append(y_pred)
        score = np.mean(y_pred == y_val)
        scores.append(score)
        print(score)



2
0.19
3
0.19
4
0.19


In [112]:
scores

[0.29999999999999999, 0.29999999999999999, 0.29999999999999999]

Short test with n=2000, kernel = poly, mode = OVA
* degree = 4 c = 1, accuracy 16%
* degree = 3 c = 1, accuracy 20% 
* degree = 2 c = 1, accuracy 20%



short test with n=4500 n_test = 500
* degree = 2 c = 0 accuracy 32%!