In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sys
import matplotlib.pyplot as plt
from keras.datasets import mnist
from PIL import Image

### Define some related functions

In [None]:
# activation functions
def tanh(x):
    return np.tanh(x)

def tanh_prime(x):
    return 1 - np.tanh(x)**2

def ReLU(x):
    return np.maximum(x, 0)

def ReLU_prime(x):
    return np.heaviside(x, 1)

def linear(x):
    return x

def linear_prime(x):
    return 1


# error functions
def rmse(y_true, y_pred):
    return np.sqrt(np.mean((y_true - y_pred)**2))

def accuracy(y_true, y_pred):
    correct = 0.0
    for i in range(len(y_true)):
        if np.argmax(y_true[i]) == np.argmax(y_pred[i]):
            correct += 1
    return correct / len(y_true)

# loss functions (single example)
def se(y_true, y_pred):
    return (y_true - y_pred)**2

def se_prime(y_true, y_pred):
    return (-2) * (y_true - y_pred)

def softmax(logits):
    exp_logits = np.exp(logits)
    return exp_logits / np.sum(exp_logits)

def softmax_cross_entropy(y_true, y_pred):
    epsilon = 1e-12
    y_pred_softmax = softmax(y_pred)
    #y_pred_softmax = np.clip(y_pred_softmax, epsilon, 1. - epsilon)
    return -1 * np.sum(np.multiply(y_true, np.log(y_pred_softmax)))

def softmax_cross_entropy_prime(y_true, y_pred):
    y_pred_softmax = softmax(y_pred)
    return  y_pred_softmax - y_true

### Define several layers

In [None]:
# Define layers
class Layer:
    def __init__(self):
        self.input = None
        self.output = None

    def forward(self, input_data):
        raise NotImplementedError("Forward prop not implemented")

    def backprop(self, output_grad, learning_rate):
        raise NotImplementedError("Backward prop not implemented")

In [None]:
class ActivationLayer(Layer):
    def __init__(self, activation, activation_prime):
        super().__init__()
        self.activation = activation
        self.activation_prime = activation_prime

    def forward(self, input_data):
        self.input = input_data
        self.output = self.activation(self.input)
        return self.output

    def backprop(self, output_grad, learning_rate):
        return np.multiply(self.activation_prime(self.input), output_grad)

In [None]:
class FullyConnectedLayer(Layer):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.weights = np.random.randn(input_size, output_size) / np.sqrt(input_size)  # Xavier initialization
        self.biases = np.zeros((1, output_size))

    def forward(self, input_data):
        self.input = input_data
        self.output = np.dot(input_data, self.weights) + self.biases
        return self.output

    def backprop(self, output_grad, learning_rate):
        input_grad = np.dot(output_grad, self.weights.T)
        weight_grad = np.dot(self.input.T, output_grad)
        bias_grad = np.sum(output_grad, axis=0, keepdims=True)

        self.weights -= learning_rate * weight_grad
        self.biases -= learning_rate * bias_grad

        return input_grad

In [None]:
class FlattenLayer(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input_data):
        self.input = input_data
        flattened_size = np.prod(input_data.shape)
        self.output = input_data.reshape(1, flattened_size)
        return self.output

    def backprop(self, output_grad, learning_rates):
        input_grad = output_grad.reshape(self.input.shape)
        return input_grad

