<a href="https://colab.research.google.com/github/mertege/FMCW-Data-Classification-/blob/main/Range_Doppler_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Burada range-doppler dataları CNN için kullanıldı.
# Sadece 'fast' dataset 1 kat augmente ediliyor.
# 5 defa run edildi ve değerler aşağıda paylaşıldı.
# Mean test accuracy is 0.90, mean test f1 score is 0.88, max test accuracy is 0.92, max test f1 score is 0.91, 
# Min test accuracy is 0.85, min test f1 score is 0.83, std of test accuracy is 0.03, std of test f1 score is 0.03
# Time elapsed through all process: 1688.79, sec

In [1]:
from keras.models import Sequential
from tensorflow.keras import layers
from keras.layers import Conv2D, MaxPooling2D
from keras.layers import Activation, Dropout, Flatten, Dense, BatchNormalization, Normalization
import tensorflow as tf
import scipy.io
import numpy as np
import cv2
import matplotlib.pyplot as plt
import random
from numpy.random import seed
from sklearn.model_selection import KFold, StratifiedKFold
import time
from sklearn.metrics import precision_recall_fscore_support
from keras.callbacks import EarlyStopping

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
range_doppler_fast_resized = scipy.io.loadmat('/content/drive/MyDrive/range_doppler_fast_resized.mat')
range_doppler_fast_resized = range_doppler_fast_resized['range_doppler_fast_resized']
range_doppler_fast_resized = np.transpose(range_doppler_fast_resized, (2, 0, 1))
range_doppler_fast_label = scipy.io.loadmat('/content/drive/MyDrive/range_doppler_fast_label.mat')
range_doppler_fast_label = range_doppler_fast_label['range_doppler_fast_label']  

range_doppler_slow_resized = scipy.io.loadmat('/content/drive/MyDrive/range_doppler_slow_resized.mat')
range_doppler_slow_resized = range_doppler_slow_resized['range_doppler_slow_resized']
range_doppler_slow_resized = np.transpose(range_doppler_slow_resized, (2, 0, 1))
range_doppler_slow_label = scipy.io.loadmat('/content/drive/MyDrive/range_doppler_slow_label.mat')
range_doppler_slow_label = range_doppler_slow_label['range_doppler_slow_label']  

range_doppler_slow_pocket_resized = scipy.io.loadmat('/content/drive/MyDrive/range_doppler_slow_pocket_resized.mat')
range_doppler_slow_pocket_resized = range_doppler_slow_pocket_resized['range_doppler_slow_pocket_resized']
range_doppler_slow_pocket_resized = np.transpose(range_doppler_slow_pocket_resized, (2, 0, 1))
range_doppler_pocket_label = scipy.io.loadmat('/content/drive/MyDrive/range_doppler_pocket_label.mat')
range_doppler_pocket_label = range_doppler_pocket_label['range_doppler_pocket_label']  

In [4]:
range_doppler_concat = np.concatenate((range_doppler_fast_resized,range_doppler_slow_resized),axis=0)
range_doppler_concat = np.concatenate((range_doppler_concat,range_doppler_slow_pocket_resized),axis=0)
range_doppler_concat = range_doppler_concat[:,:,:,np.newaxis] 
range_doppler_concat_label = np.zeros((57*3,1))
range_doppler_concat_label[:57,:] = 1
# Shuffle concat range doppler
shuffle_indx = random.sample(range(0, range_doppler_concat.shape[0]), range_doppler_concat.shape[0]) # split validation data
range_doppler_concat_shuffle = range_doppler_concat[shuffle_indx,:,:,:]
range_doppler_concat_label_shuffle = range_doppler_concat_label[shuffle_indx,:]

