<a href="https://colab.research.google.com/github/nihermann/IANNwTF26/blob/master/Kopie_von_hw06.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

In [937]:
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras import Model
from tensorflow.keras import layers
from tensorflow.keras.regularizers import l2
import numpy as np
import matplotlib.pyplot as plt
import time
import datetime
import os
from tensorflow.keras.layers import Conv2D, MaxPool2D, BatchNormalization, Dropout, GlobalAveragePooling2D, Dense, AveragePooling2D, Activation,Concatenate
from tensorflow.keras.activations import relu, softmax
from tensorflow.keras.regularizers import L2

In [938]:
training_raw, (test_img, test_labels) = tf.keras.datasets.cifar10.load_data()

In [939]:
# split test data again into test and validation data
test_raw = (test_img[5000:], test_labels[5000:])
validation_raw = (test_img[:5000], test_labels[:5000])

# Parameters

In [940]:
# these parameters are used to create a list of indices to later select 
# the right manipulated dataset for training in each epoch
NO_PAIRING = 0
WITH_PAIRING = 1
SEQUENTIAL_PARING = lambda pairing, no_pairing: [WITH_PAIRING]*pairing + [NO_PAIRING]*no_pairing

In [941]:
## HYPERPARAMETERS
# Data
BATCH_SIZE = 64
SHUFFLE_BUFFER = 50000 # the large shuffle buffer will be very important later!
CACHE_PATH = "/content/drive/MyDrive/"
SAVE_PATH = "/content/drive/MyDrive/"
SAVING_THRESHOLD = 0.4 # minimum accuracy before saving the model

# Training
# initialize the Training phases sequentially
PHASECONTROLLER = [NO_PAIRING]*2 + [WITH_PAIRING]*8 + SEQUENTIAL_PARING(pairing=8, no_pairing=2)*10 + [NO_PAIRING]*5
EPOCHS = len(PHASECONTROLLER)
LEARNING_RATE = 0.00005

# loss
LOSS_FUNCTION = tf.keras.losses.categorical_crossentropy
# optimizer
OPTIMIZER = tf.keras.optimizers.Adam(LEARNING_RATE)

# Datapipeline

In [942]:
if not os.path.exists(CACHE_PATH[:-1]):
    print("lol")
    os.makedirs(CACHE_PATH[:-1])

In [943]:
os.listdir(CACHE_PATH[:-1])

['Colab Notebooks',
 'NI_Lecture_Template (2).gslides',
 'TfNN',
 'Models',
 '.data-00000-of-00001',
 '.index']

In [944]:
def blend(img1, img2, blending_factor=0.5, prob=0.8):
    '''
    IN:
        img1: image with shape (h,w,c)
        img2: imgae with shape (h,w,c)
        blending_factor: float in range [0:1], intensity of img1 in the new img
        prob: float in range[0:1], probability of success
    OUT:
        img: result of blending img1 and img2 by blending_factor
    '''
    if tf.random.uniform([1]) > prob or blending_factor > 1 or blending_factor < 0:
        return img1
    
    return (img1*blending_factor) + img2*(1-blending_factor)

In [945]:
def data_augmentation(dataset, horizontal_flip=True, sample_pairing=True):
    '''
    Possible Augmentations:
        horizontal_flip: randomly flips imgs of the dataset horizontally.
        sample_pairing: blends two random img together. Twist: there is a 25% chance to skip the whole augmentation.
    '''
    AUTOTUNE = tf.data.experimental.AUTOTUNE

    if horizontal_flip:
        dataset = dataset.map(lambda img, label: (tf.image.random_flip_left_right(img), label),
                              num_parallel_calls=AUTOTUNE)

    if sample_pairing and tf.random.uniform([1]) < 0.75:
        # first split the dataset into its img and label
        ds_images = dataset.map(lambda img, label: img)
        ds_labels = dataset.map(lambda img, label: label)

        # shuffle the img dataset and store it seperately. For better results the buffersize should be big
        # enough such that also pictures of different classes will be paired later.
        shuffled_ds = ds_images.shuffle(SHUFFLE_BUFFER)

        # zip all back together
        merged_ds = shuffled_ds.zip((ds_images, shuffled_ds, ds_labels))

        # blend two random imgs (pixlewise) with equal(50%:50%) strenght together and use the label of the first one
        dataset = merged_ds.map(lambda img1, img2, label1: (blend(img1, img2, blending_factor=0.5, prob=0), label1),
                                num_parallel_calls=AUTOTUNE)
        
    return dataset

