In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn as nn
import torch.utils.data as torch_data
import torch.nn.functional as F
from torchsummary import summary
import json, os
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import accuracy_score


%matplotlib inline

In [4]:
# split the data along the axis with shape 70
# into 23 slices of width 3, ignore the last layer

class MriData2d_3(torch.utils.data.Dataset):
    def __init__(self, X, y):
        super(MriData2d, self).__init__()
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y).long()
    
    def __len__(self):
        return self.X.shape[0] * 23
    
    def __getitem__(self, idx):
        i = idx // 23
        j = idx % 23
        return self.X[i,:,3*j:3*j+3,:].permute(1, 0, 2), self.y[i]

In [None]:
# split the data along the axis with shape 70
# into 14 slices of width 5
class MriData2d_5(torch.utils.data.Dataset):
    def __init__(self, X, y):
        super(MriData2d, self).__init__()
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y).long()
    
    def __len__(self):
        return self.X.shape[0] * 14
    
    def __getitem__(self, idx):
        i = idx // 14
        j = idx % 14
        return self.X[i,:,5*j:5*j+5,:].permute(1, 0, 2), self.y[i]

In [None]:
class hidden(nn.Module):
    def __init__(self, c_in, c_out):
        super(hidden, self).__init__()
        self.conv_block = nn.Sequential(
        nn.Conv2d(c_in, c_out, 3),
        nn.BatchNorm2d(c_out),
        nn.ReLU(),
        nn.MaxPool2d(2)
        )
    def forward(self, x):
        return self.block(x)

In [5]:
class hidden_res(nn.Module):
    def __init__(self, c_in, c_out):
        super(hidden, self).__init__()
        self.conv_block = nn.Sequential(
        nn.Conv2d(c_in, c_out, 3),
        nn.BatchNorm2d(c_out)
        )
        self.act_block = nn.Sequential(
        nn.ReLU(),
        nn.MaxPool2d(2)
        )
    def forward(self, x):
        return self.act_block(x + self.conv_block(x))

In [10]:
class MriNet2d_3(nn.Module):
    def __init__(self, c):
        super(MriNet2d, self).__init__()
        self.hidden_layers = nn.Sequential(
            nn.Conv2d(3, c, 3, padding=1),
            hidden(c, c), 
            hidden(c, 2*c), 
            hidden(2*c, 4*c)
        )
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(p=0.2)
        self.linear = nn.Linear(4*c*5*5, 2)
        
    def forward(self, x):
        x = self.hidden_layers(x)
        x = self.flatten(x)
        x = self.dropout(x)
        x = self.linear(x)
        
        return x
        

In [None]:
class MriNet2d_5(nn.Module):
    def __init__(self, c):
        super(MriNet2d, self).__init__()
        self.hidden_layers = nn.Sequential(
            nn.Conv2d(5, c, 3, padding=1),
            hidden(c, c), 
            hidden(c, 2*c), 
            hidden(2*c, 4*c)
        )
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(p=0.2)
        self.linear = nn.Linear(4*c*5*5, 2)
        
    def forward(self, x):
        x = self.hidden_layers(x)
        x = self.flatten(x)
        x = self.dropout(x)
        x = self.linear(x)
        
        return x
        

In [None]:
class MriNet2d_res3(nn.Module):
    def __init__(self, c):
        super(MriNet2d, self).__init__()
        self.hidden_layers = nn.Sequential(
            nn.Conv2d(3, c, 3, padding=1),
            hidden_res(c, c), 
            hidden_res(c, 2*c), 
            hidden_res(2*c, 4*c)
        )
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(p=0.2)
        self.linear = nn.Linear(4*c*5*5, 2)
        
    def forward(self, x):
        x = self.hidden_layers(x)
        x = self.flatten(x)
        x = self.dropout(x)
        x = self.linear(x)
        
        return x

In [None]:
class MriNet2d_res5(nn.Module):
    def __init__(self, c):
        super(MriNet2d, self).__init__()
        self.hidden_layers = nn.Sequential(
            nn.Conv2d(5, c, 3, padding=1),
            hidden_res(c, c), 
            hidden_res(c, 2*c), 
            hidden_res(2*c, 4*c)
        )
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(p=0.2)
        self.linear = nn.Linear(4*c*5*5, 2)
        
    def forward(self, x):
        x = self.hidden_layers(x)
        x = self.flatten(x)
        x = self.dropout(x)
        x = self.linear(x)
        
        return x

In [15]:
def get_accuracy(net, data_loader):
    net.eval()
    correct = 0
    for data, target in data_loader:
        data = data.to(device)
        target = target.to(device)

        out = net(data)
        pred = out.data.max(1)[1] # get the index of the max log-probability
        correct += pred.eq(target.data).cpu().sum()
        del data, target
    accuracy = 100. * correct / len(data_loader.dataset)
    return accuracy.item()

def get_loss(net, data_loader):
    net.eval()
    loss = 0 
    for data, target in data_loader:
        data = data.to(device)
        target = target.to(device)

        out = net(data)
        loss += criterion(out, target).item()*len(data)

        del data, target

    return loss / len(data_loader.dataset)


