In [None]:
import numpy as np
import data
import time
import tqdm


In [None]:
import idx2numpy
import numpy as np
import os
import pickle


In [None]:
import argparse
import sm_network
import data
import image
import numpy as np
from sklearn.decomposition import PCA

In [None]:
def load_data(data_directory, train = True):
    if train:
        images = idx2numpy.convert_from_file(os.path.join(data_directory, 'train-images.idx3-ubyte'))
        labels = idx2numpy.convert_from_file(os.path.join(data_directory, 'train-labels.idx1-ubyte'))
    else:
        images = idx2numpy.convert_from_file(os.path.join(data_directory, 't10k-images.idx3-ubyte'))
        labels = idx2numpy.convert_from_file(os.path.join(data_directory, 't10k-labels.idx1-ubyte'))

    vdim = images.shape[1] * images.shape[2]
    vectors = np.empty([images.shape[0], vdim])
    for imnum in range(images.shape[0]):
        imvec = images[imnum, :, :].reshape(vdim, 1).squeeze()
        vectors[imnum, :] = imvec

    return vectors, labels

In [None]:
def z_score_normalize(X, u = None, sd = None):
    """
    Performs z-score normalization on X.

    f(x) = (x - μ) / σ
        where
            μ = mean of x
            σ = standard deviation of x

    Parameters
    ----------
    X : np.array
        The data to z-score normalize
    u (optional) : np.array
        The mean to use when normalizing
    sd (optional) : np.array
        The standard deviation to use when normalizing

    Returns
    -------
        Tuple:
            Transformed dataset with mean 0 and stdev 1
            Computed statistics (mean and stdev) for the dataset to undo z-scoring.
    """

    u = np.mean(X, axis = 1)
    sd = np.std(X, axis = 1)
    X_normalized = np.copy(X)
    for i in range(len(X)):
        X_normalized[i] = (X[i] - u[i])/sd[i]


    return X_normalized, u, sd


def min_max_normalize(X, _min = None, _max = None):
    """
    Performs min-max normalization on X.

    f(x) = (x - min(x)) / (max(x) - min(x))

    Parameters
    ----------
    X : np.array
        The data to min-max normalize
    _min (optional) : np.array
        The min to use when normalizing
    _max (optional) : np.array
        The max to use when normalizing

    Returns
    -------
        Tuple:
            Transformed dataset with all values in [0,1]
            Computed statistics (min and max) for the dataset to undo min-max normalization.
    """
    X_min = np.zeros(len(X))
    X_max = np.zeros(len(X))
    X_normalized = np.zeros((len(X), len(X[0])))

    for i in range(len(X)):
        X_min[i] = np.min(X[i])
        X_max[i] = np.max(X[i])
        X_normalized[i] = (X[i] - X_min[i]) / (X_max[i] - X_min[i])

    return X_normalized, X_min, X_max

def onehot_encode(y):
    """
    Performs one-hot encoding on y.

    Ideas:
        NumPy's `eye` function

    Parameters
    ----------
    y : np.array
        1d array (length n) of targets (k)

    Returns
    -------
        2d array (shape n*k) with each row corresponding to a one-hot encoded version of the original value.
    """
    diagonals = np.eye(10)
    y_encoded = np.zeros((len(y), 10))

    for i in range(len(y)):
        y_encoded[i] = diagonals[y[i]]

    return y_encoded




def onehot_decode(y):
    """
    Performs one-hot decoding on y.

    Ideas:
        NumPy's `argmax` function

    Parameters
    ----------
    y : np.array
        2d array (shape n*k) with each row corresponding to a one-hot encoded version of the original value.

    Returns
    -------
        1d array (length n) of targets (k)
    """
    y_decoded = np.argmax(y, axis = 1)

    return y_decoded

