In [None]:
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import init
from torch.utils.data import TensorDataset, DataLoader
import random
from scipy import io
import numpy as np
import matplotlib.pyplot as plt
import os
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
BATCH_SIZE = 128


In [None]:
def get_device():
    return 'cuda' if torch.cuda.is_available() else 'cpu'

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

In [None]:
def choose_scheme(path, scheme):
  filedir = os.listdir(path)
  x_train, x_test, y_train, y_test = [], [], [], []
  x_fine_tune, y_fine_tune = [], []
  # trial_class0, trial_class1, 
  # 將第一個subject 1固定作為testing subject
  subject_t = "BCIC_S01_T.mat"
  subject_e = "BCIC_S01_E.mat"
  x_train = torch.Tensor(io.loadmat(os.path.join(path, subject_t))['x_train']).unsqueeze(1)
  y_train = torch.Tensor(io.loadmat(os.path.join(path, subject_t))['y_train']).view(-1).long()
  x_test = torch.Tensor(io.loadmat(os.path.join(path, subject_e))['x_test']).unsqueeze(1)
  y_test = torch.Tensor(io.loadmat(os.path.join(path, subject_e))['y_test']).view(-1).long()
  len_x = x_train.size()[0]

  # 依序將其他subject的data讀入
  for filename in filedir:
    #print(filename)
    if filename in {subject_t, subject_e}:
      continue
    elif filename.endswith('E.mat'):
      x = torch.Tensor(io.loadmat(os.path.join(path, filename))['x_test']).unsqueeze(1)
      y = torch.Tensor(io.loadmat(os.path.join(path, filename))['y_test']).view(-1).long()
      x_train = torch.cat([x_train, x])
      y_train = torch.cat([y_train, y])
    elif filename.endswith('T.mat'):
      x = torch.Tensor(io.loadmat(os.path.join(path, filename))['x_train']).unsqueeze(1)
      y = torch.Tensor(io.loadmat(os.path.join(path, filename))['y_train']).view(-1).long()
      x_train = torch.cat([x_train, x])
      y_train = torch.cat([y_train, y])

  # choose real training and testing data based on scheme
  if scheme == 'individual': # indiviual will access only training session from subject1
    return [x_train[:len_x], y_train[:len_x], x_test, y_test]
  elif scheme == 'dependent': # dependent scheme collect all data except for the testing sessions from sub1
    return [x_train, y_train, x_test, y_test]
  elif scheme == 'independent': # independent scheme collect all data except for testing and training session from sub1
    return [x_train[len_x:], y_train[len_x:], x_test, y_test]
  elif scheme == 'fine-tune': # the data used on fine tune is the same as individual scheme
    return [x_train[:len_x], y_train[:len_x], x_test, y_test]
  else:
    raise ValueError('unexpected scheme, enter other scheme again')
  
def get_dataloader(data_path, scheme):
  data = []
  data = choose_scheme(data_path, scheme)

  x_train, y_train, x_test, y_test = data

  print("x_train shape: ", x_train.size())
  print("y_train shape: ", y_train.size())
  print("x_test shape: ", x_test.size())
  print("y_test shape: ", y_test.size())

  # 存成tensordataset
  train_dataset = TensorDataset(x_train, y_train)
  test_dataset = TensorDataset(x_test, y_test)
  # 包成dataloader
  train_dl = DataLoader(
      dataset = train_dataset,
      batch_size = BATCH_SIZE,
      shuffle = True,
      num_workers = 0   
  )
  test_dl = DataLoader(
      dataset = test_dataset,
      batch_size = len(test_dataset),
      shuffle = False,
      num_workers = 0   
  )
  return [train_dl, test_dl]

In [None]:
## 4 CNN model
class basic_CNN(nn.Module):
    def __init__(self, filter_size):
        super(basic_CNN, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 50, (1, filter_size), padding=(0, 25), bias=False),
            nn.BatchNorm2d(50)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(50, 50, (22, 1), groups=50, bias=False),
            nn.BatchNorm2d(50),
            nn.ELU(),
            nn.MaxPool2d((1, 3), stride=3),
            nn.Dropout(0.5)
        )
    def forward(self, input):
        x = self.conv1(input)
        x = self.conv2(x) #shape of x:(batch,50,1,196)
        return x

