In [None]:
# Nahrání potřebných knihoven

import pandas as pd
import numpy as np
import pickle
import matplotlib.pyplot as plt

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

In [None]:
"""
V této buňce dochazí k načtení a předzpracování data. Proces probíhá následovně:
1- Data se načtou do tabulky
2- Předefinují se označení pro účel použití SVM
3- Výběr datasetu, který se skládá ze 40% pozitivních a 60% negativních pozorování
4- Promíchání datasetu
5- Znormování prvků datasetu na interval [0,1]
6- Rozdělení na trénovací a testovací množinu
"""



digit_mnist_path = 'Digits-MNIST/train.csv'
fashion_mnist_path = 'Fashion-MNIST/fashion-mnist_train.csv'

pdf_digits = pd.read_csv(digit_mnist_path)
pdf_fashion = pd.read_csv(fashion_mnist_path)

pdf_digits['label'] = np.where(pdf_digits['label']==0,1,-1)
pdf_fashion['label'] = np.where(pdf_fashion['label']==0,1,-1)

test_size = 0.2
SEED = 22
np.random.seed(SEED)

def sample_and_shuffle(df):
    df_p = df[df['label']==1]
    df_n = df[df['label']==-1]
    df_n_sample = df_n.sample(int(1.5*len(df_p.index)))

    df_new = df_p.append(df_n_sample)
    df_new_shuffled = shuffle(df_new)

    return df_new_shuffled

pdf_digits = sample_and_shuffle(pdf_digits)
pdf_fashion = sample_and_shuffle(pdf_fashion)

# pdf_digits = shuffle(pdf_digits)
# pdf_fashion = shuffle(pdf_fashion)

x_dig, y_dig =  pdf_digits[pdf_digits.columns[1:]].to_numpy()/255, pdf_digits['label'].to_numpy(),
x_fash, y_fash = pdf_fashion[pdf_fashion.columns[1:]].to_numpy()/255, pdf_fashion['label'].to_numpy()

x_dig_train, x_dig_test, y_dig_train, y_dig_test = train_test_split(x_dig, y_dig, test_size = test_size)
x_fash_train, x_fash_test, y_fash_train, y_fash_test = train_test_split(x_fash, y_fash, test_size = test_size)

