<a href="https://colab.research.google.com/github/mertege/Thesis_Experiments/blob/main/3_Spectrograms_CI4R_Datasets_24GHz_Multi_Class_LSTM_with_Attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# LSTM and GRU based architecure is running in Gürbüz dataset with only 24GHz.
# 3 LSTM and GRU blocks are used.
# Bidirectional(GRU) ve Bidirectional(LSTM) are running in parallel and attention applied seperately. Then, these paths are combined with concatenation. 
# Resolution is 402x512.
# It was run 3-times. Results are shared on below:
# Min test accuracy is 0.842, Max test accuracy is 0.855, Mean test accuracy is 0.850, std of test accuracy is 0.006.
# Time elapsed during training time: 322.539 sec
# Time elapsed during test time: 3.336 sec

In [2]:
from keras.models import Sequential
from tensorflow.keras import layers
from keras.models import Model
from keras.layers import Conv1D,Activation, Dropout, Flatten, Dense, Reshape, BatchNormalization, ConvLSTM1D, Normalization, Input, Conv2D, MaxPooling1D, AveragePooling1D, MaxPooling2D, Concatenate, GRU, LSTM, TimeDistributed, Bidirectional, ReLU
from keras.regularizers import l2, l1
import tensorflow as tf
import scipy.io
import numpy as np
import cv2
import matplotlib.pyplot as plt
import random
from numpy.random import seed
from sklearn.model_selection import KFold, StratifiedKFold
import time
from sklearn.metrics import precision_recall_fscore_support
from keras.callbacks import EarlyStopping
from numpy import array
from numpy import argmax
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from keras import backend as K 
import gc
import os

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# 24 GHz
bending_24GHz                 = np.load('/content/drive/MyDrive/bending_24GHz.npy')
label_bending_24GHz           = np.zeros((bending_24GHz.shape[0],1))
label_bending_24GHz[:]        = 1
crawling_24GHz                = np.load('/content/drive/MyDrive/crawling_24GHz.npy')
label_crawling_24GHz          = np.zeros((crawling_24GHz.shape[0],1))
label_crawling_24GHz[:]       = 2
kneeling_24GHz                = np.load('/content/drive/MyDrive/kneeling_24GHz.npy')
label_kneeling_24GHz          = np.zeros((kneeling_24GHz.shape[0],1))
label_kneeling_24GHz[:]       = 3
limping_with_RL_Stiff_24GHz   = np.load('/content/drive/MyDrive/limping with right stiff_24GHz.npy')
label_limping_with_RL_Stiff_24GHz           = np.zeros((limping_with_RL_Stiff_24GHz.shape[0],1))
label_limping_with_RL_Stiff_24GHz[:]        = 4
picking_up_an_object_24GHz    = np.load('/content/drive/MyDrive/picking up object_24GHz.npy')
label_picking_up_an_object_24GHz          = np.zeros((picking_up_an_object_24GHz.shape[0],1))
label_picking_up_an_object_24GHz[:]       = 5
scissors_gait_24GHz           = np.load('/content/drive/MyDrive/scissors gait_24GHz.npy')
label_scissors_gait_24GHz          = np.zeros((scissors_gait_24GHz.shape[0],1))
label_scissors_gait_24GHz[:]       = 6
short_steps_24GHz             = np.load('/content/drive/MyDrive/short steps_24GHz.npy')
label_short_steps_24GHz          = np.zeros((short_steps_24GHz.shape[0],1))
label_short_steps_24GHz[:]       = 7
sitting_24GHz                 = np.load('/content/drive/MyDrive/sitting_24GHz.npy')
label_sitting_24GHz          = np.zeros((sitting_24GHz.shape[0],1))
label_sitting_24GHz[:]       = 8
walking_away_from_Radar_24GHz = np.load('/content/drive/MyDrive/walking away from radar_24GHz.npy')
label_walking_away_from_Radar_24GHz         = np.zeros((walking_away_from_Radar_24GHz.shape[0],1))
label_walking_away_from_Radar_24GHz[:]       = 9
Walking_on_both_toes_24GHz    = np.load('/content/drive/MyDrive/walking on toes both_24GHz.npy')
label_Walking_on_both_toes_24GHz         = np.zeros((Walking_on_both_toes_24GHz.shape[0],1))
label_Walking_on_both_toes_24GHz[:]       = 10
Walking_towards_radar_24GHz   = np.load('/content/drive/MyDrive/walking towards radar_24GHz.npy')
label_Walking_towards_radar_24GHz          = np.zeros((Walking_towards_radar_24GHz.shape[0],1))
label_Walking_towards_radar_24GHz[:]       = 11