class CNN1(nn.Module):
    def __init__(self):
        super(CNN1, self).__init__()
        self.basic_cnn = basic_CNN(30)
        self.fc = nn.Linear(5000,1024) # sliding:5000 normal:9700 
        self.classifier = nn.Linear(1024, 4)

    def forward(self, input):
        x = self.basic_cnn(input)
        # print(f"latent size: {x.size()}")
        x = torch.flatten(x,1)
        latent = self.fc(x)
        output = self.classifier(latent)

        return latent, output

    
class CNN2(nn.Module):
    def __init__(self):
        super(CNN2, self).__init__()
        self.basic_cnn = basic_CNN(25)
        self.conv_block = nn.Sequential(
            nn.Conv2d(50, 100, (1,10), padding=(0,5), bias=False),
            nn.BatchNorm2d(100),
            nn.ELU(),
            nn.MaxPool2d((1,3), stride=3),
            nn.Dropout(0.5)
        )
        self.fc = nn.Linear(6500,1024)
        self.classifier = nn.Linear(1024, 4)

    def forward(self, input):
        x = self.basic_cnn(input)
        x = self.conv_block(x)
        # print(f"latent size: {x.size()}")
        x = torch.flatten(x,1)
        latent = self.fc(x)
        output = self.classifier(latent)

        return latent, output

class CNN3(nn.Module):
    def __init__(self):
        super(CNN3, self).__init__()
        self.basic_cnn = basic_CNN(20)
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(50, 100, (1,10), padding=(0,5), bias=False),
            nn.BatchNorm2d(100),
            nn.ELU(),
            nn.MaxPool2d((1,3), stride=3),
            nn.Dropout(0.5)
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(100, 100, (1,10), padding=(0,5), bias=False),
            nn.BatchNorm2d(100),
            nn.ELU(),
            nn.MaxPool2d((1,3), stride=3),
            nn.Dropout(0.5)
        )
        self.fc = nn.Linear(2200, 1024)
        self.classifier = nn.Linear(1024, 4)

    def forward(self, input):
        x = self.basic_cnn(input)
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        # print(f"latent size: {x.size()}")
        x = torch.flatten(x,1)
        latent = self.fc(x)
        output = self.classifier(latent)
        return latent, output

class CNN4(nn.Module):
    def __init__(self):
        super(CNN4, self).__init__()
        self.basic_cnn = basic_CNN(10)
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(50, 100, (1,10), padding=(0,5), bias=False),
            nn.BatchNorm2d(100),
            nn.ELU(),
            nn.MaxPool2d((1,3), stride=3),
            nn.Dropout(0.5)
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(100, 100, (1,10), padding=(0,5), bias=False),
            nn.BatchNorm2d(100),
            nn.ELU(),
            nn.MaxPool2d((1,3), stride=3),
            nn.Dropout(0.5)
        )
        self.fc = nn.Linear(700, 1024)
        self.classifier = nn.Linear(1024, 4)
        
    def forward(self, input):
        x = self.basic_cnn(input)
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block2(x)
        # print(f"latent size: {x.size()}")
        x = torch.flatten(x,1)
        latent = self.fc(x)
        output = self.classifier(latent)
        return latent, output

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(4096,50)
        self.drop1 = nn.Dropout(0.5)
        self.elu = nn.ELU()
        self.fc2 = nn.Linear(50,50)
        self.drop2 = nn.Dropout(0.5)
        #self.softmax = nn.LogSoftmax(dim=1)
        self.classifier = nn.Linear(50,4)

        self.layers = nn.Sequential(self.fc1,self.drop1,self.elu,self.fc2,self.drop2,self.classifier)
        self.init_weights()

    def init_weights(self):
        init.xavier_uniform_(self.fc1.weight)
        init.xavier_uniform_(self.fc2.weight)

    def forward(self, input):
        '''x = self.fc1(input)
        x = self.fc2(x)
        output = self.classifier(x)'''
        output = self.layers(input)
        dummy = 0
        return dummy, output

class AE(nn.Module):
    def __init__(self):
        super(AE, self).__init__()
        self.encode = nn.Linear(4096,100)
        self.decode = nn.Linear(100,4096)
        self.classifier = nn.Linear(4096,4)

    def forward(self, input):
        x = self.encode(input)
        output = self.decode(x)
        with torch.no_grad():
            pred = self.classifier(output)

        return pred, output

