<a href="https://colab.research.google.com/github/phuongnguyen99/Spring22_Porject_Math/blob/main/CNN_MNIST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%tensorflow_version 2.x
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [None]:
import os
import tensorflow as tf
import numpy as np
import math
import timeit
import matplotlib.pyplot as plt

%matplotlib inline

USE_GPU = True

if USE_GPU:
    device = '/device:GPU:0'
else:
    device = '/cpu:0'

# Constant to control how often we print when training models.
print_every = 100
print('Using device: ', device)

Using device:  /device:GPU:0


**Preprocessing MNIST dataset**

In [None]:
from keras.datasets import mnist
def load_mnist(num_training=59000, num_validation=1000, num_test=10000):
    """
    Preprocess MMNIST data set 
    """
    # Load the raw MNIST dataset and use appropriate data types and shapes
    mnist = tf.keras.datasets.mnist.load_data()
    (X_train, y_train), (X_test, y_test) = mnist
    X_train = np.asarray(X_train, dtype=np.float32)
    y_train = np.asarray(y_train, dtype=np.int32).flatten()
    X_test = np.asarray(X_test, dtype=np.float32)
    y_test = np.asarray(y_test, dtype=np.int32).flatten()

    # Subsample the data
    mask = range(num_training, num_training + num_validation)
    X_val = X_train[mask]
    y_val = y_train[mask]
    mask = range(num_training)
    X_train = X_train[mask]
    y_train = y_train[mask]
    mask = range(num_test)
    X_test = X_test[mask]
    y_test = y_test[mask]

    # Normalize the data: subtract the mean pixel and divide by std
    mean_pixel = X_train.mean(axis=(0, 1, 2), keepdims=True)
    std_pixel = X_train.std(axis=(0, 1, 2), keepdims=True)
    X_train = (X_train - mean_pixel) / std_pixel
    X_val = (X_val - mean_pixel) / std_pixel
    X_test = (X_test - mean_pixel) / std_pixel

    return X_train, y_train, X_val, y_val, X_test, y_test

# NHW = (0, 1, 2)
X_train, y_train, X_val, y_val, X_test, y_test = load_mnist()
print('Train data shape: ', X_train.shape)
print('Train labels shape: ', y_train.shape, y_train.dtype)
print('Validation data shape: ', X_val.shape)
print('Validation labels shape: ', y_val.shape)
print('Test data shape: ', X_test.shape)
print('Test labels shape: ', y_test.shape)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Train data shape:  (59000, 28, 28)
Train labels shape:  (59000,) int32
Validation data shape:  (1000, 28, 28)
Validation labels shape:  (1000,)
Test data shape:  (10000, 28, 28)
Test labels shape:  (10000,)


**Since there is no C channels in the dataset yet so we have to add C = 1 to each subsample to convolve in 4 dims (N,H,W,C)**

In [None]:
X_train = X_train.reshape((X_train.shape + (1,)))
X_val = X_val.reshape((X_val.shape + (1,)))
X_test = X_test.reshape((X_test.shape + (1,)))

In [None]:
class Dataset(object):
    def __init__(self, X, y, batch_size, shuffle=False):
        """
        Construct a Dataset object to iterate over data X and labels y
        
        Inputs:
        - X: Numpy array of data, of any shape
        - y: Numpy array of labels, of any shape but with y.shape[0] == X.shape[0]
        - batch_size: Integer giving number of elements per minibatch
        - shuffle: (optional) Boolean, whether to shuffle the data on each epoch
        """
        assert X.shape[0] == y.shape[0], 'Got different numbers of data and labels'
        self.X, self.y = X, y
        self.batch_size, self.shuffle = batch_size, shuffle

    def __iter__(self):
        N, B = self.X.shape[0], self.batch_size
        idxs = np.arange(N)
        if self.shuffle:
            np.random.shuffle(idxs)
        return iter((self.X[i:i+B], self.y[i:i+B]) for i in range(0, N, B))


train_dset = Dataset(X_train, y_train, batch_size=50, shuffle=True)
val_dset = Dataset(X_val, y_val, batch_size=64, shuffle=False)
test_dset = Dataset(X_test, y_test, batch_size=64)

In [None]:
for t, (x, y) in enumerate(train_dset):
    print(t, x.shape, y.shape)
    if t > 5: break

