In [None]:
import os
import time
import re
import random
import shutil
import json
from json import JSONEncoder
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import callbacks
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
from datetime import datetime
from sklearn.metrics import balanced_accuracy_score, f1_score, precision_score, confusion_matrix, classification_report
from sklearn.model_selection import KFold

from skimage.util import random_noise
from PIL import Image
import cv2
from pathlib import Path

from numpy import expand_dims
from keras.preprocessing.image import load_img
from keras.preprocessing.image import save_img
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import array_to_img
from keras.preprocessing.image import ImageDataGenerator

In [None]:
def split_train_test(base_dir, train_dir, test_dir, train_perc, test_perc, class_names, extensions):

  if not os.path.exists(train_dir) or not os.path.exists(test_dir):

    # Create "train" and "test" folders
    os.mkdir(train_dir)
    os.mkdir(test_dir)

    for class_name in class_names:
      # get all image files beginning with the class name
      regex = r'^' + class_name + r'[0-9]+[_0-9]*\..*'

      #print(regex)

      filename_list = [fn for fn in os.listdir(base_dir) if re.match(regex, fn) and any(fn.endswith(ext) for ext in extensions)]
      #print(filename_list)

      # Randomly choose the files for the test set (remaing for training set)
      random.seed(15)
      n_files = len(filename_list)
      test_files = random.sample(filename_list, int(round(float(n_files)*test_perc/100,0)))
      train_files = list(set(filename_list) - set(test_files))

      # Place the files in the folder with the associated class name within
      # folders "train" and "test", according to the previous selection
      train_class_dir = os.path.join(train_dir, class_name)
      #print(train_class_dir)
      test_class_dir = os.path.join(test_dir, class_name)
      #print(test_class_dir)
      if not os.path.exists(train_class_dir):
        os.mkdir(train_class_dir)
      if not os.path.exists(test_class_dir):
        os.mkdir(test_class_dir)

      for file in train_files:
        shutil.move(os.path.join(base_dir, file), os.path.join(train_class_dir, file))
      for file in test_files:
        shutil.move(os.path.join(base_dir, file), os.path.join(test_class_dir, file))

In [None]:
def transfer_images_between_folders(source, dest, filenames, class_labels, class_names):
  for filename, label in zip(filenames, class_labels):
      shutil.move(os.path.join(source, class_names[label], filename),
                  os.path.join(dest, class_names[label], filename))

In [None]:
def transfer_all_images_between_folders(source, dest, split_rate, class_names):
  for class_name in class_names:
      transfer_between_folders(source + '/' + class_name, dest + '/' + class_name,
                              split_rate)

In [None]:
def remove_all_images(folder, class_names):
  for class_name in class_names:
    for f in os.listdir(os.path.join(folder, class_name)):
      os.remove(os.path.join(folder, class_name, f))

In [None]:
def transfer_between_folders(source, dest, split_rate):

  source_files = os.listdir(source)
  if(len(source_files) != 0):
      transfer_file_nr = int(len(source_files)*split_rate)
      transfer_ind = random.sample(range(0, len(source_files)), transfer_file_nr)
      for ind in transfer_ind:
          shutil.move(os.path.join(source, str(source_files[ind])),
                      os.path.join(dest, str(source_files[ind])))
  # else:
      # print("No file moved. Source empty!")

In [None]:
def get_X_y(base_dir, class_labels):

  X=[]
  y=[]

  for i in range(len(class_labels)):
    label = class_labels[i]
    files = os.listdir(os.path.join(base_dir, label))
    for filename in files:
      X.append(filename)
      y.append(i)

  X=np.asarray(X)
  y=np.asarray(y)

  return X, y

In [None]:
def get_X_y_groups(base_dir, class_labels):

  X = []
  y = []
  groups = []

  for i in range(len(class_labels)):
      label = class_labels[i]
      files = os.listdir(os.path.join(base_dir, label))
      for filename in files:
          vals = filename.split(".")[0].split("_")
          id = vals[1] + "_" + vals[2]

          X.append(filename)
          y.append(i)
          groups.append(id)

  X = np.asarray(X)
  y = np.asarray(y)
  groups = np.asarray(groups)

  return X, y, groups