In [None]:
class ConvolutionalLayer(Layer):
    def __init__(self, filters, kernel_size):
        super().__init__()
        self.num_filters = filters
        self.kernel_size = kernel_size
        self.weights = None
        self.bias = None


    def initialize(self, input_shape):
        num_channels = input_shape[0]
        self.weights = np.random.randn(self.num_filters, num_channels, self.kernel_size, self.kernel_size)
        self.bias = np.zeros((self.num_filters, 1))


    def forward(self, input_data):
        self.input = input_data
        self.initialize(self.input.shape)
        num_channels, input_height, input_width = input_data.shape
        output_height = input_height - self.kernel_size + 1
        output_width = input_width - self.kernel_size + 1
        self.output = np.zeros((self.num_filters, output_height, output_width))

        for f in range(self.num_filters):
            for i in range(output_height):
                for j in range(output_width):
                    receptive_field = input_data[:, i: i + self.kernel_size, j:j + self.kernel_size]
                    self.output[f, i, j] = np.sum(receptive_field * self.weights[f]) + self.bias[f]

        return self.output


    def backprop(self, output_grad, learning_rate):
        num_channels, input_height, input_width = self.input.shape
        num_filters, output_height, output_width = self.output.shape

        weight_grad = np.zeros_like(self.weights)
        bias_grad = np.zeros_like(self.bias)
        input_grad = np.zeros_like(self.input)

        for f in range(num_filters):
            for i in range(output_height):
                for j in range(output_width):
                    receptive_field = self.input[:, i: i + self.kernel_size, j: j + self.kernel_size]
                    weight_grad[f] += receptive_field * output_grad[f, i, j]
                    bias_grad[f] += output_grad[f, i, j]
                    input_grad[:, i: i + self.kernel_size, j: j + self.kernel_size] += self.weights[f] * output_grad[f, i, j]

        self.weights -= learning_rate * weight_grad
        self.bias -= learning_rate * bias_grad

        return input_grad

In [None]:
class MaxPoolingLayer(Layer):
    def __init__(self, pool_size):
       super().__init__()
       self.pool_size = pool_size


    def forward(self, input_data):
        self.input = input_data
        num_channels, input_height, input_width = input_data.shape
        pool_height = input_height // self.pool_size
        pool_width = input_width // self.pool_size
        self.output = np.zeros((num_channels, pool_height, pool_width))

        for c in range(num_channels):
            for i in range(pool_height):
                for j in range(pool_width):
                    receptive_field = input_data[c, i*self.pool_size : (i+1)*self.pool_size, j*self.pool_size : (j+1)*self.pool_size]
                    self.output[c, i, j] = np.max(receptive_field)

        return self.output


    def backprop(self, output_grad, learning_rate):
        num_channels, input_height, input_width = self.input.shape
        pool_height = input_height // self.pool_size
        pool_width = input_width // self.pool_size
        input_grad = np.zeros_like(self.input)

        for c in range(num_channels):
            for i in range(pool_height):
                for j in range(pool_width):
                    receptive_field = self.input[c, i*self.pool_size : (i+1)*self.pool_size, j*self.pool_size : (j+1)*self.pool_size]
                    max_value = np.max(receptive_field)
                    mask = (receptive_field == max_value)
                    input_grad[c, i*self.pool_size : (i+1)*self.pool_size, j*self.pool_size : (j+1)*self.pool_size] = mask * output_grad[c, i, j]

        return input_grad

### Define neural network

In [None]:
class Network:
    def __init__(self, loss, loss_prime):
        self.layers = []
        self.loss_func = loss
        self.loss_prime = loss_prime

    def add_layer(self, layer):
        self.layers.append(layer)

    def predict(self, input_data):
        output = input_data
        for layer in self.layers:
            output = layer.forward(output)
        return output

    def train(self, x_train, y_train, epoch_num, lr):
        sample_size = len(x_train)
        error_list = []
        print("==== Start training ====")

        for epoch in range(epoch_num):
            err = 0
            output_list = []
            correct = 0

            # Shuffle the training data and labels
            shuffled_indices = np.random.permutation(sample_size)
            x_train_shuffled = x_train[shuffled_indices]
            y_train_shuffled = y_train[shuffled_indices]

            for i in range(sample_size):
                output = x_train_shuffled[i]

                # Reshape input if it is 1D
                if output.ndim == 1:
                    output = output.reshape(1, -1)

                # Forward propagation
                for layer in self.layers:
                    output = layer.forward(output)

                output_list.append(output)
                err += self.loss_func(y_train_shuffled[i], output)
                print(f"{epoch+1}-{i}th sample")
                print(f"true: {np.argmax(y_train_shuffled[i])}, pred: {np.argmax(output)}")

                # Backpropagation
                grad = self.loss_prime(y_train_shuffled[i], output)
                #print(grad)
                for layer in reversed(self.layers):
                    grad = layer.backprop(grad, lr)

            err /= sample_size
            error_list.append(err)

            print('epoch %d/%d   loss=%f' % (epoch + 1, epoch_num, err))

        return error_list


