In [1]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, confusion_matrix
from tqdm import tqdm
import gc
import torchvision.datasets as ds
import torchvision.transforms as transforms
import pickle

class Layer:
    def __init__(self):
        pass

    def forward(self, A):
        pass

    def backward(self, dZ):
        pass

class DenseLayer(Layer):
    def __init__(self, input_size, output_size, activation='relu'):
        self.W = np.random.randn(output_size, input_size) * np.sqrt(2/input_size) # He initialization
        self.b = np.zeros((output_size, 1))
        self.activation_function = ActivationLayer(activation)

    def forward(self, A):
        Z = self.W.dot(A) + self.b
        assert Z.shape == (self.W.shape[0], A.shape[1])
        return Z

    def backward(self, dZ, A_prev):
        m = A_prev.shape[1]
        dW = 1./m * np.dot(dZ, A_prev.T)
        db = 1./m * np.sum(dZ, axis=1, keepdims=True)
        dA_prev = np.dot(self.W.T, dZ)
        
        assert dA_prev.shape == A_prev.shape
        assert dW.shape == self.W.shape
        assert db.shape == self.b.shape
        
        return dA_prev, dW, db

class DropoutLayer(Layer):
    def __init__(self, rate):
        self.rate = rate
        self.mask = None

    def forward(self, A):
        self.mask = np.random.rand(*A.shape) > self.rate
        A = A * self.mask
        A /= (1 - self.rate)
        return A

    def backward(self, dA):
        dA = dA * self.mask
        dA /= (1 - self.rate)
        return dA

class ActivationLayer(Layer):
    def __init__(self, activation):
        self.activation = activation
        self.activation_cache = None

    def forward(self, Z):
        self.activation_cache = Z
        if self.activation == "relu":
            A = np.maximum(0, Z)
            assert A.shape == Z.shape
            return A
        elif self.activation == "sigmoid":
            A = 1 / (1 + np.exp(-Z))
            assert A.shape == Z.shape
            return A
        elif self.activation == "softmax":
            Z = np.clip(Z, -20, 20)
            e_x = np.exp(Z)
            A = e_x / np.sum(e_x, axis=0)
            assert A.shape == Z.shape
            return A + 1e-8

    def backward(self, dA):
        Z = self.activation_cache
        if self.activation == "relu":
            dZ = np.array(dA, copy=True)
            dZ[Z <= 0] = 0
            assert dZ.shape == Z.shape
            return dZ
        elif self.activation == "sigmoid":
            s = 1 / (1 + np.exp(-Z))
            dZ = dA * s * (1 - s)
            assert dZ.shape == Z.shape
            return dZ
        elif self.activation == "softmax":
            # For this case, dA = Y_true
            Z = np.clip(Z, -20, 20)
            e_x = np.exp(Z)
            s = e_x / np.sum(e_x, axis=0)
            dZ = s - dA
            assert dZ.shape == Z.shape
            return dZ
