<a href="https://colab.research.google.com/github/magikerwin1993/Image-Clustering-tf2/blob/main/mnist_projector_with_custom_loop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import tensorflow as tf

print('TF version:', tf.__version__)

# Training Classifier

## Parameters

In [None]:
LEARNING_RATE = 1e-2
BATCH_SIZE = 128
NUM_EPOCHS = 5

## Data Preparation (using tf.data.Dataset)

In [None]:
mnist = tf.keras.datasets.mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Data normalization
x_train, x_test = x_train / 255.0, x_test / 255.0

# Adds channel dim for conv layer
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)

print(x_train.shape)
print(x_test.shape)

In [None]:
def create_dataset(x, y, shuffle):
  dataset = tf.data.Dataset.from_tensor_slices((x, y))
  dataset = dataset.batch(batch_size=BATCH_SIZE)
  if shuffle:
    dataset = dataset.shuffle(1000)
  return dataset

train_dataset = create_dataset(x_train, y_train, shuffle=True)
eval_dataset = create_dataset(x_test, y_test, shuffle=False)

In [None]:
for batch_x, batch_y in train_dataset:
  print(batch_x.shape, batch_y.shape)
  break

## Model Definition (using Function API)

In [None]:
def create_classifier(input_shape, name):
  input_tensor = tf.keras.layers.Input(input_shape, name='input')

  out = input_tensor
  out = tf.keras.layers.Conv2D(8, 3, 2, activation='relu', name='conv1') (out)
  out = tf.keras.layers.Conv2D(16, 3, 2, activation='relu', name='conv2') (out)
  out = tf.keras.layers.Flatten() (out)
  out = tf.keras.layers.Dense(128, activation='relu', name='fc1') (out)
  out = tf.keras.layers.Dropout(0.2) (out)
  out = tf.keras.layers.Dense(64, activation='relu', name='fc2') (out)
  out = tf.keras.layers.Dropout(0.2) (out)
  out = tf.keras.layers.Dense(10, activation='softmax', name='fc3') (out)
  return tf.keras.Model(inputs=[input_tensor], outputs=[out], name=name)

classifier = create_classifier(input_shape=(28, 28, 1), name='classifier')

In [None]:
classifier.summary()

## Training (using tf.GradientTape)

In [None]:
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
acc_fn = tf.keras.metrics.SparseCategoricalAccuracy()
optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)

In [None]:
@tf.function
def train_step(x, y):
  with tf.GradientTape() as tape:
    # Forward pass
    pred = classifier(x, training=True)
    loss = loss_fn(y, pred)

  # Calculates gradients with respect to every trainable variable
  grad = tape.gradient(loss, classifier.trainable_variables)
  # Back propagation
  optimizer.apply_gradients(zip(grad, classifier.trainable_variables))
  return loss, acc_fn(y, pred)

@tf.function
def eval_step(x, y):    
  # Forward pass
  pred = classifier(x, training=True)
  loss = loss_fn(y, pred)
  acc_fn(y, pred)
  return loss, acc_fn(y, pred)

In [None]:
for epoch in range(NUM_EPOCHS):

  train_loss_lst, train_acc_lst = [], []
  for batch_x, batch_y in train_dataset:
    loss, acc = train_step(batch_x, batch_y)
    train_loss_lst.append(loss)
    train_acc_lst.append(acc)

  eval_loss_lst, eval_acc_lst = [], []
  for batch_x, batch_y in eval_dataset:
    loss, acc = eval_step(batch_x, batch_y)
    eval_loss_lst.append(loss)
    eval_acc_lst.append(acc)

  train_loss = np.mean(train_loss_lst)
  train_acc = np.mean(train_acc_lst)
  eval_loss = np.mean(eval_loss_lst)
  eval_acc = np.mean(eval_acc_lst)
  print(f'Epoch {epoch}, loss: {train_loss:.4f}, accuracy: {train_acc:.4f},'
        f' eval_loss: {eval_loss:.4f}, eval_accuracy: {eval_acc:.4f}')

## Evaluation

In [None]:
eval_loss_lst = []
eval_acc_lst = []
for batch_x, batch_y in eval_dataset:
    loss, acc = eval_step(batch_x, batch_y)
    eval_loss_lst.append(loss)
    eval_acc_lst.append(acc)

eval_loss = np.mean(eval_loss_lst)
eval_acc = np.mean(eval_acc_lst)
print('eval_loss:', eval_loss)
print('eval_acc:', eval_acc)

# Visualization using Embedding Projector

In [None]:
import os
from PIL import Image
from tensorboard.plugins import projector

# Load the TensorBoard notebook extension
%load_ext tensorboard

## Dataset for Visualization

In [None]:
VISUALIZATION_COUNT = 900

x_test_ = x_test[:VISUALIZATION_COUNT]
y_test_ = y_test[:VISUALIZATION_COUNT]
print(x_test_.shape)
print(y_test_.shape)

## Feature Extractor Definition

In [None]:
feature_extractor = tf.keras.Model(inputs=[classifier.input], 
                                   outputs=[classifier.get_layer('fc2').output])
print(feature_extractor.output.shape)

## Embedding Projector Setup

In [None]:
# Sets up a logs directory for Tensorboard
log_dir='logs/mnist-embeddings'
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

In [None]:
# Saves labels to metadata.tsv
classes = ['Zero', 'One', 'Two', 'Three', 'Four', 'Five', 'Six', 'Seven',
           'Eight', 'Nine']
with open(os.path.join(log_dir, 'metadata.tsv'), "w") as f:
  for y in y_test_:
    f.write("{}\n".format(classes[y]))


images_pil = []
for x, y in zip(x_test_, y_test_):
  img_pil = Image.fromarray((x[..., 0] * 255).astype(np.uint8))
  images_pil.append(img_pil)

# Saves sprite image
one_square_size = int(np.ceil(np.sqrt(VISUALIZATION_COUNT)))
master_width = 28 * one_square_size
master_height = 28 * one_square_size
spriteimage = Image.new(
    mode='RGB',
    size=(master_width, master_height),
    color=(0,0,0)  # fully transparent
)

for count, image in enumerate(images_pil):
    div, mod = divmod(count, one_square_size)
    h_loc = 28 * div
    w_loc = 28 * mod
    spriteimage.paste(image, (w_loc, h_loc))

spriteimage.convert('RGB').save(os.path.join(log_dir, 'sprite.jpg'))
spriteimage

In [None]:
# Save the weights we want to analyze as a variable.
features = feature_extractor(x_test_, training=False)
features_var = tf.Variable(features)
print(features_var.shape)

# Create a checkpoint from embedding, the filename and key are the
# name of the tensor.
checkpoint = tf.train.Checkpoint(embedding=features_var)
checkpoint.save(os.path.join(log_dir, "embedding.ckpt"))

In [None]:
# Sets up config
config = projector.ProjectorConfig()
embedding = config.embeddings.add()

# The name of the tensor will be suffixed by `/.ATTRIBUTES/VARIABLE_VALUE`.
embedding.tensor_name = "embedding/.ATTRIBUTES/VARIABLE_VALUE"
embedding.metadata_path = 'metadata.tsv'
embedding.sprite.image_path = 'sprite.jpg'
embedding.sprite.single_image_dim.extend([28, 28])
projector.visualize_embeddings(log_dir, config)

## Visualization using Tensorboard

In [None]:
# Now run tensorboard against on log data we just saved.
%tensorboard --logdir {log_dir}