In [None]:
def train_model(model, train_dl, test_dl, device, config):
  # set the optimizer and corresponding loss function
  optimizer = getattr(optim, config['optimizer'])(model.parameters(), lr=config['lr'], weight_decay=0.0001)
  criterion = nn.CrossEntropyLoss()
  #lr_scheduler = optim.lr_scheduler.ExponentialLR(optimizer, 0.99)
  record = {
      'train_loss': [],
      'train_acc': [],
      'val_loss': [],
      'val_acc': []
      }
  
  max_acc = 0
  print('\n training start')
  # start training process
  for epoch in range(config['epoch']):
    model.train()
    train_loss = 0
    train_acc = 0
    for x_train, y_train in train_dl:
      x_train, y_train = x_train.to(device), y_train.to(device)
      optimizer.zero_grad()
      _, out = model(x_train) # model prediction
      loss = criterion(out, y_train) # calculate loss
      pred = torch.argmax(out, axis = 1) # max out the prediction
      train_loss += loss.detach().cpu().item()
      train_acc += (pred==y_train).sum().item() # count the corrected classified casese

      loss.backward() # derive backpropagation
      optimizer.step() # update model
    
    # save the model which produce maximum validation accuracy
    val_acc = val_model(model, test_dl, device)
    if(val_acc>max_acc):
      max_acc = val_acc
      torch.save(model.state_dict(), f"{config['save_path']+config['model']}_{config['scheme']}_{config['lr']}_{BATCH_SIZE}")

    record['train_loss'].append(train_loss/len(train_dl))
    record['train_acc'].append(train_acc/len(train_dl.dataset))
    record['val_acc'].append(val_acc)
      
    #lr_scheduler.step()
    if (epoch+1)%10 == 0:
      print(f'epoch: {epoch+1}, training acc: {train_acc/len(train_dl.dataset)} val_acc: {val_acc}')
  
  return record

def val_model(model, test_dl, device):
  model.eval()
  # print('test start')
  acc = 0
  with torch.no_grad():
    for x_test, y_test in test_dl:
      x_test, y_test = x_test.to(device), y_test.to(device)
      _, pred = model(x_test)
      pred = torch.argmax(pred, axis = 1)
      acc += (pred==y_test).sum().item()
  acc /= len(test_dl.dataset)
 
  return acc

def test_model(model, test_dl, device):
  model.eval()
  # print('test start')
  acc = 0
  latent = []
  output = []
  with torch.no_grad():
    for x_test, y_test in test_dl:
      x_test, y_test = x_test.to(device), y_test.to(device)
      vector, pred = model(x_test)
      pred = torch.argmax(pred, axis = 1)
      output.append(pred)
      latent.append(vector)
      acc += (pred==y_test).sum().item()
  acc /= len(test_dl.dataset)
  print(f"testing accuracy: {acc*100}%")
 
  return latent, output

In [None]:
def main():
  device = get_device()
  set_seed(33)
  file_dir = '/content/gdrive/Shareddrives/BCI/BCI_data'
  scheme = 'independent' # individual, dependent, independent, fine-tune
  dl = get_dataloader(file_dir, scheme)

  if scheme == 'fine-tune':
    tune_dl, test_dl, _ = dl  
    config = {
        'epoch': 300,
        'optimizer': 'Adam',
        'lr': 0.001,
        'model': 'CNN1',
        'scheme': scheme,
        'save_path': '/content/gdrive/Shareddrives/BCI/CNN_model/'
    }
    # os.makedirs('/content/gdrive/MyDrive/model', exist_ok=True)
    model = CNN1().to(device) # CNN1, CNN2, CNN3, CNN4
    model.load_state_dict(torch.load('/content/gdrive/Shareddrives/BCI/CNN_model/CNN1_independent_0.001_128')) # 要改load model的檔名
    #for name, param in model.named_parameters():
    #  if name != 'classifier.weight' and name != 'classifier.bias':
    #    param.requires_grad = False
    loss_record = train_model(model, tune_dl, test_dl, device, config)
  else:
    train_dl, test_dl, _ = dl
    model = CNN1().to(device)  # CNN1, CNN2, CNN3, CNN4
    config = {
        'epoch': 500,
        'optimizer': 'Adam',
        'lr': 0.0001,
        'scheme': scheme,
        'model': 'CNN1',
        'save_path': '/content/gdrive/Shareddrives/BCI/CNN_model/sliding/'
    }
    # os.makedirs('/content/gdrive/MyDrive/model', exist_ok=True)
    loss_record = train_model(model, train_dl, test_dl, device, config)

  # plot confusion matrix