In [16]:
# ---------------- Augmente and shuffle (train and test) data data ----------------
data_augmentation = tf.keras.Sequential([
  layers.RandomFlip("horizontal_and_vertical"),
  layers.RandomRotation(0.2),
])
def split_and_augmentation_of_training(range_doppler_concat_shuffle_train,range_doppler_concat_label_shuffle_train, augmentation_enable):
  # ---------------- Parameters ----------------
  repeat_of_augmentation_for_fast = 1
  repeat_of_augmentation_for_slow = np.floor(repeat_of_augmentation_for_fast/2)
  repeat_of_augmentation_for_slow = int(repeat_of_augmentation_for_slow)
  slow_size_of_validation = 20
  fast_size_of_validation = slow_size_of_validation/2
  fast_size_of_validation = int(fast_size_of_validation)
  slow_indexes = np.where(range_doppler_concat_label_shuffle_train == 0)[0]
  fast_indexes = np.delete(range(0, range_doppler_concat_label_shuffle_train.shape[0]), slow_indexes)
  slow_spectrograms_train_val = range_doppler_concat_shuffle_train[slow_indexes,:,:,:]
  fast_spectrograms_train_val = range_doppler_concat_shuffle_train[fast_indexes,:,:,:]
  # ---------------- Seperate Validation From Training Data ----------------
  # -- Slow --
  randomlist_for_validation_indx = random.sample(range(0, slow_spectrograms_train_val.shape[0]), slow_size_of_validation) # split validation data
  randomlist_for_train_indx = np.delete(range(0, slow_spectrograms_train_val.shape[0]), randomlist_for_validation_indx) # split training data
  slow_spectrograms_train = slow_spectrograms_train_val[randomlist_for_train_indx,:,:,:]
  slow_spectrograms_val = slow_spectrograms_train_val[randomlist_for_validation_indx,:,:,:]
  slow_label_val = np.zeros((slow_size_of_validation,1))
  size_of_samples_slow = slow_spectrograms_train.shape[0]
  # -- Fast -- 
  randomlist_for_validation_indx = random.sample(range(0, fast_spectrograms_train_val.shape[0]), fast_size_of_validation) # split validation data
  randomlist_for_train_indx = np.delete(range(0, fast_spectrograms_train_val.shape[0]), randomlist_for_validation_indx) # split training data
  fast_spectrograms_train = fast_spectrograms_train_val[randomlist_for_train_indx,:,:,:]
  fast_spectrograms_val = fast_spectrograms_train_val[randomlist_for_validation_indx,:,:,:]
  fast_label_val = np.ones((fast_size_of_validation,1))
  size_of_samples_fast = fast_spectrograms_train.shape[0]
  # -- Concat Fast and Slow Data at Validation Dataset -- 
  validation_spectrograms = np.concatenate((fast_spectrograms_val,slow_spectrograms_val),axis=0)
  validation_labels = np.concatenate((fast_label_val,slow_label_val),axis=0)
  if augmentation_enable == True: 
    # ---------------- Augmente Train Data for Fast ----------------
    augmented_image_fast = np.zeros((size_of_samples_fast*repeat_of_augmentation_for_fast,fast_spectrograms_train.shape[1],fast_spectrograms_train.shape[2],1))
    spectrograms_fast_label = np.ones((size_of_samples_fast*(repeat_of_augmentation_for_fast+1),1))
    for jj in range(repeat_of_augmentation_for_fast):
      for ii in range(size_of_samples_fast):
        augmented_image_fast[size_of_samples_fast*jj+ii,:,:,:] = data_augmentation(fast_spectrograms_train[ii,:,:,:])
    augmented_image_fast = np.concatenate((augmented_image_fast,fast_spectrograms_train),axis=0)   
    # ---------------- Augmente Train Data for Slow ----------------
    augmented_image_slow = np.zeros((size_of_samples_slow*repeat_of_augmentation_for_slow,slow_spectrograms_train.shape[1],slow_spectrograms_train.shape[2],1))
    spectrograms_slow_label = np.zeros((size_of_samples_slow*(repeat_of_augmentation_for_slow+1),1))
    if repeat_of_augmentation_for_slow == 0:
      augmented_image_slow = slow_spectrograms_train
    else:
      for kk in range(repeat_of_augmentation_for_slow):
        for ii in range(size_of_samples_slow):
          augmented_image_slow[size_of_samples_slow*kk+ii,:,:,:] = data_augmentation(slow_spectrograms_train[ii,:,:,:])
      augmented_image_slow = np.concatenate((augmented_image_slow,slow_spectrograms_train),axis=0)    
  else:
    augmented_image_fast = fast_spectrograms_train
    augmented_image_slow = slow_spectrograms_train
    spectrograms_fast_label = np.ones((size_of_samples_fast,1))
    spectrograms_slow_label = np.zeros((size_of_samples_slow,1))
  return (augmented_image_fast,augmented_image_slow,spectrograms_fast_label,spectrograms_slow_label,validation_spectrograms,validation_labels)


