<a href="https://colab.research.google.com/github/wylhtydtm/Nematode-project/blob/master/CNN_8signals.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import tables
import numpy as np
import pandas as pd
from PIL import Image
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import matplotlib.pyplot as plt
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from sklearn import preprocessing
import time
import copy

In [None]:
!pip install livelossplot --quiet
from livelossplot import PlotLosses

In [None]:
class timeseries_dataset(Dataset):

    def __init__(self, hdf5_filename, which_set='train', transform=None):

        self.fname = hdf5_filename
        self.set_name = which_set
        # get labels info
        with tables.File(self.fname, 'r') as fid:
            tmp = pd.DataFrame.from_records(
                fid.get_node('/'+self.set_name)['labels'].read())
        self.label_info = tmp[['imaging_plate_drug_concentration', 'MOA_group', 'ts_id']]
        # any transform?
        self.transform = transform

    def __len__(self):
        return len(self.label_info)

    def __getitem__(self, index):
        if torch.is_tensor(index):
            index = index.tolist()
        # I could just use index because ts_id is the same as the index of label_info, but just in case of shuffling...
        label_info = self.label_info.iloc[index]
        ts_id = label_info['ts_id'].astype(int)
        # read data from disk
        with tables.File(self.fname, 'r') as fid:
          timeseries_data = fid.get_node(
                '/' + self.set_name + '/tw_data')[ts_id,:,:].copy()
          
        ts = timeseries_data.astype(np.float32)
        ts = ts.T

        if self.transform:  # if any transforms were given to initialiser

            #ts = ts.reshape((8,876),order='A') 
            ts *= 255
            ts = ts.astype(np.uint8)         
            ts = self.transform(ts)
            ts = ts.squeeze(0)
          
        # read labels too
        labels = label_info['MOA_group']
        labels = np.array(labels, dtype=np.float32).reshape(-1, 1)
        #lb = preprocessing.LabelBinarizer()
         #labels = lb.fit_transform(labels)
        labels = torch.from_numpy(labels)

        #read the drug concentration
        #concentration = label_info['imaging_plate_drug_concentration']
        #concentration = np.array(concentration, dtype=np.float32).reshape(-1, 1)
        #concentration = torch.from_numpy(concentration)

        return timeseries_data, labels #, concentration

In [None]:
def imshow(inp, title=None):
    """Imshow for Tensor. transpose; to get height and width from tensor. 
    In Pytorch, images are presented as [channels, height, width]"""
    inp = inp.numpy().transpose((1, 2, 0))  
    plt.imshow(inp)
    plt.axis("off")
    plt.subplots_adjust(wspace=0.02, hspace=0)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  

In [None]:
 hd = Path('/content/drive/My Drive')
 fname = hd / 'Timeseries_0708_8signals_normalizd.hdf'
 fname_2 = hd/'Timeseries_0708_8signals_normalizd.hdf'


In [None]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
batch_size = 5

In [None]:
tw_transform= transforms.ToTensor()

In [None]:
train_data = timeseries_dataset(fname, which_set='train',transform=tw_transform)
val_data = timeseries_dataset(fname, which_set='val',transform=tw_transform)

In [None]:
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size, num_workers=4)
val_loader = DataLoader(val_data, shuffle=True, batch_size=batch_size, num_workers=4)
    

In [None]:
ts_1 = train_data[0][0]
print(ts_1.shape)

In [None]:
i1,id= next(iter(train_loader))
print(i1.shape)


In [None]:
# visualizing images
i = 0
plt.figure(figsize=(10,10))
plt.subplot(221), plt.plot(train_data[i][0]), plt.title(train_data[i][1])
plt.subplot(222), plt.plot(train_data[i+25][0]), plt.title(train_data[i+25][1])
plt.subplot(223), plt.plot(train_data[i+50][0]), plt.title(train_data[i+50][1])
plt.subplot(224), plt.plot(train_data[i+75][0]), plt.title(train_data[i+75][1])

In [None]:
images, labels= next(iter(train_loader)) 
out = torchvision.utils.make_grid(images,nrow=5)
images.shape

In [None]:
plt.figure(figsize=(40,60))
plt.imshow(images[0].squeeze(0).numpy())
plt.show()

In [None]:
ts_2=val_data[0][0]
plt.figure(figsize=(40,60))
plt.imshow(ts_2.squeeze(0).numpy())
plt.show()

In [None]:
print(val_data[0][1])

In [None]:
class ConvNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_layers = nn.Sequential(
                nn.Conv1d(8, 40, kernel_size=5, stride=1, padding=1),
                nn.BatchNorm1d(40),
                nn.ReLU(), # activation layer
                nn.MaxPool1d(kernel_size=2, stride=2),
                nn.Conv1d(40, 80, kernel_size=5, stride=1, padding=1),)
        self.drop_out = nn.Dropout(0.5)
        self.fc_layers = nn.Sequential(nn.Linear(33200,12))
    
    def forward(self, x):
        x = self.conv_layers(x)
        x = self.drop_out(x)# pass input through conv layers
        x = x.view(x.shape[0], -1) # flatten output for fully connected layer, batchize,-1 do whatever it needs to be 
        x = self.fc_layers(x)# pass  through fully connected layer #
        return x 

