<a href="https://colab.research.google.com/github/lucasfldmn/twrds_unbiased_anns/blob/main/notebooks/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparations

## Imports

In [1]:
%tensorflow_version 2.x
import io
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd
import scipy.stats as stats
import datetime
import pickle
import functools
import json
import datetime
import gc
from google.colab import files

## GitHub

First we clone the existing repository in order to access the function created in the repo

In [2]:
from getpass import getpass
git_user = "lucasfldmn"
git_mail = "lucfeldmann23@googlemail.com"
git_password = getpass('Git Password:')

!git config --global user.email '$git_mail'
!git config --global user.name '$git_user'

!git clone https://$git_user:$git_password\@github.com/lucasfldmn/twrds_unbiased_anns/ -q

KeyboardInterrupt: ignored

# Data Generator

## Functions for Drawing

These functions are implemented in the repo and can easily be imported

In [None]:
from twrds_unbiased_anns.src.data.shapes import make_square, make_circle

## Creation of Sample Array

Now we create a numpy array of the sample that contains all information. This we can store and load in order to reproduce sample data.

In [None]:
def draw_from_truncated_normal_distribution(n_samples, mean, stddev = 10):
  # Set lower and upper bounds for truncation
  lower = 1
  upper = 100
  # Set parameters of normal distribution
  mu = mean
  sigma = stddev
  # Randomly sample
  samples = stats.truncnorm.rvs((lower-mu)/sigma, (upper-mu)/sigma, loc = mu, scale = sigma, size = n_samples)
  return np.reshape(samples.round(), (n_samples, 1))

def create_sample_array(n_samples, white_square, white_circle, colorful_square, colorful_circle):
  # Calculate number of samples for each group
  n_white_square = round(n_samples * white_square[0] / 100)
  n_white_circle = round(n_samples * white_circle[0] / 100)
  n_colorful_square = round(n_samples * colorful_square[0] / 100)
  n_colorful_circle = round(n_samples * colorful_circle[0] / 100)

  # White squares
  color = np.ones((n_white_square,1), dtype = bool) # True = white
  shape = np.ones((n_white_square,1), dtype = bool) # True = square
  size = draw_from_truncated_normal_distribution(n_white_square, mean = white_square[1], stddev = white_square[2])
  white_squares = np.hstack((color, shape, size))

  # White circles
  color = np.ones((n_white_circle,1), dtype = bool) # True = white
  shape = np.zeros((n_white_circle,1), dtype = bool) # False = circle
  size = draw_from_truncated_normal_distribution(n_white_circle, mean = white_circle[1], stddev = white_circle[2])
  white_circles = np.hstack((color, shape, size))

  # Colorful squares
  color = np.zeros((n_colorful_square,1), dtype = bool) # False = colorful
  shape = np.ones((n_colorful_square,1), dtype = bool) # True = square
  size = draw_from_truncated_normal_distribution(n_colorful_square, mean = colorful_square[1], stddev = colorful_square[2])
  colorful_squares = np.hstack((color, shape, size))

  # Colorful circles
  color = np.zeros((n_colorful_circle,1), dtype = bool) # False = colorful
  shape = np.zeros((n_colorful_circle,1), dtype = bool) # False = circle
  size = draw_from_truncated_normal_distribution(n_colorful_circle, mean = colorful_circle[1], stddev = colorful_circle[2])
  colorful_circles = np.hstack((color, shape, size))

  # Stack all together
  samples = np.vstack((white_squares, white_circles, colorful_squares, colorful_circles))

  # Shuffle array
  np.random.shuffle(samples)

  # Return result
  return samples

## Creation of TensorFlow Generator

In [None]:
import imageio

def convert_sample_to_np_array(sample):  
  # Get sample color
  if sample[0]:
    sample_color = 'white'
  else:
    sample_color = np.random.choice(colors)

  # Get size of sample
  sample_size = sample[2]

  # Call shape generator based on sample shape
  if sample[1]:
    shape = make_square(color = sample_color, size = sample_size)
  else:
    shape = make_circle(color = sample_color, size = sample_size)

  # Open image as numpy array
  shape_array = imageio.imread(shape)

  # Return numpy array and size
  return shape_array, sample_size