In [None]:
def KFoldStratificationClassGroup(n_folds, groups, X, y):

  unique_groups = list(set(groups))  # get lists with subjects and gestures
  unique_classes = list(set(y))

  test_indexes = list()  # create lists to append splits
  train_indexes = list()

  # print(y)
  # print(groups)

  for class_ in unique_classes:  # iterate over gestures
    for group in unique_groups:  # iterate over subjects

      indexesToSplit = np.array([i for i in range(len(groups)) if y[i] == class_ and groups[i] == group]) #get indexes of gestures to split

      # print(indexesToSplit)

      kf = KFold(n_splits=n_folds, shuffle=True, random_state=15)
      n_fold=0
      for train_index, test_index in kf.split(X[indexesToSplit], y[indexesToSplit], groups[indexesToSplit]):

        # print(train_index)
        # print(test_index)

        if len(train_indexes) == n_folds and len(test_indexes) == n_folds:
          train_indexes[n_fold] = np.append(train_indexes[n_fold], indexesToSplit[train_index])
          test_indexes[n_fold] = np.append(test_indexes[n_fold], indexesToSplit[test_index])
        else:
          train_indexes.append(indexesToSplit[train_index])
          test_indexes.append(indexesToSplit[test_index])

        # print(train_indexes)
        # print(test_indexes)

        n_fold = n_fold + 1

  # print(train_indexes)
  # print(test_indexes)

  return train_indexes, test_indexes

In [None]:
def print_metrics(y_true, y_pred, class_names, model_name):

  accuracy = balanced_accuracy_score(y_true, y_pred)
  precision = precision_score(y_true, y_pred, average='weighted')
  f1 = f1_score(y_true, y_pred, average='weighted')

  print(f"Balanced Accuracy (%)  : {accuracy*100:.2f}")
  #print(f"Weighted Precision (%) : {precision*100:.2f}")
  #print(f"Weighted F1 score (%)  : {f1*100:.2f}")

  print(classification_report(y_true, y_pred, target_names=class_names))

  cm = confusion_matrix(y_true, y_pred, normalize = "true")
  # print(cm)

  fig, ax = plt.subplots(figsize=(5, 4.5), tight_layout=True)
  ax.set_title(f"Normalized Confusion Matrix - {model_name}", fontweight="bold", pad=20, fontsize=14)
  sns.heatmap(cm, annot=True, cmap=plt.cm.Blues)
  ax.set_xticklabels(class_names)
  ax.set_yticklabels(class_names)
  plt.ylabel('True label', fontweight="bold")
  plt.xlabel('Predicted label', fontweight="bold")
  # Rotate the tick labels and set their alignment.
  plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
          rotation_mode="anchor")
  plt.setp(ax.get_yticklabels(), rotation=45, ha="right",
          rotation_mode="anchor")
  plt.show()

  #return accuracy, precision, f1Score

In [None]:
def get_metrics(y_true, y_pred, class_names, groups, model_name):

  # Get metrics considering all groups

  # Balanced accuracy (not given by classification_report)
  balanced_acc = balanced_accuracy_score(y_true, y_pred)
  # precision = precision_score(y_true, y_pred, average='weighted')
  # f1 = f1_score(y_true, y_pred, average='weighted')

  # Classification report (including several metrics)
  #print(y_true)
  #print(y_pred)
  report = classification_report(y_true, y_pred, target_names=class_names, output_dict = True)
  #print(report)

  # Confusion matrix
  cm = confusion_matrix(y_true, y_pred)

  # Get list of unique groups
  unique_groups = list(set(groups))

  report_groups = None
  balanced_acc_groups = None
  cm_groups = None

  # If there are 2 groups or more
  if len(unique_groups) > 1:

    report_groups = list()
    balanced_acc_groups = list()
    cm_groups = list()

    # Additionally get the same metrics for each group separately
    for group in unique_groups:

      ind = [i for i in range(len(groups)) if groups[i]==group]
      y_true_group = np.array(y_true)[ind]
      y_pred_group = np.array(y_pred)[ind]

      balanced_acc_group = balanced_accuracy_score(y_true_group, y_pred_group)
      report_group = classification_report(y_true_group, y_pred_group, target_names=class_names, output_dict = True)
      #print(report_group)
      cm_group = confusion_matrix(y_true_group, y_pred_group)

      report_groups.append(report_group)
      balanced_acc_groups.append(balanced_acc_group)
      cm_groups.append(cm_group)

  return report, balanced_acc, cm, report_groups, balanced_acc_groups, cm_groups

