In [None]:
from google.colab import drive #mount Google Drive
drive.mount('/gdrive')

Mounted at /gdrive


In [None]:
# for importing from py scripts in the root directory (including subfolders)
import sys

# for function get_duration
import math
import time

# for function write_log
import logging

# for preprocessing Samek
import numpy as np
import glob
import os
import sys
import scipy.io.wavfile as wavf
import scipy.signal
import h5py
import json
import librosa
import multiprocessing
#import argparse # dont need this one

# for preprocessing adjustments
from urllib.request import urlopen
from io import BytesIO
from zipfile import ZipFile
# import shutil

# for reading hdf5 files
import h5py

# for model training
from tensorflow.keras.models import Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D
from tensorflow.keras.losses import sparse_categorical_crossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow import keras
from tensorflow.keras import backend as k
import pickle
import tensorflow as tf


# for model evaluation
from tensorflow.keras.models import Model, load_model
import pandas as pd
from sklearn.metrics import accuracy_score, balanced_accuracy_score, cohen_kappa_score, precision_score, recall_score, f1_score, roc_auc_score, roc_curve
from sklearn.preprocessing import minmax_scale
import matplotlib.pyplot as plt
# for xai methods

# for generating plots
from PIL import Image,ImageOps
from keras.preprocessing.image import img_to_array,array_to_img
###################



In [None]:
# set root dir: the path to the github repo folder "XAI_spec_TSC" in your google drive
root_dir = '/gdrive/My Drive/XAI_spec_AudioMNIST/'
test_dir = '/gdrive/My Drive/Testing/'

paths = {
    'root': root_dir,
    'dataset': os.path.join(root_dir,'AudioMNIST-master'),
    'data': os.path.join(root_dir,'AudioMNIST-master/data'),
    'meta': os.path.join(root_dir,'AudioMNIST-master/data/audioMNIST_meta.txt'),
    'spectrograms': os.path.join(root_dir,'spectrograms'),
    'splits': os.path.join(root_dir,'splits'),
    'results': os.path.join(root_dir, 'results'),
    'models': os.path.join(root_dir, 'results/models'),
    'history': os.path.join(root_dir, 'results/history'),
    'evaluation': os.path.join(root_dir, 'results/evaluation'),
    'xai': os.path.join(root_dir, 'results/xai'),
    'plots': os.path.join(root_dir, 'results/plots'),
    'plots_waveform': os.path.join(root_dir, 'results/plots/waveform'),
    'plots_spectrograms': os.path.join(root_dir, 'results/plots/spectrograms'),
    'plots_loss': os.path.join(root_dir, 'results/plots/loss')
}

In [None]:
########## 1. function to write log about the events
def write_log(message):
    logging.basicConfig(level = logging.INFO, filename=os.path.join(root_dir,'events.log'), filemode='a', format='%(asctime)s - %(message)s')
    logging.info(message)
    print(message) 


########## 2. function to create a directory
def create_directory(directory_path):
    if os.path.isdir(directory_path) == False:
        os.mkdir(directory_path)
        write_log('Created folder: '+directory_path)
    #else:
        #writeLog('Folder already exists: '+directory_path)

########## 3. function to calculate time difference between two timepoints from package 'time' and returns the duration in format HH:MM:SS as String
def get_duration(start_time,end_time):
    duration = round(end_time-start_time)
    if duration < 0:
        duration*=-1
    h=math.floor(duration/3600)
    r=duration%3600
    m=math.floor(r/60)
    r=r%60
    s=round(r)
    return(str(h).zfill(2)+':'+str(m).zfill(2)+':'+str(s).zfill(2))

