In [1]:
%%file src/__init__.py
# To reference the folder

Writing src/__init__.py


In [17]:
%%file src/svm.py
import sklearn.metrics
import matplotlib.pyplot as plt
import numpy as np
import scipy.linalg

class HuberSVM:
    def __init__(self, h=None):
        self.beta_vals = None
        self.cache = {}
        self.h = h
        
    def fit(self, X_train, y_train, lam=1, **config):
        """Fit the model
        """
        # Initialize
        n, d = X_train.shape
        beta_init = np.zeros(n)
        theta_init = np.zeros(n)
        K = None
        sigma = None
        order = None
        eta_init = None
        max_iter = None
        eps = None
        kernel_choice = None
        plot = False
        
        if 'kernel_choice' in config:
            kernel_choice = config['kernel_choice']
        else:
            kernel_choice = 'linear'
        
        if 'sigma' in config:
            sigma = config['sigma']
        elif sigma is None and kernel_choice=='rbf':
            # Set sigma based on pairwise distances.
            dists = sklearn.metrics.pairwise.pairwise_distances(X_train).reshape(-1)
            sigma = np.median(dists)
            
        if 'order' in config:
            order = config['order']
        elif order is None and kernel_choice=='poly':
            order = 2
        
        if 'max_iter' in config:
            max_iter = config['max_iter']
        else:
            max_iter = 50
            
        if 'eps' in config:
            eps = config['eps']
        else:
            eps = 1e-5
            
        if 'eta_init' in config:
            eta_init = config['eta_init']
            
        if 'plot' in config:
            plot = config['plot']
        
        # Main Loop
        K = self.gram(X_train, X_train, **{'kernel_choice': kernel_choice, 'sigma': sigma, 
                                           'order': order})
        if 'eta_init' not in config:
            # Set eta_init based on an upper bound on the Lipschitz constant.
            eta_init = 1 / scipy.linalg.eigh(2 / n * np.dot(K, K) + 2 * lam * K, eigvals=(n - 1, n - 1),
                                             eigvals_only=True)[0]
        self.beta_vals = self.fastgradalgo(beta_init, theta_init, K, y_train, lam, eta_init, max_iter, eps)
        if plot:
            ax = self.objective_plot(self.beta_vals, K, y_train, lam)
            self.cache['plot'] = ax
        self.cache['kernel_choice'] = kernel_choice
        self.cache['sigma'] = sigma
        self.cache['order'] = order
        self.cache['eta_init'] = eta_init
        self.cache['lambda'] = lam
    
    def gram(self, X, Z, **config):
        """
        Inputs: 
        - X: matrix with observations as rows
        - Z: Another matrix with observations as rows
        - Sigma: kernel bandwidth
        Output: Gram matrix
        """  
        if Z is None:
            Z = X
        if config['kernel_choice'] == 'rbf':
            return np.exp(-1/(2*config['sigma']**2)*((np.linalg.norm(X, axis=1)**2)[:, np.newaxis] + (np.linalg.norm(Z, axis=1)**2)[np.newaxis, :] - 2*np.dot(X, Z.T)))
        elif config['kernel_choice'] == 'linear':
            return X.dot(Z.T)
        elif config['kernel_choice'] == 'poly':
            return (X.dot(Z.T) + 1)**config['order']
        else:
            print("Kernel Not Implemented")
            return X
        
    def fastgradalgo(self, beta_init, theta_init, K, y, lam, eta_init, max_iter, eps):
        beta = beta_init
        theta = theta_init
        eta = eta_init
        grad_theta = self.grad(theta, K, y, lam)
        grad_beta = self.grad(beta, K, y, lam)
        beta_vals = beta
        iter = 0
        while iter < max_iter and np.linalg.norm(grad_beta) > eps:
            eta = self.bt_line_search(theta, K, y, lam, eta=eta)
            beta_new = theta - eta*grad_theta
            theta = beta_new + iter/(iter+3)*(beta_new-beta)
            grad_theta = self.grad(theta, K, y, lam)
            grad_beta = self.grad(beta, K, y, lam)
            beta = beta_new
            iter += 1
            if iter % 1 == 0:
                beta_vals = np.vstack((beta_vals, beta_new))
        return beta_vals
    
    def obj(self, beta, K, y, lam, h=0.5):
        """
        Inputs:
        - beta: Vector to be optimized
        - K: Gram matrix consisting of evaluations of the kernel k(x_i, x_j) for i,j=1,...,n
        - y: Labels y_1,...,y_n corresponding to x_1,...,x_n
        - lam: Penalty parameter lambda
        Output:
        - Value of the objective function at beta
        """
        if self.h is not None:
            h = self.h
        cost_vector = np.zeros(y.shape[0])
        t = K.dot(beta)
        yt = y * t
        cost_vector[yt < 1-h] = (1 - yt[yt < 1 - h])
        cost_vector[np.absolute(1 - yt) <= h] = (((1 + h - yt[np.absolute(1 - yt) <= h])**2) / (4 * h))
        return (1/y.shape[0]) * np.sum(cost_vector) + lam * beta.dot(K).dot(beta)

    def grad(self, beta, K, y, lam, h=0.5):
        """
        Inputs:
        - beta: Vector to be optimized
        - K: Gram matrix consisting of evaluations of the kernel k(x_i, x_j) for i,j=1,...,n
        - y: Labels y_1,...,y_n corresponding to x_1,...,x_n
        - lam: Penalty parameter lambda
        Output:
        - Value of the gradient at beta
        """
        if self.h is not None:
            h = self.h
        grad_matrix = np.zeros((y.shape[0], y.shape[0]))
        t = K.dot(beta)
        yt = y * t
        grad_matrix[yt < 1 - h] = (-y[:, np.newaxis] * K)[yt < 1 - h]
        grad_matrix[np.absolute(1 - yt) <= h] = (((-2 / (4 * h)) * (1 + h - yt))[:, np.newaxis] * (y[:, np.newaxis] * K))[np.absolute(1 - yt) <= h]
        return 1/y.shape[0] * np.sum(grad_matrix, axis=0) + 2 * lam * K.dot(beta)

    def bt_line_search(self, beta, K, y, lam, eta=1, alpha=0.5, betaparam=0.8, max_iter=100):
        grad_beta = self.grad(beta, K, y, lam)
        norm_grad_beta = np.linalg.norm(grad_beta)
        found_eta = 0
        iter = 0
        while found_eta == 0 and iter < max_iter:
            if self.obj(beta - eta * grad_beta, K, y, lam) < \
                            self.obj(beta, K, y, lam) - alpha * eta * norm_grad_beta ** 2:
                found_eta = 1
            elif iter == max_iter-1:
                raise ('Max number of iterations of backtracking line search reached')
            else:
                eta *= betaparam
                iter += 1
        return eta
    
    def objective_plot(self, betas, K, y, lam):
        num_points = np.size(betas, 0)
        objs = np.zeros(num_points)
        for i in range(0, num_points):
            objs[i] = self.obj(betas[i, :], K, y, lam)
        fig, ax = plt.subplots()
        ax.plot(np.array(range(num_points)), objs, c='red')
        ax.set_xlabel('Iteration')
        ax.set_ylabel('Objective value')
        ax.set_title('Objective value vs. iteration when lambda=' + str(lam))
        return ax

