In [1]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import numpy as np
import requests
import json

In [3]:

try:
    from tqdm import tqdm
except ImportError:
    tqdm = lambda x, total, unit: x  # If tqdm doesn't exist, replace it with a function that does nothing
    print('**** Could not import tqdm. Please install tqdm for download progressbars! (pip install tqdm) ****')

# Python2 compatibility
try:
    input = raw_input
except NameError:
    pass

download_dict = {
    '1) Kuzushiji-MNIST (10 classes, 28x28, 70k examples)': {
        '1) MNIST data format (ubyte.gz)':
            ['http://codh.rois.ac.jp/kmnist/dataset/kmnist/train-images-idx3-ubyte.gz',
             'http://codh.rois.ac.jp/kmnist/dataset/kmnist/train-labels-idx1-ubyte.gz',
             'http://codh.rois.ac.jp/kmnist/dataset/kmnist/t10k-images-idx3-ubyte.gz',
             'http://codh.rois.ac.jp/kmnist/dataset/kmnist/t10k-labels-idx1-ubyte.gz'],
        '2) NumPy data format (.npz)':
            ['http://codh.rois.ac.jp/kmnist/dataset/kmnist/kmnist-train-imgs.npz',
             'http://codh.rois.ac.jp/kmnist/dataset/kmnist/kmnist-train-labels.npz',
             'http://codh.rois.ac.jp/kmnist/dataset/kmnist/kmnist-test-imgs.npz',
             'http://codh.rois.ac.jp/kmnist/dataset/kmnist/kmnist-test-labels.npz'],
    }
}

# Download a list of files
def download_list(url_list):
    for url in url_list:
        path = url.split('/')[-1]
        r = requests.get(url, stream=True)
        with open(path, 'wb') as f:
            total_length = int(r.headers.get('content-length'))
            print('Downloading {} - {:.1f} MB'.format(path, (total_length / 1024000)))

            for chunk in tqdm(r.iter_content(chunk_size=1024), total=int(total_length / 1024) + 1, unit="KB"):
                if chunk:
                    f.write(chunk)
    print('All dataset files downloaded!')

def traverse_dict(d):
    if isinstance(d, list):  # If we've hit a list of downloads, download that list
        download_list(d)
    else:
        selected = list(d.keys())[0]  # Select the first option by default
        traverse_dict(d[selected])     # Repeat with the next level

traverse_dict(download_dict['1) Kuzushiji-MNIST (10 classes, 28x28, 70k examples)']['2) NumPy data format (.npz)'])


Downloading kmnist-train-imgs.npz - 18.0 MB


100%|██████████| 17954/17954 [00:13<00:00, 1290.40KB/s]


Downloading kmnist-train-labels.npz - 0.0 MB


100%|██████████| 30/30 [00:00<00:00, 206.69KB/s]


Downloading kmnist-test-imgs.npz - 3.0 MB


100%|██████████| 3008/3008 [00:03<00:00, 971.24KB/s]


Downloading kmnist-test-labels.npz - 0.0 MB


100%|██████████| 6/6 [00:00<00:00, 13919.15KB/s]

All dataset files downloaded!





In [4]:
X_train = np.load('kmnist-train-imgs.npz')['arr_0']
y_train = np.load('kmnist-train-labels.npz')['arr_0']

X_test = np.load('kmnist-test-imgs.npz')['arr_0']
y_test = np.load('kmnist-test-labels.npz')['arr_0']

print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

(60000, 28, 28)
(60000,)
(10000, 28, 28)
(10000,)


#Architecture:

1. **Input Layer**:
   - The input layer is responsible for passing the input data to the subsequent layers.


2. **Convolutional Layer**:
   - This layer performs convolution operations on the input data using learnable filters (kernels).

3. **Pooling Layer**:
   - The pooling layer reduces the spatial dimensions of the feature maps generated by the convolutional layer.

