# Lab 08-2: DNNs with TensorFlow
## Exercise: Predicting MNIST Digits

Prepare MNIST Dataset

In [112]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import time

# tf.config.set_visible_devices([], 'GPU')

# Load the original MNIST dataset
# MNIST dataset contains 60000 training images and 10000 test images of 28x28 pixels
# Each image has a hand-written digit
(X_train_org, y_train_num), (X_test_org, y_test) = tf.keras.datasets.mnist.load_data()

# flatten the last dimension for DNN
X_train_org = tf.reshape(X_train_org, (X_train_org.shape[0], -1))
X_test_org = tf.reshape(X_test_org, (X_test_org.shape[0], -1))

# Digits data has range of [0,255], which often lead too big exponential values
# so make them normal distribution of [0,1] with the sklearn package, or you can just divide them by 255
X_train = tf.cast(X_train_org / 255, dtype=tf.float32)
X_test = tf.cast(X_test_org / 255, dtype=tf.float32)

# Transform Nx1 Y vector to Nx10 answer vector, so that we can perform one-to-all classification
y_train = tf.one_hot(y_train_num, 10)

# Split training dataset into training and validation
X_val = X_train[50000:60000]
y_val = y_train[50000:60000]

X_train = X_train[:50000]
y_train = y_train[:50000]

n_classes = 10
n_features = 784

Predifined Functions