#   model = EEGNet().to(device)
#   model.load_state_dict(torch.load(f"{config['save_path']+config['model']}_{config['scheme']}_{config['lr']}_{BATCH_SIZE}")) # 要改load model的檔名
#   plot_confusion_matrix(model, test_dl, device)
  
def plot_confusion_matrix(model, test_dl, device):
  pred = test_model(model, test_dl, device)
  _, y_test = next(iter(test_dl))
  cm = confusion_matrix(y_test, pred[0].cpu(), normalize = 'pred')
  disp = ConfusionMatrixDisplay(confusion_matrix=cm)
  disp.plot()
  plt.plot()

In [None]:
if __name__ =="__main__":
  main()

x_train shape:  torch.Size([13824, 1, 22, 281])
y_train shape:  torch.Size([13824])
x_validation shape:  torch.Size([864, 1, 22, 281])
y_validation shape:  torch.Size([864])
x_test shape:  torch.Size([864, 1, 22, 281])
y_test shape:  torch.Size([864])

 training start
epoch: 10, training acc: 0.4117476851851852 val_acc: 0.39699074074074076
epoch: 20, training acc: 0.45970775462962965 val_acc: 0.45949074074074076
epoch: 30, training acc: 0.46997974537037035 val_acc: 0.5046296296296297
epoch: 40, training acc: 0.4908854166666667 val_acc: 0.45601851851851855
epoch: 50, training acc: 0.5056423611111112 val_acc: 0.4722222222222222
epoch: 60, training acc: 0.5109953703703703 val_acc: 0.49421296296296297
epoch: 70, training acc: 0.5214120370370371 val_acc: 0.4722222222222222
epoch: 80, training acc: 0.5211950231481481 val_acc: 0.4791666666666667
epoch: 90, training acc: 0.5343605324074074 val_acc: 0.4756944444444444
epoch: 100, training acc: 0.5410879629629629 val_acc: 0.4710648148148148
epoc

In [None]:
def val_model_CCNN(model, test_dl, device):
  model.eval()
  # print('test start')
  acc = 0
  with torch.no_grad():
    for x_test, y_test in test_dl:
      x_test, y_test = x_test.to(device), y_test.to(device)
      pred, feature_pred = model(x_test)
      pred = torch.argmax(pred, axis = 1)
      acc += (pred==y_test).sum().item()
  acc /= len(test_dl.dataset)
 
  return acc