In [5]:
spectrogram_concat = np.concatenate((bending_24GHz,crawling_24GHz),axis=0)
spectrogram_concat = np.concatenate((spectrogram_concat,kneeling_24GHz               ),axis=0)
spectrogram_concat = np.concatenate((spectrogram_concat,limping_with_RL_Stiff_24GHz  ),axis=0)
spectrogram_concat = np.concatenate((spectrogram_concat,picking_up_an_object_24GHz   ),axis=0)
spectrogram_concat = np.concatenate((spectrogram_concat,scissors_gait_24GHz          ),axis=0)
spectrogram_concat = np.concatenate((spectrogram_concat,short_steps_24GHz            ),axis=0)
spectrogram_concat = np.concatenate((spectrogram_concat,sitting_24GHz                ),axis=0)
spectrogram_concat = np.concatenate((spectrogram_concat,walking_away_from_Radar_24GHz),axis=0)
spectrogram_concat = np.concatenate((spectrogram_concat,Walking_on_both_toes_24GHz   ),axis=0)
spectrogram_concat = np.concatenate((spectrogram_concat,Walking_towards_radar_24GHz  ),axis=0)

label_concat = np.concatenate((label_bending_24GHz,label_crawling_24GHz),axis=0)
label_concat = np.concatenate((label_concat,label_kneeling_24GHz               ),axis=0)
label_concat = np.concatenate((label_concat,label_limping_with_RL_Stiff_24GHz  ),axis=0)
label_concat = np.concatenate((label_concat,label_picking_up_an_object_24GHz   ),axis=0)
label_concat = np.concatenate((label_concat,label_scissors_gait_24GHz          ),axis=0)
label_concat = np.concatenate((label_concat,label_short_steps_24GHz            ),axis=0)
label_concat = np.concatenate((label_concat,label_sitting_24GHz                ),axis=0)
label_concat = np.concatenate((label_concat,label_walking_away_from_Radar_24GHz),axis=0)
label_concat = np.concatenate((label_concat,label_Walking_on_both_toes_24GHz   ),axis=0)
label_concat = np.concatenate((label_concat,label_Walking_towards_radar_24GHz  ),axis=0)

In [6]:
del bending_24GHz
del crawling_24GHz
del kneeling_24GHz
del limping_with_RL_Stiff_24GHz
del picking_up_an_object_24GHz
del scissors_gait_24GHz
del short_steps_24GHz
del sitting_24GHz
del walking_away_from_Radar_24GHz
del Walking_on_both_toes_24GHz
del Walking_towards_radar_24GHz


In [7]:
# One-Hot Encode Label
# integer encode
label_encoder = LabelEncoder()
integer_encoded = label_encoder.fit_transform(label_concat)
# binary encode
onehot_encoder = OneHotEncoder(sparse=False)
integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)
onehot_encoded_labels = onehot_encoder.fit_transform(integer_encoded)

  y = column_or_1d(y, warn=True)


In [8]:
print(integer_encoded[600,:])
print(onehot_encoded_labels[600,:])

[3]
[0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]


In [9]:
# Concat range-doppler data
range_doppler_concat = spectrogram_concat
range_doppler_concat_label = label_concat
# Shuffle concat range doppler
shuffle_indx = random.sample(range(0, range_doppler_concat.shape[0]), range_doppler_concat.shape[0]) # split validation data
range_doppler_concat_shuffle = range_doppler_concat[shuffle_indx,:,:,:]
range_doppler_concat_label_shuffle = range_doppler_concat_label[shuffle_indx,:]
# Concat range-doppler data
# spectrogram_concat = spectrogram_concat
spectrogram_concat_label = onehot_encoded_labels
# Shuffle concat range doppler
# spectrogram_concat_shuffle = spectrogram_concat[shuffle_indx,:,:,:]
spectrogram_concat_label_shuffle = spectrogram_concat_label[shuffle_indx,:]
# Third Spectrogram
# third_spectrogram_concat = spectrogram_concat
# third_spectrogram_concat_shuffle = third_spectrogram_concat[shuffle_indx,:,:,:]
del range_doppler_concat
del spectrogram_concat