class Network:
    def __init__(self):
        self.layers = []
        self.cache = []
        self.costs = []
        self.train_acc = []
        self.test_acc = []
        self.test_costs = []
        self.test_f1_scores = []
        self.linestyle = '-'
        # For adam optimizer
        self.beta1 = 0.9
        self.beta2 = 0.999
        self.epsilon = 1e-8
        self.t = 0
        self.optimizer = None

    def add(self, layer):
        self.layers.append(layer)

    def forward(self, X, training=True):
        A = X
        self.cache = [(None, X)]
        for layer in self.layers:
            if isinstance(layer, DropoutLayer) and training:
                A = layer.forward(A)
            elif isinstance(layer, DenseLayer):
                Z = layer.forward(A)
                A = layer.activation_function.forward(Z)
                self.cache.append((Z, A))
        assert A.shape == (26, X.shape[1])
        return A + 1e-8

    def backward(self, Y, AL):
        gradients = {}
        Y = Y.reshape(AL.shape)
        L = len(self.layers)
        dA_prev = Y  # Assuming last layer uses softmax
        self.cache.pop()
        for i in reversed(range(L)):
            layer = self.layers[i]
            if isinstance(layer, DropoutLayer):
                dA_prev = layer.backward(dA_prev)
            elif isinstance(layer, DenseLayer):
                self.t += 1
                Z, A_prev = self.cache.pop()
                dZ = layer.activation_function.backward(dA_prev)
                dA_prev, dW, db = layer.backward(dZ, A_prev)
                if self.optimizer == "adam":
                    gradients["vdW" + str(i+1)] = self.beta1 * gradients.get("vdW" + str(i+1), np.zeros_like(dW)) + (1 - self.beta1) * dW
                    gradients["vdb" + str(i+1)] = self.beta1 * gradients.get("vdb" + str(i+1), np.zeros_like(db)) + (1 - self.beta1) * db
                    gradients["sdW" + str(i+1)] = self.beta2 * gradients.get("sdW" + str(i+1), np.zeros_like(dW)) + (1 - self.beta2) * dW**2
                    gradients["sdb" + str(i+1)] = self.beta2 * gradients.get("sdb" + str(i+1), np.zeros_like(db)) + (1 - self.beta2) * db**2
                    vdW_corrected = gradients["vdW" + str(i+1)] / (1 - self.beta1**self.t)
                    vdb_corrected = gradients["vdb" + str(i+1)] / (1 - self.beta1**self.t)
                    sdW_corrected = gradients["sdW" + str(i+1)] / (1 - self.beta2**self.t)
                    sdb_corrected = gradients["sdb" + str(i+1)] / (1 - self.beta2**self.t)
                    dW = vdW_corrected / (np.sqrt(sdW_corrected) + self.epsilon)
                    db = vdb_corrected / (np.sqrt(sdb_corrected) + self.epsilon)
                gradients["dW" + str(i+1)] = dW
                gradients["db" + str(i+1)] = db
        return gradients

    def update_parameters(self, gradients):
        for i, layer in enumerate(self.layers):
            if isinstance(layer, DenseLayer):
                layer.W -= self.learning_rate * gradients["dW" + str(i+1)]
                layer.b -= self.learning_rate * gradients["db" + str(i+1)]

    def train(self, X_train, y_train, epochs, learning_rate=0.1, batch_size=64, valid_split=0.2, optimizer=None, decay_rate=0, linestyle='-'):
        self.linestyle = linestyle
        self.learning_rate = learning_rate
        fixed_lr = learning_rate
        self.optimizer = optimizer
        X_train, y_train, X_valid, y_valid = self.train_test_split(X_train, y_train, valid_split=valid_split)
        num_examples = X_train.shape[1]
        num_batches = num_examples // batch_size
        for epoch in range(epochs):
            train_cost = 0
            self.learning_rate = fixed_lr * np.exp(-decay_rate*epoch)
            for j in tqdm(range(0, num_examples, batch_size)):
                start = j
                end = min(j+batch_size, num_examples)
                X_batch = X_train[:, start:end]
                y_batch = y_train[:, start:end]
                output = self.forward(X_batch)
                cost = self.compute_cost(output, y_batch)
                train_cost += cost
                gradients = self.backward(y_batch, output)
                self.update_parameters(gradients)

            # Compute cost and accuracy
            test_cost = self.compute_cost(self.forward(X_valid), y_valid)
            train_cost /= num_batches
            _, train_acc = self.predict(X_train, y_train)
            test_preds, test_acc = self.predict(X_valid, y_valid)
            test_f1 = f1_score(np.argmax(y_valid, axis=0), test_preds, average='macro')
            self.costs.append(train_cost)
            self.test_costs.append(test_cost)
            self.train_acc.append(train_acc)
            self.test_acc.append(test_acc)
            self.test_f1_scores.append(test_f1)
            
            print(f'Epoch: {epoch+1}/{epochs}, train_cost: {train_cost:.5f}, test_cost: {test_cost:.5f}, train_acc: {train_acc:.5f}, test_acc: {test_acc:.5f}')
            
    def predict(self, X, y):
        m = X.shape[1]   # X = (768, m)
        # y = (26, m)
        y_hat = self.forward(X, training=False)
        p = np.argmax(y_hat, axis=0)
        y = np.argmax(y, axis=0)
        correct = np.sum(p == y)
        # print(f"train examples: {m}, correctly predicted: {correct}, accuracy: {correct/m}")
        return p, correct/m

    @staticmethod
    def compute_cost(AL, Y):
        m = Y.shape[1]
        cost = (-1./m) * np.sum(Y * np.log(AL))
        cost = np.squeeze(cost)
        assert cost.shape == ()
        return cost
    
    def train_test_split(self, X, y, valid_split=0.2):
        # Number of examples
        num_examples = X.shape[1]
        # Splitting the data
        valid_split = int(num_examples * valid_split)
        X_train = X[:, valid_split:]
        y_train = y[:, valid_split:]
        X_valid = X[:, :valid_split]
        y_valid = y[:, :valid_split]

        return X_train, y_train, X_valid, y_valid
    
    def plot_cost(self):
        plt.figure(1)
        plt.plot(np.squeeze(self.costs), linestyle=self.linestyle, color='C0')
        plt.plot(np.squeeze(self.test_costs), linestyle=self.linestyle, color='C1')
        # plt.legend(['train', 'test'], loc='upper right')
        # plt.ylabel('cost')
        # plt.xlabel('epoch')
        # plt.title("Learning rate = " + str(self.learning_rate))
        # plt.savefig('figures/cost.png', dpi=300, bbox_inches='tight')
        # plt.show()
        
    def plot_acc(self):
        plt.figure(2)
        plt.plot(np.squeeze(self.train_acc), linestyle=self.linestyle, color='C0')
        plt.plot(np.squeeze(self.test_acc), linestyle=self.linestyle, color='C1')
        # plt.legend(['train', 'test'], loc='upper left')
        # plt.ylabel('accuracy')
        # plt.xlabel('epoch')
        # plt.title("Learning rate = " + str(self.learning_rate))
        # plt.savefig('figures/acc.png', dpi=300, bbox_inches='tight')
        # plt.show()

    def plot_f1(self):
        plt.figure(3)
        plt.plot(np.squeeze(self.test_f1_scores), linestyle=self.linestyle, color='C3')
        # plt.ylabel('Validation Macro F1 score')
        # plt.xlabel('epoch')
        # plt.title("Learning rate = " + str(self.learning_rate))
        # plt.savefig('figures/f1.png', dpi=300, bbox_inches='tight')
        # plt.show()

    def save_model(self, filepath):
        self.cache.clear()
        self.costs.clear()
        self.train_acc.clear()
        self.test_acc.clear()
        self.test_costs.clear()
        self.test_f1_scores.clear()
        for l in self.layers:
            if isinstance(l, DenseLayer):
                l.activation_function.activation_cache = None
            elif isinstance(l, DropoutLayer):
                l.mask = None
        gc.collect()
        with open(filepath, 'wb') as f:
            pickle.dump(self, f)
        
    