## if using MLP to fusion
def train_fusion_model(model, train_dl, test_dl, device, config):
  # set the optimizer and corresponding loss function
  optimizer = getattr(optim, config['optimizer'])(model["fusion model"].parameters(), lr=config['lr'], weight_decay=0.0001)
  if config['model'] == 'MCNN':
    criterion = nn.CrossEntropyLoss()
  elif config['model'] == 'CCNN':
    criterion = nn.MSELoss()
  #lr_scheduler = optim.lr_scheduler.ExponentialLR(optimizer, 0.99)
  record = {
      'train_loss': [],
      'train_acc': [],
      'val_loss': [],
      'val_acc': []
      }
  
  max_acc = 0
  print('\n training start')
  # start training process
  for epoch in range(config['epoch']):
    model["fusion model"].train()
    train_loss = 0
    train_acc = 0
    loss = 0
    print("epoch: ", epoch)
    for x_train, y_train in train_dl:
      x_train, y_train = x_train.to(device), y_train.to(device)
      optimizer.zero_grad()
      latent1, _ = model['CNN1'](x_train) # model prediction
      latent2, _ = model['CNN2'](x_train)
      latent3, _ = model['CNN3'](x_train)
      latent4, _ = model['CNN4'](x_train)
      feat_vec = torch.cat([latent1, latent2, latent3, latent4], 1)
      _, out = model['fusion model'](feat_vec)
      if config['model'] == 'MCNN':
        loss = criterion(out, y_train) # calculate loss
        pred = torch.argmax(out, axis = 1) # max out the prediction
      elif config['model'] == 'CCNN':
        loss = criterion(out, feat_vec) # calculate loss
        pred = torch.argmax(_, axis = 1) # max out the prediction
      train_loss += loss.detach().cpu().item()
      train_acc += (pred==y_train).sum().item() # count the corrected classified casese

      loss.backward() # derive backpropagation
      optimizer.step() # update model

    for x_test, y_test in test_dl:
      x_test, y_test = x_test.to(device), y_test.to(device)
      latent1, _ = model['CNN1'](x_test) # model prediction
      latent2, _ = model['CNN2'](x_test)
      latent3, _ = model['CNN3'](x_test)
      latent4, _ = model['CNN4'](x_test)
      feat_vec_val = torch.cat([latent1, latent2, latent3, latent4], 1)
      val_dataset = TensorDataset(feat_vec_val, y_test)
    
    del latent1, latent2, latent3, latent4, feat_vec_val

    val_dl = DataLoader(
        dataset = val_dataset,
        batch_size = len(val_dataset),
        shuffle = False,
        num_workers = 0   
    )

    # save the model which produce maximum validation accuracy
    if config['model'] == 'MCNN':
      val_acc = val_model(model['fusion model'], val_dl, device)
    elif config['model'] == 'CCNN':
      val_acc = val_model_CCNN(model['fusion model'], val_dl, device)
    
    ## free tensor dataset
    del val_dataset, val_dl
    torch.cuda.empty_cache()

    if(val_acc>max_acc):
      max_acc = val_acc
      torch.save(model['fusion model'].state_dict(), f"{config['save_path']+config['model']}_{config['scheme']}_{config['lr']}_{BATCH_SIZE}")

    record['train_loss'].append(train_loss/len(train_dl))
    record['train_acc'].append(train_acc/len(train_dl.dataset))
    record['val_acc'].append(val_acc)
      
    #lr_scheduler.step()
    if (epoch+1)%10 == 0:
      print(f'epoch: {epoch+1}, training acc: {train_acc/len(train_dl.dataset)} val_acc: {val_acc}')
  
  return record


**For training fusion model**

In [None]:
def val_model(model, test_dl, device):
  model['fusion model'].eval()
  # print('test start')
  acc = 0
  with torch.no_grad():
        for x_test, y_test in test_dl:
            x_test, y_test = x_test.to(device), y_test.to(device)
            latent1, _ = model['CNN1'](x_test) # model prediction
            latent2, _ = model['CNN2'](x_test)
            latent3, _ = model['CNN3'](x_test)
            latent4, _ = model['CNN4'](x_test)
            feat_vec = torch.cat([latent1, latent2, latent3, latent4], 1)
            _, pred= model['fusion model'](feat_vec)
            pred = pred.mean(0)
            pred = torch.argmax(pred)
            acc += (pred==y_test[0]).item()

  acc /= len(test_dl.dataset)/18
 
  return acc

