<a href="https://colab.research.google.com/github/suresha97/MSc_Project/blob/main/CNN_Baseline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup Workspace

In [None]:
#load packages
import scipy
from scipy import signal
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import time
from IPython import display
import copy 
import pickle

from sklearn.cluster import KMeans
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import sklearn.metrics
from sklearn import preprocessing
from sklearn.model_selection import StratifiedShuffleSplit

import torch
import torchvision
from torchvision import transforms
from torchvision.transforms import Normalize, Resize, ToTensor
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import PIL
from PIL import Image
import cv2

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Load Data

## Spectrograms




In [None]:
#List of strings for each particiapnt
parts = []
nums = [str(x) for x in range(1,10,1)]
ten = ['0'+x for x in nums]
others = [str(x) for x in range(10,24,1)]
participants = ten+others

ppg_full_list = [] # 32 elements of 40 x 80 x 80 arrays
resp_full_list = []

#Load spectorgrams of PPG and RESP signals for each particiapnt
for part in participants: 
  l_ppg = np.load('/content/drive/My Drive/PHYSIO/Generated_Specs/PPG_28/ppg_spec_{}_28.npy'.format(part)) #40 x 80 x 80
  l_resp = np.load('/content/drive/My Drive/PHYSIO/Generated_Specs/RESP_28/resp_spec_{}_28.npy'.format(part)) #40 x 80 x 80 

  ppg_full_list.append(l_ppg)
  resp_full_list.append(l_resp)

print(len(ppg_full_list))
ppg_full_array = np.vstack(ppg_full_list)
print(ppg_full_array.shape)

print(len(resp_full_list))
resp_full_array = np.vstack(resp_full_list)
print(resp_full_array.shape)

23
(920, 28, 28)
23
(920, 28, 28)


## Labels

In [None]:
#Load either K-means clusters data, manual threshold clusters or normalised threshold clusters
#labels_df = pd.read_csv("/content/drive/My Drive/PHYSIO/Generated_Specs/all_labels.csv") #class_4 only in all_labels.csv
labels_tdf = pd.read_csv("/content/drive/My Drive/PHYSIO/Generated_Specs/all_labels_threshold.csv") 
#labels_df = pd.read_csv("/content/drive/My Drive/PHYSIO/Generated_Specs/threshold_norm.csv") 

In [None]:
labels_tdf.head()

Unnamed: 0,val,aro,dom,lik,val3,aro3,dom3,lik3
0,1,1,1,1,2,2,2,2
1,1,1,1,1,2,2,2,2
2,1,1,1,1,2,2,2,2
3,1,1,1,1,1,2,2,2
4,1,0,1,1,2,0,2,2


In [None]:
#Print class proportion numbers for current clustering method
for array in list(labels_tdf.columns):
  (unique, counts) = np.unique(labels_tdf[array][0:920].to_numpy(), return_counts=True)
  frequencies = np.asarray((unique, counts/920)).T
  print(str(array))
  print(frequencies)
  print()

val
[[0.         0.37173913]
 [1.         0.62826087]]

aro
[[0.         0.36956522]
 [1.         0.63043478]]

dom
[[0.         0.37173913]
 [1.         0.62826087]]

lik
[[0.         0.29130435]
 [1.         0.70869565]]

val3
[[0.         0.29021739]
 [1.         0.3       ]
 [2.         0.40978261]]

aro3
[[0.         0.29130435]
 [1.         0.31413043]
 [2.         0.39456522]]

dom3
[[0.         0.30217391]
 [1.         0.33913043]
 [2.         0.35869565]]

lik3
[[0.         0.24673913]
 [1.         0.21086957]
 [2.         0.5423913 ]]



## Run to Combine K-means Clusters

In [None]:
#Combine either the 1st and 2nd cluster into 1 or the 2nd and 3rd cluster into 1
labels_newk = {}
val_temp = labels_df['val3'].to_numpy()
where_1 = np.where(val_temp == 1)
val_temp[where_1] = 0
where_2 = np.where(val_temp == 2)
val_temp[where_2] = 1

aro_temp = labels_df['aro3'].to_numpy()
where_1 = np.where(aro_temp == 1)
aro_temp[where_1] = 0
where_2 = np.where(aro_temp == 2)
aro_temp[where_2] = 1

