In [1]:
import numpy as np
from numpy import linalg as LA

from sklearn.datasets import load_iris
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn import preprocessing

from scipy.spatial.distance import cdist
from scipy.linalg import eigh


### Implementering kernels and do tests

In [2]:
def rbf_kernel(X, sigma, diag=1):
    """"Basic SVM with predefined kernel matrix"""
    
    N = X.shape[0]
    K = np.zeros((N,N))

    for i in range(N):
        for j in range(N):
            if i == j:
                K[i,j] = diag
            else:
                x_i = X[i, :].reshape(1,-1)
                x_j = X[j, :].reshape(1,-1)

                K[i, j] = np.exp(-cdist(x_i, x_j, 'sqeuclidean') / (sigma ** 2)) # euclidean?
    return K


def make_D_matrix(K):
    K_sum = np.sum(K, axis=1)
    D = np.diag(K_sum)
    return D


def make_L_matrix(K, D):
    D_temp = np.diag( np.diag(D) ** -0.5 )
    L = D_temp @ K @ D_temp
    return L


def step_transfer(L, k=2):
    w, v = eigh(L)
    lambda_cut = w[-k]
    
    w = np.where(w >= lambda_cut, 1, 0)
    L_hat = np.dot(v, np.dot(np.diag(w), v.T))
    D_hat = np.diag(1/np.diag(L_hat))
    K_hat = D_hat**(1/2) @ L_hat @ D_hat**(1/2)
    
    return L_hat, D_hat, K_hat


def linear_step_transfer(L, k=2):
    w, v = eigh(L)
    lambda_cut = w[-k]
    w = np.where(w >= lambda_cut, w, 0)
    
    L_hat = np.dot(v, np.dot(np.diag(w), v.T))
    D_hat = np.diag(1/np.diag(L_hat))
    K_hat = D_hat**(1/2) @ L_hat @ D_hat**(1/2)

    return L_hat, D_hat, K_hat


def polynomial_transfer(L, D, K, t):
    L_hat = L ** t
    D_hat = np.diag(1/np.diag(L_hat))
    K_hat = D_hat**(1/2) @ D**(1/2) @ (LA.inv(D) @ K)**t @ D**(1/2) @ D_hat**(1/2)
    K_hat = preprocessing.scale(K_hat)

    return L_hat, D_hat, K_hat


def apply_transfer_func(L, D, K, hyperparams, type="linear"):
    """hyperparams: k for step and linear_step, t for polynomial"""
    if type == "linear":
        return L, D, K
    if type == "step":
        k = hyperparams['k']
        return step_transfer(L, k)
    if type == "linear_step":
        k = hyperparams['k']
        return linear_step_transfer(L)
    if type == "polynomial":
        t = hyperparams['t']
        return polynomial_transfer(L, D, K, t)
        
    raise ValueError("wrong argument")

    
def accuracy(t, y):
    val = 0.0
    N = len(t)
    for i in range(N):
        if t[i] == y[i]:
            val += 1
    
    return val / N



In [3]:
def test_svm(X, Y, tf_fun, C=1, sigma=1, **kwargs):
    """Test SVM one time"""
    
    # Shuffle data
    np.random.seed(40)
    n_sample = len(X)
    order = np.random.permutation(n_sample)
    X = X[order]
    Y = Y[order].astype(np.float)
    
    # Make Kernel
    K = rbf_kernel(X, sigma)
    D = make_D_matrix(K)
    L = make_L_matrix(K, D)
    
    L, D, K = apply_transfer_func(L, D, K, kwargs, tf_fun)
    
    # Remove data without labels
    K_train = K[:70,:70]
    Y_train = Y[:70]
    
    K_test = K[70:100,:70]
    
    # Apply to SVM
    clf = SVC(kernel="precomputed", C=C)
    clf.fit(K_train, Y_train)
    
    y_pred = clf.predict(K_test)
    print("accuracy:", accuracy(y_pred, Y[70:100]))

    