In [None]:
# Třída model reprezentující algoritmy SSGD a PSSGD
class Model:
    params = np.array([])
    reg = False
    proxy = False

    alpha = 0
    beta = 0
    gamma = 0

    def __init__(self, reg = False, proxy = False, alpha = 1, beta = 2, gamma = 0.1):
        self.reg = reg
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma

        if proxy:
            self.reg = False
            self.proxy = True

    def initialize_params(self, data):
        self.params = np.random.normal(0,1,len(data[0])+1)

    # Posloupnost iteračních kroků 1/k
    def learning_rates(self, k):
        return 1/k
    
    # Explicitní řešení proximálního operátoru pro regularizaci MCP
    def proximal_operator(self):
        return np.where(
            np.abs(self.params) >self.alpha*self.beta, 
            self.params, 
            np.where(
                np.abs(self.params)<= self.alpha,
                0,
                np.sign(self.params)*(np.abs(self.params)-self.alpha)/(1-1/self.beta)
            )
        )

    # Subgradient ztrátové funkce SVM + L2 normy. V případě metody SSGD s regularizací přičítá subgradient funkce MCP.
    def subgradient(self, data, target, batch_size):
        sample_idxs = np.random.randint(np.shape(data)[0], size=batch_size)
        x_batch, y_batch  = data[sample_idxs,:], target[sample_idxs]

        loss_grad = np.zeros(1+len(data[0]))
        for i in range(np.shape(x_batch)[0]):
            x = x_batch[i]
            y = y_batch[i]
            hinge_loss = 1-y*(np.dot(self.params[1:], x)-self.params[0])
            loss_grad += np.append(np.where(hinge_loss>0, y, 0), np.where(hinge_loss>0, -y*x, 0))
        loss_grad = loss_grad/batch_size + self.gamma*self.params

        if self.reg:
            return loss_grad+self.gamma*self.regularization_subgradient()
        else:
            return loss_grad

    # Počítá subgradient MCP
    def regularization_subgradient(self):
        return np.where(
            np.abs(self.params) <= self.alpha*self.beta, 
            self.alpha*np.sign(self.params) - self.params/self.beta, 
            0
        )

    # Počítá MCP pro parametry modelu
    def regularization_loss(self):
        return np.where(
            np.abs(self.params) <= self.alpha*self.beta,
            self.alpha*np.abs(self.params) - np.square(self.params)/(2*self.beta),
            self.alpha*self.beta**2/2
        ).sum()

    # Predikční funkce. Pozorováním přiřazuje kategorie.
    def predict(self, data):
        f = lambda x: np.dot(self.params[1:], x)- self.params[0]
        predictions = np.apply_along_axis(f, 1, data)
        predictions = np.where(predictions < 0, -1, 1)
        return predictions

    # Spočítá celkovou ztrátu modelu
    def loss(self, data, target):
        data_join = np.concatenate((data, target.reshape((-1,1))), 1)
        hinge_loss = lambda x: np.maximum(0, 1-x[-1]*(np.dot(self.params[1:], x[:-1])-self.params[0]))
        loss = np.apply_along_axis(hinge_loss, 1, data_join).mean()
        if self.reg or self.proxy:
            # return loss + self.gamma*self.regularization_loss()
            return loss + np.linalg.norm(self.params)/2
        else:
            return loss + np.linalg.norm(self.params)/2

    # Spočítá přesnost klasifikátoru na daném datasetu.
    def accuracy(self, data, target):
        predictions = self.predict(data)
        acc = (predictions==target).mean()
        return acc

    # Spočítá řídkost parametrů. Z praktického hlediska se jako nulové parametry berou ty které jsou blízko nule.
    def sparsity(self):
        return (np.isclose(self.params,0, atol=1e-6)).mean()

    # Metoda, která optimalizuje model na daných datech. Jsou k dispozici různé parametry optimalizace.
    def train(self, data, target, n_iter = 2*10**5, batch_size = 1, verbose = True):
        self.initialize_params(data)

        for i in range(1, n_iter+1):
            self.params  = self.params - self.learning_rates(i)*self.subgradient(data, target, batch_size)
            if self.proxy:
                self.params = self.proximal_operator()

            if verbose and i%10000 == 0:
                l= self.loss(data, target)
                acc = self.accuracy(data, target)
                spars = self.sparsity()
                print(f'Iterace {i}: Ztráta: {l}, přesnost: {acc}, řídkost: {spars}')

    # Sbírá data v průběhu trénování. Vrací data pro výkres grafů. Relativně pomalá metoda.
    def training_data(self, data, target, n_iter = 10**5, batch_size = 10, interval = 10):
        self.initialize_params(data)

        loss_data, accuracy_data, sparsity_data = np.array([]), np.array([]), np.array([])
        idxs =  np.array([])
        for i in range(1, n_iter+1):
            self.params  = self.params - self.learning_rates(i)*self.subgradient(data, target, batch_size)
            if self.proxy:
                self.params = self.proximal_operator()

            if (i-1)%interval == 0 or (i-1)<=10:
                l= self.loss(data, target)
                acc = self.accuracy(data, target)
                spars = self.sparsity()

                idxs = np.append(idxs, i)
                loss_data = np.append(loss_data, l)
                accuracy_data = np.append(accuracy_data, acc)
                sparsity_data = np.append(sparsity_data, spars)
        return loss_data, accuracy_data, sparsity_data, idxs


