In [23]:
import numpy as np 
from sklearn.datasets import fetch_openml
import os

In [24]:
data_path = os.path.join(os.path.dirname(os.getcwd()), 'datasets')
X, y = fetch_openml('mnist_784', version=1, return_X_y=True, data_home=data_path)
print('data:', X.shape, ',', 'labels:', y.shape)

data: (70000, 784) , labels: (70000,)


In [30]:
unique_labels = np.unique(y)
num_classes = len(unique_labels)

n_train_pc = 20 # number of train samples per class
n_val_pc = 1 # number of validation samples per class
n_test_pc = 1 # number of test samples per class

n_train = n_train_pc * num_classes
n_val = n_val_pc * num_classes
n_test = n_test_pc * num_classes

train_data = np.zeros((num_classes,n_train_pc,784))
train_labels = np.zeros((num_classes,n_train_pc))

val_data = np.zeros((num_classes,n_val_pc,784))
val_labels = np.zeros((num_classes,n_val_pc))

test_data = np.zeros((num_classes,n_test_pc,784))
test_labels = np.zeros((num_classes,n_test_pc))

for l_idx, l in enumerate(unique_labels):
    idxs = np.squeeze(np.argwhere(y == l))
    idxs = np.random.choice(idxs, n_train_pc + n_val_pc + n_test_pc, replace=False)
    
    train_data[l_idx] = X[idxs[:n_train_pc]]
    train_labels[l_idx] = y[idxs[:n_train_pc]]
    
    val_data[l_idx] = X[idxs[n_train_pc:n_train_pc + n_val_pc]]
    val_labels[l_idx] = y[idxs[n_train_pc:n_train_pc + n_val_pc]]
    
    test_data[l_idx] = X[idxs[n_train_pc + n_val_pc:]]
    test_labels[l_idx] = y[idxs[n_train_pc + n_val_pc:]]
    
# ravel the data ---------------------------------------------------------------
train_data = train_data.reshape(-1,784)
train_labels = np.ravel(train_labels).astype(np.int)

val_data = val_data.reshape(-1,784)
val_labels = np.ravel(val_labels).astype(np.int)

test_data = test_data.reshape(-1,784)
test_labels = np.ravel(test_labels).astype(np.int)

# shuffle the data -------------------------------------------------------------
train_idxs = np.arange(len(train_data))
_ = np.random.shuffle(train_idxs)

train_data = train_data[train_idxs]
train_labels = train_labels[train_idxs]

#-----------------------------------------

val_idxs = np.arange(len(val_data))
_ = np.random.shuffle(val_idxs)

val_data = val_data[val_idxs]
val_labels = val_labels[val_idxs]

#-----------------------------------------

test_idxs = np.arange(len(test_data))
_ = np.random.shuffle(test_idxs)

test_data = test_data[test_idxs]
test_labels = test_labels[test_idxs]

#-----------------------------------------

print('train data:', train_data.shape, ',', 'train labels:', train_labels.shape)
print('val data:', val_data.shape, ',', 'val labels:', val_labels.shape)
print('test data:', test_data.shape, ',', 'test labels:', test_labels.shape)

train data: (200, 784) , train labels: (200,)
val data: (10, 784) , val labels: (10,)
test data: (10, 784) , test labels: (10,)