def run_test_svm():
    iris = load_iris()
    X = iris.data[:,:2]
    y = iris.target

    X = X[y != 0]
    y = y[y != 0]
    
    test_svm(X, y, "linear_step", k=2)

    
run_test_svm()


accuracy: 0.6666666666666666


In [4]:
def load_data():
    iris = load_iris()
    X = iris.data[:,:2]
    y = iris.target

    X = X[y != 0]
    y = y[y != 0]
    
    n_sample = len(X)
    order = np.random.permutation(n_sample)
    X = X[order]
    y = y[order].astype(np.float)
    
    return X, y

def k_fold_svm_error(X_train, y_train, C, sigma, tf_fun="linear"):
    """Compute error mean and std using k-fold cross validation with k=10 using SVM with specified kernel type"""
    errors = []

    # used for indexing in loop
    fold_size = int(X_train.shape[0] / 10)

    for fold_n in range(10):
        # splits training data into 3 separate arrays
        x_splits = np.vsplit(X_train, [fold_n * fold_size, fold_n * fold_size + fold_size])

        # middle set is current validation set
        x_validation_set = x_splits[1]
        # merge first and second array from split to get training set
        x_training_set = np.vstack((x_splits[0], x_splits[2]))

        # do same thing for y labels
        y_splits = np.split(y_train, [fold_n * fold_size, fold_n * fold_size + fold_size])
        y_validation_set = y_splits[1]
        y_training_set = np.append(y_splits[0], y_splits[2])

        # get error for current fold
        errors.append(
            get_svm_error(
                x_training_set,
                x_validation_set,
                y_training_set,
                y_validation_set,
                C,
                sigma,
                tf_fun,
            )
        )
        
    errors = np.array(errors)   
    return errors.mean(), errors.std()


def get_svm_error(x_training_set, x_validation_set, y_training_set, y_validation_set, C, sigma, type="linear"):    
    N_train = x_training_set.shape[0]
    X_t = np.concatenate([x_training_set, x_validation_set])

    K = rbf_kernel(X_t, sigma)
    D = make_D_matrix(K)
    L = make_L_matrix(K, D)
    
    L, D, K = apply_transfer_func(L, D, K, {}, type)
    
    K_train = K[:N_train, :N_train]
    K_val = K[N_train:, :N_train]
    
    clf = SVC(kernel="precomputed", C=C)
    clf.fit(K_train, y_training_set)
    
    y_pred = clf.predict(K_val)
    
    err = 1 - accuracy(y_pred, y_validation_set)
    return err

def find_hyperparameters(X_train, y_train, c_range, sigma_range):    
    lowest_error = 1.0
    best_parameter_values = [0, 0]
    
    iters = len(c_range) * len(sigma_values)
    i = 0

    for c in c_values:
        for sigma in sigma_values:
            current_error,_ = k_fold_svm_error(X_train, y_train, c, sigma)

            # print("c: {}\t sigma: {}\t error: {}".format(c, sigma, current_error))
            if (current_error < lowest_error):
                lowest_error = current_error
                best_parameter_values = c, sigma
            
            if i % 4 == 0:
                print("{} / {}".format(i, iters))
            i += 1
    
    return best_parameter_values[0], best_parameter_values[1], lowest_error

In [6]:


# split data for finding hyperparams and for computing model error
np.random.seed(40)
# load data
X, y = load_data()

X, X_test, y, y_test = train_test_split(X, y, test_size=0.2)

# find optimal hyperparams
c_values = [0.01, 0.1, 1, 10, 100, 1000]
sigma_values = [0.01, 0.1, 1, 10, 100, 1000]
c, sigma, _ = find_hyperparameters(X, y, c_values, sigma_values)
print("c: {}\t sigma: {}".format(c, sigma))



0 / 36
4 / 36
8 / 36
12 / 36
16 / 36
20 / 36
24 / 36
28 / 36
32 / 36
c: 0.1	 sigma: 1


## Experiments
Here under we can start to run experiment with the hyperparams we found above