lik_temp = labels_df['lik3'].to_numpy()
where_1 = np.where(lik_temp == 1)
lik_temp[where_1] = 0
where_2 = np.where(lik_temp == 2)
lik_temp[where_2] = 1

labels_newk['val'] = val_temp
labels_newk['aro'] = aro_temp
labels_newk['lik'] = lik_temp

#Revised class proprtions after combinging clusters
lab_newk = pd.DataFrame.from_dict(labels_newk)
for array in list(lab_newk.columns):
  (unique, counts) = np.unique(lab_newk[array][0:920].to_numpy(), return_counts=True)
  frequencies = np.asarray((unique, counts)).T
  print(str(array))
  print(frequencies)
  print()

val
[[  0 563]
 [  1 357]]

aro
[[  0 641]
 [  1 279]]

lik
[[  0 536]
 [  1 384]]



# Prepare Data for Pytorch

In [None]:
#Name signal arrays and choose clustering type to train
data_X = ppg_full_array
data_X2 = resp_full_array
data_Y = labels_tdf['aro'][0:len(data_X)].to_numpy()

#data_Y = lab_newk['val'][0:len(data_X)].to_numpy()
#Label encoding for clusterins with quadrant names as labels
#le = preprocessing.LabelEncoder()
#data_Y = le.fit_transform(data_Y)
#print(data_Y[0:20])

image_size = data_X.shape[1]


## Run For Standardisation

In [None]:
#Convert arrays to tensor
all_x = torch.tensor(data_X)
all_x2 = torch.tensor(data_X2)

#Find mean and SD for spectrograms of each signal
mean1 = all_x.mean().item()
std1 = all_x.std().item()
mean2 = all_x2.mean().item()
std2 = all_x2.std().item()

#Normalise data to have 0 mean and standard deviation of 1 i.e. be between [-1,1]
transform = transforms.Compose([transforms.Normalize(mean1,std1)])
transform2 = transforms.Compose([transforms.Normalize(mean2,std2)])

all_x = transform(all_x)
all_x = all_x.numpy()

all_x2 = transform2(all_x2)
all_x2 = all_x2.numpy()

## Run for MinMax Scaling

In [None]:
#Find min and max values for spectrograms of each signal
ppg_max = np.amax(data_X)
ppg_min = np.amin(data_X)
resp_max = np.amax(data_X2)
resp_min = np.amin(data_X2)

print(ppg_max)
print(ppg_min)
print(resp_max)
print(resp_min)

#Apply min max normalisation to data so that all values are between 0 and 1
data_X = (data_X - ppg_min)/(ppg_max-ppg_min)
data_X2 = (data_X2 - resp_min)/(resp_max-resp_min)

0.76465565
8.650511e-06
0.8332644
8.939816e-06


In [None]:
#For 2 channel input, convert seperate siganl arrays to 920 x 2 x 28 x 28
to_stack = [data_X, data_X2]
two_sig = np.stack(np.asarray(to_stack), axis = 1) 
print(two_sig.shape)

(920, 2, 28, 28)


## Data Splits - Single Signal

In [None]:
def data_split(data_feat,data_lab, channel):
  #Make training + (val+test) sets 60/40 
  X_train, X_test, y_train, y_test = train_test_split(data_feat, data_lab, 
                                                      train_size=0.6, 
                                                      random_state=42,
                                                      stratify=data_lab)

  print(f"Numbers of train instances by class: {np.bincount(y_train)}")

  #Make val and test sets 50/50 
  X_test, X_val, y_test, y_val = train_test_split(X_test, y_test, 
                                                      train_size=0.5, 
                                                      random_state=42,
                                                      stratify=y_test)


  print(f"Numbers of test instances by class: {np.bincount(y_test)}")
  print(f"Numbers of validation instances by class: {np.bincount(y_val)}")

  #If single channel reshape to satisfy convolional layer operations
  if channel == 1:
    X_train = X_train.reshape(len(X_train),1,image_size,image_size)
    X_val = X_val.reshape(len(X_val),1,image_size,image_size)
    X_test = X_test.reshape(len(X_test),1,image_size,image_size)

  return X_train, y_train, X_val, y_val, X_test, y_test
  