In [None]:
def plot_history(inc_history, base_model_name):

  # Visualize training history
  fig, (ax0, ax1) = plt.subplots(nrows=1, ncols=2, sharex=True)

  # summarize history for accuracy
  if 'categorical_accuracy' in inc_history.history:
    ax0.plot(inc_history.history['categorical_accuracy'])
  if 'val_categorical_accuracy' in inc_history.history:
    ax0.plot(inc_history.history['val_categorical_accuracy'])
  ax0.set_title('Model Accuracy')
  ax0.set_ylabel('Accuracy')
  ax0.set_xlabel('No. epoch')
  ax0.legend(['Train', 'Test'], loc='upper left')

  # summarize history for loss
  if 'loss' in inc_history.history:
    ax1.plot(inc_history.history['loss'])
  if 'val_loss' in inc_history.history:
    ax1.plot(inc_history.history['val_loss'])
  ax1.set_title('Model Loss')
  ax1.set_ylabel('Loss')
  ax1.set_xlabel('No. epoch')
  ax1.legend(['Train', 'Test'], loc='upper left')

  fig.suptitle(base_model_name)
  plt.show()

In [None]:
def write_to_csv(dir, results_fold, dataset_name, class_names, n_images_fold, base_model_name, model_params, aug_params, outer_fold_nr, groupsDict, testType):

  csv_dir = os.path.join(dir, "Results")

  print(csv_dir)

  if not os.path.exists(csv_dir):
    os.mkdir(csv_dir)

  #now = datetime.now()
  #d1 = now.strftime("%d-%m-%Y_%H:%M:%S")

  folder_name = "Neurons " + str(model_params["nr_hidden_neurons"]) + "_dropout " + str(model_params["dropout_rate"]) + "_min delta " + str(model_params["min_delta"]) + "_patience " + str(model_params["patience"]) + "_optimizer " + str(model_params["optimizer"])
  config_dir = os.path.join(csv_dir,folder_name)

  if not os.path.exists(config_dir):
    os.mkdir(config_dir)

  final_dir = config_dir
  if aug_params:
    aug_dir_name = "No_augmentation"
    if aug_params["augment_all"]:
      aug_dir_name = "Augmented all " + str(aug_params["n_images_all"])
    if aug_params["augment_all"] and aug_params["augment_train"]:
      aug_dir_name += "_"
      if aug_params["augment_train"]:
        aug_dir_name += "Augmented train " + str(aug_params["n_images_all"])
    elif aug_params["augment_train"]:
        aug_dir_name = "Augmented train " + str(aug_params["n_images_all"])
    aug_dir = os.path.join(config_dir, aug_dir_name)
    if not os.path.exists(aug_dir):
      os.mkdir(aug_dir)
    final_dir = aug_dir

  file_path = os.path.join(final_dir, dataset_name)
  print(file_path)
  filename1 = file_path + ".csv"
  filename2 = file_path + "_Classific_Report.csv"
  filename3 = file_path + "_Conf_Matrix.json"

  exists1 = os.path.exists(filename1)
  exists2 = os.path.exists(filename2)
  exists3 = os.path.exists(filename3)

  # CSV files
  with open(filename1, 'a') as writefile1, open(filename2, 'a') as writefile2:

    # Write headers
    if not exists1:
      header1 = "Test_Type,n_subj_test,FullAugmentation,FullAugFactor,TrainAugFactor,TrainAugmentation,pre_trained_model,outer_fold,inner_fold,epoch,loss,categorical_acc,val_loss,val_categorical_acc,epoch_time (s),test_time (s)," +\
                "TrainSubjects,TestSubjects\n"
      writefile1.write(header1)

    if not exists2:
      header2 = "Test_Type,n_subj_test,FullAugmentation,FullAugFactor,TrainAugFactor,TrainAugmentation,pre_trained_model,subject,outer_fold,inner_fold,epochs,"
      for class_name in class_names:
        header2 += class_name + "_precision," + class_name + "_recall," + class_name + "_f1," + class_name + "_support,"
      header2 += "macro_avg_precision,macro_avg_recall,macro_avg_f1,macro_avg_support," +\
        "weighted_avg_precision,weighted_avg_recall,weighted_avg_f1,weighted_avg_support," +\
        "accuracy,balanced_accuracy," +\
        "train_time (s),predict_time (s),n_train_images,n_val_images,n_test_images,TrainSubjects,TestSubjects\n"
      writefile2.write(header2)

    if aug_params["augment_all"] == True and aug_params["augment_train"] == True:
      toAdd = str(aug_params["augment_all"]) + "," + str(aug_params["n_images_all"]) + "," + str(aug_params["augment_train"]) + "," + str(aug_params["n_images_train"])
    elif aug_params["augment_all"] == True and aug_params["augment_train"] == False:
      toAdd = str(aug_params["augment_all"]) + "," + str(aug_params["n_images_all"]) + "," + str(aug_params["augment_train"]) + "," + "None"
    elif aug_params["augment_all"] == False and aug_params["augment_train"] == True:
      toAdd = str(aug_params["augment_all"]) + "," + "None" + ","  + str(aug_params["augment_train"]) + "," + str(aug_params["n_images_train"])
    else:
      toAdd = str(aug_params["augment_all"]) + "," + "None" + ","  + str(aug_params["augment_train"]) + "," + "None"

    # Write results
    # File 2 (folds)
    data_str = get_report_results_str(results_fold["classif_report"], results_fold["balanced_acc"], results_fold["inc_history"], results_fold["times"], \
                                      n_images_fold, class_names, "All", base_model_name, aug_params, toAdd, outer_fold_nr, groupsDict, testType)
    writefile2.write(data_str)

    if ("groups" in results_fold) and results_fold["groups"] and (len(results_fold["groups"]) > 0) and (len(results_fold["groups"][0]) > 1):
      subj_ind = 0

      groups = results_fold["groups"][0]

      for subj_ind in range(len(groups)):
        subj_name = groups[subj_ind]
        print(groups[subj_ind])

        classific_report = list()
        balanced_acc = list()

        for classific_report_group, balanced_acc_group in zip(results_fold["classif_report_groups"], results_fold["balanced_acc_groups"]):
          classific_report.append(classific_report_group[subj_ind])
          balanced_acc.append(balanced_acc_group[subj_ind])

        data_str = get_report_results_str(classific_report, balanced_acc, None, None, None, class_names, subj_name, base_model_name, aug_params, toAdd, outer_fold_nr, groupsDict, testType)
        writefile2.write(data_str)

    # File 1 (epochs)
    fold_nr = 0
    for fold in results_fold["inc_history"]:
      # trainSubject, testSubject = groupsDict[fold_nr].replace("'","").replace("}","").replace("{","").split("/")
      for epoch in range(len(fold.history["loss"])):
        data_str = testType + ","
        if groupsDict is not None:
          data_str += str(len(groupsDict[fold_nr]["test"]))
        else:
          data_str += str(0)
        data_str += "," + toAdd + "," + base_model_name +","+ str((outer_fold_nr)) +","+ str(fold_nr+1) + "," + str(epoch + 1) + "," +\
           str(fold.history["loss"][epoch]) + "," + str(fold.history["categorical_accuracy"][epoch]) + "," +\
           str(fold.history["val_loss"][epoch]) + "," + str(fold.history["val_categorical_accuracy"][epoch]) + "," +\
           str(results_fold["times"][fold_nr].epoch_times[epoch]) + "," + str(results_fold["times"][fold_nr].test_times[epoch])
        if groupsDict is not None:
          data_str += "," + ";".join(groupsDict[fold_nr]["train"]) + "," + ";".join(groupsDict[fold_nr]["test"]) + "\n"
        else:
          data_str += ",,\n"
        writefile1.write(data_str)
      fold_nr += 1

  # JSON file
  json_data = {"cm_results": []}
  if exists3:
    with open(filename3, "r") as readfile3:
      json_data = json.load(readfile3)

  cm_results = {"pre_trained_model": base_model_name, "labels": class_names, "cm_info": []}

  cm_info_all = {"subject": "All", "folds": []}
  for cm in results_fold["cm"]:
    cm_info_all["folds"].append(cm.tolist())
  cm_results["cm_info"].append(cm_info_all)

  if ("groups" in results_fold) and results_fold["groups"] and (len(results_fold["groups"]) > 0) and (len(results_fold["groups"][0]) > 1):
    group_ind = 0
    for group in results_fold["groups"][0]:
      cm_info_group = {"subject": group, "folds": []}
      for cm_groups_fold in results_fold["cm_groups"]:
        cm_info_group["folds"].append(cm_groups_fold[group_ind].tolist())

      cm_results["cm_info"].append(cm_info_group)
      group_ind += 1

  json_data["cm_results"].append(cm_results)

  with open(filename3, "w") as writefile3:
    json.dump(json_data, writefile3)

  #print(inc_history_fold.history)
  #print(classif_report_fold)
  #print(balanced_acc_fold)


