Congratulations on completing the last module of this learning path! In this module, you learned about basic TensorFlow concepts such as tensors, variables, automatic differentiation, eager execution, and graph execution. You then used those concepts to re-implement the model, training, testing, and prediction code from module 1, but this time at a lower level, using TensorFlow. You're now prepared to customize your code in case you ever need more flexibility than Keras offers.

Here's the complete training, testing, and prediction code, for your future reference:

In [1]:
import gzip
import numpy as np
import tensorflow as tf
from typing import Tuple
import time
import requests
from PIL import Image


class NeuralNetwork(tf.keras.Model):
  def __init__(self):
    super(NeuralNetwork, self).__init__()
    initializer = tf.keras.initializers.GlorotUniform()
    self.W1 = tf.Variable(initializer(shape=(784, 20)))
    self.b1 = tf.Variable(tf.zeros(shape=(20,)))
    self.W2 = tf.Variable(initializer(shape=(20, 10)))
    self.b2 = tf.Variable(tf.zeros(shape=(10,)))

  def call(self, x: tf.Tensor) -> tf.Tensor:
    x = tf.reshape(x, [-1, 784])
    x = tf.matmul(x, self.W1) + self.b1
    x = tf.nn.relu(x)
    x = tf.matmul(x, self.W2) + self.b2
    return x


labels_map = {
    0: 'T-Shirt',
    1: 'Trouser',
    2: 'Pullover',
    3: 'Dress',
    4: 'Coat',
    5: 'Sandal',
    6: 'Shirt',
    7: 'Sneaker',
    8: 'Bag',
    9: 'Ankle Boot',
  }


def read_images(path: str, image_size: int, num_items: int) -> np.ndarray:
  with gzip.open(path, 'rb') as file:
    data = np.frombuffer(file.read(), np.uint8, offset=16)
    data = data.reshape(num_items, image_size, image_size)
  return data


def read_labels(path: str, num_items: int) -> np.ndarray:
  with gzip.open(path, 'rb') as file:
    data = np.frombuffer(file.read(num_items + 8), np.uint8, offset=8)
    data = data.astype(np.int64)
  return data


def get_data(batch_size: int) -> Tuple[tf.data.Dataset, tf.data.Dataset]:
  image_size = 28
  num_train = 60000
  num_test = 10000

  training_images = read_images('data/FashionMNIST/raw/train-images-idx3-ubyte.gz', image_size, num_train)
  test_images = read_images('data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz', image_size, num_test)
  training_labels = read_labels('data/FashionMNIST/raw/train-labels-idx1-ubyte.gz', num_train)
  test_labels = read_labels('data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz', num_test)

  # (training_images, training_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()

  train_dataset = tf.data.Dataset.from_tensor_slices((training_images, training_labels))
  test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))

  train_dataset = train_dataset.map(lambda image, label: (float(image) / 255.0, label))
  test_dataset = test_dataset.map(lambda image, label: (float(image) / 255.0, label))

  train_dataset = train_dataset.batch(batch_size).shuffle(500)
  test_dataset = test_dataset.batch(batch_size).shuffle(500)

  return (train_dataset, test_dataset)


@tf.function
def fit_one_batch(X: tf.Tensor, y: tf.Tensor, model: tf.keras.Model, loss_fn: tf.keras.losses.Loss, 
optimizer: tf.keras.optimizers.Optimizer) -> Tuple[tf.Tensor, tf.Tensor]:
  with tf.GradientTape() as tape:
    y_prime = model(X, training=True)
    loss = loss_fn(y, y_prime)

  grads = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(grads, model.trainable_variables))

  return (y_prime, loss)


def fit(dataset: tf.data.Dataset, model: tf.keras.Model, loss_fn: tf.keras.losses.Loss, 
optimizer: tf.optimizers.Optimizer) -> None:
  batch_count = len(dataset)
  loss_sum = 0
  correct_item_count = 0
  current_item_count = 0
  print_every = 100

  for batch_index, (X, y) in enumerate(dataset):
    (y_prime, loss) = fit_one_batch(X, y, model, loss_fn, optimizer)

    y = tf.cast(y, tf.int64)
    correct_item_count += (tf.math.argmax(y_prime, axis=1) == y).numpy().sum()

    batch_loss = loss.numpy()
    loss_sum += batch_loss
    current_item_count += len(X)

    if ((batch_index + 1) % print_every == 0) or ((batch_index + 1) == batch_count):
      batch_accuracy = correct_item_count / current_item_count * 100
      print(f'[Batch {batch_index + 1:>3d} - {current_item_count:>5d} items] accuracy: {batch_accuracy:>0.1f}%, loss: {batch_loss:>7f}')


