In [None]:
import sys
print(f'Using python version {sys.version}') # get python version

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd '/content/drive/MyDrive/247_project_data'

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random

import torch
import torch.nn as nn
import torch.nn.functional as F

print(f'Using pytorch version {torch.__version__}')

In [None]:
print(torch.cuda.is_available())
device_id = 0 if torch.cuda.is_available() else 'cpu' # Equivalent to device_id = 'cuda:0'
device = torch.device(device_id) # use these semantics to specify a specific device. 

## Prepare the data

In [None]:
X_test = np.load("X_test.npy")
y_test = np.load("y_test.npy")
person_train_valid = np.load("person_train_valid.npy")
X_train_valid = np.load("X_train_valid.npy")
y_train_valid = np.load("y_train_valid.npy")
person_test = np.load("person_test.npy")

print ('Training/Valid data shape: {}'.format(X_train_valid.shape))
print ('Test data shape: {}'.format(X_test.shape))
print ('Training/Valid target shape: {}'.format(y_train_valid.shape))
print ('Test target shape: {}'.format(y_test.shape))
print ('Person train/valid shape: {}'.format(person_train_valid.shape))
print ('Person test shape: {}'.format(person_test.shape))

In [None]:
X_train_valid[0]

In [None]:
# Convert to 0-4 labeling and integer type (integers needed for categorical labels)
y_train_valid = (y_train_valid - np.min(y_train_valid)).astype('int')
y_test = (y_test - np.min(y_test)).astype('int')

In [None]:
#Find the subject 1
Subject1_train_index = np.count_nonzero(person_train_valid == 0)
Subject1_test_index = np.count_nonzero(person_test == 0)
Sub1_train_X = X_train_valid[0:Subject1_train_index]
Sub1_train_y = y_train_valid[0:Subject1_train_index]
Sub1_test_X = X_test[0:Subject1_test_index]
Sub1_test_y = y_test[0:Subject1_test_index]
print('Sub1_train_X:',Sub1_train_X.shape)
print('Sub1_train_y:',Sub1_train_y.shape)
print('Sub1_test_X:',Sub1_test_X.shape)
print('Sub1_test_y:',Sub1_test_y.shape)

## Build the CNN

In [None]:
# inherits from nn.Module
class DeepConvNet(nn.Module):
    def __init__(self, input_shape=(22, 1000), n_temporal_filters=40, n_spatial_filters=40, n_classes=4):
        super().__init__() # call __init__ method of superclass
        
        self.conv1 = nn.Conv2d(1, 25, kernel_size=(1,10), stride=1, padding=0)
        self.conv2 = nn.Conv2d(25, 25, kernel_size=(22,1), stride=1, padding=0)
        self.maxpool1 = nn.MaxPool2d(kernel_size=(1,3), stride=3)
        self.conv3 = nn.Conv2d(25, 50, kernel_size=(1,10), stride=1, padding=0)
        self.maxpool2 = nn.MaxPool2d(kernel_size=(1,3), stride=3)
        self.conv4 = nn.Conv2d(50, 100, kernel_size=(1,10), stride=1, padding=0)
        self.maxpool3 = nn.MaxPool2d(kernel_size=(1,3), stride=3)
        self.conv5 = nn.Conv2d(100, 200, kernel_size=(1,10), stride=1, padding=0)
        self.maxpool4 = nn.MaxPool2d(kernel_size=(1,3), stride=3)
        self.dense = nn.LazyLinear(n_classes)
        return
    
    # declaring a forward method also makes the instance a callable.
    # e.g.:
    # model = ShallowConvNet()
    # out = model(x)
    def forward(self, x):
      x = x.view(-1, 1, 22, 1000)
      #x = F.normalize(x)
      x = F.elu(self.conv1(x))
      x = F.elu(self.conv2(x))
      x = self.maxpool1(x)
      x = F.elu(self.conv3(x))
      x = self.maxpool2(x)
      x = F.elu(self.conv4(x))
      x = self.maxpool3(x)
      x = F.elu(self.conv5(x))
      x = self.maxpool4(x)
      x = x.view(x.shape[0], -1)
      x = self.dense(x)
      return x
       