# Provede několik experimentů na daných modelech a datech.
def perform_experiments(model, data_train, target_train, data_test, target_test, n_iter = 10**5, batch_size = 1, n_experiments = 10):
    losses_train, accuracies_train = np.array([]), np.array([])
    losses_test, accuracies_test = np.array([]), np.array([])
    sparsities = np.array([])

    for i in range(1, n_experiments+1):
        np.random.seed(i)
        model.train(data_train, target_train, n_iter, batch_size, False)

        losses_train = np.append(losses_train, model.loss(data_train, target_train)) 
        accuracies_train = np.append(accuracies_train, model.accuracy(data_train, target_train))

        losses_test = np.append(losses_test, model.loss(data_test, target_test)) 
        accuracies_test = np.append(accuracies_test, model.accuracy(data_test, target_test))

        sparsities = np.append(sparsities, model.sparsity())

        print(f'Experiment číslo {i} ukončen.')
    np.random.seed(SEED)
    
    return pd.DataFrame(
        {
            "Sparsity": sparsities,
            "Train loss": losses_train,
            "Test loss": losses_test,
            "Train accuracy": accuracies_train,
            "Test accuracy": accuracies_test
        },
        index = [f"Experiment {i}" for i in range(1, n_experiments+1)]
    )

# Metoda vytvořující grafy vývoje ztrátové funkce, přesnosti a řídkosti klasifikátoru.
def plot_training_data(models, data, target, fig_names, n_iter = 10**5, batch_size = 10, interval = 10):
    losses, accuracies, sparsities = [], [], []
    idxs = []
    for model in models:
        x, y, z, w = model.training_data(data, target, n_iter, batch_size, interval)
        losses.append(x), accuracies.append(y), sparsities.append(z)
        idxs.append(w)
    
    plot_loss(losses, idxs[0], fig_names[0])
    plot_accuracy(accuracies, idxs[1], fig_names[1])
    plot_sparsity(sparsities, idxs[2], fig_names[2])

# Tři metody vytvářející grafy pro různé charakteristiky trénování.
def plot_loss(data, idxs, fig_name):
    fig, ax = plt.subplots()
    labels = ['SSGD', 'SSGD s regularizací', 'PSSGD']
    for y, l in zip(data, labels):
        ax.plot(idxs, y, label = l)
    ax.legend(fontsize=12)
    ax.set_xscale('log')
    ax.xaxis.set_tick_params(width=2.5)
    ax.yaxis.set_tick_params(width=2.5)
    plt.xticks(fontsize=15)
    plt.yticks(fontsize=15)
    plt.xlabel("Počet iterací", fontsize=15)
    plt.ylabel('Ztrátová funkce', fontsize=15)
    plt.tight_layout()
    plt.savefig(fig_name, format='pdf')

def plot_accuracy(data, idxs, fig_name):
    fig, ax = plt.subplots()
    labels = ['SSGD', 'SSGD s regularizací', 'PSSGD']
    for y, l in zip(data, labels):
        ax.plot(idxs, y, label = l)
    ax.legend(fontsize=12)
    ax.set_xscale('log')
    ax.xaxis.set_tick_params(width=2.5)
    ax.yaxis.set_tick_params(width=2.5)
    plt.xticks(fontsize=15)
    plt.yticks(fontsize=15)
    plt.xlabel("Počet iterací", fontsize=15)
    plt.ylabel('Přesnost', fontsize=15)
    plt.tight_layout()
    plt.savefig(fig_name, format='pdf')

def plot_sparsity(data, idxs, fig_name):
    fig, ax = plt.subplots()
    labels = ['SSGD', 'SSGD s regularizací', 'PSSGD']
    for y, l in zip(data, labels):
        ax.plot(idxs, y, label = l)
    ax.legend(fontsize=12)
    ax.set_xscale('log')
    ax.xaxis.set_tick_params(width=2.5)
    ax.yaxis.set_tick_params(width=2.5)
    plt.xticks(fontsize=15)
    plt.yticks(fontsize=15)
    plt.xlabel("Počet iterací", fontsize=15)
    plt.ylabel('Řídkost', fontsize=15)
    plt.tight_layout()
    plt.savefig(fig_name, format='pdf')