In [None]:
class Classifier(Network):
    def __init__(self, loss, loss_prime):
        super().__init__(loss, loss_prime)

    def predict(self, input_data):
        sample_size = len(input_data)
        result = []

        for i in range(sample_size):
            output = input_data[i]

            for layer in self.layers:
                output = layer.forward(output)

            output = softmax(output)  # Apply softmax activation
            result.append(output)

        return np.array(result)

In [5]:
# Function for one-hot encoding
def one_hot_encoding(labels, num_classes):
    length = len(labels)
    encoded_labels = np.zeros((length, num_classes))

    for i in range(length):
        encoded_labels[i, int(labels[i])] = 1

    return encoded_labels

### Data preprocessing

In [6]:
# Load the data
(x_train, y_train), (x_test, y_test) = mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [7]:
# Filter the training dataset to include only 2000 samples for each digit
# Filter the training dataset to include only digits 0 to 4
selected_digits = [0, 1, 2, 3, 4]
filtered_x_train = []
filtered_y_train = []

num_samples_per_digit = 2000

for digit in selected_digits:
    digit_indices = np.where(y_train == digit)[0][:num_samples_per_digit]
    filtered_x_train.append(x_train[digit_indices])
    filtered_y_train.append(y_train[digit_indices])

filtered_x_train = np.concatenate(filtered_x_train, axis=0)
filtered_y_train = np.concatenate(filtered_y_train, axis=0)

In [8]:
# Filter the testing dataset to include only 500 samples for each digit
num_samples_per_digit = 500
filtered_x_test = []
filtered_y_test = []

for digit in selected_digits:
    digit_indices = np.where(y_test == digit)[0][:num_samples_per_digit]
    filtered_x_test.append(x_test[digit_indices])
    filtered_y_test.append(y_test[digit_indices])

filtered_x_test = np.concatenate(filtered_x_test, axis=0)
filtered_y_test = np.concatenate(filtered_y_test, axis=0)

In [9]:
# Expand one dimnesion for channel
x_train, x_test = np.expand_dims(filtered_x_train, 1), np.expand_dims(filtered_x_test, 1)
y_train, y_test = one_hot_encoding(filtered_y_train, 5), one_hot_encoding(filtered_y_test, 5)