X_train, y_train, X_val, y_val, X_test, y_test = data_split(data_X,data_Y, channel = 1)

#Convert to torch tensors
trainf_tensor = torch.tensor(X_train, dtype=torch.float)
trainl_tensor = torch.tensor(y_train, dtype=torch.float)
valf_tensor = torch.tensor(X_val, requires_grad=False, dtype=torch.float)
vall_tensor = torch.tensor(y_val, requires_grad=False, dtype=torch.float)
testf_tensor = torch.tensor(X_test, requires_grad=False, dtype=torch.float)
testl_tensor = torch.tensor(y_test, requires_grad=False, dtype=torch.float)

#Batchify training data with trainloader
batch_size = 50
trainset = torch.utils.data.TensorDataset(trainf_tensor, trainl_tensor)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)


Numbers of train instances by class: [204 348]
Numbers of test instances by class: [ 68 116]
Numbers of validation instances by class: [ 68 116]


In [None]:
print(X_train.shape)

(552, 1, 28, 28)


## Data Splits - Multi Signal

In [None]:

#split based on indices - didn't stratify classes. 
def multi_data_split(data_feats1,data_feats2,data_labs):

  #Train test indices
  sss = StratifiedShuffleSplit(n_splits=1,test_size=0.4,train_size=0.6,random_state=42)
  sss.get_n_splits(data_feats1, data_labs)
  train_index, test_index = next(sss.split(data_feats1, data_labs))
  print(len(train_index))
  print(len(test_index))

  #Further divide test indices to val and test indices
  data_feat1_split = np.take(data_feats1, test_index, axis =0)
  data_labs_split = np.take(data_labs, test_index, axis =0)
  sss = StratifiedShuffleSplit(n_splits=1,test_size=0.5,train_size=0.5,random_state=42)
  sss.get_n_splits(data_feat1_split, data_labs_split)
  test_index, val_index = next(sss.split(data_feat1_split, data_labs_split))
  print(len(test_index))
  print(len(val_index))

  #Training set
  X_train1 = np.take(data_feats1, train_index, axis = 0)
  X_train2 = np.take(data_feats2, train_index, axis = 0)
  y_train = np.take(data_labs, train_index)

  #Validation set
  X_val1 = np.take(data_feats1, val_index, axis = 0)
  X_val2 = np.take(data_feats2, val_index, axis = 0)
  y_val = np.take(data_labs, val_index)

  #Test set
  X_test1 = np.take(data_feats1, test_index, axis = 0)
  X_test2 = np.take(data_feats2, test_index, axis = 0)
  y_test = np.take(data_labs, test_index)

  #Reshape all of the above into to specifcy number of input channels
  X_train1 = X_train1.reshape(len(X_train1),1,image_size, image_size)
  X_val1 = X_val1.reshape(len(X_val1),1,image_size,image_size)
  X_test1 = X_test1.reshape(len(X_test1),1,image_size,image_size)

  X_train2 = X_train2.reshape(len(X_train2),1,image_size,image_size)
  X_val2 = X_val2.reshape(len(X_val2),1,image_size,image_size)
  X_test2 = X_test2.reshape(len(X_test2),1,image_size,image_size)
  return X_train1, X_train2, X_val1, X_val2, X_test1, X_test2, y_train, y_val, y_test


X_train_ppg, X_train_resp, X_val_ppg, X_val_resp, X_test_ppg, X_test_resp, y_train, y_val, y_test = multi_data_split(data_X,data_X2,data_Y)


#Convert to torch tensors
trainf_tensor1 = torch.tensor(X_train_ppg, dtype=torch.float)
valf_tensor1 = torch.tensor(X_val_ppg, requires_grad=False, dtype=torch.float)
testf_tensor1 = torch.tensor(X_test_ppg, requires_grad=False, dtype=torch.float)

trainf_tensor2 = torch.tensor(X_train_resp, dtype=torch.float)
valf_tensor2 = torch.tensor(X_val_resp, requires_grad=False, dtype=torch.float)
testf_tensor2 = torch.tensor(X_test_resp, requires_grad=False, dtype=torch.float)