In [None]:
def get_report_results_str(report_folds, balanced_acc_folds, inc_history_fold, times_fold, n_images_fold, class_names, group_name, base_model_name, aug_params, toAdd, outer_fold_nr, groupsDict, testType):

  data_str = ""

  # print(report_folds)
  # print(class_names)

  fold_nr = 0
  for report in report_folds:
    # trainSubject, testSubject = groupsDict[fold_nr].replace("'","").replace("}","").replace("{","").split("/")

    data_str += testType + ","
    if groupsDict is not None:
       data_str += str(len(groupsDict[fold_nr]["test"]))
    else:
       data_str += str(0)
    data_str += "," + toAdd + "," + base_model_name + "," + group_name

    if outer_fold_nr is not None:
      data_str += "," + str(outer_fold_nr)

    data_str += "," + str(fold_nr+1) + ","

    if inc_history_fold is not None:
       data_str += str(len(inc_history_fold[fold_nr].history['loss']))

    for class_name in class_names:
      data_str += "," + str(report[class_name]["precision"]) + "," + str(report[class_name]["recall"]) + "," + str(report[class_name]["f1-score"]) + "," + str(report[class_name]["support"])

    data_str += "," + str(report["macro avg"]["precision"]) + "," + str(report["macro avg"]["recall"]) + "," + str(report["macro avg"]["f1-score"]) + "," + str(report["macro avg"]["support"]) + "," +\
        str(report["weighted avg"]["precision"]) + "," + str(report["weighted avg"]["recall"]) + "," + str(report["weighted avg"]["f1-score"]) + "," + str(report["weighted avg"]["support"]) + "," +\
        str(report["accuracy"]) + "," + str(balanced_acc_folds[fold_nr])

    if times_fold is not None:
        data_str += "," + str(times_fold[fold_nr].train_times[0]) + "," + str(times_fold[fold_nr].predict_times[0])
    else:
        data_str += ",,"

    if n_images_fold is not None:
      if aug_params is not None and "augment_train" in aug_params and aug_params["augment_train"] is True:
        data_str += "," + str(n_images_fold["train_aug"][fold_nr])
      else:
        data_str += "," + str(n_images_fold["train"][fold_nr])

      data_str += "," + str(n_images_fold["val"][fold_nr]) + "," + str(n_images_fold["test"][fold_nr]) + ","
      if groupsDict is not None:
        data_str += ";".join(groupsDict[fold_nr]["train"]) + "," + ";".join(groupsDict[fold_nr]["test"]) + "\n"
      else:
        data_str += ",\n"
    else:
      data_str += ",,,,\n"

    fold_nr += 1

  return data_str