def shuffle(dataset):
    """
    Shuffle dataset.

    Make sure that corresponding images and labels are kept together.
    Ideas:
        NumPy array indexing
            https://numpy.org/doc/stable/user/basics.indexing.html#advanced-indexing

    Parameters
    ----------
    dataset
        Tuple containing
            Images (X)
            Labels (y)

    Returns
    -------
        Tuple containing
            Images (X)
            Labels (y)
    """
    X, y = dataset
    indices = np.random.permutation(len(X))
    X_shuffled = X[indices]
    y_shuffled = y[indices]

    return X_shuffled, y_shuffled

def append_bias(X):
    """
    Append bias term for dataset.

    Parameters
    ----------
    X
        2d numpy array with shape (N,d)

    Returns
    -------
        2d numpy array with shape ((N+1),d)
    """
    X_new = np.insert(X, 0, 1, axis = 1)
    return X_new

def generate_minibatches(dataset, batch_size=64):
    X, y = dataset
    l_idx, r_idx = 0, batch_size
    while r_idx < len(X):
        yield X[l_idx:r_idx], y[l_idx:r_idx]
        l_idx, r_idx = r_idx, r_idx + batch_size

    yield X[l_idx:], y[l_idx:]

def generate_k_fold_set(dataset, k = 5):
    X, y = dataset
    if k == 1:
        yield (X, y), (X[len(X):], y[len(y):])
        return

    order = np.random.permutation(len(X))

    fold_width = len(X) // k

    l_idx, r_idx = 0, fold_width

    for i in range(k):
        train = np.concatenate([X[order[:l_idx]], X[order[r_idx:]]]), np.concatenate([y[order[:l_idx]], y[order[r_idx:]]])
        validation = X[order[l_idx:r_idx]], y[order[l_idx:r_idx]]
        yield train, validation
        l_idx, r_idx = r_idx, r_idx + fold_width

In [None]:
parser = argparse.ArgumentParser(description = 'CSE151B PA1')
parser.add_argument('--batch-size', type = int, default = 512,
        help = 'input batch size for training (default: 1)')
parser.add_argument('--epochs', type = int, default = 100,
        help = 'number of epochs to train (default: 100)')
parser.add_argument('--learningrate', type = float, default = 0.01,
        help = 'learning rate (default: 0.001)')
parser.add_argument('--z-score', dest = 'normalization', action='store_const',
        default = data.min_max_normalize, const = data.z_score_normalize,
        help = 'use z-score normalization on the dataset, default is min-max normalization')
parser.add_argument('--k-folds', type = int, default = 5,
        help = 'number of folds for cross-validation')
parser.add_argument('--p', type = int, default = 100,
        help = 'number of principal components')

hyperparameters, unknown = parser.parse_known_args()

In [None]:
train_images, train_labels = data.load_data('')
test_images, test_labels = data.load_data('', train = False)

In [None]:
train_images, train_labels = data.shuffle((train_images, train_labels))

In [None]:
train_images, train_mean, train_std = data.z_score_normalize(train_images)
test_images, test_mean, test_std = data.z_score_normalize(test_images)

In [None]:
pca = PCA(hyperparameters.p)
pca.fit(train_images)
train_images = pca.transform(train_images)
test_images = pca.transform(test_images)

In [None]:
train_labels = data.onehot_encode(train_labels)
test_labels = data.onehot_encode(test_labels)

In [None]:
train_images = data.append_bias(train_images)
test_images = data.append_bias(test_images)

In [None]:
def softmax_regression(hyperparemeters):
    network = Network(hyperparameters, softmax, multiclass_cross_entropy, 10)
    accuracy = []
    val_loss = np.zeros((10,100))
    fold_idx = 0
    for train_set, val_set in data.generate_k_fold_set((train_images,train_labels), 10):

        loss = np.Inf

        for epoch in range(100): #hyperparameters.epoch

            # stochastic gradient descent on every mini-batch
            for minibatch in data.generate_minibatches(train_set, 512):
                network.train(minibatch)

            # test performance on validation
            average_loss, acc = network.test(val_set)
            accuracy.append(acc)
            val_loss[fold_idx][epoch] = average_loss

            if average_loss > loss:
                break
            loss = average_loss

        fold_idx += 1


    val_loss = np.sum(val_loss, axis = 0)

    for i in range(100):
        print(i,'th epoch average validation loss','   ',val_loss[i])


    return accuracy