def convert_sample_to_tensor(sample):
  # Convert sample to numpy array
  sample_np_array, sample_size = convert_sample_to_np_array(sample)
  # Convert array to tensor
  img_tensor = tf.convert_to_tensor(sample_np_array, dtype=tf.int32)
  # Divide image tensor by 255 to normalize values
  img_tensor = img_tensor / 255
  # Return tensor and size
  return img_tensor, sample_size

def make_tf_dataset(samples, divide_by_255 = True):
  
  # Iterate over samples and create list of numpy image arrays and list of target
  images = []
  targets = []
  for sample in samples:
    # Convert sample to numpy array of the image
    shape_tensor, sample_size = convert_sample_to_np_array(sample)
    images.append(shape_tensor)
    targets.append(sample_size)

  # Convert both to tensors
  img_tensor = tf.convert_to_tensor(images, dtype=tf.int32)
  target_tensor = tf.convert_to_tensor(targets, dtype=tf.float32)

  # Divide image tensor by 255 to normalize values
  img_tensor = img_tensor / 255
  
  # Convert to tensorflow dataset and return results
  dataset = tf.data.Dataset.from_tensor_slices((img_tensor, target_tensor))
  return dataset



# CNN Setup

## Simple CNN

In [None]:
from tensorflow.keras.layers import Layer, Conv2D, MaxPool2D, Flatten, Dense

class SimpleCNN(tf.keras.Model):
    
    def __init__(self):
        super(SimpleCNN, self).__init__()
        # Define layers
        self.network_layers = [
          # Convolutional layers with zero padding and maxpooling inbetween
          Conv2D(32, (3, 3), padding = "same", activation = "relu", input_shape=(360, 360, 4)), 
          MaxPool2D(pool_size = (2, 2)),          
          Conv2D(64, (3, 3), padding = "same", activation = "relu"),
          MaxPool2D(pool_size = (2, 2)),
          Conv2D(64, (3, 3), padding = "same", activation = "relu"),
          # Flatten the feature maps
          Flatten(),
          # Fully connected layers to funnel flat tensor into single value
          Dense(100, activation='relu'),
          Dense(20, activation='relu'),
          Dense(1)
        ]    
        
    def call(self, x):
        # Calculate forward step through all layers
        for layer in self.network_layers:
          x = layer(x)
        return x

## ResNet

TODO

## SENet (Squeeze Excitation)

TODO

# Model Evaluation

After training the model on different samples, we want to evaluate the performance of the model based on another dataset that contains now bias at all. We then check the average per group combination to see if there is any bias in the model.

## Setup of Evaluation Sample