In [None]:
class TimeHistory(keras.callbacks.Callback):

  def __init__(self):
    self.epoch_times = []
    self.train_times = []
    self.test_times = []
    self.predict_times = []

  def on_epoch_begin(self, batch, logs={}):
    self.epoch_time_start = time.time()

  def on_epoch_end(self, batch, logs={}):
    self.epoch_times.append(time.time() - self.epoch_time_start)

  def on_train_begin(self, logs={}):
    self.train_time_start = time.time()

  def on_train_end(self, batch, logs={}):
    self.train_times.append(time.time() - self.train_time_start)

  def on_test_begin(self, logs={}):
    self.test_time_start = time.time()

  def on_test_end(self, batch, logs={}):
    self.test_times.append(time.time() - self.test_time_start)

  def on_predict_begin(self, logs={}):
    self.predict_time_start = time.time()

  def on_predict_end(self, batch, logs={}):
    self.predict_times.append(time.time() - self.predict_time_start)

In [None]:
def write_host_specs(host_specs, dataset_name, model_params, aug_params):

  dir = "/content/drive/MyDrive/"
  csv_dir = os.path.join(dir, "csv")

  folder_name = "Neurons " + str(model_params["nr_hidden_neurons"]) + "_dropout " + str(model_params["dropout_rate"]) + "_min delta " + str(model_params["min_delta"]) + "_patience " + str(model_params["patience"]) + "_optimizer " + str(model_params["optimizer"])
  config_dir = os.path.join(csv_dir,folder_name)

  if not os.path.exists(config_dir):
    os.mkdir(config_dir)

  final_dir = config_dir
  if aug_params:
    aug_dir_name = "No_augmentation"
    if aug_params["augment_all"]:
      aug_dir_name = "Augmented all " + str(aug_params["n_images_all"])
    if aug_params["augment_all"] and aug_params["augment_train"]:
      aug_dir_name += "_"
      if aug_params["augment_train"]:
        aug_dir_name += "Augmented train " + str(aug_params["n_images_all"])
    elif aug_params["augment_train"]:
        aug_dir_name = "Augmented train " + str(aug_params["n_images_all"])
    aug_dir = os.path.join(config_dir, aug_dir_name)
    if not os.path.exists(aug_dir):
      os.mkdir(aug_dir)
    final_dir = aug_dir

  file_path = os.path.join(final_dir, dataset_name + "_Host_Specs")
  filename1 = file_path + ".txt"
  with open(filename1, 'a') as writefile1:
    for key in host_specs.keys():
      writefile1.write(key + ": ")
      writefile1.write(host_specs[key] + "\n")