In [10]:
# ---------------- Augmente and shuffle (train and test) data data ----------------
def mixup_augmentation(range_doppler_training_data, labels, repeat_of_mixup, alpha, beta):
    batch_size = range_doppler_training_data.shape[0]
    concat_images_range_doppler = np.zeros((batch_size*(repeat_of_mixup+1),range_doppler_training_data.shape[1],range_doppler_training_data.shape[2],range_doppler_training_data.shape[3]))
    concat_label = np.zeros((batch_size*(repeat_of_mixup+1),labels.shape[1]))

    # integer_training_labels = np.argmax(labels, axis=1)
    # np.where(integer_training_labels == 0)

    if repeat_of_mixup == 0:
      concat_images_range_doppler = range_doppler_training_data
      concat_label = labels      
    else:
      for ii in range(repeat_of_mixup):
        # shuffle train dataset
        shuffle_indx_1 = random.sample(range(0, range_doppler_training_data.shape[0]), range_doppler_training_data.shape[0]) # split validation data
        range_doppler_training_data_shuffled_1 = range_doppler_training_data[shuffle_indx_1,:,:,:]
        labels_shuffled_1 = labels[shuffle_indx_1,:]

        shuffle_indx_2 = random.sample(range(0, range_doppler_training_data.shape[0]), range_doppler_training_data.shape[0]) # split validation data
        range_doppler_training_data_shuffled_2 = range_doppler_training_data[shuffle_indx_2,:,:,:]
        labels_shuffled_2 = labels[shuffle_indx_2,:]

        # shuffle_indx_3 = random.sample(range(0, range_doppler_training_data.shape[0]), range_doppler_training_data.shape[0]) # split validation data
        # range_doppler_training_data_shuffled_3 = range_doppler_training_data[shuffle_indx_3,:,:,:]
        # labels_shuffled_3 = labels[shuffle_indx_3,:]

        # shuffle_indx_4 = random.sample(range(0, range_doppler_training_data.shape[0]), range_doppler_training_data.shape[0]) # split validation data
        # range_doppler_training_data_shuffled_4 = range_doppler_training_data[shuffle_indx_4,:,:,:]
        # labels_shuffled_4 = labels[shuffle_indx_4,:]

        # Sample lambda and reshape it to do the mixup
        # ll = np.random.beta(alpha, beta, (batch_size,1,1,1))
        gaussian_mean = 0.2
        gaussian_std = 0.02
        ll = np.random.normal(gaussian_mean, gaussian_std, (batch_size,1,1,1))
        x_l = np.reshape(ll, (batch_size,1,1,1))
        y_l = np.reshape(ll, (batch_size,1))
      
        ll_2 = np.random.normal(gaussian_mean, gaussian_std, (batch_size,1,1,1))
        x_l_2 = np.reshape(ll_2, (batch_size,1,1,1))
        y_l_2 = np.reshape(ll_2, (batch_size,1))

        ll_3 = np.random.normal(gaussian_mean, gaussian_std, (batch_size,1,1,1))
        x_l_3 = np.reshape(ll_3, (batch_size,1,1,1))
        y_l_3 = np.reshape(ll_3, (batch_size,1))

        
        # Perform mixup on both images and labels by combining a pair of images/labels
        # images_mixup_range_doppler = range_doppler_training_data_shuffled_1 * x_l + range_doppler_training_data_shuffled_2 * x_l_2 +\
        # range_doppler_training_data_shuffled_3 * x_l_3 + range_doppler_training_data_shuffled_4 * (1 - x_l - x_l_2 - x_l_3)
        # labels_mixup = labels_shuffled_1 * y_l + labels_shuffled_2 * y_l_2  + labels_shuffled_3 * y_l_3 + labels_shuffled_4 * (1 - y_l - y_l_2 - y_l_3)
        
        images_mixup_range_doppler = range_doppler_training_data_shuffled_1 * x_l + range_doppler_training_data_shuffled_2 * (1 - x_l)
        labels_mixup = labels_shuffled_1 * y_l + labels_shuffled_2 * (1 - y_l)

        concat_images_range_doppler[ii*batch_size:(ii+1)*batch_size,:,:,:] = images_mixup_range_doppler
        concat_label[ii*batch_size:(ii+1)*batch_size,:] = labels_mixup

      concat_images_range_doppler[repeat_of_mixup*batch_size:,:,:,:] = range_doppler_training_data
      concat_label[repeat_of_mixup*batch_size:,:] = labels
    return (concat_images_range_doppler, concat_label)