In [None]:
class ShallowConvNet(nn.Module):
    def __init__(self, input_shape=(22, 1000), n_temporal_filters=40, n_spatial_filters=40, n_classes=4, use_BN = False):
        super().__init__()
        
        self.input_shape = input_shape
        self.n_temporal_filters = n_temporal_filters
        self.n_spatial_filters = n_spatial_filters
        self.n_classes = n_classes
        self.use_BN = use_BN

        self.temporal_convolution = nn.Conv2d(1, n_temporal_filters, (1, 25))
        self.spatial_convolution = nn.Conv2d(n_temporal_filters, n_spatial_filters, (input_shape[0], 1))
        self.average_pool = nn.AvgPool2d((1, 75), stride=(1, 15))
        self.BN_temporal = nn.BatchNorm2d(num_features = n_temporal_filters)
        self.BN_spatial = nn.BatchNorm2d(num_features = n_spatial_filters)

        self.n_dense_features = n_spatial_filters*(1 + ((input_shape[1] - 25 + 1) - 75) // 15)
        self.dense = nn.Linear(self.n_dense_features, n_classes)
        self.elu = nn.ELU()
        #self.dropout = nn.Dropout()
        return
    
    def forward(self, x):

        h = x
        h = h.view(-1, 1, self.input_shape[0], self.input_shape[1]) # view as (batch_size, 1, input_shape[0], input_shape[1])
        h = self.temporal_convolution(h) # (batch_size, 1, H0, W0) -> (batch_size, n_temporal_filters, H0, W0 - 25 + 1)
        if self.use_BN == True:
          h = self.BN_temporal(h)
        h = self.elu(h)

        h = self.spatial_convolution(h) # (batch_size, n_temporal_filters, H0, W0 - 25 + 1) -> (batch_size, n_spatial_filters, 1, W0 - 25 + 1)
        if self.use_BN == True:
          h = self.BN_spatial(h)
        h = self.elu(h)
        h = torch.pow(h, 2.0)
        h = self.average_pool(h) # (batch_size, n_spatial_filters, 1, W0 - 25 + 1) -> (batch_size, n_spatial_filters, 1, 1 + ((W0 - 25 + 1) - 75)//15)
        h = torch.log(h) 
        h = h.view(h.shape[0], -1) # flatten the non-batch dimensions
        h = self.dense(h) # (batch_size, self.n_dense_features) -> (batch_size, n_classes)
        #h = self.dropout(h)

        return h


In [None]:
# torchsummary is deprecated. Use torchinfo instead (https://github.com/TylerYep/torchinfo).
!pip install torchinfo

In [None]:
from torchinfo import summary
# Only uses outputs of modules.
print(summary(ShallowConvNet(use_BN = True), input_shape=(22, 600)))
#print(summary(DeepConvNet(), input_size=(22, 1000)))

## Build Dataset

In [None]:
from torchvision import transforms, utils
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
        
    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

transformations = transforms.Compose([
                    transforms.RandomErasing(p=.99,
                                            scale=(.02,.08),
                                            ratio=(.025,.026),
                                            value=0),
                    AddGaussianNoise(mean=0., std=1.),
                  ])

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, X, Y, transform = None):
        self.transform = transform
        if isinstance(X, np.ndarray):
            self.X = torch.FloatTensor(X) # 32-bit float
        else:
            self.X = X
        if isinstance(Y, np.ndarray):
            self.Y = torch.LongTensor(Y) # integer type
        else:
            self.Y = Y
        return
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, index):
      X = self.X[index]
      y = self.Y[index]
      if self.transform:
        X = self.transform(X)
      return X,y

In [None]:
# Split the data into training and validation sets
from sklearn.model_selection import train_test_split
X_train_np, X_val_np, y_train_np, y_val_np = train_test_split(X_train_valid[:,:,0:600], y_train_valid, test_size=0.2)
X_train = torch.FloatTensor(X_train_np).to(device)
X_val = torch.FloatTensor(X_val_np).to(device)
y_train = torch.LongTensor(y_train_np).to(device)
y_val = torch.LongTensor(y_val_np).to(device)

In [None]:
batch_size = 256
train_dataset = MyDataset(X_train, y_train, transform=None)
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size, shuffle=True)