In [None]:
def augment_data_classic_noises(source, dest, class_names, n_new):

  # print("Augmentation")

  noise_types = ["gaussian", "salt", "pepper", "poisson"]

  count = 0

  # For each image belonging to each class in source folder
  for class_name in class_names:

    # print(class_name)
    # print(len(os.listdir(os.path.join(dest, class_name))))

    files = os.listdir(os.path.join(source, class_name))
    # print(len(files))

    for filename in files:

      # print(filename)

      # Load the image
      file_path = os.path.join(source, class_name, filename)
      img = cv2.imread(file_path)
      # img = Image.open(file_path)

      # Add noise to the image "n_new" times
      for i in range(n_new):
        noise_img = img

        # Choose randomly nr of times to add noise (between 1 and 3 times)
        #n_layers = random.randint(1, len(noise_types))
        n_layers = 3

        # For each type of noise
        for j in range(n_layers):

          # Decide which noise type to add
          noise = random.choice(noise_types)

          # Add the type of noise to the image
          if noise == "salt" or noise == "pepper":
            amount = random.uniform(0.001,0.003)
            noise_img_array = random_noise(noise_img, mode=noise, amount=amount) # Returns a floating-point image on the range [0, 1]
          elif noise == "gaussian":
            var = random.uniform(0.008,0.01)
            noise_img_array = random_noise(noise_img, mode=noise, var=var) # Returns a floating-point image on the range [0, 1]
          else:
            noise_img_array = random_noise(noise_img, mode=noise) # Returns a floating-point image on the range [0, 1]

          # noise_img = Image.fromarray(noise_img_array)
          noise_img = np.array(noise_img, dtype = 'uint8')

        # Save new image with noise to the corresponding class folder in the destination folder
        p = Path(filename)
        new_filename = str(p.stem) + "_aug_" + str(i+1) + str(p.suffix)
        new_file_path = os.path.join(dest, class_name, new_filename)
        cv2.imwrite(new_file_path, noise_img)
        # noise_img.save(os.path.joint(dest, class_name, filename), noise_img)

    count += len(os.listdir(os.path.join(dest, class_name)))
    # print(count)

  return count