def normalize_inputs(range_doppler_concat_shuffle_test, validation_spectrograms, augmented_image, normalize_inputs_enable):
  # ---------------- Normalize Inputs ----------------
  if normalize_inputs_enable == True:
    layer = Normalization(axis=None)
    layer.adapt(range_doppler_concat_shuffle_test)
    range_doppler_concat_shuffle_test = layer(range_doppler_concat_shuffle_test)
    layer = Normalization(axis=None)
    layer.adapt(validation_spectrograms)
    validation_spectrograms = layer(validation_spectrograms)
    layer = Normalization(axis=None)
    layer.adapt(augmented_image)
    augmented_image = layer(augmented_image)
  else:
    (range_doppler_concat_shuffle_test, validation_spectrograms, augmented_image) = (range_doppler_concat_shuffle_test, validation_spectrograms, augmented_image)
  return(range_doppler_concat_shuffle_test, validation_spectrograms, augmented_image)

In [None]:
t = time.time()
# ---------- Parameters ----------------
augmentation_enable = True
normalize_inputs_enable = True
num_folds = 5
kfold = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state = None) # random_state = 1 ile split run'dan run'a sabit.
test_accuracy_per_run = []
f1_score_per_run = []
epoch_number = 100
batch_size = 32
dense_size = 32
dropout_prob_cnn = 0.1
dropout_prob_dense = 0.5
number_of_repeat = 5
for repeat_run_number in range(number_of_repeat):
  test_accuracy_per_fold = []
  f1_score_per_fold = []
  for train, test in kfold.split(range_doppler_concat_shuffle,range_doppler_concat_label_shuffle):   
    randomlist_for_validation_indx = test
    randomlist_for_train_indx = train
    # test data
    range_doppler_concat_shuffle_test = range_doppler_concat_shuffle[randomlist_for_validation_indx,:,:,:]
    range_doppler_concat_label_shuffle_test = range_doppler_concat_label_shuffle[randomlist_for_validation_indx,:]
    #train data
    range_doppler_concat_shuffle_train = range_doppler_concat_shuffle[randomlist_for_train_indx,:,:,:]
    range_doppler_concat_label_shuffle_train = range_doppler_concat_label_shuffle[randomlist_for_train_indx,:]
      # ---------------- Split labels to equal them during augmentation for Validation ----------------
    (augmented_image_fast,augmented_image_slow,spectrograms_fast_label,spectrograms_slow_label,validation_spectrograms,validation_labels)  = split_and_augmentation_of_training(range_doppler_concat_shuffle_train,range_doppler_concat_label_shuffle_train, augmentation_enable)
    # ---------------- Concat Validation and Slow ----------------
    augmented_image = np.concatenate((augmented_image_fast,augmented_image_slow),axis=0)
    range_doppler_concat_label_shuffle_concat = np.concatenate((spectrograms_fast_label,spectrograms_slow_label),axis=0)

    (range_doppler_concat_shuffle_test, validation_spectrograms, augmented_image) = normalize_inputs(range_doppler_concat_shuffle_test, validation_spectrograms, augmented_image, normalize_inputs_enable)

    # ---------------- Neural Network Architecture ----------------
    model = Sequential()

    initializer = tf.keras.initializers.HeNormal()

    model.add(Conv2D(4, (3, 3), input_shape=augmented_image.shape[1:],kernel_initializer=initializer))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dropout(dropout_prob_cnn))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(8, (3, 3), input_shape=augmented_image.shape[1:],kernel_initializer=initializer))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dropout(dropout_prob_cnn))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    # model.add(Conv2D(16, (3, 3),kernel_initializer=initializer))
    # model.add(BatchNormalization())
    # model.add(Activation('relu'))
    # model.add(Dropout(dropout_prob_cnn))
    # model.add(MaxPooling2D(pool_size=(2, 2)))


    # model.add(Conv2D(64, (3, 3)))
    # model.add(BatchNormalization())
    # model.add(Activation('relu'))
    # model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())  
    model.add(Dense(dense_size,kernel_initializer=initializer))
    model.add(Activation('relu'))
    model.add(Dropout(dropout_prob_dense))


    model.add(Dense(1,kernel_initializer=initializer))
    model.add(Activation('sigmoid'))

    print(model.summary())
    # ---------------- Compile and Fit ----------------
    model.compile(loss='binary_crossentropy',
                  optimizer='rmsprop',
                  metrics=['accuracy'])
    
    earlyStopping = EarlyStopping(monitor='val_loss', patience=35, verbose=0,restore_best_weights=True, mode='min')
    # earlyStopping = EarlyStopping(monitor='val_accuracy', patience=15, verbose=0,restore_best_weights=True, mode='max')
    history = model.fit((augmented_image),(range_doppler_concat_label_shuffle_concat),
                    epochs=epoch_number,
                    batch_size=batch_size,
                    shuffle = True,
                    callbacks=[earlyStopping],
                    validation_data = ((validation_spectrograms) , (validation_labels)))
    tf.keras.models.load_model
    test_loss, test_accuracy  = model.evaluate((range_doppler_concat_shuffle_test),(range_doppler_concat_label_shuffle_test),
                  batch_size=batch_size)
    # ---------------- Get Test Results ----------------
    y_test_predicted = model.predict((range_doppler_concat_shuffle_test), batch_size=batch_size)
    # ----- Binarize y_test_predicted values -----
    y_test_predicted_binary = np.zeros(y_test_predicted.size)
    for ii in range(y_test_predicted.size):
      if y_test_predicted[ii] < 0.5:
        y_test_predicted_binary[ii] = 0
      else:
        y_test_predicted_binary[ii] = 1
    
    test_precision, test_recall, test_f1_score, support = precision_recall_fscore_support(range_doppler_concat_label_shuffle_test, y_test_predicted_binary, average='macro')

    test_accuracy_per_fold.append(test_accuracy)
    f1_score_per_fold.append(test_f1_score)

  test_accuracy_per_run.append(sum(test_accuracy_per_fold)/num_folds)
  f1_score_per_run.append(sum(f1_score_per_fold)/num_folds)