4. **ReLU Layer (Rectified Linear Unit)**:
   - The ReLU layer introduces non-linearity into the network by applying the ReLU activation function to the feature maps.

5. **Reshaping Layer**:
   - The reshaping layer reshapes the output of the preceding layers into a format suitable for feeding into fully connected layers.

6. **Fully Connected (Linear) Layers**:
   - These layers consist of neurons that are fully connected to all neurons in the previous layer.
   
7. **Softmax Layer**:
   - It computes the probabilities of each class given the input and ensures that the sum of these probabilities is 1.

8. **Loss Function (Cross Entropy)**:
   - The cross-entropy loss function is used to measure the difference between the predicted probability distribution and the actual distribution (one-hot encoded labels).

9. **Accuracy Calculation**:
    - The accuracy module calculates the accuracy of the model predictions by comparing the predicted class labels with the true class labels.

10. **Training Loop**:
    - The training loop runs for multiple epochs, where each epoch consists of iterations over batches of training data. In each iteration, forward pass, backward pass (backpropagation), and optimization (applying SGD) are performed to update the parameters of the network.
    - Learning rate adjustments based on performance thresholds are also implemented to improve convergence and accuracy.


In [5]:

class Neural_Network:

    def __init__(self, Network):
        self.Network = Network

    def forward_pass(self, X):
        n = X
        for i in self.Network:
            n = i.forward_pass(n,saved_weights = None)
        return n

    def backprop(self, Y):
        m = Y
        for i in (reversed(self.Network)):
            m = i.backprop(m)

    def applying_sgd(self):
        for i in self.Network:
            i.applying_sgd()

    def change_alpha(self):
        for i in self.Network:
            i.change_alpha()

    def saving_params(self):
        saved_params = []
        for i,layer in enumerate(self.Network):
            saved_params.append(layer.saving_params())

        return saved_params


    def predict(self,X,saved_params):
        n = X
        for i,layer in enumerate(self.Network):
            n = layer.forward_pass(n,saved_weights = saved_params[i])

        return n




#ACCURACY

In [6]:
class accuracy:
    def __init__(self):
        pass

    def value(self, out, Y):
        self.out = np.argmax(out, axis=1)
        return np.mean(self.out == Y)


# SoftMax

1. forward_pass(x) : a = softmax(x). returns a
2. backward_prop(actual_y) : returns gradient = a - expansion(actual)
3. expansion(actual_y) : returns one hot vector of actual_y

In [7]:
class softmax:

    def __init__(self):
        pass

    def expansion(self, t):
        (a,) = t.shape
        Y = np.zeros((a,10))
        for i in range(0,a):
            Y[i,t[i]] = 1
        return Y

    def forward_pass(self, z, saved_weights = None):
        self.z =  z
        (p,t) = self.z.shape
        self.a = np.zeros((p,t))
        for i in range(0,p):
            denominator = np.sum(np.exp(self.z[i,:]))
            for ii in range(0,t):
                self.a[i,ii] = np.exp(self.z[i,ii])/denominator
        # print("r_soft_for")
        return self.a

    def backprop(self, Y):
        y = self.expansion(Y)
        self.grad = (self.a - y)
        # print("r_soft_back")
        return self.grad

    def applying_sgd(self):
        pass

    def change_alpha(self):
        pass

    def saving_params(self):
        return (None)


#Linear Layer
1. x is input (prev layer output) and grad_forward is gradient from next layer.
2. forward prop(x) --> (thetha)x + b
3. back_prop(grad_forward) --> grdient wrt x,theta,b. Returns grad_x