In [57]:
def run_iris_experiment_2():
    




def run_iris_experiment():
    sigma = 1
    c = 1
    tf_fun = "polynomial"
    kwargs = {}
    kwargs["k"] = 2
    kwargs["t"] = 3
    perc_val = 0.2
    perc_label = 0.4
    perc_unlabel = 1 - perc_label - perc_val
    assert(perc_val + perc_label + perc_unlabel == 1)
    
    # kwargs["k"] for step and linear_step
    # kwards["t"] for polynomial
    
    errors = []

    np.random.seed(40)
    # load data
    iris = load_iris()
    X = iris.data #X = iris.data[:,:2]
    y = iris.target

    X = X[y != 0]
    y = y[y != 0]
    
    n_sample = -1
    n_train = -1
    n_unlabel = -1
    n_val = -1
    
    # time for experiment
    for test_no in range(100):
        if test_no % 20 == 0:
            print("{} / {}".format(test_no, 100))
            
        # randomize
        n_sample = len(X)
        order = np.random.permutation(n_sample)
        X = X[order]
        y = y[order].astype(np.float)


        # Calculate split
        n_train = int(n_sample * perc_label)
        n_unlabel = int(n_sample * perc_unlabel)
        n_val = int(n_sample * perc_val)

        idx_train_end = n_train + 1
        idx_val_start = n_train + n_unlabel

        K = rbf_kernel(X, sigma)
        D = make_D_matrix(K)
        L = make_L_matrix(K, D)

        L, D, K = apply_transfer_func(L, D, K, kwargs, tf_fun)

        K_train = K[:idx_train_end, :idx_train_end]
        Y_train = y[:idx_train_end]

        K_val = K[idx_val_start:, :idx_train_end]

        clf = SVC(kernel="precomputed", C=c)
        clf.fit(K_train, Y_train)

        y_pred = clf.predict(K_val)
        acc = accuracy(y_pred, y[idx_val_start:])
        err = 1 - acc

        errors.append(err)
    
    errors = np.array(errors)

    mean_err = errors.mean()
    std_err = errors.std()
    
    results = """
    model: {}
    data: {}
    datapoints: {}
    datapoints (label/unlabel/val): {} {} {}
    perc_label: {}
    kwargs: {}
    sigma: {}
    c: {}
    error: {:.4} ({:.4})
    """.format(
        tf_fun,
        "iris (all features)",
        n_sample,
        n_train, n_unlabel, n_val,
        perc_label,
        kwargs,
        sigma,
        c,
        mean_err, std_err
    )
    print(results)
    
run_iris_experiment()

0 / 100
20 / 100
40 / 100
60 / 100
80 / 100

    model: polynomial
    data: iris (all features)
    datapoints: 100
    datapoints (label/unlabel/val): 40 40 20
    perc_label: 0.4
    kwargs: {'k': 2, 't': 3}
    sigma: 1
    c: 1
    error: 0.0735 (0.04973)
    