Overwriting src/svm.py


In [27]:
%%file examples/sim_demo.py
import numpy as np
from matplotlib.colors import ListedColormap
import matplotlib.pyplot as plt
import sklearn.datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import sys
sys.path.insert(0, '../')
from src.svm import HuberSVM

def sim_demo(verbose=True, plot_contour=False):
    def evaluate(beta, X_train, X_test, y_test, kernel, **kwargs):
        n_test = len(y_test)
        y_pred = np.zeros(n_test)
        y_vals = np.zeros(n_test)
        for i in range(n_test):
            y_vals[i] = np.dot(kernel(X_train, X_test[i, :].reshape(1, -1), **kwargs).reshape(-1), beta)
        y_pred = np.sign(y_vals)
        return np.mean(y_pred != y_test), y_vals  # return error and values from before applying cutoff
    
    def evaluate_plot(betas, kernel, plot_contour, **kwargs):
        error, test_values = evaluate(betas[-1, :], X_train, X_test, y_test, kernel, **kwargs)
        print('Misclassification error when lambda =', kwargs['lambda'], ':', error)
        ax = None
        if plot_contour:
            Zs = np.c_[xx.ravel(), yy.ravel()]
            Z = evaluate(betas[-1, :], X_train, Zs, [0]*len(Zs), kernel, **kwargs)[1]
            # Put the result into a color plot
            Z = Z.reshape(xx.shape)
            ax = plt.subplot()
            ax.contourf(xx, yy, Z, cmap=cm, alpha=.8)

            # Plot also the training points
            ax.scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=cm_bright,
                       edgecolors='k')
            # and testing points
            ax.scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=cm_bright,
                       edgecolors='k', alpha=0.2)

            ax.set_xlim(xx.min(), xx.max())
            ax.set_ylim(yy.min(), yy.max())
            ax.set_xticks(())
            ax.set_yticks(())
        return error, test_values, ax
        
    X, y = sklearn.datasets.make_circles(noise=0.2, factor=0.5, random_state=1)
    X = StandardScaler().fit_transform(X)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.4, random_state=42)
    y_train = 2*y_train - 1
    y_test = 2*y_test - 1
    if verbose:
        print('Number of training examples:', X_train.shape[0])
        print('Number of test examples:', X_test.shape[0])

    x_min, x_max = X[:, 0].min() - .5, X[:, 0].max() + .5
    y_min, y_max = X[:, 1].min() - .5, X[:, 1].max() + .5
    h = .02  # step size in the mesh
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                         np.arange(y_min, y_max, h))

    cm = plt.cm.RdBu
    cm_bright = ListedColormap(['#FF0000', '#0000FF'])
    if plot_contour:
        # Plot the training points
        ax = plt.subplot()
        ax.scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=cm_bright,
                   edgecolors='k')
        # and testing points
        ax.scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=cm_bright, alpha=0.1,
                   edgecolors='k')
        ax.set_xlim(xx.min(), xx.max())
        ax.set_ylim(yy.min(), yy.max())
        ax.set_xticks(())
        ax.set_yticks(())

    mysvm = HuberSVM()
    mysvm.fit(X_train, y_train, lam=0.8, **{'plot': True, 'kernel_choice': 'rbf', 'sigma': 1, 'max_iter': 10})
    beta_vals = mysvm.beta_vals
    train_cache = mysvm.cache
    errors, test_vals, contour_ax = evaluate_plot(beta_vals, mysvm.gram, plot_contour, **train_cache)
    plt.show()
    