trainl_tensor = torch.tensor(y_train, dtype=torch.float)
vall_tensor = torch.tensor(y_val, requires_grad=False, dtype=torch.float)
testl_tensor = torch.tensor(y_test, requires_grad=False, dtype=torch.float)

#Batchify training data uisng trainloader
batch_size = 32
trainset = torch.utils.data.TensorDataset(trainf_tensor1, trainf_tensor2, trainl_tensor)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)


552
368
184
184


In [None]:
#Number of batches with the current batch size
print(len(trainloader))
batchsize = len(trainloader)


12


# Define Evaluation Metrics

In [None]:

def accuracy_2(t_batch):
  # Compute the accuracy of the current batch
  correct_pred = 0
  total_pred = 0
  for data in t_batch:
      images, labels = t_batch
      # Compute the predicted labels
      outputs = net.forward(Variable(images))
      dummy, pred_labels = torch.max(outputs.data, 1)
      
      # Count the correct predictions
      correct_pred += (pred_labels == labels).sum().item()
      total_pred += pred_labels.size(0)
      
  # Add accuracy to the overall accuracy
  acc = (100 * correct_pred)/total_pred

  return acc

#Accuracy function
def accuracy(features1,features2, labels,signal):
  #Get predictions from model
  if signal == 'Single':
    outputs = net.forward(features1)
    #outputs = F.softmax(outputs, dim = 0)
  if signal == 'Multi':
    outputs = net.forward(features1, features2)
  #_, preds = torch.max(outputs.data,1)

  preds = []
  for el in outputs.data:
    if el >= 0.5:
      preds.append(1)
    elif el < 0.5:
      preds.append(0)

  preds = np.array(preds)

  #Find number that are coorect and accuracy
  num_correct = (torch.tensor(preds) == labels.int()).sum().numpy()
  accuracy = num_correct/(labels.size()[0])

  return accuracy*100


def F1(features,feats2,true_y,signal):
  #Get predictions from model
  if signal == 'Single':
    outputs = net.forward(features)
    outputs = F.softmax(outputs, dim = 0)
  if signal == 'Multi':
    outputs = net.forward(features, feats2)
  _,preds = torch.max(outputs.data,1)
  #F1 using sklearn metrics
  true_y = true_y.detach().numpy()
  F1 = sklearn.metrics.f1_score(true_y, preds)

  return F1

# Model Architectures

## Single Signal (Baseline CNN)

In [None]:

np.random.seed(3)

# Module is a base class for all neural network modules
class Single_Net(nn.Module):
    
    # Define the neural network layers
    def __init__(self,fcsize):
        
        super(Single_Net, self).__init__()
        self.fcsize = fcsize

        #Conv layers
        self.conv1 = nn.Conv2d(1, 9, kernel_size=5)
        self.conv2 = nn.Conv2d(9, 18, kernel_size=5)

        #FC layers
        self.fc1 = nn.Linear(self.fcsize, 50)
        self.fc2 = nn.Linear(50,1)

    # Define the forward pass.    
    def forward(self, x):
        #Conv1 Block 1 - Convolution + Pooling + Non-linearity
        x = F.max_pool2d(self.conv1(x), 2, stride = 2)
        x = torch.sigmoid(x)

        #Conv1 Block 2 - Convolution + Pooling + Non-linearity
        x = F.max_pool2d(self.conv2(x), 2, stride = 2)
        x = torch.sigmoid(x)

        #Flatten array
        x = x.view(-1, self.fcsize)
        x = torch.sigmoid(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))

        return x

## Signal Fusion (Baseline CNN)

In [None]:
#np.random.seed(3)