In [946]:
def build_pipeline(data, batchsize, shuffle_buffer, horizontal_flip=False, sample_pairing=False):
    ds = tf.data.Dataset.from_tensor_slices(data)
    ds = ds.map(lambda img, label: (2*(img/255)-1, # normalize the color values between [-1:1]
                                    tf.reshape(tf.one_hot(label, 10), shape=(-1,)))) # one hot the labels
    
    # cache the preprocessing for better performance
    ds = ds.cache(CACHE_PATH)

    # if one of the data augmentations is choosen, apply them.
    if horizontal_flip or sample_pairing:
        ds = data_augmentation(ds, horizontal_flip, sample_pairing)

    ds = ds.shuffle(buffer_size=shuffle_buffer)
    ds = ds.batch(batchsize)

    ds = ds.prefetch(tf.data.experimental.AUTOTUNE)

    return ds

# ResNet

In [947]:
class ResidualBlock(tf.keras.layers.Layer):
  '''Class for implementing a Residual Block to create a ResNet'''
  
  def __init__(self,channels, num_layer_iterations = 3,input_dimension = 32):
    super(ResidualBlock,self).__init__()

    self.bottleneck = Conv2D(filters= channels, kernel_size=(1,1), activation= relu)
    conv_layer = Conv2D(filters = channels, kernel_size = 3, padding = "same", kernel_regularizer= L2()) #save the default for the convolutional layer in a variable for to keep the list comp. short
    self.block_layers = [layer for layer in range(num_layer_iterations-1) for layer in (conv_layer, BatchNormalization()) ] #list comprehension -> building a list by adding a convolutional layers and batchnormalization (alternating) for the wanted number of blocks    
    self.block_layers.append(Conv2D(filters = input_dimension, kernel_size = 1, padding = "same", kernel_regularizer= L2()))
    self.block_layers.append(BatchNormalization())
    self.block_layers.append(Activation('relu'))
   
  @tf.function
  def call(self,x,training=True):
    x = self.bottleneck(x)#bottleneck layer with kernel_size of 1,1
    y = x #save the input
    for layer in self.block_layers: #iterate trough all layers in the block_layers list
      x = layer(x,training = training)
    return x + y #add the original input to the transformed input

In [948]:
class ResNet(Model):
  '''Class for implementing a Residual Network'''
  def __init__(self, block_num = 6, block_channels  = 32, num_layer_iterations = 3):
    super(ResNet,self).__init__()        
 
    self.conv_1 = Conv2D(filters= 32, kernel_size= 3, padding= 'same',  kernel_regularizer= L2())
    self.batch_1 = BatchNormalization()
    self.activation_1 = Activation('relu')
    
    self.residual_blocks = [block for block in range(block_num) for block in (ResidualBlock(channels = block_channels, num_layer_iterations = num_layer_iterations),MaxPool2D(padding='same'))]
    
    self.conv_2 = Conv2D(filters= 32, kernel_size= 3, padding= 'same', activation= relu, kernel_regularizer= L2())
    self.average = GlobalAveragePooling2D()
    self.dense = Dense(units= 10, activation= softmax)

  @tf.function
  def call(self, input, training = True):
    x = self.conv_1(input)
    x = self.batch_1(x,training = training)
    x = self.activation_1(x)
    for layer in self.residual_blocks:
      x = layer(x,training = training)
    x = self.conv_2(x)
    x = self.average(x)
    x = self.dense(x)
    return x

