In [1]:
import tensorflow as tf
import numpy as np

from tensorflow.keras.datasets import mnist

![title](images/CNN.png)

In [43]:
class ConvLayer(tf.keras.layers.Layer):
    def __init__(self, activation, input_channels, output_channels, window_size, pool_size, filt_stride, pool_stride,
        initializer=tf.keras.initializers.he_normal()):
        super(ConvLayer, self).__init__()
        self.initializer = initializer
        self.activation = activation
        self.input_channels = input_channels
        self.output_channels = output_channels
        self.window_size = window_size
        self.pool_size = pool_size
        self.filt_stride = filt_stride
        self.pool_stride = pool_stride
        self.w = self.add_weight(shape=(window_size[0], window_size[1], input_channels, output_channels),
                                 initializer=self.initializer,trainable=True)
        
        self.b = self.add_weight(shape=(output_channels,), initializer=tf.zeros_initializer, trainable=True)
    
    def call(self, inputs):
        filt_stride = [1, self.filt_stride[0], self.filt_stride[1], 1]
        out_layer = tf.nn.conv2d(inputs, self.w, filt_stride, padding='SAME')
        # add the bias
        out_layer += self.b
        out_layer = self.activation(out_layer)
        pool_shape = [1, self.pool_size[0], self.pool_size[1], 1]
        pool_strides = [1, self.pool_stride[0], self.pool_stride[1], 1]
        out_layer = tf.nn.max_pool(out_layer, ksize=pool_shape, strides=pool_strides, padding='SAME')
        return out_layer

* O primeiro argumento é a função de ativação
* O segundo é o numero de canais de entrada (input channel) para a primeira camada convolucional, que é 1 pois corresponde ao único canal de uma imagem em escala de cinza
* O terceiro é o número de canais de saída, que será 32. Ou seja, terão 32 janelas deslizantes (convoluções)
* Para o segundo Layer a entrada serão os 32 canais gerados no primero e a saída sera 64 canais
* O quarto é o tamanho da janela usada para a convolução, nesse caso será uma janela de 5x5
* o quinto é o tamanho da janela do pooling que é o responsavél por diminuir features e generalizar mais o modelo (2x2)
* o sexto e o sétimo são strides para a camada de convolução e pooling, são eles que dizem como a janela deslizante irá se movimentar.
* o último é um inicializador de pesos

In [44]:
model = tf.keras.Sequential([
    ConvLayer(tf.nn.relu, 1, 32, [5, 5], [2, 2], [1, 1], [2, 2]),
    ConvLayer(tf.nn.relu, 32, 64, [5, 5], [2, 2], [1, 1], [2, 2]),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(300, activation=tf.nn.relu, kernel_initializer=tf.keras.initializers.he_normal()),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(10, activation=None)
])

In [45]:
def get_batch(x, y, size):
    idxs = np.random.randint(0, len(y), size)
    return x[idxs,:,:], y[idxs]

In [48]:
def loss_fn(logits, labels):
    cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels = labels, 
                                                                           logits= logits))
    return cross_entropy

In [85]:
optimizer = tf.keras.optimizers.Adam()
iterations = 1000
batch_size = 32
train_writer = tf.summary.create_file_writer("tf_vis/cnn_mnist")

(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train / 255.0
x_test = x_test / 255.0
x_test = tf.Variable(x_test)
x_test = tf.cast(x_test, tf.float32)
x_test = tf.reshape(x_test, (len(x_test),28, 28, 1))

for i in range(iterations):
    batch_x, batch_y = get_batch(x_train, y_train, batch_size)
    
    # create tensors
    batch_x = tf.Variable(batch_x)
    batch_y = tf.Variable(batch_y)
    batch_y = tf.cast(batch_y, tf.int32)
    batch_y_one_hot = tf.one_hot(batch_y, 10)
    
    # get the images in the right format
    batch_x = tf.cast(batch_x, tf.float32)
    batch_x = tf.reshape(batch_x, (batch_size, 28, 28, 1))
            
    with tf.GradientTape() as tape:
        logits = model(batch_x)
        loss = loss_fn(logits, batch_y_one_hot)
        
        
        
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    if i % 50 == 0:
        max_idxs = tf.argmax(logits, axis=1)
        
        train_acc = np.sum(max_idxs.numpy() == batch_y.numpy()) / len(batch_y)
        test_logits = model(x_test, training=False)
        max_idxs = tf.argmax(test_logits, axis=1)
        test_acc = np.sum(max_idxs.numpy() == y_test) / len(y_test)
        print(f"Iter: {i}, loss={loss:.3f}, train accuracy={train_acc * 100:.3f}%, test accuracy={test_acc * 100:.3f}%")
        with train_writer.as_default():
            tf.summary.scalar('loss', loss, step=i)
            tf.summary.scalar('train_accuracy', train_acc, step=i)
            tf.summary.scalar('test_accuracy', test_acc, step=i)
# determine the test accuracy
logits = model(x_test, training=False)
max_idxs = tf.argmax(logits, axis=1)
acc = np.sum(max_idxs.numpy() == y_test) / len(y_test)
print("Final test accuracy is {:.2f}%".format(acc * 100))

Iter: 0, loss=0.170, train accuracy=96.875%, test accuracy=98.820%
Iter: 50, loss=0.091, train accuracy=96.875%, test accuracy=98.640%
Iter: 100, loss=0.001, train accuracy=100.000%, test accuracy=98.750%
Iter: 150, loss=0.010, train accuracy=100.000%, test accuracy=97.870%
Iter: 200, loss=0.013, train accuracy=100.000%, test accuracy=98.420%
Iter: 250, loss=0.625, train accuracy=90.625%, test accuracy=98.030%
Iter: 300, loss=0.160, train accuracy=96.875%, test accuracy=98.570%
Iter: 350, loss=0.213, train accuracy=96.875%, test accuracy=98.700%
Iter: 400, loss=0.062, train accuracy=96.875%, test accuracy=98.740%
Iter: 450, loss=0.010, train accuracy=100.000%, test accuracy=98.240%
Iter: 500, loss=0.023, train accuracy=100.000%, test accuracy=98.900%
Iter: 550, loss=0.037, train accuracy=96.875%, test accuracy=98.690%
Iter: 600, loss=0.054, train accuracy=96.875%, test accuracy=98.570%
Iter: 650, loss=0.118, train accuracy=96.875%, test accuracy=98.680%
Iter: 700, loss=0.075, train acc