In [1]:
from __future__ import absolute_import, division, print_function

import tensorflow as tf
import numpy as np
import gzip

tf.__version__

'2.4.0'

In [2]:
# MNIST dataset parameters.
NUM_CHANNELS = 1  # BW images
NUM_CLASSES = 10
PIXEL_DEPTH = 255  # pixel value [0,255]
DATA_TYPE = 'tf.float32'
VAL_SET_SIZE = 0  # create validation set from training set

# Training parameters.
learning_rate = 0.001
training_steps = 500
batch_size = 128
display_step = 10

# Network parameters.
SEED = 2
CONV1_DEEP = 32  # No.filters for 1st Conv layer.
CONV1_SIZE = 5  # size of 1st Conv layer.
CONV2_DEEP = 64  # No.filters for 2nd Conv layer.
CONV2_SIZE = 5  # size of 2nd Conv layer.
FC_SIZE = 512  # No. neurons for 1st FC layer.

# File parameters
DATA_PATH = 'mnist_'
TRAIN_IMAGES_PATH = 'train-images-idx3-ubyte.gz'
TRAIN_LABELS_PATH = 'train-labels-idx1-ubyte.gz'
TEST_IMAGES_PATH = 't10k-images-idx3-ubyte.gz'
TEST_LABELS_PATH = 't10k-labels-idx1-ubyte.gz'

In [3]:
# Prepare MNIST data.
# Functions for extracting data from files
def read32(bytestream):
    dt = np.dtype(np.uint32)
    dt = dt.newbyteorder('>')
    return np.frombuffer(bytestream.read(4), dtype=dt)[0]


def extract_images(filename):
    """ extract images into 4D uint8 tensor:
        - 32 bits magic number == 2051
        - 3x32 bits num_images, rows, cols
        - data, reshaped to 4D tensor [image index, y, x, channels]
    """
    print('Extracting image file', filename)
    with gzip.open(filename) as bytestream:
        magic = read32(bytestream)
        if magic != 2051:
            raise ValueError('Invalid magic number %d in MNIST image file: %s' % (magic, filename))
        num_images = read32(bytestream)
        rows = read32(bytestream)
        cols = read32(bytestream)
        buf = bytestream.read(rows * cols * num_images * NUM_CHANNELS)
        data = np.frombuffer(buf, dtype=np.uint8)
        data = data.reshape(num_images, rows, cols, NUM_CHANNELS)
    return data


def extract_labels(filename):
    """Extract the labels into uint8 tensor
        - 32 bits magic number == 2049
        - 32 bits num_labels
        - labels, 2D tensor [image index, y, x, channels]
    """
    print('Extracting label file', filename)
    with gzip.open(filename) as bytestream:
        magic = read32(bytestream)
        if magic != 2049:
            raise ValueError(
                'Invalid magic number %d in MNIST label file: %s' %
                (magic, filename))
        num_items = read32(bytestream)
        buf = bytestream.read(num_items)
        labels = np.frombuffer(buf, dtype=np.uint8)
        return labels

In [4]:
# Extract images/labels from given dataset
img_train = extract_images(DATA_PATH + TRAIN_IMAGES_PATH)
label_train = extract_labels(DATA_PATH + TRAIN_LABELS_PATH)
img_test = extract_images(DATA_PATH + TEST_IMAGES_PATH)
label_test = extract_labels(DATA_PATH + TEST_LABELS_PATH)

if VAL_SET_SIZE:
    img_validation = img_train[:VAL_SET_SIZE, ...]
    label_validation = label_train[:VAL_SET_SIZE]
    img_train = img_train[VAL_SET_SIZE:, ...]
    label_train = label_train[VAL_SET_SIZE:]
    img_validation = np.array(img_validation, np.float32)
    img_validation = (img_validation - PIXEL_DEPTH / 2) / PIXEL_DEPTH

