In [1]:
# TODO:
# GRAD-CAM
# data visualization DONE
# LSTM based model DONE
# standardization and normalization (either based on training or individually) DONE
# image based CNN IN PROGRESS
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import shutil
import os
import pickle
from scipy.signal import stft

In [2]:
# CNN class
# TODO: some hardcoded numbers
class CNN(nn.Module):
    def __init__(self, input_channels=1, conv_channels=(3, 10), kernel=(5, 5), dropout=0.0):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, conv_channels[0], kernel[0])
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(conv_channels[0], conv_channels[1], kernel[1])
        self.fc1 = nn.Linear(58*conv_channels[1], 120) # 5: 58, 7: 
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 2)
        self.dropout1d = nn.Dropout(p=dropout)
        self.dropout2d = nn.Dropout2d(p=dropout)
        self.batchnorm1 = nn.BatchNorm2d(input_channels)
        self.batchnorm2 = nn.BatchNorm2d(conv_channels[0])

    def forward(self, x):
        x = self.batchnorm1(x)
        x = self.pool(F.relu(self.conv1(x)))
        # print(x.shape)
        # x = self.dropout2d(x)
        x = self.batchnorm2(x)
        x = self.pool(F.relu(self.conv2(x)))
        # print(x.shape)
        # x = self.dropout2d(x)
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        # print(x.shape)
        x = F.relu(self.fc1(x))
        x = self.dropout1d(x)
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [3]:
from swec_utils import training_model
def grid_search(epochs, device, weights, train_loader, val_loader, dtype, kernel_sizes, n_filters, dropouts, save_path, plot_results=False):
    sensitivity = []
    specificity = []
    auc = []
    f1 = []
    pr_auc = []
    for k in kernel_sizes:
        for f in n_filters:
            for d in dropouts:
                print(f"Training model with kernel size = {k}, filter_sizes = {f}, dropout = {d}.")
                model = CNN(input_channels=next(iter(train_loader))[0].shape[1], conv_channels=f, kernel=(k, k), dropout=d)
                model.to(numpy_to_torch_dtype_dict[dtype])
                t_res, v_res = training_model(model, epochs, device, weights, train_loader, val_loader, dtype, plot_results=plot_results)
                sensitivity.append(v_res.sen[-1])
                specificity.append(v_res.spe[-1])
                auc.append(v_res.auc[-1])
                f1.append(v_res.f1[-1])
                pr_auc.append(v_res.pr_auc[-1])
                if (save_path):
                    print("Saving model")
                    save_path += 'CNN_k' + str(k) + '_f' + str(f[0]) + '_' + str(f[1]) + '_d' + str(d).replace('.', '')
                    torch.save(model.state_dict(), save_path)
    
    return sensitivity, specificity, auc, f1, pr_auc

In [4]:
def channel_selection(subject_path, seq_duration, sph, pil, distance, dtype, n_channels, **kwargs):
    seed = 0
    results_list = []
    for channel in range(n_channels):
        print(f"Channel {channel}.")
        segments, labels, fs = load_data(subject_path, seq_duration, sph, pil, distance, channels=[channel], dtype=dtype)

        segments_t = np.transpose(segments, (0, 2, 1))
        print(segments_t.shape)
        print(segments_t.dtype)
        Zxx = np.zeros((segments_t.shape[0], segments_t.shape[1], 129, 21), dtype=dtype)

        _, _, Zxx = stft(segments_t, fs=fs)

        BATCH = 512
        EPOCHS = 250
        use_cuda = True
        device = torch.device("cuda" if use_cuda else "cpu")

        X, X_test, Y, Y_test = train_test_split(Zxx, labels, test_size=0.2, stratify=labels, random_state=seed)

        X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.2, stratify=Y, random_state=seed)

        weights = torch.tensor(compute_class_weight(class_weight='balanced', classes=np.unique(Y_train), y=Y_train), dtype=dtype, device=device)
        # weights = torch.tensor([1, 1], dtype=torch.float32, device=device)

        # convert dataset to Dataloader
        train_dataset = STFTDataset(X_train, Y_train)
        train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH)
        val_dataset = STFTDataset(X_val, Y_val)
        val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH)

        results = grid_search(EPOCHS, device, weights, train_loader, val_loader, dtype, **kwargs, save_path='')
        results_list.append(results)

    return results_list

In [5]:
def copy_subject(src, dst):
    src_files = os.listdir(src)
    for file_name in src_files:
        full_file_name = os.path.join(src, file_name)
        if os.path.isfile(full_file_name):
            shutil.copy(full_file_name, dst)
            print(file_name)

subject = 'ID03'
src = 'D:/research/swec/' + subject
dst = './swec/' + subject
data_path = './swec/' + subject + '/data.p'
if (not os.path.exists(data_path)):
    if (len(os.listdir(dst)) == 0):
        copy_subject(src, dst)

In [6]:
# 1: 4
# 2: 7
# 3: 2
from swec_utils import load_data
subject_path = './swec/' + subject + '/' + subject
seq_duration = 5
sph = 300
pil = 3600
distance = 3600 * 24 * 2

dtype = np.float16

if (os.path.exists(data_path)):
    print(f"Data already processed at {data_path}")
    data = pickle.load(open(data_path, 'rb'))
    Zxx, labels = data