In [None]:
def augment_data_shift(source, dest, class_names, n_new):

  count = 0

  # For each image belonging to each class in source folder
  for class_name in class_names:

    # print(class_name)
    # print(len(os.listdir(os.path.join(dest, class_name))))

    files = os.listdir(os.path.join(source, class_name))
    # print(len(files))

    for filename in files:

      # print(filename)

      # Load the image
      file_path = os.path.join(source, class_name, filename)
      # print(file_path)
      img = load_img(file_path)

      # convert to numpy array
      data = img_to_array(img)
      # expand dimension to one sample
      samples = expand_dims(data, 0)

      # create image data augmentation generator
      datagen = ImageDataGenerator(width_shift_range=[-0.5,0.5], fill_mode = "constant", cval = 0)

      # prepare iterator
      it = datagen.flow(samples, batch_size=1)

      # Add noise to the image "n_new" times
      for i in range(n_new):
        # noise_img = img

        # generate batch of images
        batch = it.next()
        augmented_img_array = batch[0]

        augmented_img = array_to_img(augmented_img_array)

        # Save augmented image to the corresponding class folder in the destination folder
        p = Path(filename)
        new_filename = str(p.stem) + "_aug_" + str(i+1) + str(p.suffix)
        new_file_path = os.path.join(dest, class_name, new_filename)
        # cv2.imwrite(os.path.join(dest, class_name, new_filename), noise_img)
        # noise_img.save(os.path.joint(dest, class_name, filename), noise_img)
        save_img(new_file_path, augmented_img)

        # print(new_file_path)

        if False:
          drive_dir = "/content/drive/MyDrive/aug_images"
          if not os.path.exists(drive_dir):
            os.mkdir(drive_dir)
          for class_name_ in class_names:
            if not os.path.exists(os.path.join(drive_dir, class_name_)):
              os.mkdir(os.path.join(drive_dir, class_name_))
          print(os.path.join(drive_dir, class_name, new_filename))
          save_img(os.path.join(drive_dir, class_name, new_filename), augmented_img)

    count += len(os.listdir(os.path.join(dest, class_name)))
    # print(count)

  return count