# Convert to float32.
img_train, img_test = np.array(img_train, np.float32), np.array(img_test, np.float32)
img_train, img_test = (img_train - PIXEL_DEPTH / 2) / PIXEL_DEPTH, (img_test - PIXEL_DEPTH / 2) / PIXEL_DEPTH

# Use tf.data API to shuffle and batch data.
train_data = tf.data.Dataset.from_tensor_slices((img_train, label_train))
train_data = train_data.repeat().shuffle(5000).batch(batch_size).prefetch(1)

Extracting image file mnist_train-images-idx3-ubyte.gz
Extracting label file mnist_train-labels-idx1-ubyte.gz
Extracting image file mnist_t10k-images-idx3-ubyte.gz
Extracting label file mnist_t10k-labels-idx1-ubyte.gz


In [5]:
# Initialize the network
# Create some wrappers for simplicity.
def conv_2d(x, weights, bias, strides=1):
    # Conv2D wrapper, with bias and relu activation.
    x = tf.nn.conv2d(x, weights, strides=[1, strides, strides, 1], padding='SAME')
    x = tf.nn.bias_add(x, bias)
    return tf.nn.relu(x)


def maxpool_2d(x, k=2):
    # MaxPool2D wrapper.
    return tf.nn.max_pool(x, ksize=[1, k, k, 1], strides=[1, k, k, 1], padding='SAME')