# Module is a base class for all neural network modules
class Multi_Net(nn.Module):
    
    # Define the neural network layers
    def __init__(self, fcsize):
        
        super(Multi_Net, self).__init__()
        self.fcsize = fcsize
        #Conv layers
        self.conv1 = nn.Conv2d(1, 5, kernel_size=5)
        self.conv2 = nn.Conv2d(5, 10, kernel_size=5)

        #FC layers
        self.fc1 = nn.Linear(self.fcsize*2, self.fcsize)
        self.fc2 = nn.Linear(self.fcsize, int(fcsize/2))
        self.fc3 = nn.Linear(int(fcsize/2),2)

    # Define the forward pass.    
    def forward(self, x, x2):
        #Conv1 Block 1 (Singal 1) - Convolution + Pooling + Non-linearity
        x = F.max_pool2d(self.conv1(x), 2)
        x = F.relu(x)
        
        #Conv1 Block 2 (Singal 1) - Convolution + Pooling + Non-linearity
        x = F.max_pool2d(self.conv2(x), 2)
        x = F.relu(x)

        #Conv1 Block 1 (Singal 2) - Convolution + Pooling + Non-linearity
        x2 = F.max_pool2d(self.conv1(x2), 2)
        x2 = F.relu(x2)
        
        #Conv1 Block 2 (Singal 2) - Convolution + Pooling + Non-linearity
        x2 = F.max_pool2d(self.conv2(x2), 2)
        x2 = F.relu(x2)
        
        #Flatten layers for both signal
        x = x.view(-1, self.fcsize)
        x2 = x2.view(-1, self.fcsize)
        #Concatenate features extracted from both spectrograms
        x_combined = torch.cat((x,x2),dim=1)

        #FC layers, non-linearities and predcition layers
        x_combined = F.relu(self.fc1(x_combined))
        x_combined = F.relu(self.fc2(x_combined))
        x_combined = self.fc3(x_combined) 

        return x_combined

# Training Function

In [None]:

# Train the CNN
def train_net(train_set, no_epochs, lr, m, opt, classes, signal='Single'):

    #Define the loss and optimiser
    
    #loss_func = nn.CrossEntropyLoss()
    loss_func = nn.BCELoss()

    if opt == 'ADAM':
      optimizer = optim.Adam(net.parameters(), lr = lr)
    elif opt == 'SGD':
      optimizer=optim.SGD(net.parameters(), lr = lr, momentum = m)

    #Initalise best loss and accuracies and lists to store values
    best_val_loss = 10000
    best_epoch = 0
    best_val_acc = 0

    losses_train = []
    losses_val = []

    # Loop over the number of epochs
    for epoch in range(no_epochs):

        #Initialise loss and acc
        current_loss = 0.0
        current_accuracy = 0.0

        # Loop over each mini-batch
        for batch_index, training_batch in enumerate(train_set, 0):
  
            #Load the mini-batch and wrap with variable
            if signal == 'Single':
              inputs, labels = training_batch
              inputs, labels = Variable(inputs), Variable(labels)

            if signal == 'Multi':
              inputs, inputs2, labels = training_batch
              inputs, inputs2, labels = Variable(inputs), Variable(inputs2), Variable(labels)

            #Initalise parameter gradients
            optimizer.zero_grad()

            #Forward pass
            if signal == 'Single':
              outputs = net.forward(inputs)
            
            if signal == 'Multi':
              outputs = net.forward(inputs, inputs2)

            #labels = labels.long()
            loss = loss_func(outputs, labels)
            
            #Backward pass
            loss.backward()
            optimizer.step()

            #Add loss 
            current_loss += loss.item()

            # Add accuracy to the overall accuracy
            #current_accuracy += accuracy_2(training_batch) 
            if signal == 'Single':
              current_accuracy += accuracy(inputs,'_',labels, signal=signal)
            if signal == 'Multi':
              current_accuracy += accuracy(inputs,inputs2,labels,signal=signal)
        '''
        #Validation loss
        if signal == 'Single':
          val_preds = net.forward(valf_tensor)
        if signal == 'Multi':
          val_preds = net.forward(valf_tensor1, valf_tensor2)

        current_val_loss = loss_func(val_preds,vall_tensor).item()

        #Update best model
        if current_val_loss < best_val_loss:
          best_val_loss = current_val_loss
          best_epoch = epoch
          best_model = copy.deepcopy(net)

        #current_val_acc = accuracy(valf_tensor, vall_tensor)
        #if current_val_acc > best_val_acc:
        #  best_val_acc = current_val_acc
        #  best_epoch = epoch
        #  best_model = copy.deepcopy(net)
        '''
        print('[Epoch: %d Batch: %5d] loss: %.3f, acc: %.3f' % 
                    (epoch + 1, batch_index+1, current_loss/batchsize, current_accuracy/batchsize))
        losses_train.append(current_loss/batchsize)
        #losses_val.append(current_val_loss) 

        #Early stopping criteria
        #if epoch - best_epoch > 10:

    print("------------------------------------------------------")
    print('Training has finished')
    print('Best Epoch :',best_epoch,'Best Validation Loss :', best_val_loss)

    #Find test set predictions
    if signal == 'Single':
      outputs = net.forward(testf_tensor)
      #outputs = F.softmax(outputs, dim = 0)
    if signal == 'Multi':
      outputs = best_model.forward(testf_tensor1, testf_tensor2)

    #_, preds = torch.max(outputs.data,1)
    preds = []
    for el in outputs.data:
      if el >= 0.5:
        preds.append(1)
      elif el < 0.5:
        preds.append(0)

    #Test accuracy
    num_correct = (torch.tensor(np.array(preds)).int() == testl_tensor.int()).sum().numpy()
    test_ac = num_correct/(testl_tensor.size()[0])
    print('Test Accuracy: ',test_ac*100)

    #Test F1 score
    true_y = testl_tensor.detach().numpy()
    F1_test = sklearn.metrics.f1_score(true_y, preds, average = 'weighted')
    print('Test F1 Score :', F1_test)
    print("All :",sklearn.metrics.classification_report(true_y, preds))
    print("Confusion Matrix :")
    print(sklearn.metrics.confusion_matrix(true_y, preds))
    #sklearn.metrics.plot_confusion_matrix(best_model, testf_tensor)

    #Plot learning curves
    fig = plt.figure(figsize=plt.figaspect(0.2))
    ax1 = fig.add_subplot(1, 2, 1)
    ax1.plot(losses_train,'r')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    #ax1 = fig.add_subplot(1, 2, 2)
    #ax1.plot(losses_val)
    #plt.xlabel('Epoch')
    #plt.ylabel('Loss')
    plt.show()

        

    return test_ac*100, F1_test