########### 4. function to read spectrograms/labels and return np.arrays ready for training/evaluation/xai methods
def read_spectrograms_hdf5(label,split_index,split_type,resize_factor=1,reshape=False,img_width=227, img_height=227, img_num_channels=1):
  write_log('Started reading '+str(split_type)+' data ...')
  start_time = time.time()
  if label == 'gender':
    label_index = 1
  else:
    label_index = 0
  # read txt with current split paths
  path_to_split_paths = os.path.join(paths['splits'],'AlexNet_'+str(label)+'_'+str(split_index)+'_'+str(split_type)+'.txt')
  text_file = open(path_to_split_paths, 'r')
  split_paths = text_file.read().split('\n')
  text_file.close()
  # if there are empty lines at the end of the txt file there will be an empty list element for each empty line
  # removing empty lines/list elements
  while split_paths[len(split_paths)-1] == '':
    split_paths.pop(len(split_paths)-1)
  # read hdf5 files of the current split and split_type and store it as np.array (spectrograms as x and labels as y)
  index = 0
  x = np.zeros(((len(split_paths),227,227))) # create target array for spectrograms
  y = np.zeros(len(split_paths)) # create target array for labels
  for cur_path in split_paths: # iterate the files
    #read current file
    f = h5py.File(cur_path, 'r')
    x_cur = f['data'][...]
    y_cur = f['label'][...]
    f.close() 
    #extract relevant data of current file
    x_cur = x_cur[0][0]
    y_cur = y_cur[0][label_index]    
    #append current data to x and y
    x[index] = x_cur
    y[index] = y_cur
    # increase index by 1
    index +=1
  x = x/resize_factor
  if reshape:
    x = x.reshape((len(x), img_width, img_height, img_num_channels))
  write_log('Finished reading '+str(split_type)+' data in '+get_duration(start_time,time.time()))
  return x,y

#create directories for the results
for path in paths:
  if 'result' in paths[path]:
    create_directory(paths[path])

In [None]:
# create list of trained models
write_log('Model evaluation started ...')
model_names = os.listdir(paths['models'])
# iterate models
for model_name in model_names:
  # get label and split from modelname
  net,label,split_index = model_name.rstrip(".h5").split("/")[-1].split("_")
  # set saving paths for the results
  evaluation_path = os.path.join(paths['evaluation'],'evaluation_'+label+'_'+str(split_index)+'.csv')
  confusion_matrix_path = os.path.join(paths['evaluation'],'confusionMatrix_'+label+'_'+str(split_index)+'.csv')
  # check if model was already evaluated
  if os.path.isfile(evaluation_path) and os.path.isfile(confusion_matrix_path):
    write_log('Model for '+label+' '+str(split_index)+' already evaluated')
  else:
    write_log('Current model: '+label+' '+str(split_index))
    # load current model
    cur_model = load_model(os.path.join(paths['models'],model_name))
    # read according test data
    x_test, y_test = read_spectrograms_hdf5(label,split_index,'test',resize_factor=256,reshape=True)
    # Run the Class Predictions and get the probabilities
    start_time = time.time()
    pred_probas = cur_model.predict(x_test, verbose=1)
    predictions = np.argmax(pred_probas,axis=1)
    # calculate evaluation metrics
    if os.path.isfile(evaluation_path) == False:
      evaluation = pd.DataFrame(data=np.zeros((1,7),dtype=np.float), columns=['label','split','accuracy','balanced accuracy','precision','recall','kappa'])
      evaluation['label'] = label
      evaluation['split'] = split_index
      evaluation['accuracy'] = accuracy_score(y_test, predictions)
      evaluation['balanced accuracy'] = balanced_accuracy_score(y_test, predictions)
      evaluation['precision'] = precision_score(y_test, predictions, average='weighted')
      evaluation['recall'] = recall_score(y_test, predictions, average='weighted')
      evaluation['kappa'] = cohen_kappa_score(y_test, predictions)
      #roc_AUC = roc_auc_score(y_test, pred_probas) # not working for multiclass?!
      evaluation.to_csv(evaluation_path, index=False)
    # calculate confusion matrix
    if os.path.isfile(confusion_matrix_path) == False:
      confusion_matrix = pd.crosstab(y_test, predictions, rownames=['Actual'], colnames=['Predicted'])
      confusion_matrix.to_csv(confusion_matrix_path, index= False)
    write_log('Evaluation of model '+label+' '+str(split_index)+' finished in '+str(get_duration(start_time,time.time())))
    # the data is stacked in the RAM for each model evaluation, in order to clear the RAM and prevent RAM exceed the following lines release some RAM
    k.clear_session()
    try:
      del x_test
      del y_test
    except:
      pass