In [6]:
# Trainable weights and biases for 2 conv layers and FC layers
# Using TruncatedNormal for initialization
weight_initializer = tf.initializers.TruncatedNormal(stddev=0.1, seed=SEED)
weights = {
    # Conv Layer 1: 5x5 conv, 1 input, 32 filters (MNIST has 1 color channel only).
    'conv1_w': tf.Variable(
        weight_initializer([CONV1_SIZE, CONV1_SIZE, NUM_CHANNELS, CONV1_DEEP])),
    # Conv Layer 2: 5x5 conv, 32 inputs, 64 filters.
    'conv2_w': tf.Variable(
        weight_initializer([CONV2_SIZE, CONV2_SIZE, CONV1_DEEP, CONV2_DEEP])),
    # FC Layer 1: 7*7*64 inputs, 512 units.
    'fc_w': tf.Variable(
        weight_initializer([img_test.shape[1] // 4 * img_test.shape[2] // 4 * CONV2_DEEP, FC_SIZE])),
    # FC Out Layer: 512 inputs, 10 units (total number of classes)
    'out_w': tf.Variable(weight_initializer([FC_SIZE, NUM_CLASSES])),
}

biases = {
    'conv1_b': tf.Variable(tf.zeros([CONV1_DEEP])),
    'conv2_b': tf.Variable(tf.zeros([CONV2_DEEP])),
    'fc_b': tf.Variable(tf.zeros([FC_SIZE])),
    'out_b': tf.Variable(tf.zeros([NUM_CLASSES]))
}

In [7]:
# Build the model.
def conv_net(x):
    # Input: a batch of 28x28 grayscale images
    x = tf.reshape(x, [-1, 28, 28, NUM_CHANNELS])

    # Conv layer. Output shape: 28x28x32 ('SAME' padding)
    conv1 = conv_2d(x, weights['conv1_w'], biases['conv1_b'])
    # Max Pooling. Output shape: 14x14x32
    conv1 = maxpool_2d(conv1, k=2)

    # Conv layer. Output shape: 14x14x64 ('SAME' padding)
    conv2 = conv_2d(conv1, weights['conv2_w'], biases['conv2_b'])
    # Max Pooling. Output shape: 7x7x64
    conv2 = maxpool_2d(conv2, k=2)

    # Reshape conv2 output to fit FC layer input, Output shape: [-1, 7*7*64].
    fc1 = tf.reshape(conv2, [-1, weights['fc_w'].get_shape().as_list()[0]])

    # Fc layer, Output shape: [-1, 1024].
    fc1 = tf.add(tf.matmul(fc1, weights['fc_w']), biases['fc_b'])
    # Apply ReLU to fc1 output for non-linearity.
    fc1 = tf.nn.relu(fc1)

    # Fully connected layer, Output shape: [-1, 10].
    out = tf.add(tf.matmul(fc1, weights['out_w']), biases['out_b'])
    # Apply softmax to normalize the logits to a probability distribution.
    return tf.nn.softmax(out)

In [8]:
# Optimization process.

# Cross-Entropy loss function.
def cross_entropy(y_pred, y_true):
    # Encode label to a one hot vector.
    y_true = tf.one_hot(y_true, depth=NUM_CLASSES)
    # Clip prediction values to avoid log(0) error.
    y_pred = tf.clip_by_value(y_pred, 1e-9, 1.)
    # Compute cross-entropy.
    return tf.reduce_mean(-tf.reduce_sum(y_true * tf.math.log(y_pred)))


# Accuracy metric.
def accuracy(y_pred, y_true):
    # Predicted class is the index of highest score in prediction vector (i.e. argmax).
    correct_prediction = tf.equal(tf.argmax(y_pred, 1), tf.cast(y_true, tf.int64))
    return tf.reduce_mean(tf.cast(correct_prediction, tf.float32), axis=-1)
optimizer = tf.optimizers.Adam(learning_rate)


def run_optimization(x, y):
    # Wrap computation inside a GradientTape for automatic differentiation.
    with tf.GradientTape() as g:
        pred = conv_net(x)
        loss = cross_entropy(pred, y)

    # Compute gradients and update W and b following gradients
    trainable_variables = list(weights.values()) + list(biases.values())
    gradients = g.gradient(loss, trainable_variables)
    optimizer.apply_gradients(zip(gradients, trainable_variables))

In [9]:
# Run training for the given number of steps.
train_losses = []
# val_losses = []

for step, (batch_x, batch_y) in enumerate(train_data.take(training_steps), 1):
    # Run the optimization to update W and b values.
    run_optimization(batch_x, batch_y)

    if step % display_step == 0:
        pred = conv_net(batch_x)
        loss = cross_entropy(pred, batch_y)
        acc = accuracy(pred, batch_y)
        print("step: %i, loss: %f, accuracy: %f" % (step/display_step, loss, acc))
        train_losses.append(loss)

        # Test model on validation set.
        # pred = conv_net(img_validation)
        # val_loss = cross_entropy(pred, label_validation)
        # print("Validation Accuracy: %f" % val_loss)
        # val_losses.append(val_loss/VAL_SET_SIZE)

step: 1, loss: 141.361572, accuracy: 0.695312
step: 2, loss: 59.312401, accuracy: 0.898438
step: 3, loss: 42.479141, accuracy: 0.867188
step: 4, loss: 42.982277, accuracy: 0.906250
step: 5, loss: 25.200092, accuracy: 0.945312
step: 6, loss: 21.710499, accuracy: 0.968750
step: 7, loss: 37.268360, accuracy: 0.906250
step: 8, loss: 16.409821, accuracy: 0.968750
step: 9, loss: 34.755798, accuracy: 0.945312
step: 10, loss: 16.114361, accuracy: 0.953125
step: 11, loss: 10.829327, accuracy: 0.968750
step: 12, loss: 8.173870, accuracy: 0.976562
step: 13, loss: 15.448557, accuracy: 0.968750
step: 14, loss: 19.054787, accuracy: 0.945312
step: 15, loss: 16.696077, accuracy: 0.960938
step: 16, loss: 13.700947, accuracy: 0.960938
step: 17, loss: 11.703341, accuracy: 0.976562
step: 18, loss: 12.377934, accuracy: 0.984375
step: 19, loss: 13.658243, accuracy: 0.976562
step: 20, loss: 7.029793, accuracy: 0.984375
step: 21, loss: 18.095743, accuracy: 0.976562
step: 22, loss: 14.569902, accuracy: 0.97656

In [10]:
# Test model.
pred = conv_net(img_test)
print("Test Accuracy: %f" % accuracy(pred, label_test))

Test Accuracy: 0.986700
