In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models

%matplotlib inline

## Dataset

In [2]:
%%capture
fashion_dataset = keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_dataset.load_data()

In [3]:
print(train_images.shape)
print(train_labels.shape)

(60000, 28, 28)
(60000,)


In [4]:
print("BEFORE NORMALIZING:")
print("IMAGES: ",train_images.max()) # RANGE: 0-255
print("LABELS: ",train_labels.max()) # RANGE: 0-9

BEFORE NORMALIZING:
IMAGES:  255
LABELS:  9


In [5]:
train_images = train_images / 255.0
test_images = test_images / 255.0

print("AFTER NORMALIZING:")
print("IMAGES: ",train_images.max()) # RANGE: 0-1
print("IMAGES: ",test_images.max()) # RANGE: 0-1

AFTER NORMALIZING:
IMAGES:  1.0
IMAGES:  1.0


In [6]:
# plt.figure(figsize=(2,2))
# plt.imshow(train_images[100])

In [7]:
# CONVERT (x,y) -> (x,y,c)
train_images = train_images.reshape(*train_images.shape, 1).astype(np.float32)
test_images = test_images.reshape(*test_images.shape, 1).astype(np.float32)
print(train_images.shape)
print(test_images.shape)

(60000, 28, 28, 1)
(10000, 28, 28, 1)


## Network

In [8]:
class ConvUnit(layers.Layer):
    def __init__(self, out_channels, kernel_size):              # "kernel_size" - 3 or (3,3) | "input_shape" passed to first block
        super().__init__()
        self.conv = layers.Conv2D(out_channels, kernel_size)
        self.bn   = layers.BatchNormalization()
        
    def call(self, input_tensor, training=False):               # "training" - depends on FIT or EVALUATE (BN or DROPOUT)
        t = self.conv(input_tensor)
        t = self.bn(t, training=training)                       # "training" - passed to BN or DROPOUT if present
        t = tf.nn.relu(t)                                       # custom ACTIVATION
        return t
    
class LinearUnit(layers.Layer):
    def __init__(self, out_size, activation):
        super().__init__()
        self.fc = layers.Dense(out_size, activation=activation) # layers ACTIVATION
    
    def call(self, input_tensor):
        t = self.fc(input_tensor)
        return t

In [9]:
class CustomModel(models.Model):
    def __init__(self, image_shape):
        super().__init__()
        self.conv1 = ConvUnit(16, 3)                       # activated - custom
        self.conv2 = ConvUnit(32, 3)
        self.fc1 = LinearUnit(128 , activation='relu')     # activated - layers
        self.out = LinearUnit(10  , activation='softmax')
        # GENERATE SUMMARY
        self.image_shape = image_shape
        self.build(input_shape=(None, *image_shape))
        
    def call(self, input_tensor, training=False):
        t = self.conv1(input_tensor, training=training)    # has BN
        t = self.conv2(t, training=training)
        t = layers.Flatten()(t)                            # FLATTEN returns callable
        t = self.fc1(t)
        t = self.out(t)
        return t
    
    def model(self):
        t = keras.Input(shape=self.image_shape)
        return keras.Model(inputs=[t], outputs=self.call(t))
        

## Default Training & Evaluation

In [10]:
# INIT MODEL
model = CustomModel(image_shape=(28,28,1))

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy', # from_logits = False (DEFAULT) ## EXPLORE ##
              metrics=['accuracy'])

In [11]:
model.model().summary()

Model: "functional_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 28, 28, 1)]       0         
_________________________________________________________________
conv_unit (ConvUnit)         (None, 26, 26, 16)        224       
_________________________________________________________________
conv_unit_1 (ConvUnit)       (None, 24, 24, 32)        4768      
_________________________________________________________________
flatten (Flatten)            (None, 18432)             0         
_________________________________________________________________
linear_unit (LinearUnit)     (None, 128)               2359424   
_________________________________________________________________
linear_unit_1 (LinearUnit)   (None, 10)                1290      
Total params: 2,365,706
Trainable params: 2,365,610
Non-trainable params: 96
___________________________________________

In [14]:
# TRAIN - passes "training=True" to CALL
model.fit(train_images, train_labels, batch_size=64, epochs=1)