val_dataset = MyDataset(X_val, y_val)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size, shuffle=False)

In [None]:
len(train_dataset), len(train_dataloader)

Train a Model

In [None]:
# choose which model to use
model = ShallowConvNet(input_shape=(22, 600),use_BN = True).to(device)
#model = DeepConvNet().to(device)

In [None]:
import tqdm 
torch.manual_seed(12345) 
random.seed(12345) 
np.random.seed(12345) 

optimizer = torch.optim.Adam(model.parameters(), lr=0.0005, weight_decay = 0.05)
celoss = nn.CrossEntropyLoss()

loss_hist = []
val_loss_hist = []
acc_hist = []
val_acc_hist = [0]

In [None]:
num_epochs = 50

pbar = tqdm.tqdm(
    range(num_epochs), position=0, leave=True,
    bar_format='{l_bar}{bar:30}{r_bar}',
)
for epoch in pbar:
    model.train() 
    for batch in train_dataloader:
        optimizer.zero_grad() 
        x, y = batch

        out = model(x) 
        loss = celoss(out, y) 

        loss.backward() 
        optimizer.step() 
        loss_hist.append(loss.item())
    
    model.eval() 
    ns = 0 # number of samples
    nc = 0 # number of correct outputs
    with torch.no_grad():
        for batch in train_dataloader:
            x, y = batch
            out = model(x)
            ns += len(y)
            nc += (out.max(1)[1] == y).detach().cpu().numpy().sum()
    acc_hist.append(nc/ns)

    ns = 0 
    nc = 0 
    with torch.no_grad():
        for batch in val_dataloader:
            x, y = batch
            out = model(x)
            loss = celoss(out, y)
            val_loss_hist.append(loss.item())
            ns += len(y)
            nc += (out.max(1)[1] == y).detach().cpu().numpy().sum()
    val_acc = nc/ns
    if val_acc > max(val_acc_hist):
      torch.save(model.state_dict(), 'train_best33.pt')
    val_acc_hist.append(nc/ns)
    
    pbar.set_postfix({'acc': acc_hist[-1], 'val_acc': val_acc_hist[-1]})

In [None]:
print(val_acc_hist[-1])

In [None]:
plt.figure(dpi=100)
#plt.subplot(1,2,1)
plt.subplots_adjust(wspace=1)
plt.plot(loss_hist, label='training')
plt.yscale('log')
plt.ylabel('loss')
plt.legend(loc='lower left')
plt.xlabel('step (training)')
plt.twiny()
plt.plot(val_loss_hist, 'r', label='validation')
plt.xlabel('step (validation)')
plt.legend(loc='upper left')


# plt.subplot(1,2,2)
# plt.subplots_adjust(wspace=1)
# plt.plot(acc_hist, label='training')
# plt.ylabel('acc')
# plt.legend(loc='lower left')
# plt.xlabel('step (training)')
# plt.twiny()
# plt.plot(val_acc_hist, 'r', label='validation')
# plt.xlabel('step (validation)')
# plt.legend(loc='upper left')
# plt.grid

In [None]:
#Test
X_test = torch.FloatTensor(X_test[:,:,0:600]).to(device)
y_test = torch.LongTensor(y_test).to(device)
Test_dataset = MyDataset(X_test,y_test)
Test_dataloader = torch.utils.data.DataLoader(Test_dataset, batch_size, shuffle=False)

In [None]:
test_loss_hist = []
test_acc_hist = []
ns = 0
nc = 0
model.eval()
with torch.no_grad():
    for batch in Test_dataloader:
        x, y = batch
        out = model(x)
        loss = celoss(out, y)
        test_loss_hist.append(loss.item())
        ns += len(y)
        nc += (out.max(1)[1] == y).detach().cpu().numpy().sum()
test_acc_hist.append(nc/ns)
print('Test ACC:',test_acc_hist[-1])

## Save the model

In [None]:

checkpoint = {
    'model': model.state_dict(),
    'optimizer': optimizer.state_dict(),
    'loss_hist': loss_hist,
    'acc_hist': acc_hist,
    'val_loss_hist': val_loss_hist,
    'val_acc_hist': val_acc_hist,
}
torch.save(checkpoint, 'shallowconvnet33_checkpoint.pt')
