In [None]:
from google.colab import drive
drive.mount('/content/drive')
import sys    
import os
path_to_module = '/content/drive/MyDrive/247pj/'
sys.path.append(path_to_module)
os.chdir(path_to_module)

In [None]:
%matplotlib inline
import torch
import numpy as np

# get the device, either cuda or cpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print('device type is {}'.format(device))

In [None]:
X_test = np.load("X_test.npy")
y_test = np.load("y_test.npy")
person_train_valid = np.load("person_train_valid.npy")
X_train_valid = np.load("X_train_valid.npy")
y_train_valid = np.load("y_train_valid.npy")
person_test = np.load("person_test.npy")

In [None]:
print ('Training/Valid data shape: {}'.format(X_train_valid.shape))
print ('Test data shape: {}'.format(X_test.shape))
print ('Training/Valid target shape: {}'.format(y_train_valid.shape))
print ('Test target shape: {}'.format(y_test.shape))
print ('Person train/valid shape: {}'.format(person_train_valid.shape))
print ('Person test shape: {}'.format(person_test.shape))

In [None]:
def normalize(array):
  return (array - np.mean(array, axis=0)) / np.std(array, axis=0)
#preprocess data
y_train_valid -= 769
y_test -= 769

In [None]:
def data_prep(X,y,sub_sample,average,noise):
    
    total_X = None
    total_y = None
    
    # Trimming the data (sample,22,1000) -> (sample,22,500)
    X = X[:,:,0:500]
    print('Shape of X after trimming:',X.shape)
    
    # Maxpooling the data (sample,22,1000) -> (sample,22,500/sub_sample)
    X_max = np.max(X.reshape(X.shape[0], X.shape[1], -1, sub_sample), axis=3)
    
    
    total_X = X_max
    total_y = y
    print('Shape of X after maxpooling:',total_X.shape)
    # Averaging + noise 
    X_average = np.mean(X.reshape(X.shape[0], X.shape[1], -1, average),axis=3)
    X_average = X_average + np.random.normal(0.0, 0.5, X_average.shape)
    total_X = np.vstack((total_X, X_average))
    total_y = np.hstack((total_y, y))
    print('Shape of X after averaging+noise and concatenating:',total_X.shape)
    
    # Subsampling
    
    for i in range(sub_sample):
        
        X_subsample = X[:, :, i::sub_sample] + \
                            (np.random.normal(0.0, 0.5, X[:, :,i::sub_sample].shape) if noise else 0.0)
            
        total_X = np.vstack((total_X, X_subsample))
        total_y = np.hstack((total_y, y))
        
    
    print('Shape of X after subsampling and concatenating:',total_X.shape)
    return total_X,total_y

In [None]:
## Preprocessing the dataset

X_train_valid_prep,y_train_valid_prep = data_prep(X_train_valid,y_train_valid,2,2,True)
X_test_prep,y_test_prep = data_prep(X_test,y_test,2,2,True)

print(X_train_valid_prep.shape)
print(y_train_valid_prep.shape)
print(X_test_prep.shape)
print(y_test_prep.shape)



## Random splitting and reshaping the data

# First generating the training and validation indices using random splitting
ind_valid = np.random.choice(8460, 1500, replace=False)
ind_train = np.array(list(set(range(8460)).difference(set(ind_valid))))
#subject 1
#ind_valid = np.random.choice(944, 150, replace=False)
#ind_train = np.array(list(set(range(944)).difference(set(ind_valid))))

# Creating the training and validation sets using the generated indices
(x_train, x_valid) = X_train_valid_prep[ind_train], X_train_valid_prep[ind_valid] 
(y_train, y_valid) = y_train_valid_prep[ind_train], y_train_valid_prep[ind_valid]
print('Shape of training set:',x_train.shape)
print('Shape of validation set:',x_valid.shape)
print('Shape of training labels:',y_train.shape)
print('Shape of validation labels:',y_valid.shape)

In [None]:
from torch.utils.data import DataLoader, TensorDataset

batch_size = 64
trainset = TensorDataset(torch.from_numpy(x_train).float(), torch.from_numpy(y_train).type(torch.LongTensor))
valset = TensorDataset(torch.from_numpy(x_valid).float(), torch.from_numpy(y_valid).type(torch.LongTensor))
testset = TensorDataset(torch.from_numpy(X_test_prep).float(), torch.from_numpy(y_test_prep).type(torch.LongTensor))

trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
valloader = DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=2)
testloader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

In [None]:
!pip install conformer

In [None]:
import torch.nn as nn
import torch.nn.functional as F
from conformer import ConformerBlock

In [None]:
class swap021(nn.Module):
  def forward(self, x):
    return x.permute(0, 2, 1)
class swap102(nn.Module):
  def forward(self, x):
    return x.permute(1, 0, 2)
class swap120(nn.Module):
  def forward(self, x):
    return x.permute(1, 2, 0)

In [None]:
import torch.nn as nn
import torch.nn.functional as F
from conformer import ConformerBlock
class Classifier(nn.Module):
  def __init__(self):
    super().__init__()
    self.net = nn.Sequential(

        nn.Conv1d(22, 44, 10, padding='same'),
        nn.ELU(),
        nn.MaxPool1d(3, padding=1),
        nn.BatchNorm1d(44),
        nn.Dropout(0.6),
        nn.Conv1d(44, 88, 10, padding='same'),
        nn.ELU(),
        nn.MaxPool1d(3, padding=1),
        nn.BatchNorm1d(88),
        nn.Dropout(0.6),
        swap021(),
        swap102(),
        ConformerBlock(conv_kernel_size=10, dim=88, attn_dropout = 0.3, heads = 1,
                ff_dropout = 0.3, conv_dropout = 0.3),
        swap120(),
        nn.AvgPool1d(7),
        nn.Flatten(),
        nn.Linear(352, 4)

    )

    self.criterion = nn.CrossEntropyLoss()

  def forward(self, x):
    return self.net(x)
  def cal_loss(self, pred, target):
    return self.criterion(pred, target)

model = Classifier().to(device)

In [None]:
import torch.optim as optim
import matplotlib.pyplot as plt

PATH_loss = './attention_loss.pth'

def train(tr_set, model, device):

    n_epochs = 80

    # Setup optimizer
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    loss_record = {'train':[], 'val':[]}
    acc_record = {'train':[], 'val':[]}
    best_loss = 10
    best_acc = 0
    for epoch in range(n_epochs):
        model.train()
        for data in tr_set:
            optimizer.zero_grad()
            x, y = data
            x, y = x.to(device), y.to(device)
            pred = model(x)
            loss = model.cal_loss(pred, y)
            loss.backward()
            optimizer.step()
        
        acc_val, loss_val = test(valloader, model, device)
        acc_train, loss_train = test(trainloader, model, device)
        acc_record['val'].append(acc_val)
        loss_record['val'].append(loss_val)
        acc_record['train'].append(acc_train)
        loss_record['train'].append(loss_train)
        print('Finished training after {} epochs'.format(epoch+1))
        print('acc on validation set:{}'.format(acc_val))
        if loss_val < best_loss:
          best_loss = loss_val
          # Save:
          torch.save(model.state_dict(), PATH_loss) # save state_dict
    plt.subplot(2, 1, 1)
    plt.title('loss')
    plt.plot(loss_record['train'], '-', label='train')
    plt.plot(loss_record['val'], '-', label='val')
    plt.xlabel('Epoch')
    plt.legend(loc='upper right')
    plt.subplot(2, 1, 2)
    plt.title('Accuracy')
    plt.plot(acc_record['train'], '-', label='train')
    plt.plot(acc_record['val'], '-', label='val')
    plt.xlabel('Epoch')
    plt.legend(loc='lower right')
    plt.gcf().set_size_inches(15, 12)
    plt.show()

In [None]:
def test(tt_set, model, device):
    correct = total = loss = 0
    model.eval()
    for x, y in tt_set:
      x, y = x.to(device), y.to(device)
      with torch.no_grad():
        pred = model(x)
        loss += model.cal_loss(pred, y).item() * y.size(0)
        _, predicted = torch.max(pred.data, 1)
        total += y.size(0)
        correct += (predicted == y).sum().item()
    acc = correct / total
    loss /= total
    return acc, loss

In [None]:
train(trainloader, model, device)

In [None]:
best_model_loss = Classifier()
best_model_loss.load_state_dict(torch.load(PATH_loss)) # load state_dict
best_model_loss.to(device)
best_model_loss.eval() # sets model in evaluation (inference) mode

In [None]:
print(test(testloader, best_model_loss, device))
print(test(testloader, model, device))