# DenseNet

In [949]:
class TransitionLayer(layers.Layer):
  '''class for implementing a Transition layer'''

  def __init__(self,num_filters = 32):
    super(TransitionLayer,self).__init__()
    self.layer_list = [

      Conv2D(filters = num_filters, kernel_size = 1 , padding = 'same', kernel_regularizer= L2()),             
      BatchNormalization(),
      Activation('relu'),
      AveragePooling2D(strides=(2,2))
      
    ]
  
  @tf.function
  def call(self, input, training = True):
    for layer in self.layer_list:
      input = layer(input)
    return input

In [950]:
class DenseBlock(layers.Layer):
  ''''Class for implementing a DenseBlock'''

  def __init__(self,num_blocks = 3,block_length = 10):
    super(DenseBlock,self).__init__()
    
    self.layer_list = []
    for i in range(num_blocks):
      self.layer_list.append(Conv2D(filters= block_length, kernel_size=(3,3), padding= 'same', kernel_regularizer= L2()))
      self.layer_list.append(BatchNormalization())
    
    self.concat = tf.keras.layers.Concatenate()

  @tf.function
  def call(self, input, training= False):
    for layer in self.layer_list:
      output = layer(input,training = training)
      input = self.concat([input, output])
    return input


In [951]:
class DenseNet(tf.keras.Model):
  '''Class dor implementing a DenseNet'''

  def __init__(self,num_blocks = 5, block_length =3,growth_rate = 10):
    super(DenseNet,self).__init__()

    self.conv_1 = Conv2D(filters= (growth_rate * 2),input_shape=(BATCH_SIZE, 32, 32, 3), kernel_size= 3, padding= 'same', kernel_regularizer= L2())
    self.batch_1 = BatchNormalization()
    self.activation_1 = Activation('relu')

    #make a list with alternating between a Denseblock and a TransitionLayer (one less than the wanten number of blocks as the last layer should not be a transitional one)
    self.block_list = [block for block in range(num_blocks-1) for block in (DenseBlock(block_length,growth_rate), TransitionLayer(num_blocks * growth_rate))]
    self.block_list.append(DenseBlock(block_length,growth_rate))

    self.average = GlobalAveragePooling2D()
    self.dense = Dense(units= 10, activation= softmax)

  @tf.function
  def call(self, input, training = False):
    print("a")
    x = self.conv_1(input)
    x = self.batch_1(x,training = training)
    x = self.activation_1(x)
    for layer in self.block_list:
      x = layer(x,training = training)
    x = self.average(x)
    x = self.dense(x)
    return x

# Training Functions