## if using MLP to fusion
def train_fusion_model(model, train_dl, test_dl, device, config):
  # set the optimizer and corresponding loss function
  optimizer = getattr(optim, config['optimizer'])(model["fusion model"].parameters(), lr=config['lr'], weight_decay=0.0001)
  if config['model'] == 'MCNN':
    criterion = nn.CrossEntropyLoss()
  elif config['model'] == 'CCNN':
    criterion = nn.MSELoss()
  
  model['CNN1'].eval()
  model['CNN2'].eval()
  model['CNN3'].eval()
  model['CNN4'].eval()

  record = {
      'train_loss': [],
      'train_acc': [],
      'val_loss': [],
      'val_acc': []
      }
  
  max_acc = 0
  print('\n training start')
  # start training process
  for epoch in range(config['epoch']):
    model["fusion model"].train()
    train_loss = 0
    train_acc = 0
    loss = 0
    print("epoch: ", epoch)
    for x_train, y_train in train_dl:
      x_train, y_train = x_train.to(device), y_train.to(device)
      optimizer.zero_grad()
      with torch.no_grad():
        latent1, _ = model['CNN1'](x_train) # model prediction
        latent2, _ = model['CNN2'](x_train)
        latent3, _ = model['CNN3'](x_train)
        latent4, _ = model['CNN4'](x_train)
        feat_vec = torch.cat([latent1, latent2, latent3, latent4], 1)
      _, out = model['fusion model'](feat_vec)
      if config['model'] == 'MCNN':
        loss = criterion(out, y_train) # calculate loss
        pred = torch.argmax(out, axis = 1) # max out the prediction
      elif config['model'] == 'CCNN':
        loss = criterion(out, feat_vec) # calculate loss
        pred = torch.argmax(_, axis = 1) # max out the prediction
      train_loss += loss.detach().cpu().item()
      train_acc += (pred==y_train).sum().item() # count the corrected classified casese

      loss.backward() # derive backpropagation
      optimizer.step() # update model
    
    del latent1, latent2, latent3, latent4, feat_vec, out

    val_acc = val_model(model, test_dl, device)
    
    ## free tensor dataset
    torch.cuda.empty_cache()

    if(val_acc>max_acc):
      max_acc = val_acc
      torch.save(model['fusion model'].state_dict(), f"{config['save_path']+config['model']}_{config['scheme']}_{config['lr']}_{BATCH_SIZE}_test16")

    record['train_loss'].append(train_loss/len(train_dl))
    record['train_acc'].append(train_acc/len(train_dl.dataset))
    record['val_acc'].append(val_acc)
      
    #lr_scheduler.step()
    if (epoch+1)%10 == 0:
      print(f'epoch: {epoch+1}, training acc: {train_acc/len(train_dl.dataset)} val_acc: {val_acc}')
  
  return record


In [None]:
def train_fusion_model_test(model, train_dl, test_dl, device, config):
  # set the optimizer and corresponding loss function
  optimizer = getattr(optim, config['optimizer'])(model["fusion model"].parameters(), lr=config['lr'], weight_decay=0.0001)
  if config['model'] == 'MCNN':
    criterion = nn.CrossEntropyLoss()
  elif config['model'] == 'CCNN':
    criterion = nn.MSELoss()
    #train_dl.batch = 1
  #lr_scheduler = optim.lr_scheduler.ExponentialLR(optimizer, 0.99)
  record = {
      'train_loss': [],
      'train_acc': [],
      'val_loss': [],
      'val_acc': []
      }
  
  max_acc = 0
  print('\n training start')
  # start training process
  for epoch in range(config['epoch']):
    model["fusion model"].train()
    train_loss = 0
    train_acc = 0
    loss = 0
    print("epoch: ", epoch)
    for x_train, y_train in train_dl:
      x_train, y_train = x_train.to(device), y_train.to(device)
      optimizer.zero_grad()
      latent1, _ = model['CNN1'](x_train) # model prediction
      latent2, _ = model['CNN2'](x_train)
      latent3, _ = model['CNN3'](x_train)
      latent4, _ = model['CNN4'](x_train)
      feat_vec = torch.cat([latent1, latent2, latent3, latent4], 1)
      _, out = model['fusion model'](feat_vec)
      if config['model'] == 'MCNN':
        loss = criterion(out, y_train) # calculate loss
        pred = torch.argmax(out, axis = 1) # max out the prediction
      elif config['model'] == 'CCNN':
        loss = criterion(out, feat_vec) # calculate loss
        pred = torch.argmax(_, axis = 1) # max out the prediction
      train_loss += loss.detach().cpu().item()
      train_acc += (pred==y_train).sum().item() # count the corrected classified casese

      loss.backward() # derive backpropagation
      optimizer.step() # update model

    '''# validation
    for x_test, y_test in test_dl:
      x_test, y_test = x_test.to(device), y_test.to(device)
      latent1, _ = model['CNN1'](x_test) # model prediction
      latent2, _ = model['CNN2'](x_test)
      latent3, _ = model['CNN3'](x_test)
      latent4, _ = model['CNN4'](x_test)
      feat_vec_val = torch.cat([latent1, latent2, latent3, latent4], 1)
      val_dataset = TensorDataset(feat_vec_val, y_test)
    
    del latent1, latent2, latent3, latent4, feat_vec_val

    val_dl = DataLoader(
        dataset = val_dataset,
        batch_size = len(val_dataset),
        shuffle = False,
        num_workers = 0   
    )

    # save the model which produce maximum validation accuracy
    if config['model'] == 'MCNN':
      val_acc = val_model(model['fusion model'], val_dl, device)
    elif config['model'] == 'CCNN':
      val_acc = val_model_CCNN(model['fusion model'], val_dl, device)
    
    ## free tensor dataset
    del val_dataset, val_dl
    torch.cuda.empty_cache()

    if(val_acc>max_acc):
      max_acc = val_acc
      torch.save(model['fusion model'].state_dict(), f"{config['save_path']+config['model']}_{config['scheme']}_{config['lr']}_{BATCH_SIZE}")'''

    record['train_loss'].append(train_loss/len(train_dl))
    record['train_acc'].append(train_acc/len(train_dl.dataset))
    #record['val_acc'].append(val_acc)
    if train_acc > max_acc:
      torch.save(model['fusion model'].state_dict(), f"{config['save_path']+config['model']}_{config['scheme']}_{config['lr']}_{BATCH_SIZE}")
      max_acc = train_acc
    
    #lr_scheduler.step()
    if (epoch+1)%10 == 0:
      print(f'epoch: {epoch+1}, training loss: {train_loss/len(train_dl)} training acc: {train_acc/len(train_dl.dataset)}')
  
  return record