def vertical_mixup_augmentation(range_doppler_training_data, labels, repeat_of_mixup, alpha, beta):
    batch_size = range_doppler_training_data.shape[0]
    concat_images_range_doppler = np.zeros((batch_size*(repeat_of_mixup+1),range_doppler_training_data.shape[1],range_doppler_training_data.shape[2],range_doppler_training_data.shape[3]))
    concat_label = np.zeros((batch_size*(repeat_of_mixup+1),labels.shape[1]))

    # integer_training_labels = np.argmax(labels, axis=1)
    # np.where(integer_training_labels == 0)

    if repeat_of_mixup == 0:
      concat_images_range_doppler = range_doppler_training_data
      concat_label = labels      
    else:
      for ii in range(repeat_of_mixup):
        # shuffle train dataset
        shuffle_indx_1 = random.sample(range(0, range_doppler_training_data.shape[0]), range_doppler_training_data.shape[0]) # split validation data
        range_doppler_training_data_shuffled_1 = range_doppler_training_data[shuffle_indx_1,:,:,:]
        labels_shuffled_1 = labels[shuffle_indx_1,:]

        shuffle_indx_2 = random.sample(range(0, range_doppler_training_data.shape[0]), range_doppler_training_data.shape[0]) # split validation data
        range_doppler_training_data_shuffled_2 = range_doppler_training_data[shuffle_indx_2,:,:,:]
        labels_shuffled_2 = labels[shuffle_indx_2,:]

        images_mixup_range_doppler = np.zeros((range_doppler_training_data.shape))

        gaussian_mean = 0.15
        gaussian_std = 0.02
        ll_1 = np.random.normal(gaussian_mean, gaussian_std, (batch_size,1))
        vertical_resolution = range_doppler_training_data_shuffled_1.shape[2]
        index_number_1 = int(vertical_resolution*ll_1[0])
        index_number_2 = vertical_resolution - index_number_1
        shuffle_indx_1 = random.sample(range(0, vertical_resolution), index_number_1) # split validation data
        shuffle_indx_2 = np.delete(range(0, vertical_resolution), shuffle_indx_1) # split training data

        images_mixup_range_doppler[:,:,shuffle_indx_1,0] = range_doppler_training_data_shuffled_1[:,:,shuffle_indx_1,0]
        images_mixup_range_doppler[:,:,shuffle_indx_2,0] = range_doppler_training_data_shuffled_2[:,:,shuffle_indx_2,0]
        
        # images_mixup_range_doppler = range_doppler_training_data_shuffled_1 * x_l + range_doppler_training_data_shuffled_2 * (1 - x_l)

        labels_mixup = labels_shuffled_1 * ll_1 + labels_shuffled_2 * (1 - ll_1)

        concat_images_range_doppler[ii*batch_size:(ii+1)*batch_size,:,:,:] = images_mixup_range_doppler
        concat_label[ii*batch_size:(ii+1)*batch_size,:] = labels_mixup

      concat_images_range_doppler[repeat_of_mixup*batch_size:,:,:,:] = range_doppler_training_data
      concat_label[repeat_of_mixup*batch_size:,:] = labels
    return (concat_images_range_doppler, concat_label)