print(f'Mean test accuracy is {"{:.2f}".format(sum(test_accuracy_per_run)/number_of_repeat)}, mean test f1 score is {"{:.2f}".format(sum(f1_score_per_run)/number_of_repeat)}, \
max test accuracy is {"{:.2f}".format(max(test_accuracy_per_run))}, max test f1 score is {"{:.2f}".format(max(f1_score_per_run))}, \
min test accuracy is {"{:.2f}".format(min(test_accuracy_per_run))}, min test f1 score is {"{:.2f}".format(min(f1_score_per_run))}, \
std of test accuracy is {"{:.2f}".format(np.std(test_accuracy_per_run, axis=0))}, std of test f1 score is {"{:.2f}".format(np.std(f1_score_per_run, axis=0))}')
elapsed = time.time() - t
print(f'Time elapsed through all process: {"{:.2f}".format(elapsed)}, sec')

In [19]:
print(f'Mean test accuracy is {"{:.2f}".format(sum(test_accuracy_per_run)/number_of_repeat)}, mean test f1 score is {"{:.2f}".format(sum(f1_score_per_run)/number_of_repeat)}, \
max test accuracy is {"{:.2f}".format(max(test_accuracy_per_run))}, max test f1 score is {"{:.2f}".format(max(f1_score_per_run))}, \
min test accuracy is {"{:.2f}".format(min(test_accuracy_per_run))}, min test f1 score is {"{:.2f}".format(min(f1_score_per_run))}, \
std of test accuracy is {"{:.2f}".format(np.std(test_accuracy_per_run, axis=0))}, std of test f1 score is {"{:.2f}".format(np.std(f1_score_per_run, axis=0))}')
print(f'Time elapsed through all process: {"{:.2f}".format(elapsed)}, sec')

Mean test accuracy is 0.90, mean test f1 score is 0.88, max test accuracy is 0.92, max test f1 score is 0.91, min test accuracy is 0.85, min test f1 score is 0.83, std of test accuracy is 0.03, std of test f1 score is 0.03
Time elapsed through all process: 1688.79, sec


In [None]:
# Test Amaçlı
model.predict(augmented_image)

In [None]:
# Test Amaçlı
model.predict(range_doppler_concat_shuffle_test)