In [952]:
@tf.function
def forward_step(model, img, label ,loss_func, optimizer, training=True):
    with tf.GradientTape() as tape:
        prediction = model(img, training)
        loss = loss_func(label, prediction) + tf.reduce_sum(model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # calc accuracy of the batch
    sample_test_accuracy = tf.argmax(label, axis=1) == tf.argmax(prediction, axis=1)
    sample_test_accuracy = tf.reduce_mean(tf.cast(sample_test_accuracy, tf.float32))  

    return tf.reduce_mean(loss), sample_test_accuracy

#@tf.function
def test(model, test_data, loss_function):
    # test over complete test data

    test_accuracy_aggregator = []
    test_loss_aggregator = []

    for (data, target) in test_data:
        prediction = model(data, False)
        ## Calc Loss and its partial mean
        sample_test_loss = loss_function(target, prediction)
        test_loss_aggregator.append(np.mean(sample_test_loss.numpy()))
        
        ## Calc Accuracy and its partial mean
        sample_test_accuracy = np.argmax(target, axis=1) == np.argmax(prediction, axis=1)
        sample_test_accuracy = np.mean(sample_test_accuracy)
        test_accuracy_aggregator.append(sample_test_accuracy)

    test_loss = np.mean(test_loss_aggregator)
    test_accuracy = np.mean(test_accuracy_aggregator)

    return test_loss, test_accuracy

# thanks to Cornelius who shared this function which I adjusted a bit
def estimateTime(epoch, time_passed, number_of_epochs):
    """
    estimates the time it will take until the network is done with training.
    """
    epoch += 1
    time_per_epoch = time_passed / epoch
    number_of_epochs_left = number_of_epochs - epoch
    time_sec_remaining = number_of_epochs_left * time_per_epoch
    time_remain = str(datetime.timedelta(seconds=time_sec_remaining))
    return time_remain

# def save_best_weights(accuracy, best_so_far):
#     '''saves the best model if acc > preset threshold and if it is the best one so far'''
#     global SAVING_THRESHOLD
#     if (accuracy > SAVING_THRESHOLD) and best_so_far:
#         model.save(SAVE_PATH + str(int(accuracy*10000)))
#         SAVING_THRESHOLD = accuracy
#         return True
#     return False

# Training

In [953]:
training_ds = [None, None]
training_ds[0] = build_pipeline(training_raw, BATCH_SIZE, SHUFFLE_BUFFER, horizontal_flip=True)
training_ds[1] = build_pipeline(training_raw, BATCH_SIZE, SHUFFLE_BUFFER, horizontal_flip=True, sample_pairing=True)
test_ds = build_pipeline(test_raw, BATCH_SIZE, SHUFFLE_BUFFER)
validation_ds = build_pipeline(validation_raw, BATCH_SIZE, SHUFFLE_BUFFER)

In [None]:
tf.keras.backend.clear_session()

startTime = time.time()
print(f"Initialize, {datetime.timedelta(seconds=startTime)}")

# Initialize NN
#shift between ResNet and DenseNet
#model = ResNet(block_num= 5,block_channels=32,num_layer_iterations=4)
model = DenseNet()

# lists for vizualisation
training_losses = []
training_accuracies = []
test_losses = []
test_accuracies = []

# let's test our Model how it performes on the test dataset before learning
test_loss, test_accuracy = test(model, test_ds, LOSS_FUNCTION)
test_losses.append(test_loss)
test_accuracies.append(test_accuracy)

# check how our Model performs on training dataset before learning
training_loss, training_accuracy = test(model, training_ds[0], LOSS_FUNCTION)
training_losses.append(training_loss)
training_accuracies.append(training_accuracy)

print(f"Start Training for {EPOCHS} Epochs", f"Training Loss: {np.round(float(training_loss), 3)}, Training Accuracy: {np.round(training_accuracy*100, 3)}%, Test Loss: {np.round(float(test_loss),3)}, Test Accuracy: {np.round(test_accuracy*100, 3)}%", sep="\n")

startTime = time.time()
# We train for before specified epochs.
for epoch in range(EPOCHS):
    if epoch % 5 == 0:
        print("-"*20)
    print('Epoch: __ ', (epoch + 1))#, ", Pairing = " + str(bool(PHASECONTROLLER[epoch])))

    # perform a training step with each entry of our data pipeline and record loss and accuracy
    t_acc_accumulator = []
    for (data, label) in training_ds[PHASECONTROLLER[epoch]]:
        training_loss, training_accuracy = forward_step(model, data, label, LOSS_FUNCTION, OPTIMIZER)
        t_acc_accumulator.append(training_accuracy)

    # save loss and accuracy
    training_losses.append(training_loss)
    training_accuracies.append(np.mean(t_acc_accumulator))

    # check how our Model performs after one learning epoch on our test dataset and record Loss and Acc. as well
    test_loss, test_accuracy = test(model, test_ds, LOSS_FUNCTION)
    test_losses.append(test_loss)
    
    # check if we have a new best score and save the model if it is good enough
    # if save_best_weights(test_accuracy, np.max(test_accuracies) < test_accuracy):
    #     print(f"CONGRATS, NEW BEST ACCURACY OF {np.round(test_accuracy, 3)}. MODEL WAS SAVED!")
    
    test_accuracies.append(test_accuracy)
    print(f"Training Loss: {np.round(float(training_loss), 3)}, Training Accuracy: {np.round(training_accuracies[epoch]*100, 3)}%, Test Loss: {np.round(float(test_loss), 3)}, Test Accuracy: {np.round(test_accuracy*100, 3)}%, Finish in: {str(estimateTime(epoch, time.time() - startTime, EPOCHS))[:7]}")

Initialize, 18605 days, 22:48:31.287508
a
a
a
Start Training for 115 Epochs
Training Loss: 2.308, Training Accuracy: 10.146%, Test Loss: 2.308, Test Accuracy: 10.107%
--------------------
Epoch: __  1
a
a
Training Loss: 8.301, Training Accuracy: 10.146%, Test Loss: 2.278, Test Accuracy: 13.054%, Finish in: 0:16:41
Epoch: __  2
Training Loss: 8.002, Training Accuracy: 26.167%, Test Loss: 2.3, Test Accuracy: 16.535%, Finish in: 0:11:36
Epoch: __  3
Training Loss: 7.697, Training Accuracy: 36.669%, Test Loss: 2.303, Test Accuracy: 18.038%, Finish in: 0:10:17
Epoch: __  4
Training Loss: 7.38, Training Accuracy: 40.724%, Test Loss: 2.172, Test Accuracy: 24.031%, Finish in: 0:09:36
Epoch: __  5
Training Loss: 7.702, Training Accuracy: 45.609%, Test Loss: 1.81, Test Accuracy: 34.949%, Finish in: 0:09:09
--------------------
Epoch: __  6
Training Loss: 7.439, Training Accuracy: 47.943%, Test Loss: 1.518, Test Accuracy: 45.767%, Finish in: 0:08:50
Epoch: __  7
Training Loss: 7.179, Training Acc

# Visualization

In [None]:
# taken from Tensorflow_Intro.ipynb

# Visualize accuracy and loss for training and test data. 
# One plot training and test loss.
# One plot training and test accuracy.
plt.figure()
line1, = plt.plot(training_losses)
line2, = plt.plot(test_losses)
plt.xlabel("Training steps")
plt.ylabel("Loss")
plt.legend((line1,line2),("training","test"))
plt.show()

plt.figure()
line1, = plt.plot(training_accuracies)
line2, = plt.plot(test_accuracies)
plt.xlabel("Training steps")
plt.ylabel("Accuracy")
plt.legend((line1,line2),("training", "test"))
plt.show()
print(f"Max Accuracy: {np.round(np.max(test_accuracies)*100, 3)}%")

In [None]:
# # load and compile our best model
# model = tf.keras.models.load_model(SAVE_PATH+str(int(SAVING_THRESHOLD*10000)))
# model.compile()

# test their performance once again
_, test_acc = test(model, validation_ds, LOSS_FUNCTION)
_, train_acc = test(model, training_ds[0], LOSS_FUNCTION)

labels = 'Correct', 'Wrong'
explode = (0, 0.1)

# cook some cakes
fig1, ax1 = plt.subplots()
ax1.pie([train_acc, 1-train_acc], explode=explode, labels=labels, autopct='%1.1f%%',
        shadow=True, startangle=45)
ax1.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
plt.title("Accuracy on Train Dataset", fontdict={"fontsize":20})
plt.show()

fig1, ax1 = plt.subplots()
ax1.pie([test_acc, 1-test_acc], explode=explode, labels=labels, autopct='%1.1f%%',
        shadow=True, startangle=45)
ax1.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
plt.title("Accuracy on Test Dataset", fontdict={"fontsize":20})
plt.show()