if __name__=='__main__':
    sim_demo(verbose=True, plot_contour=True)

Overwriting examples/sim_demo.py


In [38]:
%%file examples/real_demo.py
from tqdm import tqdm
import itertools
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import mode
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import load_digits

import sys
sys.path.insert(0, '../')
from src.svm import HuberSVM
    
def real_demo(verbose=True, plot_progress=True):
    def filter_pair(label_pair, x, y):
        """ Filter a multi-class dataset into binary dataset given a label pair.
        """
        mask = np.isin(y, label_pair)
        x_bin, y_bin = x[mask].copy(), y[mask].copy()
        y_bin[y_bin==label_pair[0]] = 1.0
        y_bin[y_bin==label_pair[1]] = -1.0
        return x_bin, y_bin

    def evaluate(beta, X_train, X_test, kernel, **kwargs):
        n_test = X_test.shape[0]
        y_pred = np.zeros(n_test)
        y_vals = np.zeros(n_test)
        for i in range(n_test):
            y_vals[i] = np.dot(kernel(X_train, X_test[i, :].reshape(1, -1), **kwargs).reshape(-1), beta)
        return y_vals

    def train_predict(X_train, y_train, X_test, y_test, lam, method='ovo', **config):
        error = None
        label_list = np.unique(y_train)
        if method == 'ovo':
            pred_list = []
            label_pair_list = list(itertools.combinations(label_list, 2))
            for label_pair in tqdm(label_pair_list):
                X_train_bin, y_train_bin = filter_pair(label_pair, X_train, y_train)
                mylinearsvm = HuberSVM()
                mylinearsvm.fit(X_train_bin, y_train_bin, lam, **config)
                beta_vals, train_cache = mylinearsvm.beta_vals, mylinearsvm.cache
                if config['plot']:
                    plt.show(train_cache['plot'])
                scores = evaluate(beta_vals[-1, :], X_train_bin, X_test, mylinearsvm.gram, **train_cache)
                y_pred_bin = np.zeros_like(y_test) + label_pair[-1]
                y_pred_bin[scores >= 0] = label_pair[0]
                pred_list.append(y_pred_bin)
            test_preds = np.array([mode(pi).mode[0] for pi in np.array(pred_list, dtype=np.int64).T])
            error = np.mean(test_preds != y_test)
        elif method == 'ovr':
            score_list = []
            for label in tqdm(label_list):
                y_train_bin = np.zeros_like(y_train) - 1
                y_train_bin[y_train == label] = 1
                mylinearsvm = HuberSVM()
                mylinearsvm.fit(X_train, y_train_bin, lam, **config)
                beta_vals, train_cache = mylinearsvm.beta_vals, mylinearsvm.cache
                if config['plot']:
                    plt.show(train_cache['plot'])
                scores = evaluate(beta_vals[-1, :], X_train, X_test, mylinearsvm.gram, **train_cache)
                score_list.append(scores)
            test_preds = np.argmax(np.stack(score_list, axis=1), axis=1)
            error = np.mean(test_preds != y_test)
        else:
            print("Method Not Implemented")
        return error, beta_vals
    
    print("Download the dataset...")
    digits = load_digits() 
    num_images = digits.images.shape[0]
    X_train, X_test, y_train, y_test = train_test_split(digits.images.reshape(num_images, -1), digits.target, test_size=.4, random_state=42)
    #Standardizing the data
    scaler = StandardScaler()
    scaler.fit(X_train)
    X_train = scaler.transform(X_train)
    X_test = scaler.transform(X_test)
    if verbose:
        print('Number of training examples:', X_train.shape)
        print('Number of test examples:', X_test.shape)
    lambda_list = [1]
    for l in lambda_list:
        error, beta_vals = train_predict(X_train, y_train, X_test, y_test, lam=l, method='ovr', **{'kernel_choice': 'linear', 'sigma': 1, 'plot': plot_progress, 'max_iter': 50})
        print("Lambda = %0.4f. Misclassification Error = %0.4f" %(l, error))

if __name__=='__main__':
    real_demo(verbose=True, plot_progress=False)

Overwriting examples/real_demo.py