0 (50, 28, 28, 1) (50,)
1 (50, 28, 28, 1) (50,)
2 (50, 28, 28, 1) (50,)
3 (50, 28, 28, 1) (50,)
4 (50, 28, 28, 1) (50,)
5 (50, 28, 28, 1) (50,)
6 (50, 28, 28, 1) (50,)


We use CNN to learn MNIST data set witht the following architecture: <br>
**(CONV-RELU-BATCH-DROPOUT-MAXPOOL)*2 - FULLY CONNECTED - SOFTMAX**

In [None]:
class CustomConvNet(tf.keras.Model):
    def __init__(self, channel_1, channel_2, num_classes):
        super(CustomConvNet, self).__init__()
        ############################################################################
        # TODO: Construct a model that performs well on MNIST                   #
        ############################################################################
  
        initializer = tf.initializers.VarianceScaling(scale=2.0)
      

        self.conv1 = tf.keras.layers.Conv2D(filters = channel_1, kernel_size = (3,3), strides = (1,1),
                                          activation='relu',padding='valid',kernel_initializer=initializer)
        # self.drop_out = tf.keras.layers.Dropout(rate = 0.8)
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.drop_out1 = tf.keras.layers.Dropout(rate = 0.2)
        self.max_pool1 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
        # self.group_norm = tfa.layers.GroupNormalization (groups = 16)
        self.conv2 = tf.keras.layers.Conv2D(filters = channel_2, kernel_size = (5,5), strides = (1,1),
                                          activation='relu',padding='valid',kernel_initializer=initializer)
        self.bn2 = tf.keras.layers.BatchNormalization()
        self.drop_out2 = tf.keras.layers.Dropout(rate = 0.2)
        self.max_pool2 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
        # self.conv3 = tf.keras.layers.Conv2D(filters = channel_3, kernel_size = (5,5), strides = (1,1),
        #                                 activation='relu',padding='valid',kernel_initializer=initializer)
        # self.bn3 = tf.keras.layers.BatchNormalization()
        # self.max_pool3 = tf.keras.layers.MaxPool2D(pool_size=(2, 2))
        # self.drop_out3 = tf.keras.layers.Dropout(rate = 0.5)
        self.fc = tf.keras.layers.Dense(num_classes,kernel_initializer=initializer)
        self.flatten = tf.keras.layers.Flatten()
        # self.global_avg = tf.keras.layers.GlobalAveragePooling2D()
        self.softmax = tf.keras.layers.Softmax()   


    def call(self, input_tensor, training=False):
        ############################################################################
        # TODO: Construct a model that performs well on CIFAR-10                   #
        ############################################################################
       
        x = tf.pad(input_tensor, [[0,0], [2,2], [2,2], [0,0]], 'CONSTANT')
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.drop_out1(x)
        x = self.max_pool1(x)
        x = tf.pad(x, [[0,0], [1,1], [1,1], [0,0]], 'CONSTANT')
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.drop_out2(x)
        x = self.max_pool2(x)
        # x = tf.pad(x, [[0,0], [1,1], [1,1], [0,0]], 'CONSTANT')
        # x = self.conv3(x)
        # x = self.bn3(x)
        # x = self.drop_out3(x)
        # x = self.max_pool3(x)        
        x = self.flatten(x)
        # x = self.global_avg(x)
        x = self.fc(x)
        x = self.softmax(x)

        return x

print_every = 700
num_epochs = 10

channel_1, channel_2, num_classes = 32, 16,10

def model_init_fn():
    mode = None
    model = CustomConvNet(channel_1, channel_2, num_classes)
    return model

def optimizer_init_fn():
    learning_rate = 1e-3
    return tf.keras.optimizers.Adam(learning_rate) 


**Results: Loss and accuracy**

In [None]:
model = model_init_fn()
model.compile(tf.keras.optimizers.Adam(learning_rate=1e-3),
              loss='sparse_categorical_crossentropy',
              metrics=[tf.keras.metrics.sparse_categorical_accuracy])
model.fit(X_train, y_train, batch_size= 50, epochs=5, validation_data= (X_val, y_val))
model.evaluate(X_test, y_test)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


[0.03392622619867325, 0.9900000095367432]