In [None]:
accuracy = softmax_regression(hyperparameters)

0 th epoch average validation loss     3.2096920132850375
1 th epoch average validation loss     3.1064907419204633
2 th epoch average validation loss     0.41435568157292674
3 th epoch average validation loss     0.3896866078735272
4 th epoch average validation loss     0.37348268253531136
5 th epoch average validation loss     0.36187001098168087
6 th epoch average validation loss     0.35306542932462404
7 th epoch average validation loss     0.3461214394772091
8 th epoch average validation loss     0.3404827786787959
9 th epoch average validation loss     0.33580001541781185
10 th epoch average validation loss     0.33184100456695037
11 th epoch average validation loss     0.32844480738527726
12 th epoch average validation loss     0.3254959469752722
13 th epoch average validation loss     0.3229091841030912
14 th epoch average validation loss     0.3206200855706869
15 th epoch average validation loss     0.31857895003532966
16 th epoch average validation loss     0.316746765445906


In [None]:
def sigmoid(a):
    """
    Compute the sigmoid function.

    f(x) = 1 / (1 + e ^ (-x))

    Parameters
    ----------
    a
        The internal value while a pattern goes through the network
    Returns
    -------
    float
       Value after applying sigmoid (z from the slides).
    """
    return 1/(1 + np.exp(-a))




In [None]:
b

In [None]:
def softmax(a):
    """
    Compute the softmax function.

    f(x) = (e^x) / Σ (e^x)

    Parameters
    ----------
    a
        The internal value while a pattern goes through the network
    Returns
    -------
    float
       Value after applying softmax (z from the slides).
    """
    x = np.exp(a)
    return x/np.sum(x, axis = 1, keepdims = True)



In [None]:
def binary_cross_entropy(y, t):
    """
    Compute binary cross entropy.

    L(x) = t*ln(y) + (1-t)*ln(1-y)

    Parameters
    ----------
    y
        The network's predictions
    t
        The corresponding targets
    Returns
    -------
    float
        binary cross entropy loss value according to above definition
    """
    pass



In [None]:
def multiclass_cross_entropy(y, t):
    """
    Compute multiclass cross entropy.

    L(x) = - Σ (t*ln(y))

    Parameters
    ----------
    y
        The network's predictions
    t
        The corresponding targets
    Returns
    -------
    float
        multiclass cross entropy loss value according to above definition
    """
    entropy = np.multiply(np.log(y), t)

    return - np.sum(entropy)