# Train Models

## Train Single Net

In [None]:
# Create a neural network
# Set a number of epochs
no_epochs = 300

# Set the learning rate
lr = 0.5

# Set the momentum
momentum = 0.9

#490 for 28, pad = 2
accuracies_test = []
f1s = []

#Train network 10 times and find average metrics
for i in range(1):
  print("Iteration : ",i+1)
  net = Single_Net(288)
  test_accuracy,testf1 = train_net(trainloader, no_epochs, lr, momentum, opt = 'SGD', classes = 'binary', signal = 'Single')   
  accuracies_test.append(test_accuracy) 
  f1s.append(testf1)

avg_acc = sum(accuracies_test)/len(accuracies_test)
print("Accuracies :", accuracies_test)
print("Average test accuracy after 10 runs : {}%".format(round(avg_acc,2)))
print("Average test F1 Score :", sum(f1s)/len(f1s))

## Train Multi Net

In [None]:
# Both spectrgroams through seperate 
# Set a number of epochs
no_epochs = 500

# Set the learning rate
lr = 0.001

# Set the momentum
momentum = 0.9

accuracies_test = []
f1s = []

#Train network 10 times and find average metrics
for i in range(10):
  print("Iteration : ",i+1)
  net = Multi_Net(160)
  test_accuracy,testf1 = train_net(trainloader, no_epochs, lr, momentum, opt = 'SGD', classes = 'binary', signal = 'Multi')   
  accuracies_test.append(test_accuracy) 
  f1s.append(testf1)

avg_acc = sum(accuracies_test)/len(accuracies_test)
print("Accuracies :", accuracies_test)
print("Average test accuracy after 10 runs : {}%".format(round(avg_acc,2)))
print("Average test F1 Score :", sum(f1s)/len(f1s))

# LOSO Cross Validation

In [None]:
#List of strings for each particiapnt
parts = []
nums = [str(x) for x in range(1,10,1)]
ten = ['0'+x for x in nums]
others = [str(x) for x in range(10,24,1)]
participants = ten+others