learning_rate = 0.0001
epochs = 100

cnn = ConvNet().to(device) # to instantiate model
criterion = torch.nn.CrossEntropyLoss()
optimiser = torch.optim.Adam(cnn.parameters(), lr= learning_rate)

In [None]:
dataloaders = {
    "train": train_loader,
    "validation": val_loader
}
dataset_sizes = {'train':len(train_loader.dataset), 'validation':len(val_loader.dataset)}

In [None]:
def train_model(model, criterion, optimiser, epochs, verbose= True, tag ='Loss/Train'):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    Liveloss= PlotLosses()
    #Iterate through epochs
    for epoch in range(epochs):
        logs = {}
        print('Epoch{}/{}'.format(epoch, epochs-1))
        print('-' * 15)
  
        #Each epoch has a training and validation phase        
        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train()
              
            else:
                model.eval()
                
            running_loss = 0.0
            running_corrects = 0
            
            for index, (inputs, labels) in enumerate (dataloaders[phase]):
                inputs,labels = inputs.to(device), labels.to(device)
                labels = labels.view(-1) # flatten
                labels = labels.long()
                prediction = model(inputs) 
                prediction = prediction.view(labels.size(0), -1)  #flatten
                loss = criterion(prediction, labels) #calculate the loss between predicted and ground truth
                optimiser.zero_grad() # zero the paratmeter gradients
                  
                if phase == 'train':
                    if verbose: print('Epoch:', epoch, '\tBatch:', index, '\tLoss', loss.item())
                    loss.backward()  
                    optimiser.step() # backward + optimize only if in training phase

                _, pred = torch.max(prediction, dim= 1)                  
                running_loss += loss.detach() * inputs.size(0) 
                running_corrects += torch.sum(pred == labels.data)

             # calculate average losses fo the entire epoch
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.float() / dataset_sizes[phase]

            prefix = ''
            if phase == 'validation':
                prefix = 'val_'
                
            logs[prefix + ' loss'] = epoch_loss
            logs[prefix + 'accuracy'] = epoch_acc
                                    
            #Deep copy the model
            if phase == 'validation' and epoch_acc > best_acc:
              best_acc = epoch_acc
              best_model_wts = copy.deepcopy(model.state_dict())

        Liveloss.update(logs)
        Liveloss.send()

    time_elapse = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapse // 60, time_elapse % 60))
    print('Best Val Acc: {}'.format(best_acc)) 
    model.load_state_dict(best_model_wts)
    return model

Training

In [None]:
cnn = train_model(cnn, criterion, optimiser, epochs) # 8 signals normalized data, 

In [None]:
def measure_performance(predictions, labels):
    """
    I think there's scikit learn functions for this
    but found out after writing the function
    """
    # go logical for ease
    predictions = predictions.astype(bool)
    labels = labels.astype(bool)
    # true positives
    tp = np.logical_and(predictions, labels).sum()
    # true negatives
    tn = np.logical_and(~predictions, ~labels).sum()
    # false positives
    fp = np.logical_and(predictions, ~labels).sum()
    # false negatives
    fn = np.logical_and(~predictions, labels).sum()
    # accuracy
    accuracy = (tp + tn) / len(predictions)
    print(f"accuracy = {accuracy}")
    # precision
    precision = tp / (tp + fp)
    print(f"precision = {precision}")
    # recall
    recall = tp / (tp + fn)
    print(f"recall = {recall}")
    # F1
    f1 = 2*tp / (2*tp + fp + fn)
    print(f"F1 score = {f1}")
    return


In [None]:
from sklearn.metrics import classification_report
labels = []
predictions = []

with torch.no_grad():
    for images, labs in val_loader:
        images = images.to(device)
        preds = cnn(images)
        preds = torch.argmax(preds, axis=1)
        predictions.append(preds)
        labels.append(labs)
        
#concatenate accumulators into np arrays for ease of use
predictions = np.concatenate(predictions, axis=0)
labels = np.concatenate(labels, axis=0).squeeze()
print(classification_report(labels, predictions))

# measure performance
print("\nPerformance on validation data")
measure_performance(predictions, labels)

In [None]:
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(labels, predictions)
print(cm)

In [None]:
def calc_accuracy(model, dataloader):
    num_correct = 0
    num_examples = len(dataloader.dataset)
    for inputs, labels in dataloader:
        inputs,labels = inputs.to(device),labels.to(device)
        predictions = model(inputs)
        predictions = torch.argmax(predictions, axis=1)
        labels = labels.squeeze()
        num_correct += int(sum(predictions == labels))
        percent_correct = num_correct / num_examples * 100
    return percent_correct


print('Train Accuracy:', calc_accuracy (cnn, train_loader))
print('Validation Accuracy:', calc_accuracy(cnn, val_loader))