In [17]:
def train_two_layer_perceptron(X, y, X_val, y_val,
                               hidden_size, batch_size, 
                               num_epochs, learning_rate, 
                               learning_rate_decay, reg_factor):
    
    '''
    Inputs:
        X: train data (N,m)
        y: train labels (N,)
        hidden_size: number of neurons in the hidden layer (h,)
        batch_size: number of samples per batch
        num_epochs: number of total epochs
    Returns:
    
    '''

    num_train = X.shape[0] # number of train samples
    num_batches = num_train // batch_size # number of batches
    
    input_size = X.shape[1] # number of features in the train data
    output_size = len(np.unique(y)) # number of classes
    
    # Initialize network parameters
    params = {}
    std = 1e-4 # standard deviation
    params['W1'] = std * np.random.randn(input_size, hidden_size) # (m,h)
    params['b1'] = np.zeros(hidden_size) # (h,)
    params['W2'] = std * np.random.randn(hidden_size, output_size) # (h,c)
    params['b2'] = np.zeros(output_size) # (c,)
    
    
    loss_hisory = []
    train_acc_history = []
    val_acc_history = []
    for e in range(num_epochs):  
        
        # Evaluate one epoch -----------------------------------------------------------------------------
        train_acc = (predict_two_layer_perceptron(X_batch) == y_batch).mean()
        val_acc = (predict_two_layer_perceptron(X_val) == y_val).mean()
        
        train_acc_history.append(train_acc)
        val_acc_history.append(val_acc)
        
        print('Evaluate epoch:', e, 'out of:', num_epochs);
        print('train accuracy:', train_acc)
        print('validation accuracy:\n', val_acc)
        
        # Shuffle train data ----------------------------------------------------------------------------
        idxs = np.arange(X.shape[0])
        _ = np.random.shuffle(idxs)
        X = X[idxs]
        y = y[idxs]
        
        # Decay learning rate ---------------------------------------------------------------------------
        learning_rate *= learning_rate_decay

        # Train one epoch -------------------------------------------------------------------------------
        print('Train Epoch:', e, 'out of:', num_epochs)
        for b in range(num_batches):
            
            # Extract current batch
            X_batch = X[b*batch_size : (b+1)*batch_size]
            y_batch = y[b*batch_size : (b+1)*batch_size]
            
            # One forward pass and one backward pass through the whole network
            grads, loss = run_perceptron(params, X_batch, y_batch, reg_factor)
            loss_history.append(loss)
            
            # Update network's parameters
            params['W1'] -= learning_rate * grads['W1']
            params['b1'] -= learning_rate * grads['b1']
            params['W2'] -= learning_rate * grads['W2']
            params['b2'] -= learning_rate * grads['b2']
            
            if (b+1) % 100 == 0:
                print('Batch number:', b, 'out of:', num_batches)
                print('loss:\n', loss)
                
    history = (loss_history, train_acc_history, val_acc_history)
    
    return params, history
            

def run_perceptron(params, X, y=None, reg_factor=0.0)
    N, m = X.shape

    # Unpack the parameters
    W1, b1 = params['W1'], params['b1']
    W2, b2 = params['W2'], params['b2']
    
    # Forward pass --------------------------------------------------------------------------------------
    # Compute hidden scores and apply ReLU activation function
    H = np.dot(X, W1) + b1 # (N,h)
    H = np.maximum(0, H) # apply ReLU
    
    # Compute output scores
    scores = np.dot(H, W2) + b2 #(N,c)
    
    # Compute the softmax cross-entropy loss + L2 regularization for W1 and W2
    # Softmax function
    exp_scores = np.exp(scores)
    correct_scores = exp_scores[range(N),y]
    correct_probs = correct_scores / np.sum(exp_scores, axis=1) # (N,)
    
    # Cross-entropy loss 
    loss = -np.log(correct_probs) # (N,)
    loss = np.mean(loss) # average over all data samples
    
    # L2 regularization
    reg = reg_factor*(np.sum(W1**2) + np.sum(W2**2))
    
    total_loss == loss + reg
    
    # If the network is not being trained, just apply forward pass
    if y is None:
        return scores, total_loss
    
    
    # Backward pass -------------------------------------------------------------------------------------
    # Partial-derivative of total loss w.r.t scores
    dscores = np.exp(scores) / np.sum(scores, axis=1, keepdims=True)
    dscores[range(N),y] -= 1
    dscores /= N # (N,c)
    
    # Partial-derivative of total loss with respect to hidden layer
    dH = np.dot(dscores, W2.T) # (N,h)
    dH[H<0] = 0 # back-propagate only to the values that contributed in the forward pass 
    
    # Calculate gradients 
    gards = {}
    grads['W1'] = np.dot(X.T, dH) + reg_factor*w1
    grads['b1'] = np.sum(dH, axis=0)
    grads['W2'] = np.dot(H.T, dscores) + reg_factor*w2
    grads['b2'] = np.sum(dscores, axis=0)
    
    return grads, loss


def evaluate(X, y, params):
    pred_scores, pred_loss = run_perceptron(params, X)

400