In [None]:
# Získání grafu pro průběh optimalizace na Digit MNIST.
plot_training_data(
    models= [Model(), Model(reg= True),  Model(proxy= True, alpha=0.1, beta=1.2)],
    data= x_dig,
    target= y_dig,
    fig_names= ['plots/loss_plot_fig.pdf', 'plots/acc_plot_fig.pdf', 'plots/spars_plot_fig.pdf'],
    n_iter= 10**6,
    batch_size= 25,
    interval= 100
)

In [None]:
# Získání grafu pro průběh optimalizace na Fashion MNIST.
plot_training_data(
    models= [Model(), Model(reg= True),  Model(proxy= True, alpha=0.1, beta=1.2)],
    data= x_fash,
    target= y_fash,
    fig_names= ['plots/loss_plot_fash.pdf', 'plots/acc_plot_fash.pdf', 'plots/spars_plot_fash.pdf'],
    n_iter= 10**6,
    batch_size= 25,
    interval= 100
)

In [None]:
# Následující bloky dělají experimenty pro jednotlivé metody a datasety diskutované v bakalářské práci. Celkem jich je 6.
digit_exp = perform_experiments(
    model= Model(),
    data_train= x_dig_train,
    target_train= y_dig_train,
    data_test= x_dig_test,
    target_test= y_dig_test,
    n_iter= 10**6,
    batch_size= 25,
    n_experiments= 1
)
digit_exp.to_csv('results/digit_exp.csv')
print(digit_exp.mean())
print(digit_exp.std())

In [None]:
digit_reg_exp = perform_experiments(
    model= Model(reg= True),
    data_train= x_dig_train,
    target_train= y_dig_train,
    data_test= x_dig_test,
    target_test= y_dig_test,
    n_iter= 10**6,
    batch_size= 25,
    n_experiments= 1
)
digit_reg_exp.to_csv('results/digit_reg_exp.csv')
print(digit_reg_exp.mean())
print(digit_reg_exp.std())

In [None]:
fashion_exp = perform_experiments(
    model= Model(),
    data_train= x_fash_train,
    target_train= y_fash_train,
    data_test= x_fash_test,
    target_test= y_fash_test,
    n_iter= 10**6,
    batch_size= 25,
    n_experiments= 1
)
fashion_exp.to_csv('results/fashion_exp.csv')
print(fashion_exp.mean())
print(fashion_exp.std())


In [None]:
fashion_reg_exp = perform_experiments(
    model= Model(reg= True),
    data_train= x_fash_train,
    target_train= y_fash_train,
    data_test= x_fash_test,
    target_test= y_fash_test,
    n_iter= 10**6,
    batch_size= 25,
    n_experiments= 1
)
fashion_reg_exp.to_csv('results/fashion_reg_exp.csv')
print(fashion_reg_exp.mean())
print(fashion_reg_exp.std())


In [None]:
digit_proxy_exp = perform_experiments(
    model= Model(proxy= True, alpha=0.1, beta=1.1),
    data_train= x_dig_train,
    target_train= y_dig_train,
    data_test= x_dig_test,
    target_test= y_dig_test,
    n_iter= 10**6,
    batch_size= 25,
    n_experiments= 1
)
digit_proxy_exp.to_csv('results/digit_proxy.csv')
print(digit_proxy_exp.mean())
print(digit_proxy_exp.std())

In [None]:
fashion_proxy_exp = perform_experiments(
    model= Model(proxy= True, alpha=0.1, beta=1.1),
    data_train= x_fash_train,
    target_train= y_fash_train,
    data_test= x_fash_test,
    target_test= y_fash_test,
    n_iter= 10**6,
    batch_size= 25,
    n_experiments= 1
)
fashion_proxy_exp.to_csv('results/fashion_proxy.csv')
print(fashion_proxy_exp.mean())
print(fashion_proxy_exp.std())