def split_and_augmentation_of_training(range_doppler_concat_shuffle_train,range_doppler_concat_label_shuffle_train,\
                                       repeat_of_mixup, augmentation_enable):
  # ---------------- Parameters ----------------
  # to split train and validation with same distribution
  alpha = 0.1
  beta = 5
  dummy_label = np.zeros((range_doppler_concat_shuffle_train.shape[0],1))
  for randomlist_for_train_indx, randomlist_for_validation_indx in kfold.split(range_doppler_concat_shuffle_train,dummy_label):   
    randomlist_for_validation_indx
  # size_of_validation = int(0.2*range_doppler_concat_shuffle_train.shape[0])
  # # Split validation
  # randomlist_for_validation_indx = random.sample(range(0, range_doppler_concat_shuffle_train.shape[0]), size_of_validation) # split validation data
  # randomlist_for_train_indx = np.delete(range(0, range_doppler_concat_shuffle_train.shape[0]), randomlist_for_validation_indx) # split training data
  # get validation data
  spectrogram_validation_labels = range_doppler_concat_label_shuffle_train[randomlist_for_validation_indx,:]
  range_doppler_validation_data = range_doppler_concat_shuffle_train[randomlist_for_validation_indx,:,:,:]
  # get training data
  spectrogram_training_labels = spectrogram_concat_label_shuffle_train[randomlist_for_train_indx,:]
  range_doppler_training_data = range_doppler_concat_shuffle_train[randomlist_for_train_indx,:,:,:]

  (range_doppler_augmented_image,spectrograms_label)=\
   mixup_augmentation(range_doppler_training_data, spectrogram_training_labels, repeat_of_mixup, alpha, beta)

  # (range_doppler_augmented_image,spectrograms_label)=\
  #  vertical_mixup_augmentation(range_doppler_training_data, spectrogram_training_labels, repeat_of_mixup, alpha, beta)

  return (range_doppler_augmented_image,spectrograms_label,\
     range_doppler_validation_data, spectrogram_validation_labels)



def normalize_inputs(range_doppler_concat_shuffle, normalize_inputs_enable):
  # ---------------- Normalize Inputs ----------------
  if normalize_inputs_enable == True:
    layer = Normalization(axis=None)
    layer.adapt(range_doppler_concat_shuffle)
    range_doppler_concat_shuffle = layer(range_doppler_concat_shuffle)
  else:
    range_doppler_concat_shuffle = range_doppler_concat_shuffle
  return(range_doppler_concat_shuffle)


In [11]:
range_doppler_concat_shuffle_new = np.zeros((range_doppler_concat_shuffle.shape[0],402,512,1))
for ii in range(range_doppler_concat_shuffle.shape[0]):
  aa = range_doppler_concat_shuffle[ii,:,:,0]
  range_doppler_concat_shuffle_new[ii,:,:,0] = cv2.resize(aa, (range_doppler_concat_shuffle_new.shape[2], range_doppler_concat_shuffle_new.shape[1]),interpolation = cv2.INTER_CUBIC)
range_doppler_concat_shuffle = range_doppler_concat_shuffle_new
del aa
del range_doppler_concat_shuffle_new

In [12]:
normalize_inputs_enable = 1
range_doppler_concat_shuffle = normalize_inputs(range_doppler_concat_shuffle, normalize_inputs_enable)
range_doppler_concat_shuffle = np.float32(range_doppler_concat_shuffle)

In [13]:
n_features = range_doppler_concat_shuffle.shape[1]
n_steps = range_doppler_concat_shuffle.shape[2]
range_doppler_concat_shuffle = np.transpose(range_doppler_concat_shuffle, axes = (0,2,1,3)) 

In [14]:
from keras.layers import *
from keras.models import *
from keras import backend as K

class attention(Layer):
    
    def __init__(self, return_sequences=True):
        self.return_sequences = return_sequences
        super(attention,self).__init__()
        
    def build(self, input_shape):
        
        self.W=self.add_weight(name="att_weight", shape=(input_shape[-1],1),
                               initializer="normal")
        self.b=self.add_weight(name="att_bias", shape=(input_shape[1],1),
                               initializer="zeros")
        
        super(attention,self).build(input_shape)
        
    def call(self, x):
        
        e = K.tanh(K.dot(x,self.W)+self.b)
        a = K.softmax(e, axis=1)
        # a = K.sigmoid(e)
        output = x*a
        
        if self.return_sequences:
            return output
        
        return K.sum(output, axis=1)

In [15]:
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

In [16]:

# ---------- Parameters ----------------
augmentation_enable = True
normalize_inputs_enable = True
num_folds = 5
kfold = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state = None) # random_state = 1 ile split run'dan run'a sabit.
test_accuracy_per_run = []
f1_score_per_run = []
# Parameters
epoch_number = 100
batch_size = 32
repeat_of_mixup = 0
number_of_repeat = 3
unit_number_of_lstm = 128
# Regularizers
lstm_dropout_rate = 0.1
recurrent_dropout_rate = 0.0
recurrent_regularizer_value = l1(0.0000)
n_classes = 128
for repeat_run_number in range(number_of_repeat):
  test_accuracy_per_fold = []
  f1_score_per_fold = []
  for train, test in kfold.split(range_doppler_concat_shuffle,range_doppler_concat_label_shuffle):   
    gc.collect()
    K.clear_session()

    randomlist_for_test_indx = test
    randomlist_for_train_indx = train
    # test data
    spectrogram_concat_label_shuffle_test = spectrogram_concat_label_shuffle[randomlist_for_test_indx,:]
    range_doppler_concat_shuffle_test = range_doppler_concat_shuffle[randomlist_for_test_indx,:,:,:]
    #train data
    spectrogram_concat_label_shuffle_train = spectrogram_concat_label_shuffle[randomlist_for_train_indx,:]
    range_doppler_concat_shuffle_train = range_doppler_concat_shuffle[randomlist_for_train_indx,:,:,:]
    # ---------------- Split labels to equal them during augmentation for Validation ----------------
    (range_doppler_augmented_image,spectrogram_concat_label_shuffle_concat,\
     validation_range_doppler, spectrogram_validation_labels)  =\
      split_and_augmentation_of_training(range_doppler_concat_shuffle_train,\
                                         spectrogram_concat_label_shuffle_train,\
                                         repeat_of_mixup, augmentation_enable)


    def lstm_encoder_network_1(input_shape):
      input = Input(shape=input_shape)
      x = Bidirectional(GRU(unit_number_of_lstm, return_sequences=True, dropout = lstm_dropout_rate,
                             recurrent_dropout= recurrent_dropout_rate, kernel_regularizer=recurrent_regularizer_value))(input)                    
      x = AveragePooling1D(pool_size=2, strides=None)(x)
      x = Bidirectional(GRU(int(unit_number_of_lstm), return_sequences=True, dropout = int(lstm_dropout_rate),
                             recurrent_dropout= recurrent_dropout_rate, kernel_regularizer=recurrent_regularizer_value))(x) 
      x = AveragePooling1D(pool_size=2, strides=None)(x)      
      x = Bidirectional(GRU(int(unit_number_of_lstm), return_sequences=True, dropout = int(lstm_dropout_rate),
                             recurrent_dropout= recurrent_dropout_rate, kernel_regularizer=recurrent_regularizer_value))(x) 
      x = AveragePooling1D(pool_size=2, strides=None)(x)                          
      x = tf.transpose(x, perm=[0,2,1])
      x = attention(return_sequences=True)(x)
      x = tf.transpose(x, perm=[0,2,1])        
      x = GlobalAveragePooling1D()(x)
      return Model(input, x)


    def lstm_encoder_network_2(input_shape):
      input = Input(shape=input_shape)
      x = Bidirectional(LSTM(unit_number_of_lstm, return_sequences=True, dropout = lstm_dropout_rate,
                             recurrent_dropout= recurrent_dropout_rate, kernel_regularizer=recurrent_regularizer_value))(input)                            
      x = AveragePooling1D(pool_size=2, strides=None)(x)
      x = Bidirectional(LSTM(int(unit_number_of_lstm), return_sequences=True, dropout = int(lstm_dropout_rate),
                             recurrent_dropout= recurrent_dropout_rate, kernel_regularizer=recurrent_regularizer_value))(x) 
      x = AveragePooling1D(pool_size=2, strides=None)(x)      
      x = Bidirectional(LSTM(int(unit_number_of_lstm), return_sequences=True, dropout = int(lstm_dropout_rate),
                             recurrent_dropout= recurrent_dropout_rate, kernel_regularizer=recurrent_regularizer_value))(x) 
      x = AveragePooling1D(pool_size=2, strides=None)(x)  
      x = tf.transpose(x, perm=[0,2,1])
      x = attention(return_sequences=True)(x)
      x = tf.transpose(x, perm=[0,2,1])        
      x = GlobalAveragePooling1D()(x)
      return Model(input, x)
  
    def decoder_for_concat(input_shape):
      input = Input(shape=input_shape)
      x = BatchNormalization()(input)
      # x = Dense(512)(x)
      # x = BatchNormalization()(x)
      # x = Activation('LeakyReLU')(x)
      # x = Dropout(0.25)(x)
      x = Dense(64)(x)
      x = BatchNormalization()(x)
      x = Activation('LeakyReLU')(x)
      x = Dropout(0.15)(x)
      x = Dense(11, activation="softmax")(x)
      return Model(input, x)

    input_shape = range_doppler_concat_shuffle.shape[1:3]

    base_network_lstm_1 = lstm_encoder_network_1(input_shape)
    lstm_input_1  = Input(shape=input_shape)
    processed_lstm_1  = base_network_lstm_1(lstm_input_1)

    base_network_lstm_2 = lstm_encoder_network_2(input_shape)
    processed_lstm_2  = base_network_lstm_2(lstm_input_1)

    concat_layer = Concatenate()([processed_lstm_1, processed_lstm_2])

    base_decoder_network = decoder_for_concat((concat_layer.shape[1]))
    out = base_decoder_network(concat_layer)

    model = Model(inputs=[lstm_input_1], outputs=[out])

    print(model.summary()) 

    # ---------------- Compile and Fit ----------------
    t = time.time()
    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])
    
    earlyStopping = EarlyStopping(monitor='val_loss', patience=13, verbose=0,restore_best_weights=True, mode='min')
    history = model.fit((range_doppler_augmented_image),(spectrogram_concat_label_shuffle_concat),
                    epochs=epoch_number,
                    batch_size=batch_size,
                    shuffle = True,
                    verbose = 0,
                    callbacks=[earlyStopping],
                    validation_data = ((validation_range_doppler) , (spectrogram_validation_labels)))
    elapsed = time.time() - t
    tf.keras.models.load_model
    test_loss, test_accuracy  = model.evaluate([range_doppler_concat_shuffle_test],\
                                               [spectrogram_concat_label_shuffle_test],
                  batch_size=batch_size)
    test_accuracy_per_fold.append(test_accuracy)
    print(test_accuracy_per_fold)
    # f1_score_per_fold.append(test_f1_score)
  test_accuracy_per_run.append(sum(test_accuracy_per_fold)/num_folds)
  # f1_score_per_run.append(sum(f1_score_per_fold)/num_folds)
  print(test_accuracy_per_run)
  # print(f1_score_per_run)