In [None]:
def create_eval_samples(n_samples):
  # Percentage of samples for each group
  perc_white_square = 25
  perc_white_circle = 25
  perc_colorful_square = 25
  perc_colorful_circle = 25

  # Means of normal distribution for the four groups
  mean_white_square = 50
  mean_white_circle = 50
  mean_colorful_square = 50
  mean_colorful_circle = 50  

  # Calculate number of samples for each group
  n_white_square = round(n_samples * perc_white_square / 100)
  n_white_circle = round(n_samples * perc_white_circle / 100)
  n_colorful_square = round(n_samples * perc_colorful_square / 100)
  n_colorful_circle = round(n_samples * perc_colorful_circle / 100)

  # White squares
  color = np.ones((n_white_square,1), dtype = bool) # True = white
  shape = np.ones((n_white_square,1), dtype = bool) # True = square
  size = draw_from_truncated_normal_distribution(n_white_square, mean_white_square)
  white_squares = np.hstack((color, shape, size))

  # White circles
  color = np.ones((n_white_circle,1), dtype = bool) # True = white
  shape = np.zeros((n_white_circle,1), dtype = bool) # False = circle
  size = draw_from_truncated_normal_distribution(n_white_circle, mean_white_circle)
  white_circles = np.hstack((color, shape, size))

  # Colorful squares
  color = np.zeros((n_colorful_square,1), dtype = bool) # False = colorful
  shape = np.ones((n_colorful_square,1), dtype = bool) # True = square
  size = draw_from_truncated_normal_distribution(n_colorful_square, mean_colorful_square)
  colorful_squares = np.hstack((color, shape, size))

  # Colorful circles
  color = np.zeros((n_colorful_circle,1), dtype = bool) # False = colorful
  shape = np.zeros((n_colorful_circle,1), dtype = bool) # False = circle
  size = draw_from_truncated_normal_distribution(n_colorful_circle, mean_colorful_circle)
  colorful_circles = np.hstack((color, shape, size))

  # Create labeled list of groups 
  samples = list(zip([white_squares, white_circles, colorful_squares, colorful_circles], ["white_square", "white_circle", "colorful_square", "colorful_circle"]))
  return samples

## Perform Evaluation

For each group, we calculate the total loss, average loss and average prediction after feeding all images in the sample to the model.

In [None]:
def evaluate_performance(group_sample, model):
  # Feed sample to model and store targets and prediction
  actual = []
  prediction = []
  for single_sample in group_sample:
    # Convert to tensor
    shape_tensor, target_size = convert_sample_to_tensor(single_sample)
    # Reshape the tensor
    shape_tensor = tf.reshape(shape_tensor, [1,360,360,4])
    # Feed to model
    output = model(shape_tensor)
    # Store prediction and target
    actual.append(target_size)
    prediction.append(output)

  # Calculate performance metrics based on target and prediction
  actual = np.array(actual)
  prediction = np.array(prediction)
  diff = actual - prediction
  # Squared total distance
  total_squared_distance = np.sum(np.square(diff))
  # Average distance
  avg_distance = np.mean(diff)
  # Average prediction
  avg_prediction = np.mean(prediction)
  # Average actual 
  avg_actual = np.mean(actual)
  return total_squared_distance, avg_distance, avg_prediction, avg_actual

# Model Comparison

## Comparison Configuration

In [None]:
def load_configs_from_file(filepath):
  with open(filepath, 'r') as filehandle:
    config_json = json.load(filehandle)
  eval_sample_filename = config_json["general"]["eval_sample_filename"]
  n_eval_samples = config_json["general"]["eval_sample_size"]
  repeats_per_model = config_json["general"]["repeats_per_model"]
  configs = config_json["configs"]
  return configs, eval_sample_filename, n_eval_samples, repeats_per_model

In [None]:
# Set name for this run
run_name = "std_dev_check" # Also name of the config file

# Get current date
date_str = datetime.datetime.today().strftime("%d-%m-%Y")

# Set run directory
run_dir = "/content/twrds_unbiased_anns/runs/" + run_name + "_" + date_str

# Load config from JSON file
config_filename = run_name + ".json"
configs, eval_sample_filename, n_eval_samples, repeats_per_model = load_configs_from_file("/content/twrds_unbiased_anns/configs/" + config_filename)
eval_sample_filename = eval_sample_filename.format(date_str) # Update filename to include current date

# Create directory for this run
!rm -rf $run_dir
!mkdir $run_dir

## Training + Eval Loop

In [None]:
# Clear session once and then every time before a new model is trained
tf.keras.backend.clear_session()

# Create empty list of all results
results = []

# Create evaluation sample and save it -> Will be used for all configurations
eval_samples = create_eval_samples(n_eval_samples)
with open(run_dir + "/" + eval_sample_filename, 'wb') as filehandle:
    pickle.dump(eval_samples, filehandle)

# TensorBoard setup
%load_ext tensorboard
!rm -rf ./logs/ 
%tensorboard --logdir $run_dir/logs/gradient_tape/