def extract_data_and_labels(dataset):
    # Initialize lists to store data and labels
    data = []
    labels = []

    # Loop through the dataset
    for image_tensor, label in tqdm(dataset):
        # Flatten the image tensor and convert to numpy array
        flattened_image = image_tensor.numpy().flatten()
        data.append(flattened_image)
        labels.append(label)

    # Convert lists to numpy arrays
    return np.array(data), np.array(labels)


def load_data(train=True):
    print("Loading dataset for " + ("training" if train else "testing") + "...")
    dataset = ds.EMNIST(root='./data',
                        split='letters',
                        train=train,
                        transform=transforms.ToTensor(),
                        download=True)
    X, y = extract_data_and_labels(dataset)
    
    X = X.T
    y = y - 1
    y_oh = np.eye(26)[y.astype('int32')]
    y_oh = y_oh.T
    return X, y_oh, y
  


In [2]:
X_train, y_train, y_train_raw = load_data(train=True)
X_test, y_test, y_test_raw = load_data(train=False)
print(X_train.shape)
# Now shuffle the training set
m_train = X_train.shape[1]
np.random.seed(1)
permutation = list(np.random.permutation(m_train))
X_train = X_train[:, permutation]
y_train = y_train[:, permutation]
y_train_raw = y_train_raw[permutation]

Loading dataset for training...
Downloading https://www.itl.nist.gov/iaui/vip/cs_links/EMNIST/gzip.zip to ./data/EMNIST/raw/gzip.zip


100%|██████████| 561753746/561753746 [00:18<00:00, 30123954.59it/s]


Extracting ./data/EMNIST/raw/gzip.zip to ./data/EMNIST/raw


100%|██████████| 124800/124800 [00:17<00:00, 7332.40it/s]


Loading dataset for testing...


100%|██████████| 20800/20800 [00:02<00:00, 7706.44it/s]


(784, 124800)


In [4]:
import os
savefolder = 'fig_model1'
os.mkdir(savefolder)


In [6]:
!ls

data  fig_model1


