In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds
import numpy as np
import time

In [None]:
def test(model, test_data, loss_function):
    # test over complete test data

    test_accuracy_aggregator = []
    test_loss_aggregator = []

    for (input, target) in test_data:
        prediction = model(input)
        sample_test_loss = loss_function(target, prediction)
        sample_test_accuracy =  np.argmax(target, axis=1) == np.argmax(prediction, axis=1)
        sample_test_accuracy = np.mean(sample_test_accuracy)
        test_loss_aggregator.append(sample_test_loss.numpy())
        test_accuracy_aggregator.append(np.mean(sample_test_accuracy))

    test_loss = np.mean(test_loss_aggregator)
    test_accuracy = np.mean(test_accuracy_aggregator)

    return test_loss, test_accuracy

def investigate(images, labels):

    # rescaling images to a uniform size
    # not keeping aspect ration
    # images contain cropped photos of cells with black paddings
    image = tf.image.resize(images, (128,128))

    #normalization
    image /= 255 # normalize between 0 and 1
    
    # one_hot encoding labels
    label = tf.one_hot(labels, 2)
    print(label)
    return image, label
    
def train_step(model, input, target, loss_function, optimizer):
    with tf.GradientTape() as tape:
        prediction = model(input)
        loss = loss_function(target, prediction)
        gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss

In [None]:
from tensorflow.keras.layers import Layer

class Model(Layer):

    def __init__(self):
    
        # 3 conv/pool blocks with 2 conv layers each. 2 dense hidden layers.

        # init super class
        super(Model, self).__init__()

        # define layers
        self.conv_layer_1 = tf.keras.layers.Conv2D(filters=16, kernel_size=3, padding='same', activation='relu', input_shape=(128,128,3))
        self.conv_layer_1_2 = tf.keras.layers.Conv2D(filters=16, kernel_size=3, padding='same', activation='relu')
        self.max_pool_layer_1 = tf.keras.layers.MaxPool2D()

        self.conv_layer_2 = tf.keras.layers.Conv2D(filters=32, kernel_size=3, padding='same', activation='relu')
        self.conv_layer_2_2 = tf.keras.layers.Conv2D(filters=32, kernel_size=3, padding='same', activation='relu')
        self.norm_layer_2 = tf.keras.layers.BatchNormalization()
        self.max_pool_layer_2 = tf.keras.layers.MaxPool2D()

        self.conv_layer_3 = tf.keras.layers.Conv2D(filters=64, kernel_size=3, padding='same', activation='relu')
        self.conv_layer_3_2 = tf.keras.layers.Conv2D(filters=64, kernel_size=3, padding='same', activation='relu')
        self.norm_layer_3 = tf.keras.layers.BatchNormalization()
        self.max_pool_layer_3 = tf.keras.layers.MaxPool2D()
        
        self.mlp_input = tf.keras.layers.GlobalAveragePooling2D()
        self.hidden_layer_1 = tf.keras.layers.Dense(units=128, activation=tf.keras.activations.sigmoid)
        self.hidden_layer_2 = tf.keras.layers.Dense(units=64, activation=tf.keras.activations.sigmoid)
        self.output_layer = tf.keras.layers.Dense(units=2, activation=tf.keras.activations.softmax)

    @tf.function
    def call(self, x):

        x = self.conv_layer_1(x)
        x = self.conv_layer_1_2(x)
        x = self.max_pool_layer_1(x)
        
        x = self.conv_layer_2(x)
        x = self.conv_layer_2_2(x)
        x = self.norm_layer_2(x)
        x = self.max_pool_layer_2(x)
        
        x = self.conv_layer_3(x)
        x = self.conv_layer_3_2(x)
        x = self.norm_layer_3(x)
        x = self.max_pool_layer_3(x)       
        
        x = self.mlp_input(x)
        x = self.hidden_layer_1(x)
        x = self.hidden_layer_2(x)
        x = self.output_layer(x)
        
        return x