In [None]:
# 如果要使用4個train好的CNN model可以用下面當作範例
device = get_device()
set_seed(33)
scheme = 'independent' # individual, dependent, independent, fine-tune
file_dir = '/content/gdrive/Shareddrives/BCI/BCI_data'
dl = get_dataloader(file_dir, scheme)
train_dl, test_dl = dl
config = {
        'epoch': 500,
        'optimizer': 'Adam',
        'lr': 0.0001,
        'scheme': scheme,
        'model': 'CCNN',
        'save_path': '/content/gdrive/Shareddrives/BCI/CNN_model/'
     }
CNN1 = CNN1().to(device)
CNN1.load_state_dict(torch.load(f"/content/gdrive/Shareddrives/BCI/CNN_model/CNN1_independent_0.0001_128", map_location=torch.device(device)))
CNN2 = CNN2().to(device)
CNN2.load_state_dict(torch.load(f"/content/gdrive/Shareddrives/BCI/CNN_model/CNN2_independent_0.0001_128", map_location=torch.device(device)))
CNN3 = CNN3().to(device)
CNN3.load_state_dict(torch.load(f"/content/gdrive/Shareddrives/BCI/CNN_model/CNN3_independent_0.0001_128", map_location=torch.device(device)))
CNN4 = CNN4().to(device)
CNN4.load_state_dict(torch.load(f"/content/gdrive/Shareddrives/BCI/CNN_model/CNN4_independent_0.0001_128", map_location=torch.device(device)))
fusion_model = AE().to(device)
model = {"CNN1": CNN1,
      "CNN2": CNN2,
      "CNN3": CNN3,
      "CNN4": CNN4,
      "fusion model":fusion_model}

record = train_fusion_model_test(model, train_dl, test_dl, device, config)



Window Slide

In [None]:
# Dataloader
# 依據不同的scheme組成dataset
sliding_timestep = 1
def window_slide(x, y):
  length = x.size()[3]
  new_length = int(length/2)
  stride = int(sliding_timestep * (length/4))
  x_new = []
  y_new = []
  for i in range(0, length-new_length+1, stride):
    t_tmp = x[:, :, :, i:i+new_length]
    if x_new == []:
      x_new = t_tmp
      y_new = y
    else:
      x_new = torch.cat((x_new, t_tmp), 0)
      y_new = torch.cat((y_new, y))      
  return x_new, y_new