<tensorflow.python.keras.callbacks.History at 0x13481b130>

In [15]:
# EVAL - passes "training=False" to CALL
test_loss, test_accuracy = model.evaluate(test_images, test_labels)

print("ACC: ",test_accuracy)

ACC:  0.8751999735832214


## Custom Training & Evaluation

In [23]:
class CustomTrainer(models.Model):
    '''
    PROPERTIES:
        self.trainable_variables = trainable parameters in model [autodetected]
        self.compiled_loss = "callable loss" passed to trainer.compile()
        self.compiled_metrics = "callable metrics" passed to trainer.compile()
        self.metrics = metric results stored in model
        self.optimizer = "callable optimizer" passed to trainer.compile()
        
    OVERRIDE:
        def compile(): - can be used to overrider trainer.compile() and store loss,optimizer,metrics
        def train_step(): - to use in trainer.fit()
        def test_step(): - to use in trainer.evaluate()
    '''
    def __init__(self, model):
        super().__init__()
        self.model = model
    
    # FIT
    def train_step(self, data):
        x, y = data
        
        # FORWARD PROP, LOSS CALC with GRAPH TRACKING
        with tf.GradientTape() as tape:
            y_pred = model(x, training=True)
            loss = self.compiled_loss(y, y_pred)                  # "loss" from trainer.compile()
            
        # GRAD CALCULATE & BACKPROP 
        gradients = tape.gradient(loss, self.trainable_variables) # grad of "loss" wrt "network_weights"
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        
        # RETURN METRICS TO PRINT
        self.compiled_metrics.update_state(y, y_pred)             # "metrics" from trainer.compile()
        return {metric.name: metric.result() for metric in self.metrics}
        
        
    # EVALUATE
    def test_step(self, data):
        x, y = data
        
        y_pred = model(x, training=False)
        
        # RETURN METRICS TO PRINT
        self.compiled_metrics.update_state(y, y_pred)             # "metrics" from trainer.compile()
        return {metric.name: metric.result() for metric in self.metrics}

In [24]:
# INIT MODEL
model = CustomModel(image_shape=(28,28,1))

trainer = CustomTrainer(model)

trainer.compile(optimizer='adam',
                loss='sparse_categorical_crossentropy', # from_logits = False (DEFAULT) ## EXPLORE ##
                metrics=['accuracy'])

In [25]:
trainer.fit(train_images, train_labels, batch_size=64, epochs=1)



<tensorflow.python.keras.callbacks.History at 0x1325ee5b0>

In [26]:
test_loss, test_accuracy = trainer.evaluate(test_images, test_labels)

print("ACC: ",test_accuracy)

ACC:  0.8878999948501587


## Custom Training Loop

In [10]:
BATCH_SIZE = 64
NUM_EPOCHS = 2
LEARNING_RATE = 0.001

In [11]:
# PREPARE TRAIN AND TEST DATA
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(BATCH_SIZE)

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_dataset = test_dataset.batch(BATCH_SIZE)

In [12]:
# INIT MODEL
model = CustomModel(image_shape=(28,28,1))

# INIT TRAIN UTILS
optimizer = keras.optimizers.Adam(lr=LEARNING_RATE)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=False)
accuracy_metric = keras.metrics.SparseCategoricalAccuracy()

In [15]:
# TRAIN RUN
losses, accuracies = [], []

for epoch in tqdm(range(NUM_EPOCHS)):
    
    epoch_loss = 0
    batch_loss_accumulator = 0
    
    for batch_idx, batch in enumerate(train_dataset):
        x, y = batch
        
        # FORWARD PROP, LOSS CALC with GRAPH TRACKING
        with tf.GradientTape() as tape:
            y_preds = model(x, training=True)
            loss = loss_fn(y, y_preds)
            
        # GRAD CALCULATE & BACKPROP
        gradients = tape.gradient(loss, model.trainable_weights)
        optimizer.apply_gradients(zip(gradients, model.trainable_weights))
        
        # UPDATE LOSS ACCUMULATOR & METRICS
        batch_loss_accumulator += loss.numpy()
        accuracy_metric.update_state(y, y_preds)
        
    epoch_loss = batch_loss_accumulator / len(train_dataset)
    epoch_accuracy = accuracy_metric.result().numpy()
    accuracy_metric.reset_states()
    
    losses.append(epoch_loss)
    accuracies.append(epoch_accuracy)
    
    print("EPOCH: ",epoch,"\tACCURACY",round(epoch_accuracy,2),"\tLOSS: ",round(epoch_loss,2))

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=2.0), HTML(value='')))