In [None]:
class Network:
    def __init__(self, hyperparameters, activation, loss, out_dim):
        """
        Perform required setup for the network.

        Initialize the weight matrix, set the activation function, save hyperparameters.

        You may want to create arrays to save the loss values during training.

        Parameters
        ----------
        hyperparameters
            A Namespace object from `argparse` containing the hyperparameters
        activation
            The non-linear activation function to use for the network
        loss
            The loss function to use while training and testing
        """
        self.hyperparameters = hyperparameters
        self.activation = activation
        self.loss = loss

        self.weights = np.zeros((hyperparameters.p + 1, out_dim))

    def forward(self, X):
        """
        Apply the model to the given patterns

        Use `self.weights` and `self.activation` to compute the network's output

        f(x) = σ(w*x)
            where
                σ = non-linear activation function
                w = weight matrix

        Make sure you are using matrix multiplication when you vectorize your code!

        Parameters
        ----------
        X
            Patterns to create outputs for
        """
        return self.activation(X.dot(self.weights))



    def __call__(self, X):
        return self.forward(X)

    def train(self, minibatch):
        """
        Train the network on the given minibatch

        Use `self.weights` and `self.activation` to compute the network's output
        Use `self.loss` and the gradient defined in the slides to update the network.

        Parameters
        ----------
        minibatch
            The minibatch to iterate over

        Returns
        -------
        tuple containing:
            average loss over minibatch
            accuracy over minibatch
        """
        train_images, train_labels = minibatch
        B = len(train_labels)
        sm_scores = self.activation(train_images @ self.weights)
        #avarage_loss = self.loss(sm_scores, train_labels) / B
        predict_labels = np.argmax(sm_scores, axis = 1)
        #accuracy = sum(predict_labels == train_labels)/B
        gradient = train_images.T @ (train_labels - sm_scores)
        self.weights += hyperparameters.learningrate * gradient/ B

        #return (avarage_loss,accuracy)

    def test(self, minibatch):
        """
        Test the network on the given minibatch

        Use `self.weights` and `self.activation` to compute the network's output
        Use `self.loss` to compute the loss.
        Do NOT update the weights in this method!

        Parameters
        ----------
        minibatch
            The minibatch to iterate over

        Returns
        -------
            tuple containing:
                average loss over minibatch
                accuracy over minibatch
        """
        test_images, test_labels = minibatch
        B = len(test_labels)
        sm_scores = self.activation(test_images @ self.weights)
        decoded_test_labels = onehot_decode(test_labels)
        avarage_loss = self.loss(sm_scores, test_labels) / B
        predict_labels = np.argmax(sm_scores, axis = 1)


        accuracy = sum(predict_labels == decoded_test_labels)/B

        return (avarage_loss,accuracy)

In [None]:
    def forward(self, X):
        """
        Apply the model to the given patterns

        Use `self.weights` and `self.activation` to compute the network's output

        f(x) = σ(w*x)
            where
                σ = non-linear activation function
                w = weight matrix

        Make sure you are using matrix multiplication when you vectorize your code!

        Parameters
        ----------
        X
            Patterns to create outputs for
        """
        return self.activation(X.dot(self.weights))



    def __call__(self, X):
        return self.forward(X)

In [None]:
    def train(self, minibatch):
        """
        Train the network on the given minibatch

        Use `self.weights` and `self.activation` to compute the network's output
        Use `self.loss` and the gradient defined in the slides to update the network.

        Parameters
        ----------
        minibatch
            The minibatch to iterate over

        Returns
        -------
        tuple containing:
            average loss over minibatch
            accuracy over minibatch
        """
        train_images, train_labels = minibatch
        B = len(train_labels)
        sm_scores = self.activation(train_images @ self.weights)
        #avarage_loss = self.loss(sm_scores, train_labels) / B
        predict_labels = np.argmax(sm_scores, axis = 1)
        #accuracy = sum(predict_labels == train_labels)/B
        gradient = X.T @ (train_images - sm_scores)
        self.weights -= hyperparameters.learningrate * gradient/ B

        #return (avarage_loss,accuracy)

In [None]:
a

In [None]:
    def test(self, minibatch):
        """
        Test the network on the given minibatch

        Use `self.weights` and `self.activation` to compute the network's output
        Use `self.loss` to compute the loss.
        Do NOT update the weights in this method!

        Parameters
        ----------
        minibatch
            The minibatch to iterate over

        Returns
        -------
            tuple containing:
                average loss over minibatch
                accuracy over minibatch
        """
        test_images, test_labels = minibatch
        B = len(test_labels)
        sm_scores = self.activation(test_images @ self.weights)
        avarage_loss = self.loss(sm_scores, test_labels) / B
        predict_labels = np.argmax(sm_scores, axis = 1)

        accuracy = sum(predict_labels == test_labels)/B

        return (avarage_loss,accuracy)