def choose_scheme(path, scheme):
  global sliding_timestep
  sliding_timestep = 1 # 改成移動的步幅
  filedir = os.listdir(path)
  x_train, x_test, y_train, y_test, x_validation, y_validation = [], [], [], [], [], []
  x_fine_tune, y_fine_tune = [], []
  # 將第一個subject 1固定作為testing subject
  subject_t = "BCIC_S01_T.mat"
  subject_e = "BCIC_S01_E.mat"
  x_train = torch.Tensor(io.loadmat(os.path.join(path, subject_t))['x_train']).unsqueeze(1)
  y_train = torch.Tensor(io.loadmat(os.path.join(path, subject_t))['y_train']).view(-1).long()
  x_test = torch.Tensor(io.loadmat(os.path.join(path, subject_e))['x_test']).unsqueeze(1)
  y_test = torch.Tensor(io.loadmat(os.path.join(path, subject_e))['y_test']).view(-1).long()
  x_train, y_train = window_slide(x_train, y_train)
  x_test, y_test = window_slide(x_test, y_test)
  x_validation = x_test
  y_validation = y_test
  len_x = x_train.size()[0]

  # 依序將其他subject的data讀入
  for filename in filedir:
    #print(filename)
    if filename in {subject_t, subject_e}:
      continue
    elif filename.endswith('E.mat'):
      x = torch.Tensor(io.loadmat(os.path.join(path, filename))['x_test']).unsqueeze(1)
      y = torch.Tensor(io.loadmat(os.path.join(path, filename))['y_test']).view(-1).long()
      x, y = window_slide(x, y)
      x_train = torch.cat([x_train, x])
      y_train = torch.cat([y_train, y])
      # if True: # condition
      #   x = torch.Tensor(io.loadmat(os.path.join(path, filename))['x_test']).unsqueeze(1)
      #   y = torch.Tensor(io.loadmat(os.path.join(path, filename))['y_test']).view(-1).long()
      #   x, y = window_slide(x, y)
      #   x_train = torch.cat([x_train, x])
      #   y_train = torch.cat([y_train, y])
      # else:
      #   x_val = torch.Tensor(io.loadmat(os.path.join(path, filename))['x_test']).unsqueeze(1)
      #   y_val = torch.Tensor(io.loadmat(os.path.join(path, filename))['y_test']).view(-1).long()
      #   x, y = window_slide(x_val, y_val)
      #   x_validation = torch.cat([x_validation, x_val])
      #   y_validation = torch.cat([y_validation, y_val])
      
    elif filename.endswith('T.mat'):
      x = torch.Tensor(io.loadmat(os.path.join(path, filename))['x_train']).unsqueeze(1)
      y = torch.Tensor(io.loadmat(os.path.join(path, filename))['y_train']).view(-1).long()
      x, y = window_slide(x, y)
      x_train = torch.cat([x_train, x])
      y_train = torch.cat([y_train, y])
      

  # choose real training and testing data based on scheme
  if scheme == 'individual': # indiviual will access only training session from subject1
    return [x_train[:len_x], y_train[:len_x], x_test, y_test, x_validation, y_validation]
  elif scheme == 'dependent': # dependent scheme collect all data except for the testing sessions from sub1
    return [x_train, y_train, x_test, y_test, x_validation, y_validation]
  elif scheme == 'independent': # independent scheme collect all data except for testing and training session from sub1
    return [x_train[len_x:], y_train[len_x:], x_test, y_test, x_validation, y_validation]
  elif scheme == 'fine-tune': # the data used on fine tune is the same as individual scheme
    return [x_train[:len_x], y_train[:len_x], x_test, y_test, x_validation, y_validation]
  else:
    raise ValueError('unexpected scheme, enter other scheme again')
  
def get_dataloader(data_path, scheme):
  data = []
  data = choose_scheme(data_path, scheme)

  x_train, y_train, x_test, y_test, x_validation, y_validation = data

  print("x_train shape: ", x_train.size())
  print("y_train shape: ", y_train.size())
  print("x_validation shape: ", x_validation.size())
  print("y_validation shape: ", y_validation.size())
  print("x_test shape: ", x_test.size())
  print("y_test shape: ", y_test.size())


  # 存成tensordataset
  train_dataset = TensorDataset(x_train, y_train)
  test_dataset = TensorDataset(x_test, y_test)
  validation_dataset = TensorDataset(x_validation, y_validation)
  # 包成dataloader
  train_dl = DataLoader(
      dataset = train_dataset,
      batch_size = BATCH_SIZE,
      shuffle = True,
      num_workers = 0   
  )
  test_dl = DataLoader(
      dataset = test_dataset,
      batch_size = len(test_dataset),
      shuffle = False,
      num_workers = 0   
  )
  validation_dl = DataLoader(
      dataset = validation_dataset,
      batch_size = BATCH_SIZE,
      shuffle = False,
      num_workers = 0   
  )
  return [train_dl, test_dl, validation_dl]