print(f'Mean test accuracy is {"{:.3f}".format(sum(test_accuracy_per_run)/number_of_repeat)},\
 max test accuracy is {"{:.3f}".format(max(test_accuracy_per_run))},\
 min test accuracy is {"{:.3f}".format(min(test_accuracy_per_run))}, \
 std of test accuracy is {"{:.3f}".format(np.std(test_accuracy_per_run, axis=0))}.')

print(f'Time elapsed through all process: {"{:.2f}".format(elapsed)}, sec')

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 512, 402)]   0           []                               
                                                                                                  
 model (Functional)             (None, 256)          1001792     ['input_2[0][0]']                
                                                                                                  
 model_1 (Functional)           (None, 256)          1332544     ['input_2[0][0]']                
                                                                                                  
 concatenate (Concatenate)      (None, 512)          0           ['model[0][0]',                  
                                                                  'model_1[0][0]']          

In [17]:
t_test = time.time()
y_test_predicted = model.predict((range_doppler_concat_shuffle_test[0:1,:,:,:]))
test_elapsed = time.time() - t_test 
print(f'Time elapsed during test time: {"{:.2f}".format(test_elapsed)}, sec')

Time elapsed during test time: 3.34, sec


In [18]:
print(f'Min test accuracy is {"{:.3f}".format(min(test_accuracy_per_run))}, \
Max test accuracy is {"{:.3f}".format(max(test_accuracy_per_run))}, \
Mean test accuracy is {"{:.3f}".format(sum(test_accuracy_per_run)/number_of_repeat)}, \
std of test accuracy is {"{:.3f}".format(np.std(test_accuracy_per_run, axis=0))}.')
print(f'Time elapsed during training time: {"{:.3f}".format(elapsed)} sec')
print(f'Time elapsed during test time: {"{:.3f}".format(test_elapsed)} sec')

Min test accuracy is 0.842, Max test accuracy is 0.855, Mean test accuracy is 0.850, std of test accuracy is 0.006.
Time elapsed during training time: 322.539 sec
Time elapsed during test time: 3.336 sec