write_log('All evaluations finished!')

Model evaluation started ...
Current model: gender 1
Started reading test data ...


KeyboardInterrupt: ignored

In [None]:
def check_inputs(selected_outputs,selected_labels,selected_splits,selected_mode,selected_nb_examples,selected_scale,selected_show):
  # set valid inputs
  valid_outputs = ['waveform','spectrograms','Grad-CAM']
  valid_labels = ['gender','digit']
  valid_splits = [0,1,2,3,4]
  valid_mode = ['examples','all','single']
  valid_nb_examples = list(range(1, 6)) # second value is not valid, example: range(1,3) --> [1,2]
  valid_scale = [False,True]
  valid_show = [False,True]
  # check validity of the selected inputs
  for selected_output in selected_outputs:
    if selected_output not in valid_outputs:
      write_log('Interrupted due to incorrect input parameters in selected outputs!')
      return False
  for selected_label in selected_labels:
    if selected_label not in valid_labels:
      write_log('Interrupted due to incorrect input parameters in selected labels!')
      return False
  for selected_split in selected_splits:
    if selected_split not in valid_splits:
      write_log('Interrupted due to incorrect input parameters in selected splits!')
      return False
  if selected_splits == [4] and selected_labels == ['gender']:
    write_log('Interrupted due to incorrect input parameters: Model gender 4 does not exist!')
    return False
  if selected_mode[0] not in valid_mode or len(selected_mode)>1:
    write_log('Interrupted due to incorrect input parameters in selected mode!')
    return False
  if selected_nb_examples not in valid_nb_examples:
    write_log('Interrupted due to incorrect input parameters in selected nb examples!')
    return False
  if selected_scale not in valid_scale:
    write_log('Interrupted due to incorrect input parameters in selected scale!')
    return False
  if selected_show not in valid_show:
    write_log('Interrupted due to incorrect input parameters in selected show!')
    return False