In [None]:
from matplotlib.lines import Line2D
from matplotlib.patches import Patch
plt.ioff()
learning_rates = [0.005, 0.001, 0.0005, 0.0001]
fig_prefixes = ['005', '001', '0005', '0001']
styles = ['--', '-.', '-', ':']
for lr, prefix, s in zip(learning_rates, fig_prefixes, styles):
    print("Using lr: " + str(lr))
    np.random.seed(1)
    model = Network()
    model.add(DenseLayer(input_size=X_train.shape[0], output_size=512))
    model.add(DropoutLayer(rate=0.2))
    model.add(DenseLayer(input_size=512, output_size=128))
    model.add(DropoutLayer(rate=0.2))
    model.add(DenseLayer(input_size=128, output_size=128))
    model.add(DropoutLayer(rate=0.2))
    model.add(DenseLayer(input_size=128, output_size=64))
    model.add(DropoutLayer(rate=0.2))
    model.add(DenseLayer(input_size=64, output_size=64))
    model.add(DropoutLayer(rate=0.2))
    model.add(DenseLayer(input_size=64, output_size=26, activation='softmax'))
    # Assume X_train and y_train are defined
    model.train(X_train, y_train, epochs=30, batch_size=1024, learning_rate=lr, valid_split=0.15, optimizer='adam', decay_rate=0.08, linestyle=s)
    model.plot_cost()
    model.plot_acc()
    model.plot_f1()

    y_hat, acc = model.predict(X_test, y_test)
    print(f'Test accuracy: {acc}')
    macro_f1 = f1_score(y_test_raw, y_hat, average='macro')
    print(f'Test macro F1 score: {macro_f1:.5f}')

    C_counts = confusion_matrix(y_test_raw, y_hat)
    C = confusion_matrix(y_test_raw, y_hat, normalize='true')
    plt.figure(4)
    fig = plt.figure(figsize=(10, 10))
    plt.imshow(C, 'Blues', vmax=0.1)
    # also show the numbers
    for i in range(26):
        for j in range(26):
            plt.text(j, i, f'{C_counts[i, j]}', horizontalalignment='center', verticalalignment='center', fontsize=8)
    plt.xticks(range(26), labels=[chr(i+97) for i in range(26)])
    plt.yticks(range(26), labels=[chr(i+97) for i in range(26)])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.savefig(f'{savefolder}/{prefix}_conf.png', dpi=300, bbox_inches='tight')

plt.figure(1)
legend_elements = [
                  Line2D([0], [0], color='C0', lw=1.5, label='Train'),
                  Line2D([0], [0], color='C1', lw=1.5, label='Validation'),
                  Line2D([0], [0], color='black', lw=1.5, label='0.005', linestyle=styles[0]),
                  Line2D([0], [0], color='black', lw=1.5, label='0.001', linestyle=styles[1]),
                  Line2D([0], [0], color='black', lw=1.5, label='0.0005', linestyle=styles[2]),
                  Line2D([0], [0], color='black', lw=1.5, label='0.0001', linestyle=styles[3])
          ]
plt.ylabel('cost')
plt.xlabel('epoch')
plt.title('Cost vs Epoch')
plt.legend(handles=legend_elements, loc='upper right')
plt.savefig(f'{savefolder}/cost.png', dpi=300, bbox_inches='tight')

plt.figure(2)
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.title('Accuracy vs Epoch')
plt.legend(handles=legend_elements, loc='lower right')
plt.savefig(f'{savefolder}/acc.png', dpi=300, bbox_inches='tight')


legend_elements = [
                  Line2D([0], [0], color='black', lw=1.5, label='0.005', linestyle=styles[0]),
                  Line2D([0], [0], color='black', lw=1.5, label='0.001', linestyle=styles[1]),
                  Line2D([0], [0], color='black', lw=1.5, label='0.0005', linestyle=styles[2]),
                  Line2D([0], [0], color='black', lw=1.5, label='0.0001', linestyle=styles[3])
          ]
plt.figure(3)
plt.xlabel('epoch')
plt.ylabel('macro-f1')
plt.title('Validation Macro-F1 vs Epoch')
plt.legend(handles=legend_elements, loc='lower right')
plt.savefig(f'{savefolder}/f1.png', dpi=300, bbox_inches='tight')


Using lr: 0.005


100%|██████████| 104/104 [00:14<00:00,  7.04it/s]


Epoch: 1/30, train_cost: 3.88958, test_cost: 3.23936, train_acc: 0.07115, test_acc: 0.07025


100%|██████████| 104/104 [00:17<00:00,  6.12it/s]


Epoch: 2/30, train_cost: 3.57497, test_cost: 3.51329, train_acc: 0.12623, test_acc: 0.11993


 70%|███████   | 73/104 [00:10<00:04,  7.31it/s]