ppg_full_list = [] # 32 elements of 40 x 80 x 80 arrays
resp_full_list = []

#Load spectorgrams of PPG and RESP signals for each particiapnt
for part in participants: 
  l_ppg = np.load('/content/drive/My Drive/PHYSIO/Generated_Specs/PPG_28/ppg_spec_{}_28.npy'.format(part)) #40 x 80 x 80
  l_resp = np.load('/content/drive/My Drive/PHYSIO/Generated_Specs/RESP_28/resp_spec_{}_28.npy'.format(part)) #40 x 80 x 80 

  ppg_full_list.append(l_ppg)
  resp_full_list.append(l_resp)

print(len(ppg_full_list))
ppg_full_array = np.vstack(ppg_full_list)
print(ppg_full_array.shape)

print(len(resp_full_list))
resp_full_array = np.vstack(resp_full_list)
print(resp_full_array.shape)

23
(920, 28, 28)
23
(920, 28, 28)


In [None]:
labels_tdf.head()

Unnamed: 0,val,aro,dom,lik,val3,aro3,dom3,lik3
0,1,1,1,1,2,2,2,2
1,1,1,1,1,2,2,2,2
2,1,1,1,1,2,2,2,2
3,1,1,1,1,1,2,2,2
4,1,0,1,1,2,0,2,2


In [None]:
#Augment labels to match spectrograms
aug_labs = []
data_Y = labels_tdf['val'].to_numpy()[0:920]

5520


In [None]:


# Train the CNN
def train_net(train_set, no_epochs, lr, m, opt, classes, signal='Single'):

    #Define the loss and optimiser
    
    #loss_func = nn.CrossEntropyLoss()
    loss_func = nn.BCELoss()
    #loss_func = nn.NLLLoss()
    #loss_func = nn.BCEWithLogitsLoss()

    if opt == 'ADAM':
      optimizer = optim.Adam(net.parameters(), lr = lr)
    elif opt == 'SGD':
      optimizer=optim.SGD(net.parameters(), lr = lr, momentum = m)

    best_val_loss = 10000
    best_epoch = 0
    best_val_acc = 0

    losses_train = []
    losses_val = []

    # Loop over the number of epochs
    for epoch in range(no_epochs):

        #Initialise loss and acc
        current_loss = 0.0
        current_accuracy = 0.0

        # Loop over each mini-batch
        for batch_index, training_batch in enumerate(train_set, 0):
  
            #Load the mini-batch and wrap with variable
            if signal == 'Single':
              inputs, labels = training_batch
              inputs, labels = Variable(inputs), Variable(labels)

            if signal == 'Multi':
              inputs, inputs2, labels = training_batch
              inputs, inputs2, labels = Variable(inputs), Variable(inputs2), Variable(labels)

            #Initalise parameter gradients
            optimizer.zero_grad()

            #Forward pass
            if signal == 'Single':
              outputs = net.forward(inputs)
            
            if signal == 'Multi':
              outputs = net.forward(inputs, inputs2)

            #labels = labels.long()
            loss = loss_func(outputs, labels)
            
            #Backward pass
            loss.backward()
            optimizer.step()

            #Add loss 
            current_loss += loss.item()

            #Add accuracy to the overall accuracy
            #current_accuracy += accuracy_2(training_batch) 
            if signal == 'Single':
              current_accuracy += accuracy(inputs,'_',labels, signal=signal)
            if signal == 'Multi':
              current_accuracy += accuracy(inputs,inputs2,labels,signal=signal)

        print('[Epoch: %d Batch: %5d] loss: %.3f, acc: %.3f' % (epoch + 1, batch_index+1, current_loss/batchsize, current_accuracy/batchsize))
        losses_train.append(current_loss/batchsize)

    print("------------------------------------------------------")
    print('Training has finished')
    print('Best Epoch :',best_epoch,'Best Validation Loss :', best_val_loss)

    if signal == 'Single':
      outputs = net.forward(testf_tensor)
      #outputs = F.softmax(outputs, dim = 0)
    if signal == 'Multi':
      outputs = net.forward(testf_tensor1, testf_tensor2)
    
    #Predict test accuracy
    #_, preds = torch.max(outputs.data,1)
    preds = []
    for el in outputs.data:
      if el >= 0.5:
        preds.append(1)
      elif el < 0.5:
        preds.append(0)

    num_correct = (torch.tensor(np.asarray(preds)).int() == testl_tensor.int()).sum().numpy()
    test_ac = num_correct/(testl_tensor.size()[0])
    print('Test Accuracy: ',test_ac*100)

    #Test F1 score
    true_y = testl_tensor.detach().numpy()
    F1_test = sklearn.metrics.f1_score(true_y, preds, average = 'weighted')
    print('Test F1 Score :', F1_test)
    print("All :",sklearn.metrics.classification_report(true_y, preds))
    print("Confusion Matrix :")
    print(sklearn.metrics.confusion_matrix(true_y, preds))
    #sklearn.metrics.plot_confusion_matrix(best_model, testf_tensor)
    
    #Plot learning curves
    fig = plt.figure(figsize=plt.figaspect(0.2))
    ax1 = fig.add_subplot(1, 2, 1)
    ax1.plot(losses_train,'r')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.show()

    return test_ac*100, F1_test