@tf.function
def evaluate_one_batch(X: tf.Tensor, y: tf.Tensor, model: tf.keras.Model, 
loss_fn: tf.keras.losses.Loss) -> Tuple[tf.Tensor, tf.Tensor]:
  y_prime = model(X, training=False)
  loss = loss_fn(y, y_prime)

  return (y_prime, loss)


def evaluate(dataset: tf.data.Dataset, model: tf.keras.Model, 
loss_fn: tf.keras.losses.Loss) -> Tuple[float, float]:
  batch_count = len(dataset)
  loss_sum = 0
  correct_item_count = 0
  current_item_count = 0

  for (X, y) in dataset:
    (y_prime, loss) = evaluate_one_batch(X, y, model, loss_fn)

    correct_item_count += (tf.math.argmax(y_prime, axis=1).numpy() == y.numpy()).sum()
    loss_sum += loss.numpy()
    current_item_count += len(X)

  average_loss = loss_sum / batch_count
  accuracy = correct_item_count / current_item_count
  return (average_loss, accuracy)


def training_phase():
  learning_rate = 0.1
  batch_size = 64
  epochs = 5

  (train_dataset, test_dataset) = get_data(batch_size)

  model = NeuralNetwork()

  loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
  optimizer = tf.optimizers.SGD(learning_rate)

  print('\nFitting:')
  t_begin = time.time()
  for epoch in range(epochs):
    print(f'\nEpoch {epoch + 1}\n-------------------------------')
    fit(train_dataset, model, loss_fn, optimizer)
  t_elapsed = time.time() - t_begin
  print(f'\nTime per epoch: {t_elapsed / epochs :>.3f} sec' )

  print('\nEvaluating:')
  (test_loss, test_accuracy) = evaluate(test_dataset, model, loss_fn)
  print(f'Test accuracy: {test_accuracy * 100:>0.1f}%, test loss: {test_loss:>8f}')

  model.save_weights('outputs/weights')


@tf.function
def predict(model: tf.keras.Model, X: np.ndarray) -> tf.Tensor:
  y_prime = model(X, training=False)
  probabilities = tf.keras.layers.Softmax(axis=1)(y_prime)
  predicted_indices = tf.math.argmax(input=probabilities, axis=1)
  return predicted_indices


def inference_phase():
  print('\nPredicting:')

  model = NeuralNetwork()
  model.load_weights('outputs/weights')

  url = 'https://raw.githubusercontent.com/MicrosoftDocs/tensorflow-learning-path/main/intro-keras/predict-image.png'

  with Image.open(requests.get(url, stream=True).raw) as image:
    X = np.asarray(image, dtype=np.float32).reshape((-1, 28, 28)) / 255.0

  predicted_vector = model.predict(X)
  predicted_index = np.argmax(predicted_vector)
  predicted_name = labels_map[predicted_index]

  print(f'Predicted class: {predicted_name}')


training_phase()
inference_phase()



Fitting:

Epoch 1
-------------------------------
[Batch 100 -  6400 items] accuracy: 62.9%, loss: 0.675659
[Batch 200 - 12800 items] accuracy: 68.4%, loss: 0.427568
[Batch 300 - 19200 items] accuracy: 71.7%, loss: 0.630323
[Batch 400 - 25600 items] accuracy: 73.5%, loss: 0.448896
[Batch 500 - 31968 items] accuracy: 74.9%, loss: 0.473284
[Batch 600 - 38368 items] accuracy: 75.8%, loss: 0.420568
[Batch 700 - 44768 items] accuracy: 76.6%, loss: 0.495648
[Batch 800 - 51168 items] accuracy: 77.2%, loss: 0.811230
[Batch 900 - 57568 items] accuracy: 77.6%, loss: 0.714009
[Batch 938 - 60000 items] accuracy: 77.8%, loss: 0.662069

Epoch 2
-------------------------------
[Batch 100 -  6400 items] accuracy: 83.0%, loss: 0.513966
[Batch 200 - 12800 items] accuracy: 82.8%, loss: 0.350917
[Batch 300 - 19200 items] accuracy: 82.9%, loss: 0.503186
[Batch 400 - 25600 items] accuracy: 82.9%, loss: 0.464897
[Batch 500 - 32000 items] accuracy: 83.0%, loss: 0.417334
[Batch 600 - 38400 items] accuracy: 83