In [None]:
def augment_data_noise_blocks(source, dest, class_names, n_new):
  count = 0

  # For each image belonging to each class in source folder
  for class_name in class_names:
    # print(class_name)
    # print(len(os.listdir(os.path.join(dest, class_name))))

    files = os.listdir(os.path.join(source, class_name))
    # print(len(files))
    for filename in files:
      # print(filename)

      # Load the image
      file_path = os.path.join(source, class_name, filename)

      # print(file_path)
      img = Image.open(file_path)
      img = np.array(img)

      for i in range(n_new):
        img_to_add = np.zeros(img.shape) #create empty matrix with image size

        height, width, depth = img.shape

        #print("Size: " + str(img.size))
        size = (155,155)	#size to convert the images to, 155X155 is the input size for the classifier in use on 13/06/2021

        # print("========== Copy " + str(i) + " ===========")
        nrOfBlocks = random.randrange(1,6)	#if number of blocks wasnt passed to function, generates it

        for j in range(nrOfBlocks):

          blkSize = (random.randrange(1, round((height/8) + 1)), random.randrange(1, round((width/8) + 1)))  #generate random block size

          if blkSize[1]%2 == 0:	#deals with even or odd blksizes
            gradientImageHalf1 = np.linspace(1,255,(int(blkSize[1]/2)))	#creates evenly distributed interval of numbers between 1 and 255
            gradientImageHalf2 = np.flipud(gradientImageHalf1) #creates evenly distributed interval of numbers between 255 and 1
          else:
            gradientImageHalf1 = np.linspace(1,255,(int(blkSize[1]/2 + 1))) #creates evenly distributed interval of numbers between 1 and 255
            gradientImageHalf2 = np.linspace(255,1,(int(blkSize[1]/2))) #creates evenly distributed interval of numbers between 255 and 1

          #print(gradientImageHalf1.size)
          #print(gradientImageHalf2.size)
          gradientImage = np.append(gradientImageHalf1,gradientImageHalf2,axis = 0) #appends both intervals
          gradientImage = np.tile(gradientImage, (blkSize[0],1)) #tiles the first row creating a matrix with blksize
          #print(gradientImage)
          rows,columns = gradientImage.shape #get matrix dimensions

          #lowers the values of the edge rows creating a gradient in the block to mimic real noise
          gradientImage[0] = gradientImage[0] / 4
          #gradientImage[1] = gradientImage[1] / 3
          #gradientImage[2] = gradientImage[2] / 2
          gradientImage[rows-1] = gradientImage[rows-1] / 4
          #gradientImage[rows-2] = gradientImage[rows-2] / 3
          #gradientImage[rows-3] = gradientImage[rows-3] / 2

          gradientImage[:, 0] = gradientImage[:, 0] / 4
          #gradientImage[:, 1] = gradientImage[:, 1] / 3
          #gradientImage[:, 2] = gradientImage[:, 2] / 2
          gradientImage[:, columns-1] = gradientImage[:, columns-1] / 4
          #gradientImage[:, columns-2] = gradientImage[:, columns-2] / 3
          #gradientImage[:, columns-3] = gradientImage[:, columns-3] / 2

          # print(gradientImage)
          # x = input()


          #blkCenterRow = random.randrange(0 + blkSize[0], img.size[0] - blkSize[0])
          blkCenterRow = random.randrange(0 + blkSize[0], img.shape[0] - blkSize[0])
          #blkCenterColumn = random.randrange(0 + blkSize[1], img.size[1]- blkSize[1])
          blkCenterColumn = random.randrange(0 + blkSize[1], img.shape[1]- blkSize[1])

          intensityRandomFactor = random.random() #create random intesity factor that will be used if intensity == none
          for row in range(blkCenterRow - int(blkSize[0]/2),blkCenterRow + int(blkSize[0]/2)): #iterates from blkCenterRow - int(blkSize[0]/2) to blkCenterRow + int(blkSize[0]/2)
            for column in range(blkCenterColumn - int(blkSize[1]/2),blkCenterColumn + int(blkSize[1]/2)):
              #if no intensity was passed, get intensity value from gradient image and multiply by intensity factor
              img_to_add[row][column] = gradientImage[np.remainder(row,(blkCenterRow - int(blkSize[0]/2)))][np.remainder(column,(blkCenterColumn - int(blkSize[1]/2)))] * intensityRandomFactor

        #print(img.size)
        #print(img_to_add.shape)

        #img = img_to_array(img)
        #noise_img = Image.fromarray(img + np.transpose(img_to_add)) #create image

        noise_img = array_to_img(img + np.array(img_to_add))

        #noise_img = noise_img.convert("RGB")
        #noise_img = noise_img.resize(size)
        #noise_img.show()
        #x = input()

        p = Path(filename)
        new_filename = str(p.stem) + "_aug_" + str(i+1) + str(p.suffix)
        new_file_path = os.path.join(dest, class_name, new_filename)

        save_img(new_file_path, noise_img)

        # print(new_filename)

    count += len(os.listdir(os.path.join(dest, class_name)))
    # print(count)

  return count