In [130]:
def get_bow_data(train_size, unlabel_size, test_size):   
    x_train_bow = np.load('Embeddings/x_train_bow.npy').item().toarray()
    x_test_bow = np.load('Embeddings/x_test_bow.npy').item().toarray()

    #x_train_bow = np.load('Embeddings/x_train_trans_FT.npy')
    #x_test_bow = np.load('Embeddings/x_test_trans_FT.npy')
    
    y_train_text = np.load('Embeddings/y_train.npy')
    y_test_text = np.load('Embeddings/y_test.npy')
    
    y_train = np.zeros(y_train_text.shape)
    
    y_train[y_train_text == "positive"] = 0
    y_train[y_train_text == "negative"] = 1
    y_train[y_train_text == "neutral"] = 2
    
    y_test = np.zeros(y_test_text.shape)
        
    y_test[y_test_text == "positive"] = 0
    y_test[y_test_text == "negative"] = 1
    y_test[y_test_text == "neutral"] = 2
    
    X_test = x_test_bow[y_test != 2]
    y_test = y_test[y_test != 2]
    
    X = x_train_bow[y_train != 2]
    y = y_train[y_train != 2]
    
    # X positive = 1891
    # X negative = 7342
    
    n_sample = len(X)
    order = np.random.permutation(n_sample)
    X = X[order]
    y = y[order].astype(np.float)
    
    order = np.random.permutation(len(X_test))
    X_test = X_test[order]
    y_test = y_test[order].astype(np.float)
    
    
    X_train_pos_red = X[y == 0][:train_size // 2]
    y_train_pos_red = y[y == 0][:train_size // 2]
    X_train_neg_red = X[y == 1][:train_size // 2]
    y_train_neg_red = y[y == 1][:train_size // 2]
    
    X_test_red = X_test[:test_size] 
    y_test_red = y_test[:test_size]
    
    X_unlabel_pos = X[y == 0][train_size // 2: unlabel_size // 2 + train_size // 2]
    y_unlabel_pos = y[y == 0][train_size // 2: unlabel_size // 2 + train_size // 2]
    X_unlabel_neg = X[y == 1][train_size // 2: unlabel_size // 2 + train_size // 2]
    y_unlabel_pos = y[y == 1][train_size // 2: unlabel_size // 2 + train_size // 2]
    

    X_red = np.concatenate([X_train_pos_red, X_train_neg_red, X_unlabel_pos, X_unlabel_neg, X_test_red])
    y_red = np.concatenate([y_train_pos_red, y_train_neg_red])
    
    print(X_red.shape)
    return X_red, y_red, y_test_red

# ans = get_bow_data(10, 190, 20)


In [131]:
def run_BoW_experiment():
    sigma = 1
    c = 1
    tf_fun = "step"
    kwargs = {}
    kwargs["k"] = 2
    kwargs["t"] = 2
    perc_val = 0.2
    perc_label = 0.1
    perc_unlabel = 1 - perc_label - perc_val
    assert(perc_val + perc_label + perc_unlabel == 1)
    
    # kwargs["k"] for step and linear_step
    # kwards["t"] for polynomial
    
    errors = []

    np.random.seed(40)
    
    train_size = 20
    unlabel_size = 180
    test_size = 40

    n_sample = -1
    n_train = -1
    n_unlabel = -1
    n_val = -1
    
    # time for experiment
    for test_no in range(10):
        if test_no % 20 == 0:
            print("{} / {}".format(test_no, 100))
            
        
        X_red, y_red, y_test_red = get_bow_data(train_size, unlabel_size, test_size)
        
        K = rbf_kernel(X_red, sigma)
        D = make_D_matrix(K)
        L = make_L_matrix(K, D)

        L, D, K = apply_transfer_func(L, D, K, kwargs, tf_fun)
        
        K_train = K[:train_size, :train_size]
        K_test = K[train_size + unlabel_size:, :train_size]        

        #clf = SVC(kernel="precomputed", C=c)
        #clf.fit(K_train, y_red)
        
        clf = SVC(gamma='auto')
        clf.fit(X, y)
        
        y_pred = clf.predict(K_test)
        
        
        acc = accuracy(y_pred, y_test_red)
        err = 1 - acc
        
        print(err)
        
        errors.append(err)
    
    errors = np.array(errors)

    mean_err = errors.mean()
    std_err = errors.std()
    
    results = """
    model: {}
    data: {}
    datapoints: {}
    datapoints (label/unlabel/val): {} {} {}
    perc_label: {}
    kwargs: {}
    sigma: {}
    c: {}
    error: {:.4} ({:.4})
    """.format(
        tf_fun,
        "Bag of words",
        n_sample,
        n_train, n_unlabel, n_val,
        perc_label,
        kwargs,
        sigma,
        c,
        mean_err, std_err
    )
    print(results)
    
run_BoW_experiment()




0 / 100
(240, 12965)


ValueError: X.shape[1] = 20 should be equal to 2, the number of features at training time