else:
    segments, labels, fs = load_data(subject_path, seq_duration, sph, pil, distance, channels=[], dtype=dtype)

    segments_t = np.transpose(segments, (0, 2, 1))
    print(segments_t.shape)
    print(segments_t.dtype)
    Zxx = np.zeros((segments_t.shape[0], segments_t.shape[1], 129, 21), dtype=dtype)

    for i in range(segments_t.shape[1]):
        print(i)
        _, _, Zxx[:, i, :] = stft(segments_t[:,i,:], fs=fs)
    print(Zxx.shape)

    data = (Zxx, labels)
    pickle.dump(data, open(data_path, 'wb'))

Data already processed at ./swec/ID03/data.p


In [7]:
from sklearn.preprocessing import MinMaxScaler
# u and s must be vectors of length data.shape[1]
def standardize(data, u, s):
    std_data = np.zeros(data.shape, dtype=np.float16)
    for i in range(data.shape[1]):
        std_data[:,i] = (data[:,i] - u[i]) / s[i]
    return std_data

def compute_u_s(data):
    u, s = [], []
    for i in range(data.shape[1]):
        u.append(np.mean(data[:,i].astype(np.float64)))
        s.append(np.std(data[:,i].astype(np.float64)))
    return u, s

In [8]:
from swec_utils import STFTDataset, numpy_to_torch_dtype_dict
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight

BATCH = 512
EPOCHS = 100
use_cuda = True
device = torch.device("cuda" if use_cuda else "cpu")

X, X_test, Y, Y_test = train_test_split(Zxx, labels, test_size=0.2, stratify=labels, random_state=0)

del Zxx, labels

X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.2, stratify=Y, random_state=0)

del X, Y

# standardize
# u, s = compute_u_s(X_train)
# X_train = standardize(X_train, u, s)
# X_val = standardize(X_val, u, s)

# normalize
# for i in range(X_train.shape[1]):
#     X_min = np.min(X_train[:,i])
#     X_max = np.max(X_train[:,i])
#     X_train[:,i] = (X_train[:,i] - X_min) / (X_max - X_min)
#     X_val[:,i] = (X_val[:,i] - X_min) / (X_max - X_min)

weights = torch.tensor(compute_class_weight(class_weight='balanced', classes=np.unique(Y_train), y=Y_train), dtype=numpy_to_torch_dtype_dict[dtype], device=device)

# convert dataset to Dataloader
train_dataset = STFTDataset(X_train, Y_train, dtype=dtype)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH)
val_dataset = STFTDataset(X_val, Y_val, dtype=dtype)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH)

save_path = 'models/' + subject + '/'
# save_path = '' # comment out this line if you want to save
results = grid_search(EPOCHS, device, weights, train_loader, val_loader, dtype, [5], [(5, 5)], [0.5], save_path=save_path, plot_results=True)

Training model with kernel size = 5, filter_sizes = (5, 5), dropout = 0.5.
Epoch 10.
Epoch 20.
Epoch 30.
Epoch 40.
Epoch 50.
Epoch 60.
Epoch 70.


In [None]:
from swec_utils import to_numpy

# convert dataset to Dataloader
test_dataset = STFTDataset(X_test, Y_test, dtype=dtype)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH)

# retrieve model from save
model = CNN(input_channels=next(iter(test_loader))[0].shape[1], kernel=(5, 5), conv_channels=(5, 5), dropout=0.5)
path = './models/' + subject + '/CNN_k5_f5_5_d05'
model.load_state_dict(torch.load(path))

model.to(numpy_to_torch_dtype_dict[dtype])
model.to(device)
model.eval()

pred_list = np.zeros((len(test_loader.dataset), 2))
true_list = np.zeros(len(test_loader.dataset))
i = 0
with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)

        output = model(data)
        pred_list[i:i+len(data)] = to_numpy(output)
        true_list[i:i+len(data)] = to_numpy(target)
        i += len(data)

y_true = true_list
y_pred = pred_list

In [None]:
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score, roc_curve, precision_recall_curve, confusion_matrix, auc, average_precision_score, plot_precision_recall_curve
import numpy as np
import matplotlib.pyplot as plt
from swec_utils import evaluate_performance

print(y_true.shape)
print(y_pred.shape)

sensitivity, specificity, roc_auc, f1_score, pr_auc = evaluate_performance(y_pred, y_true)

print("Sensitivity: " + str(sensitivity))
print("Specificity: " + str(specificity))
print("F1 Score: " + str(f1_score))
print("AUC: " + str(roc_auc))
print("PR AUC: " + str(pr_auc))

In [None]:
# from pytorch_grad_cam import GradCAM
# from pytorch_grad_cam.utils.image import show_cam_on_image

# target_layer = next(model.children())
# cam = GradCAM(model=model, target_layer=target_layer, use_cuda=True)

# input_tensor = torch.tensor(X_test[0:1], dtype=numpy_to_torch_dtype_dict[dtype], device=device)
# target_category = torch.tensor(Y_test[0:1], dtype=torch.long, device=device)
# grayscale_cam = cam(input_tensor=input_tensor, target_category=target_category)

In [None]:
print(X_val[0,:,0])
print(X_val[0,:,-1])
print(X_val[-1,:,0])
print(X_val[-1,:,-1])
print(np.count_nonzero(Y_val == 1))

In [None]:
print(X_val[np.where(Y_val == 1)[0][0],:,0])
print(X_val[np.where(Y_val == 1)[0][0],:,-1])