In [None]:
# hyperparameters
num_epochs = 25
learning_rate = 0.00008
running_average_factor = 0.95
BATCH_SIZE = 12

tf.keras.backend.clear_session()

# since there is not test split, we take 20% of the train data as our test split
# distribution is arbitrary
train_dataset, train_info = tfds.load('malaria', split='train[:80%]', shuffle_files=True, as_supervised=True, with_info=True)
test_dataset, test_info = tfds.load('malaria', split='train[:20%]', shuffle_files=True, as_supervised=True, with_info=True)


train_dataset = train_dataset.shuffle(buffer_size=10)
train_dataset = train_dataset.map(investigate, num_parallel_calls=4)
# caching does not work in my case (to few memory)
#train_dataset = train_dataset.cache()
train_dataset = train_dataset.batch(BATCH_SIZE)
train_dataset = train_dataset.prefetch(1)


test_dataset = test_dataset.shuffle(buffer_size=10)
test_dataset = test_dataset.map(investigate, num_parallel_calls=4)
#test_dataset = test_dataset.cache()
test_dataset = test_dataset.batch(BATCH_SIZE)
test_dataset = test_dataset.prefetch(1)

[1mDownloading and preparing dataset malaria/1.0.0 (download: 337.08 MiB, generated: Unknown size, total: 337.08 MiB) to /root/tensorflow_datasets/malaria/1.0.0...[0m


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Dl Completed...', max=1.0, style=Progre…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Dl Size...', max=1.0, style=ProgressSty…

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Extraction completed...', max=1.0, styl…









HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

Shuffling and writing examples to /root/tensorflow_datasets/malaria/1.0.0.incomplete1EEPTV/malaria-train.tfrecord


HBox(children=(FloatProgress(value=0.0, max=27558.0), HTML(value='')))

[1mDataset malaria downloaded and prepared to /root/tensorflow_datasets/malaria/1.0.0. Subsequent calls will reuse this data.[0m
Tensor("one_hot:0", shape=(2,), dtype=float32)
Tensor("one_hot:0", shape=(2,), dtype=float32)


In [None]:
# init lists for visualization
train_losses = []
test_losses = []
test_accuracies = []


# init model
model = Model()

# init loss
cross_entropy_loss = tf.keras.losses.BinaryCrossentropy()

# init optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate)


start = time.time()

print("")
print("Number of Epochs: " + str(num_epochs))



# train for number of epochs
for epoch in range(num_epochs):
    print('Epoch:___' + str(epoch))

    train_dataset = train_dataset.shuffle(buffer_size=128)
    test_dataset = test_dataset.shuffle(buffer_size=128)

    running_average = 0
    for (data, target) in train_dataset:
        train_loss = train_step(model, data, target, cross_entropy_loss, optimizer)
        running_average = running_average_factor * running_average  + (1 - running_average_factor) * train_loss
    train_losses.append(running_average)

    test_loss, test_accuracy = test(model, test_dataset, cross_entropy_loss)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)

    if epoch == 0:
        end = time.time()
        end = end - start
        end = end * num_epochs - 1
        print("")
        print("Estimated training time: " + str(int(end / 60)) + " mins.")
        print("")


end = time.time()
print("")
print("Time elapsed: " + str(int((end - start)/60)) + " mins.")
print("")


Number of Epochs: 25
Epoch:___0

Estimated training time: 19 mins.

Epoch:___1


In [None]:
# plot loss and accuracy over epochs
plt.figure()
line1, = plt.plot(train_losses)
line2, = plt.plot(test_losses)
plt.xlabel("Training steps")
plt.ylabel("Loss")
plt.legend((line1,line2),("training","test"))
plt.show()

plt.figure()
line1, = plt.plot(test_accuracies)
plt.xlabel("Training steps")
plt.ylabel("Accuracy")
plt.show()