In [8]:
class Linear_Layer:

    def __init__(self, in_dim, out_dim, alpha = 0.01):
        self.alpha = alpha
        self.Theta = np.random.randn(in_dim, out_dim)
        self.Theta = self.Theta / np.sum(self.Theta)
        self.bias = np.random.randn(out_dim)
        self.bias = self.bias / np.sum(self.bias)

    def forward_pass(self, X, saved_weights = None):
        if saved_weights != None:
           self.Theta =  saved_weights[0]
           self.bias = saved_weights[1]

        self.X = X
        self.z = np.matmul(X, self.Theta) + self.bias
        # print("r_dense_for")
        return self.z


    def backprop(self, grad_previous):
        t= self.X.shape[0]
        self.grad = np.matmul((self.X.transpose()), grad_previous)/t
        self.grad_bias = (grad_previous.sum(axis=0))/t
        self.grad_a = np.matmul(grad_previous, self.Theta.transpose())
        # print("r_dense_back")
        return self.grad_a


    def applying_sgd(self):
            self.Theta = self.Theta - (self.alpha*self.grad)
            self.bias = self.bias - (self.alpha*self.grad_bias)

    def change_alpha(self):
        self.alpha = self.alpha/10

    def saving_params(self):
      return (self.Theta, self.bias)

#FLATTEN

In [9]:
class reshaping:

    def __init__(self):
        pass

    def forward_pass(self, data, saved_weights = None):
        self.data_shape = data.shape

        self.flatten = data.reshape(self.data_shape[0], self.data_shape[1]*self.data_shape[2]*self.data_shape[3])
        # print("ret_reshape_for")
        print(self.flatten.shape)
        return self.flatten

    def backprop(self, data):
        # print("r_reshape_back")
        return (data.reshape(self.data_shape[0], self.data_shape[1], self.data_shape[2], self.data_shape[3]))

    def applying_sgd(self):
        pass

    def change_alpha(self):
        pass

    def saving_params(self):
        return (None)



#RELU

In [10]:

class relu:
    def __init__(self):
        pass

    def forward_pass(self, z, saved_weights = None):
        self.z = z
        # print("r_relu_for")
        return np.maximum(0, z)

    def derivative(self, a):
        return np.where(a > 0, 1, 0)

    def backprop(self, grad_previous):
        # print("r_relu_back")
        return grad_previous * self.derivative(self.z)

    def applying_sgd(self):
        pass

    def change_alpha(self):
        pass

    def saving_params(self):
        return (None)

#CROSS ENTROPY

In [11]:

class cross_entropy:
    def __init__(self):
        pass

    def expansion(self, t):
        return np.eye(10)[t]

    def loss(self, A, Y):
        exp_Y = self.expansion(Y)

        loss_matrix = -np.log(1 - A) * (exp_Y == 0) - np.log(A) * (exp_Y == 1)

        max_log_loss = np.max(loss_matrix, axis=1, keepdims=True)
        log_sum_exp = np.log(np.sum(np.exp(loss_matrix - max_log_loss), axis=1, keepdims=True)) + max_log_loss
        average_loss = np.mean(log_sum_exp)

        return average_loss

In [12]:

class pooling:
    def __init__(self, pool_size=(2, 2), strides=None):
        self.pool_height, self.pool_width = pool_size
        if strides is None:
            self.strides = pool_size
        else:
            self.strides = strides

    def forward_pass(self, input_data, saved_weights = None):
        self.input_data_shape = input_data.shape
        batch_size, input_channels, input_height, input_width = input_data.shape
        output_height = (input_height - self.pool_height) // self.strides[0] + 1
        output_width = (input_width - self.pool_width) // self.strides[1] + 1
        self.output = np.zeros((batch_size, input_channels, output_height, output_width))

        for b in range(batch_size):
          for c in range(input_channels):
            for i in range(output_height // self.strides[0]):
                for j in range(output_width // self.strides[1]):
                        self.output[b, c, i, j] = np.max(input_data[b, c, i*self.strides[0]:i*self.strides[0]+self.pool_height,
                                            j*self.strides[1]:j*self.strides[1]+self.pool_width])
        # print("r_pool_for")
        return self.output

    def backprop(self, grad_previous):
        batch_size, input_channels, output_height, output_width = grad_previous.shape
        grad_input = np.zeros(self.input_data_shape)

        for b in range(batch_size):
          for c in range(input_channels):
            for i in range(output_height//self.strides[0]):
                for j in range(output_width//self.strides[1]):
                        patch = self.output[b, c, i*self.strides[0]:i*self.strides[0]+self.pool_height,
                                            j*self.strides[1]:j*self.strides[1]+self.pool_width]
                        max_index = np.unravel_index(np.argmax(patch), patch.shape)

                        grad_input[b, c, i*self.strides[0]+max_index[0], j*self.strides[1]+max_index[1]] = grad_previous[b, c, i, j]
        # print("r_pool_back")
        return grad_input

    def applying_sgd(self):
        pass

    def change_alpha(self):
        pass

    def saving_params(self):
        return (None)





#CONV LAYER

In [13]:
class Convolutional_Layer:
    def __init__(self, filter_dim = 3, stride = 1, pad = 1, alpha=0.01, num_of_filters = 1):
        self.filter_dim = filter_dim
        self.n_filters = num_of_filters
        self.stride = stride
        self.bias = np.random.randn(self.n_filters, 1)
        self.bias = self.bias / np.sum(self.bias)
        self.filter = np.random.randn(self.n_filters, self.filter_dim, self.filter_dim)
        self.filter = self.filter/np.sum(self.filter, axis=0)
        self.pad = pad
        self.alpha = alpha

    def convolving(self, X, fil, dimen_x, dimen_y):
        z = np.zeros((self.n_filters, dimen_x, dimen_y))
        for k in range(self.n_filters):
          for i in range(dimen_x):
              for ii in range(dimen_y):
                  temp = np.multiply(X[i : i+self.filter_dim, ii : ii+self.filter_dim], fil[k])
                  z[k,i,ii] = temp.sum() + self.bias[k,0]
        return z


    def forward_pass(self, X, saved_weights = None):
        if saved_weights != None:
          self.filter = saved_weights[0]
          self.bias = saved_weights[1]

        self.X = np.pad(X , ((0, 0), (self.pad, self.pad), (self.pad, self.pad)),'constant', constant_values=0)
        (d, p, t) = self.X.shape
        dimen_x = int(((p - self.filter_dim)/self.stride) + 1)
        dimen_y = int(((t - self.filter_dim)/self.stride) + 1)
        self.z = np.zeros((d, self.n_filters, dimen_x, dimen_y))
        for i in range(d):
            self.z[i] = self.convolving(self.X[i], self.filter, dimen_x, dimen_y)

        return self.z

    def backprop(self, grad_z):
        (d, f, p, t) = grad_z.shape

        self.grads = np.zeros((d, p, t))
        # for i in range(d):
        #   for k in range(self.n_filters):
        #     filter_1 = np.flip((np.flip(self.filter[k], axis = 0)), axis = 1)
        #     self.grads[i] += self.convolving(np.pad(grad_z[i,k], ((1,1), (1,1)), 'constant', constant_values = 0), filter_1, p, t)

        # self.grads /= self.n_filters
        # self.grads = np.pad(self.grads, ((0,0),(1,1),(1,1)), 'constant', constant_values = 0)

        self.grad_filter = np.zeros((self.n_filters, self.filter_dim, self.filter_dim))

        for k in range(self.n_filters):
          for i in range(self.filter_dim):
              for ii in range(self.filter_dim):
                  # print(grad_filter[k, i, ii].shape, grad_z[:,k,:,:].shape)
                  self.grad_filter[k, i, ii] = (np.multiply(grad_z[:,k,:,:], self.X[:, i:p+i, ii:t+ii])).sum()
        self.grad_filter = self.grad_filter /(grad_z.shape[2]*grad_z.shape[3])

        self.grad_bias = np.zeros_like(self.bias)

        for k in range(self.n_filters):
          self.grad_bias[k] = (grad_z[:,k].sum()) /(grad_z.shape[2]*grad_z.shape[3])

        return self.grads

    def applying_sgd(self):
        self.filter = self.filter - (self.alpha*self.grad_filter)
        self.bias = self.bias - (self.alpha*self.grad_bias)

    def change_alpha(self):
        self.alpha = self.alpha/10

    def saving_params(self):
        return (self.filter, self.bias)

In [None]:
X_testing = X_train#.reshape(60000,1,28,28)
Y_testing = y_train
X_testing = X_testing/255
al = 0.2

complete_NN = Neural_Network([
                                # padding(),
                                Convolutional_Layer(alpha = al,num_of_filters = 3,pad = 1),
                                pooling(),
                                relu(),
                                reshaping(),
                                Linear_Layer(392, 100, alpha = al),
                                relu(),
                                Linear_Layer(100, 10, alpha = al),
                                softmax()

                                ])
CE = cross_entropy()

acc = accuracy()
epochs = 10
done = 0
for i in range(epochs):
    k = 0
    for ii in range(6000, X_testing.shape[0] + 1, 6000):

        out = complete_NN.forward_pass(X_testing[k:ii,:,:])
        print("epoch:{} \t batch: {} \t loss: \t {}".format(i+1, int(ii/6000), CE.loss(out, Y_testing[k:ii])), end="\t")
        accuracy_val = acc.value(out, Y_testing[k:ii])*100
        print("accuracy: {}".format(accuracy_val))

        if ((accuracy_val>=80) and (done==0)):
            complete_NN.change_alpha()
            done += 1
        if ((accuracy_val>=85) and (done==1)):
            complete_NN.change_alpha()
            done += 1

        if ((accuracy_val>=90) and (done==2)):
            complete_NN.change_alpha()
            done += 1

        if ((accuracy_val>=95) and (done==3)):
            complete_NN.change_alpha()
            done += 1

        complete_NN.backprop(Y_testing[k:ii])
        complete_NN.applying_sgd()
        k = ii


In [None]:
out_1 = complete_NN.forward_pass(X_test)
print("The accuracy on test set is {}".format(acc.value(out_1, y_test)*100))



---



In [None]:
saved_params = complete_NN.saving_params()
len(saved_params)

In [None]:
import json
import numpy as np

# Function to convert NumPy arrays to a serializable format
def convert_array(arr):
    if isinstance(arr, np.ndarray):
        return arr.tolist()  # Convert NumPy array to Python list
    else:
        return arr

# Convert tuples to a serializable format (e.g., JSON)
serialized_data = json.dumps(saved_params, default=convert_array)

# Define the file path in Google Drive
file_path = '/content/drive/MyDrive/Colab Notebooks/Saved_Models/mnist_japanese_cnn.txt'

# Write the serialized data to the file in Google Drive
with open(file_path, 'w') as file:
    file.write(serialized_data)

print("Data saved successfully to Google Drive.")


In [None]:
import json
import numpy as np

# Function to convert lists back to NumPy arrays
def convert_array(obj):
    if isinstance(obj, list):
        return np.array(obj)
    else:
        return obj

# Define the file path in Google Drive
file_path = '/content/drive/MyDrive/Colab Notebooks/Saved_Models/mnist_japanese_cnn.txt'

# Read the serialized data from the file
with open(file_path, 'r') as file:
    serialized_data = file.read()

# Deserialize the JSON data
loaded_params = json.loads(serialized_data, object_hook=convert_array)

print("Data loaded successfully from Google Drive.")


In [None]:
len(loaded_params)

In [None]:
out2 = complete_NN.predict(X_test,loaded_params)
print("The testing loss is {}".format(CE.loss(out2, y_test)))
print("The accuracy on test set is {}".format(acc.value(out2, y_test)*100))



---



---