In [None]:
#Split augmented spectrogams and labels into 23 sets - 1 for each participant 
sub_splits_ppg = np.split(ppg_full_array, 23)
sub_splits_resp = np.split(resp_full_array, 23)
sub_splits_labs = np.split(data_Y, 23)
print(np.asarray(sub_splits_resp).shape)
print(np.asarray(sub_splits_labs).shape)
subjects = [x for x in range(0,23,1)]
print(subjects)
image_size = np.asarray(sub_splits_ppg).shape[2]
print(image_size)

(23, 40, 28, 28)
(23, 40)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]
28


In [None]:
#Set a number of epochs
no_epochs = 300

# Set the learning rate
lr = 0.5

# Set the momentum
momentum = 0.9

accuracies_test = []
f1s = []

#Choose signal to train on 
dat = sub_splits_resp

#Train model 23 times - differnet particiapnt data each time 
for s in subjects:
  #Get training folds
  train_inds = np.where(np.asarray(subjects) != s )[0]
  train_feats = np.take(np.asarray(dat), train_inds, axis = 0)
  train_feats = np.vstack(train_feats)
  train_labs = np.take(np.asarray(sub_splits_labs), train_inds, axis = 0)
  train_labs = np.reshape(train_labs, (train_feats.shape[0]))

  #Get test fold
  test_feats = np.take(np.asarray(dat), s, axis = 0)
  test_labs = np.take(np.asarray(sub_splits_labs), s, axis = 0)
  test_labs = np.reshape(test_labs, (test_feats.shape[0]))

  #Reshape train and test arrays to specify number of input chanenels
  train_feats = train_feats.reshape(len(train_feats),1,image_size,image_size)
  test_feats = test_feats.reshape(len(test_feats),1,image_size,image_size)

  #Convert to torch tensors
  trainf_tensor = torch.tensor(train_feats, dtype=torch.float)
  trainl_tensor = torch.tensor(train_labs, dtype=torch.float)
  testf_tensor = torch.tensor(test_feats, requires_grad=False, dtype=torch.float)
  testl_tensor = torch.tensor(test_labs, requires_grad=False, dtype=torch.float)
  print(trainf_tensor.shape)
  print(trainl_tensor.shape)
  print(testf_tensor.shape)
  print(testl_tensor.shape)

  #Batchify training data using trainloader
  batch_size = 50
  trainset = torch.utils.data.TensorDataset(trainf_tensor, trainl_tensor)
  trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
  batchsize = len(trainloader)
  print(batchsize)

  print("Subject : ",s+1)
  net = Single_Net(288)
  test_accuracy,testf1 = train_net(trainloader, no_epochs, lr, momentum, opt = 'SGD', classes = 'binary', signal = 'Single')   
  accuracies_test.append(test_accuracy) 
  f1s.append(testf1)

#Print average metrics across all participants
avg_acc = sum(accuracies_test)/len(accuracies_test)
print("Accuracies :", accuracies_test)
print("Average test accuracy after 10 runs : {}%".format(round(avg_acc,2)))
print("Average test F1 Score :", sum(f1s)/len(f1s))