def create_outputs(selected_outputs,selected_labels,selected_splits,selected_mode='single',selected_nb_examples=3,selected_scale=True,selected_show=False):
  write_log('Started creating outputs ...')
  # handle wrong inputs
  valid_inputs = check_inputs(selected_outputs,selected_labels,selected_splits,selected_mode,selected_nb_examples,selected_scale,selected_show)
  if valid_inputs == False:
    return
  # set output types: needed to find correct path from paths variable
  output_types = ['plots_waveform','plots_spectrograms','plots_Grad-CAM']
  # read meta to get gender label later
  metaData = json.load(open(paths['meta']))
  # iterate existing models
  for model_name in os.listdir(paths['models']):
    # load model
    model = load_model(os.path.join(paths['models'],model_name))
    net,label,split_index = model_name.rstrip(".h5").split("/")[-1].split("_")
    write_log('Current Model: '+label+'_'+str(split_index))
    # skip current model, if label or split was not selected
    if label not in selected_labels or int(split_index) not in selected_splits:
      write_log('Model not selected!')
      continue
    # set path to txt file which contains the spectrogram paths of the current test split
    path_to_split_paths = os.path.join(paths['splits'],net+'_'+str(label)+'_'+str(split_index)+'_test'+'.txt')
    # read txt
    split_paths = get_split_paths(path_to_split_paths)
    # set counters for breaks
    file_counter = 0
    example_counter_male = [0,0,0,0,0,0,0,0,0,0]
    example_counter_female = [0,0,0,0,0,0,0,0,0,0]
    # set counters to track how many outputs were generated/existed
    created_counter ={'waveform': 0, 'spectrograms': 0, 'Grad-CAM': 0}
    existed_counter ={'waveform': 0, 'spectrograms': 0, 'Grad-CAM': 0}
    # read predictions of current model
    predictions = pd.read_csv(os.path.join(paths['predictions'],'predictions_'+label+'_'+str(split_index)+'.csv'))
    # iterate files in testsplit  
    for filepath in split_paths:
      # if single mode is selected skip after the first file
      if file_counter >0:
        break
      # infer sample info from name
      net, dig, vp, rep = filepath[:len(filepath)-5].split('/')[-1].split('_') # filepath.rstrip(".hdf5").split("/")[-1].split("_") --> cutting of reperition 5 for example
      # get gender
      # gender = 0 if metaData[vp]["gender"] == "male" else 1 
      gender = metaData[vp]['gender']
      # if example mode is selected skip if nb_examples is reached
      if gender == 'male':
        if example_counter_male[int(dig)] == int(selected_nb_examples): # continue if enough examples are created for the current digit
          continue
      else:
        if example_counter_female[int(dig)] == int(selected_nb_examples): # continue if enough examples are created for the current digit
          continue
      # get prediction of current spectrogram
      cur_prediction = predictions[(predictions['digit'] == int(dig)) & (predictions['participant'] == int(vp)) & (predictions['repetition'] == int(rep))]
      cur_prediction = cur_prediction.iloc[0]['predicted']
      # set saving paths
      dst = {}
      for output_type in output_types: # for XAI methods there are different output plots for each model which makes an additional folder structure necessary
        if output_type in ['plots_waveform','plots_spectrograms']:
          additional_folder = '' # no additional folder for waveform and spectrograms
          additional_prediction = ''
        else:
          additional_folder = 'label_'+str(label)+'_split_'+str(split_index) # additional folder (for each model) for XAI methods
          additional_prediction = '_predicted_'+str(cur_prediction)
        dst[output_type] = os.path.join(paths[output_type],additional_folder,str(vp),str(gender)+'_'+str(dig)+'_'+str(vp)+'_'+str(rep)+additional_prediction+'.png') # set path
      # set waveform source path
      src_waveform = os.path.join(paths['data'],str(vp),str(dig)+'_'+str(vp)+'_'+str(rep)+'.wav')
      # read spectrogram from hdf5 file
      cur_spectrogram_data,_ = read_single_spectrogram_hdf5(filepath)
      # call functions to create outputs
      if 'waveform' in selected_outputs:
        count = create_waveform_plot(src_waveform,dst['plots_waveform'],scale=selected_scale,show=selected_show)
        if count == True: created_counter['waveform'] += 1
        elif count == False: existed_counter['waveform'] +=1
      if 'spectrograms' in selected_outputs:
        count = create_spectrogram_plot(cur_spectrogram_data,dst['plots_spectrograms'],show=selected_show)
        if count == True: created_counter['spectrograms'] += 1
        elif count == False: existed_counter['spectrograms'] +=1        
      if 'Grad-CAM' in selected_outputs:
        count = create_single_grad_cam(cur_spectrogram_data,dst['plots_Grad-CAM'],model,show=selected_show)
        if count == True: created_counter['Grad-CAM'] += 1
        elif count == False: existed_counter['Grad-CAM'] +=1
      if selected_mode == ['single']:
        file_counter +=1
      if selected_mode == ['examples']:
        if gender == 'male':
          example_counter_male[int(dig)] +=1
        else:
          example_counter_female[int(dig)] +=1
    if 'waveform' in selected_outputs:
      write_log('Waveform: created '+str(created_counter['waveform'])+', existed '+str(existed_counter['waveform']))
    if 'spectrograms' in selected_outputs:
      write_log('Spectrograms: created '+str(created_counter['spectrograms'])+', existed '+str(existed_counter['spectrograms']))
    if 'Grad-CAM' in selected_outputs:
      write_log('Grad-CAM: created '+str(created_counter['Grad-CAM'])+', existed '+str(existed_counter['Grad-CAM']))
  write_log('Finished creating outputs!')