In [113]:
def create_mini_batches(X, y, batch_size=64):
    mini_batches = []
    data = tf.concat([X, y], axis=-1)
    tf.random.shuffle(data)
    n_classes = y.shape[1]
    n_minibatches = (data.shape[0] // batch_size)
  
    for i in range(n_minibatches):
        mini_batch = data[i * batch_size:(i + 1)*batch_size, :]
        X_mini = mini_batch[:, :-n_classes]
        Y_mini = mini_batch[:, -n_classes:]
        mini_batches.append((X_mini, Y_mini))
    
    if data.shape[0] % batch_size != 0:
        mini_batch = data[n_minibatches * batch_size:data.shape[0]]
        X_mini = mini_batch[:, :-n_classes]
        Y_mini = mini_batch[:, -n_classes:]
        mini_batches.append((X_mini, Y_mini))
        
    return mini_batches

## Network Definition with TensorFlow Core (Low Level)
In this exercise, do not use keras layers.

In [114]:
class tfDense(tf.Module):

    def __init__(self, n_out, n_in, activation='relu', rate=0.0, batchnorm=False, name=None):
        super().__init__(name=name)
        self.activation = activation
        self.rate = rate
        self.batchnorm = batchnorm
        # variables for Dense layer
        self.w = tf.Variable(tf.zeros([n_in, n_out]), name='w')             # weight
        self.b = tf.Variable(tf.zeros([n_out]), name='b')                   # bias
        # variables for batch normalization
        self.mn = tf.Variable(tf.zeros([n_out]), trainable=False, name='m') # running mean for BN
        self.va = tf.Variable(tf.ones([n_out]), trainable=False, name='v')  # running variance for BN
        self.gm = tf.Variable(tf.ones([n_out]), name='s')                   # gamma (=scale) for BN
        self.bt = tf.Variable(tf.zeros([n_out]), name='o')                  # beta (=offset) for BN
        self.mm = 0.9                                                       # momentum parameter for BN

    def __call__(self, x, training=False):
        ### START CODE HERE ###
        
        x = tf.matmul(x, self.w) + self.b       # linear prediction
        
        # batch normalization
        if self.batchnorm:
            tmp_mn, tmp_va = self.mn, self.va
            if training:
                new_mn, new_va = tf.nn.moments(x, axes=[0])    # find mean and variance; check tf.nn.moments
                self.mn = self.mm * self.mn + (1 - self.mm) * new_mn           # update running mean
                self.va = self.mm * self.va + (1 - self.mm) * new_va           # update running variance
                tmp_mn, tmp_va = new_mn, new_va
            x = tf.nn.batch_normalization(x, tmp_mn, tmp_va, self.gm, self.bt, 1e-8)                     # batch_normalization function; check tf.nn.batch_normalization
        
        # activation function
        if self.activation=='sigmoid':
            x = tf.math.sigmoid(x)
        elif self.activation=='softmax':
            x = tf.nn.softmax(x)
        elif self.activation=='relu':
            x = tf.nn.relu(x)
        else: 
            print('activation type error')
        
        # dropout
        x = tf.nn.dropout(x, rate=self.rate)                         # dropout is only used in training mode; check tf.nn.dropout

        ### END CODE HERE ###
        return x

Create a DNN model

In [115]:
# define network
n_hd1 = 100
n_hd2 = 60
n_hd3 = 30

l1 = tfDense(n_out=n_hd1, n_in=n_features, activation='relu', rate=0.5, batchnorm=False)
l2 = tfDense(n_out=n_hd2, n_in=n_hd1, activation='relu', rate=0.0, batchnorm=True)
l3 = tfDense(n_out=n_hd3, n_in=n_hd2, activation='relu', rate=0.0, batchnorm=True)
l4 = tfDense(n_out=n_classes, n_in=n_hd3, activation='softmax', rate=0.0, batchnorm=False)

vars = [l1.w, l1.b, l2.w, l2.b, l2.gm, l2.bt, l3.w, l3.b, l3.gm, l3.bt, l4.w, l4.b]

Define Training Functions

In [116]:
# def my_forward(l1, l2, l3, l4, X_in, y_true, training=False):
# def my_backward(l1, l2, l3, l4, X_in, y_true):
# We are going to use GradientTape, so no more forward & backward definition 

def my_loss(l1, l2, l3, l4, X_in, y_true, training=False):
    ### START CODE HERE ###

    # calculate loss
    a_1 = l1.__call__(X_in, training=training)                       # first layer
    a_2 = l2.__call__(a_1, training=training)                       # second layer
    a_3 = l3.__call__(a_2, training=training)                       # third layer
    a_4 = l4.__call__(a_3, training=training)                       # last layer
    loss = -tf.reduce_mean(y_true * tf.math.log(a_4))                      # calculate loss

    # calculate accuracy; correct prediction over total prediction
    cmp = tf.where(y_true)[:,1] == tf.argmax(a_4, axis=1)                       # is it correct?
    acc = tf.reduce_mean(tf.cast(cmp, dtype=tf.float16))                       # how many prediction is correct?

    ### END CODE HERE ###
    return loss, acc
    
def my_predict(l1, l2, l3, l4, X_in):
    ### START CODE HERE ###

    a_1 = l1.__call__(X_in)                       # first layer prediction
    a_2 = l2.__call__(a_1)                       # second layer prediction
    a_3 = l3.__call__(a_2)                       # third layer prediction
    a_4 = l4.__call__(a_3)                       # last layer prediction
    pred = tf.argmax(a_4, axis=1)                      # determine class

    ### END CODE HERE ###
    return pred

Initialize Weights

In [117]:
def my_initializer(lyr, pdf='normal'):
    w_shape = lyr.w.shape            # (i,c)
    fan_in, fan_out = w_shape

    if pdf=='he_normal':
        lyr.w = tf.random.normal(w_shape) * tf.sqrt(2/fan_in)
    elif pdf=='xavier_normal':
        lyr.w = tf.random.normal(w_shape) * tf.sqrt(2/(fan_out + fan_in))
    elif pdf=='normal':
        lyr.w = tf.random.normal(w_shape)
    else:
        print('initializer error')

    return

# Weights are initialized to...
my_initializer(l1, pdf='he_normal')
my_initializer(l2, pdf='he_normal')
my_initializer(l3, pdf='he_normal')
my_initializer(l4, pdf='he_normal')

### Deep Neural Network Using GradientTape

In [118]:
alpha = 0.001
opt = tf.optimizers.Adam(alpha)

n_epochs = 30

for epoch in range(n_epochs):

    start = time.time()    
    loss_J = 0
    mini_batches = create_mini_batches(X_train, y_train, batch_size=64)

    for mini_batch in mini_batches:
        X_mini, y_mini = mini_batch
        mb_len = X_mini.shape[0]

        # Forward Prediction Path
        with tf.GradientTape() as tape:
            loss, acc = my_loss(l1, l2, l3, l4, X_mini, y_mini, training=True)

        # Backward Gradient Path
        grad = tape.gradient(loss, vars)
        opt.apply_gradients([(grd, var) for (grd, var) in zip(grad, vars)
                             if grd is not None])

        loss_J += loss
    
    loss_J = loss_J / (X_train.shape[0]/64)

    # Just to show progress
    if ((epoch+1)%2==0):
        loss_V, acc_V = my_loss(l1, l2, l3, l4, X_val, y_val, training=False)
        tf.print('Epoch: %4d' % (epoch+1), 'Elapsed_t: %4.2fs' % (time.time()-start), 'loss: %10.8f' % (loss_J),
                 '- val_loss: %10.8f' % (loss_V), 'val_acc: %10.8f' % (acc_V))

Epoch:    2 Elapsed_t: 11.30s loss: 0.18529400 - val_loss: 0.17627473 val_acc: 0.41015625


KeyboardInterrupt: ignored

Network Evaluation

In [None]:
from sklearn.metrics import accuracy_score

y_prd = my_predict(l1, l2, l3, l4, X_test)

print(y_test[0:10])
print(y_prd[0:10].numpy())

accuracy_score(y_prd, y_test)


Test Prediction

In [None]:
idx = np.random.randint(X_test_org.shape[0])
plt.matshow(tf.reshape(X_test_org[idx], (28,28)))
plt.gray()
plt.show()

X_in = tf.expand_dims(X_test[idx],0)

y_pred = my_predict(l1, l2, l3, l4, X_in)

print('My prediction is ' + str(y_pred[0].numpy()))
print('Actual number is ' + str(y_test[idx]))