def train(epochs, net, criterion, optimizer, train_loader, val_loader, scheduler=None, verbose=True):
    train_loss_list = []
    val_loss_list = []
    train_acc_list = []
    val_acc_list = []

    train_loss_list.append(get_loss(net, train_loader))
    val_loss_list.append(get_loss(net, val_loader))
    train_acc_list.append(get_accuracy(net, train_loader))
    val_acc_list.append(get_accuracy(net, val_loader))
    if verbose:
        print('Epoch {:02d}/{} || Loss:  Train {:.4f} | Validation {:.4f}'.format(0, epochs, train_loss_list[-1], val_loss_list[-1]))

    net.to(device)
    for epoch in range(1, epochs+1):
        net.train()
        for X, y in train_loader:
            # Perform one step of minibatch stochastic gradient descent
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            out = net(X)
            loss = criterion(out, y)
            loss.backward()
            optimizer.step()
            
        
        # define NN evaluation, i.e. turn off dropouts, batchnorms, etc.
        net.eval()
        for X, y in val_loader:
            # Compute the validation loss
            X, y = X.to(device), y.to(device)
            out = net(X)
         
        if scheduler is not None:
            scheduler.step()
        
        
        train_loss_list.append(get_loss(net, train_loader))
        val_loss_list.append(get_loss(net, val_loader))
        train_acc_list.append(get_accuracy(net, train_loader))
        val_acc_list.append(get_accuracy(net, val_loader))
        
        train_loss_list = np.array(train_loss_list)
        val_loss_list = np.array(val_loss_list)
        train_acc_list = np.array(train_acc_list)
        val_acc_list = np.array(val_acc_list)

        freq = 1
        if verbose and epoch%freq==0:
            print('Epoch {:02d}/{} || Loss:  Train {:.4f} | Validation {:.4f} || Accuracy:  Train {:.4f} | Validation {:.4f}'.format(epoch, epochs, train_loss_list[-1], val_loss_list[-1], train_acc_list[-1], val_acc_list[-1]))
        
    return train_loss_list, val_loss_list, train_acc_list, val_acc_list
            
            

In [1]:
def init_model(name, d, c):
    if name == 'conv':
        if d == 3:
            return MriNet2d_3(c)
        elif d == 5:
            return MriNet2d_5(c)
        else:
            print('Wrong d')
    elif name == 'res':
        if d == 3:
            return MriNet2d_res3(c)
        elif d == 5:
            return MriNet2d_res5(c)
        else:
            print('Wrong d')
    else:
            print('Wrong name')

In [2]:
def dataset_class(d):
    if d == 3:
        return MriData2d_3
    elif d == 5:
        return MriData2d_5
    else:
        print('Wrong d')

In [None]:
def cross_val_experiment(name, d, c=32):
    X, y = np.load('../data/tensors.npy'), np.load('../data/labels.npy')
    i = 0
    results = {}
    for j in range(5):
        results[j] = {}
        
    for train_idx, test_idx StratifiedKFold().split(X, y):
        model = init_model(name, d, c)
        dataset_type = dataset_class(d)
        train_dataset = dataset_type(X[train_idx], y[train_idx])
        test_dataset = dataset_type(X[test_idx], y[test_idx])
        train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
        val_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False)
        criterion = nn.CrossEntropyLoss().to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=3e-3)
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[5, 15, 25], gamma=0.5)
        print(f'Fold {i} out of 5')
        train_losses, val_losses, train_accs, val_accs =  train(35, model, criterion, optimizer, train_loader,
                                                                val_loader, scheduler=None, verbose=False)
        results[i]['train_loss'] = train_losses
        results[i]['val_loss'] = val_losses
        results[i]['train_acc'] = train_accs
        results[i]['val_acc'] = val_accs
        if d == 3:
            features_dim = 23
        elif d == 5:
            features_dim = 14
        else:
            print('Wrong d')
            
        new_features = np.zeros((len(test_idx), features_dim))
        for idx in range(517*features_dim):
            i = idx // features_dim
            j = idx % features_dim
            x = X[test_idx][i,:,5*j:5*j+5,:].permute(1, 0, 2).unsqueeze(0)
            y = model(x).squeeze()
            ind = np.argmax(y.detach().numpy())
            new_features[i, j] = ind
        y_pred = np.mean(new_features, axis=1)
        y_pred = np.where(y_pred>0.5, 1, 0)
        acc = accuracy_score(y[test_idx] y_pred)
        results[i]['test_acc_fin'] = acc
        
        new_features = np.zeros((len(train_idx), features_dim))
        for idx in range(517*features_dim):
            i = idx // features_dim
            j = idx % features_dim
            x = X[train_idx][i,:,5*j:5*j+5,:].permute(1, 0, 2).unsqueeze(0)
            y = model(x).squeeze()
            ind = np.argmax(y.detach().numpy())
            new_features[i, j] = ind
        y_pred = np.mean(new_features, axis=1)
        y_pred = np.where(y_pred>0.5, 1, 0)
        acc = accuracy_score(y[train_idx] y_pred)
        results[i]['train_acc_fin'] = acc
        i += 1
        
    return results 

In [None]:
c = 32
d = 3
#d = 5
model = 'conv'
#model = 'res'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

results = cross_val_experiment(name, d, c)
with open(f'{name}_{d}_2d_cv_results.json', 'w') as fp:
    json.dump(fp, results)