EPOCH:  0 	ACCURACY 0.91 	LOSS:  0.25
EPOCH:  1 	ACCURACY 0.93 	LOSS:  0.19



In [27]:
# TEST RUN
batch_loss_accumulator = 0

for batch in tqdm(train_dataset):
    x, y = batch
    
    y_preds = model(x, training=False)
    # LOSS
    loss = loss_fn(y, y_preds)
    batch_loss_accumulator += loss.numpy()
    # ACCURACY
    accuracy_metric.update_state(y, y_preds)
    
test_loss = batch_loss_accumulator / len(train_dataset)
test_accuracy = accuracy_metric.result().numpy()

print("ACCURACY",round(test_accuracy,2),"\tLOSS: ",round(test_loss,2))

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=938.0), HTML(value='')))


ACCURACY 0.93 	LOSS:  0.21


## Custom Training Loop without Eager

In [20]:
BATCH_SIZE = 64
NUM_EPOCHS = 2
LEARNING_RATE = 0.001

In [21]:
# PREPARE TRAIN AND TEST DATA
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(BATCH_SIZE)

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_dataset = test_dataset.batch(BATCH_SIZE)

In [22]:
# INIT MODEL
model = CustomModel(image_shape=(28,28,1))

# INIT TRAIN UTILS
optimizer = keras.optimizers.Adam(lr=LEARNING_RATE)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=False)
accuracy_metric = keras.metrics.SparseCategoricalAccuracy()

In [23]:
# EAGER DISABLED TRAIN STEP
@tf.function
def train_step(x,y):
    # FORWARD PROP, LOSS CALC with GRAPH TRACKING
    with tf.GradientTape() as tape:
        y_preds = model(x, training=True)
        loss = loss_fn(y, y_preds)

    # GRAD CALCULATE & BACKPROP
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
    
    return loss, y_preds

In [24]:
# TRAIN RUN
losses, accuracies = [], []

for epoch in tqdm(range(NUM_EPOCHS)):
    
    epoch_loss = 0
    batch_loss_accumulator = 0
    
    for batch_idx, batch in enumerate(train_dataset):
        x, y = batch
        
        # EAGER DISABLED TRAIN STEP
        loss, y_preds = train_step(x,y)
        
        # UPDATE LOSS ACCUMULATOR & METRICS
        batch_loss_accumulator += loss.numpy()
        accuracy_metric.update_state(y, y_preds)
        
    epoch_loss = batch_loss_accumulator / len(train_dataset)
    epoch_accuracy = accuracy_metric.result().numpy()
    accuracy_metric.reset_states()
    
    losses.append(epoch_loss)
    accuracies.append(epoch_accuracy)
    
    print("EPOCH: ",epoch,"\tACCURACY",round(epoch_accuracy,2),"\tLOSS: ",round(epoch_loss,2))

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=2.0), HTML(value='')))

EPOCH:  0 	ACCURACY 0.86 	LOSS:  0.43
EPOCH:  1 	ACCURACY 0.91 	LOSS:  0.25



In [25]:
# EAGER DISABLED TEST STEP
@tf.function
def test_step(x,y):
    y_preds = model(x, training=False)
    loss = loss_fn(y, y_preds)
    
    return loss, y_preds

In [29]:
# TEST RUN
batch_loss_accumulator = 0

for batch in tqdm(train_dataset):
    x, y = batch
    
    # EAGER DISABLED EST STEP
    loss, y_preds = test_step(x,y)
    
    batch_loss_accumulator += loss.numpy()
    # ACCURACY
    accuracy_metric.update_state(y, y_preds)
    
test_loss = batch_loss_accumulator / len(train_dataset)
test_accuracy = accuracy_metric.result().numpy()

print("ACCURACY",round(test_accuracy,2),"\tLOSS: ",round(test_loss,2))

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=938.0), HTML(value='')))


ACCURACY 0.93 	LOSS:  0.21