# Resize the data
x_train_trans, x_test_trans = np.zeros((len(x_train), 1, 14, 14)), np.zeros((len(x_test), 1, 14, 14))
for idx in range(len(x_train)):
    for i in range(0, 28, 2):
        for j in range(0, 28, 2):
            avg = (int(x_train[idx, 0, i, j])+int(x_train[idx, 0, i+1, j])+int(x_train[idx, 0, i, j+1])+int(x_train[idx, 0, i+1, j+1])) // (2*2)
            x_train_trans[idx, 0, i//2, j//2] = avg

for idx in range(len(x_test)):
    for i in range(0, 28, 2):
        for j in range(0, 28, 2):
            avg = (int(x_test[idx, 0, i, j])+int(x_test[idx, 0, i+1, j])+int(x_test[idx, 0, i, j+1])+int(x_test[idx, 0, i+1, j+1])) // (2*2)
            x_test_trans[idx, 0, i//2, j//2] = avg

In [10]:
# Normalize the data
x_train_trans, x_test_trans = x_train_trans / 255, x_test_trans / 255

### Flatten the images and try ordinary DNN first

In [None]:
# Build the model
dnn_scratch = Classifier(softmax_cross_entropy, softmax_cross_entropy_prime)
#dnn_scratch.add_layer(ConvolutionalLayer(16, 3))
#dnn_scratch.add_layer(MaxPoolingLayer(2))
dnn_scratch.add_layer(FullyConnectedLayer(196, 16))
dnn_scratch.add_layer(ActivationLayer(ReLU, ReLU_prime))
dnn_scratch.add_layer(FullyConnectedLayer(16, 5))

In [None]:
X_train = x_train_trans.reshape(len(x_train_trans), -1)

# Fit the model
error = dnn_scratch.train(X_train, y_train, 10, 5e-4)

[1;30;43m串流輸出內容已截斷至最後 5000 行。[0m
true: 2, pred: 2
10-7501th sample
true: 1, pred: 1
10-7502th sample
true: 4, pred: 4
10-7503th sample
true: 1, pred: 1
10-7504th sample
true: 4, pred: 4
10-7505th sample
true: 4, pred: 4
10-7506th sample
true: 1, pred: 1
10-7507th sample
true: 2, pred: 2
10-7508th sample
true: 2, pred: 2
10-7509th sample
true: 4, pred: 4
10-7510th sample
true: 0, pred: 0
10-7511th sample
true: 2, pred: 2
10-7512th sample
true: 1, pred: 1
10-7513th sample
true: 3, pred: 3
10-7514th sample
true: 0, pred: 0
10-7515th sample
true: 2, pred: 2
10-7516th sample
true: 4, pred: 4
10-7517th sample
true: 4, pred: 4
10-7518th sample
true: 0, pred: 0
10-7519th sample
true: 0, pred: 0
10-7520th sample
true: 1, pred: 1
10-7521th sample
true: 0, pred: 0
10-7522th sample
true: 4, pred: 4
10-7523th sample
true: 3, pred: 3
10-7524th sample
true: 1, pred: 1
10-7525th sample
true: 1, pred: 1
10-7526th sample
true: 1, pred: 1
10-7527th sample
true: 1, pred: 1
10-7528th sample
true: 0, pred

In [None]:
X_test = x_test_trans.reshape(len(x_test_trans), -1)
y_test_pred = dnn_scratch.predict(X_test)
acc_rate = accuracy(y_test, y_test_pred)
print('Test accuracy:', acc_rate)

Test accuracy: 0.9628


### Try to build a CNN model

In [None]:
# Build the CNN model
cnn_scratch = Classifier(softmax_cross_entropy, softmax_cross_entropy_prime)
cnn_scratch.add_layer(ConvolutionalLayer(16, 3))
cnn_scratch.add_layer(MaxPoolingLayer(2))
cnn_scratch.add_layer(FlattenLayer())
cnn_scratch.add_layer(FullyConnectedLayer(576, 16))
cnn_scratch.add_layer(ActivationLayer(ReLU, ReLU_prime))
cnn_scratch.add_layer(FullyConnectedLayer(16, 5))

In [None]:
# Fit the model
error = cnn_scratch.train(x_train_trans, y_train, 10, 5e-4)

[1;30;43m串流輸出內容已截斷至最後 5000 行。[0m
true: 2, pred: 2
10-7501th sample
true: 3, pred: 3
10-7502th sample
true: 0, pred: 0
10-7503th sample
true: 3, pred: 3
10-7504th sample
true: 3, pred: 3
10-7505th sample
true: 2, pred: 2
10-7506th sample
true: 4, pred: 1
10-7507th sample
true: 4, pred: 4
10-7508th sample
true: 4, pred: 4
10-7509th sample
true: 3, pred: 3
10-7510th sample
true: 4, pred: 4
10-7511th sample
true: 0, pred: 0
10-7512th sample
true: 0, pred: 0
10-7513th sample
true: 0, pred: 3
10-7514th sample
true: 4, pred: 4
10-7515th sample
true: 2, pred: 2
10-7516th sample
true: 3, pred: 3
10-7517th sample
true: 2, pred: 2
10-7518th sample
true: 1, pred: 1
10-7519th sample
true: 2, pred: 2
10-7520th sample
true: 2, pred: 0
10-7521th sample
true: 0, pred: 0
10-7522th sample
true: 2, pred: 2
10-7523th sample
true: 0, pred: 0
10-7524th sample
true: 0, pred: 0
10-7525th sample
true: 3, pred: 3
10-7526th sample
true: 4, pred: 4
10-7527th sample
true: 1, pred: 1
10-7528th sample
true: 4, pred

In [None]:
y_test_pred_c = cnn_scratch.predict(x_test_trans)
acc_rate_c = accuracy(y_test, y_test_pred_c)
print('Test accuracy:', acc_rate_c)

Test accuracy: 0.8644


## Also try PyTorch and conpare the result

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

DNN

In [30]:
# Define the neural network model
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(196, 16)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(16, 5)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# Prepare the data
X_train = torch.tensor(x_train_trans, dtype=torch.float32)
Y_train = torch.tensor(y_train, dtype=torch.float32)
X_test = torch.tensor(x_test_trans, dtype=torch.float32)
Y_test = torch.tensor(y_test, dtype=torch.float32)

# Create data loaders
train_dataset = TensorDataset(X_train, Y_train)
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)

model = Net()

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.0005)

epochs = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(epochs):
    print('Epoch', epoch + 1)
    model.train()

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        # Flatten the data
        data = data.view(data.size(0), -1)

        # Forward pass
        optimizer.zero_grad()
        output = model(data)
        loss = loss_fn(output, torch.argmax(target, dim=1))

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        if batch_idx % 100 == 0:
            print('Batch', batch_idx, 'Loss:', loss.item())

Epoch 1
Batch 0 Loss: 1.3809449672698975
Batch 100 Loss: 1.7081371545791626
Batch 200 Loss: 1.8613947629928589
Batch 300 Loss: 1.3616857528686523
Batch 400 Loss: 1.496755599975586
Batch 500 Loss: 1.2656413316726685
Batch 600 Loss: 1.7245572805404663
Batch 700 Loss: 1.4351884126663208
Batch 800 Loss: 1.5370246171951294
Batch 900 Loss: 1.6805005073547363
Batch 1000 Loss: 1.4564623832702637
Batch 1100 Loss: 1.8486007452011108
Batch 1200 Loss: 1.6718838214874268
Batch 1300 Loss: 1.7504541873931885
Batch 1400 Loss: 1.3937500715255737
Batch 1500 Loss: 1.2902774810791016
Batch 1600 Loss: 1.766621470451355
Batch 1700 Loss: 1.6540074348449707
Batch 1800 Loss: 1.3343671560287476
Batch 1900 Loss: 1.3276315927505493
Batch 2000 Loss: 1.395817756652832
Batch 2100 Loss: 1.3686556816101074
Batch 2200 Loss: 1.7217211723327637
Batch 2300 Loss: 1.4502910375595093
Batch 2400 Loss: 1.5354878902435303
Batch 2500 Loss: 1.690089464187622
Batch 2600 Loss: 1.3905677795410156
Batch 2700 Loss: 1.451620101928711
B

In [31]:
# Evaluate the model
model.eval()
with torch.no_grad():
    x_test_flat = X_test.view(X_test.size(0), -1)
    logits = model(x_test_flat)
    predicted_labels = torch.argmax(logits, dim=1)
    acc = (predicted_labels == torch.argmax(Y_test, dim=1)).float().mean()
    print('Test Accuracy:', acc.item())

Test Accuracy: 0.9624000191688538


### CNN

In [59]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers

# Rearrange dimensions of input data
x_train_cnn = tf.transpose(x_train_trans, [0, 2, 3, 1])
x_test_cnn = tf.transpose(x_test_trans, [0, 2, 3, 1])

# Define the model architecture
model = models.Sequential()
model.add(layers.Conv2D(16, (3, 3), activation='linear', input_shape=(14, 14, 1)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Flatten())
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(5, activation='softmax'))

sgd_optimizer = optimizers.SGD(learning_rate=0.0005)
model.compile(optimizer=sgd_optimizer,
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy'])

model.summary()

# Train
history = model.fit(x_train_cnn, y_train, epochs=10, batch_size=8)


Model: "sequential_13"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_13 (Conv2D)          (None, 12, 12, 16)        160       
                                                                 
 max_pooling2d_12 (MaxPoolin  (None, 6, 6, 16)         0         
 g2D)                                                            
                                                                 
 flatten_12 (Flatten)        (None, 576)               0         
                                                                 
 dense_24 (Dense)            (None, 64)                36928     
                                                                 
 dense_25 (Dense)            (None, 5)                 325       
                                                                 
Total params: 37,413
Trainable params: 37,413
Non-trainable params: 0
_________________________________________________

In [60]:
# Evaluate the model on the test data
test_loss, test_accuracy = model.evaluate(x_test_cnn, y_test)
print('Test accuracy:', test_accuracy)

Test accuracy: 0.9527999758720398