# Loop over all configurations
for config in configs:

  # Get name of current config
  config_name = config["name"]

  # Prepare and save sample
  dataset_size = config["dataset_size"]
  white_square = [config["perc_white_square"], config["mean_white_square"], config["stddev_white_square"]]
  white_circle = [config["perc_white_circle"], config["mean_white_circle"], config["stddev_white_circle"]]
  colorful_square = [config["perc_colorful_square"], config["mean_colorful_square"], config["stddev_colorful_square"]]
  colorful_circle = [config["perc_colorful_circle"], config["mean_colorful_circle"], config["stddev_colorful_circle"]]
  train_sample = create_sample_array(config["dataset_size"], white_square, white_circle, colorful_square, colorful_circle)
  colors = config["colors"]
  sample_filename = run_dir + "/" + "sample_{}_{}".format(config_name, date_str)
  np.save(file = sample_filename, arr = train_sample)

  # Loop training for number of repeats
  for repeat in range(1, repeats_per_model + 1):

    # Clear keras session
    tf.keras.backend.clear_session()

    # Set random seed
    tf.random.set_seed(np.random.randint(1000,9999))

    # Create model  
    model = SimpleCNN() # TODO select model based on text value in config (for now only simple CNN) 
    model.compile(optimizer = config["optimizer"], loss = config["loss_function"])  

    # Initialize loss function based on parameter (TODO, for now simple mse)
    loss_function = tf.keras.losses.MeanSquaredError()

    # Initialize optimizer based on parameter (TODO, for now simple Adam)
    optimizer = tf.keras.optimizers.Adam()

    # Prepare dataset for training
    train_dataset = make_tf_dataset(train_sample)
    train_dataset = train_dataset.shuffle(buffer_size = dataset_size)
    train_dataset = train_dataset.batch(config["batch_size"])

    # Set up tensorboard summary writer
    model_name = "model_{}_{}_{}".format(config_name, repeat, date_str)
    train_summary_writer = tf.summary.create_file_writer(run_dir + '/logs/gradient_tape/' + model_name + '/train')

    # Do training
    for epoch in range(config["n_epochs"]):    
      for (img, target) in train_dataset:
        with tf.GradientTape() as tape:
          # Forward step
          output = model(img)
          # Compute loss
          loss = loss_function(target, output)
          # Compute gradient
          gradients = tape.gradient(loss, model.trainable_variables)
        # Apply gradients
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        # Store loss
        with train_summary_writer.as_default():
            tf.summary.scalar('loss', loss, step=epoch)

    # Save trained model
    mode_filename = run_dir + "/" + model_name
    model.save(filepath = mode_filename)

    # Store repetition number
    config["repeat"] = repeat

    # Evaluate model and store results 
    for (group_sample, label) in eval_samples:
      # Evaluate performance
      total_squared_distance, avg_distance, avg_prediction, avg_actual = evaluate_performance(group_sample, model)
      # Store results      
      config["eval_" + label + "_sample_avg"] = avg_actual
      config["eval_" + label + "_prediction_avg"] = avg_prediction
      config["eval_" + label + "_avg_error"] = avg_distance
      config["eval_" + label + "_total_squared_error"] = total_squared_distance

    # Write everything to results
    config["model_iteration"] = model_name
    results.append(config)

    # Free memory
    del train_dataset   
    del model
    
    # Do garbage collection explicitly 
    gc.collect()

# Data Storage

In [None]:
# Create dataframe from results
result_df = pd.DataFrame(results)
# Write dataframe to excel
result_df.to_excel(run_dir + "/results.xlsx")
# Copy config json to run directory
!cp -b /content/twrds_unbiased_anns/configs/$config_filename $run_dir/

In [None]:
# Set target zip name
run_zip = run_name + ".zip"

# Zip run folder
!zip -r /content/$run_zip $run_dir

In [None]:
# Download run zip
from google.colab import files
files.download("/content/{}".format(run_zip))