In [103]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import copy
%load_ext jupyternotify

The jupyternotify extension is already loaded. To reload it, use:
  %reload_ext jupyternotify


In [104]:
import torchvision
import torchvision.transforms as transforms
from torchvision import models

In [105]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


#Data 
0->Covid
1->No Covid

In [106]:
classes=['Covid','No Covid']
batch_size=4

In [107]:
# reshape all images to 64x64 and apply tensor transformation
dataset = torchvision.datasets.ImageFolder(root="./Full",transform=transforms.Compose([
                                                            transforms.ToTensor(),
                                                            transforms.Resize([227,227])
                                                            # transforms.Grayscale(num_output_channels=1)
                                                            ]))
# testset = torchvision.datasets.ImageFolder(root="./xray",train=False,transform=transforms.Compose([transforms.Resize([300,305]),transforms.ToTensor()]))
# testloader = torch.utils.data.DataLoader(testset, batch_size=4, shuffle=False)

In [108]:
print(len(dataset))
trainset,testset,valset=torch.utils.data.random_split(dataset,[round(0.8*len(dataset)),round(0.1*len(dataset)),round(0.1*len(dataset))],generator=torch.Generator().manual_seed(42))
trainloader=torch.utils.data.DataLoader(trainset,batch_size=batch_size,shuffle=True)
testloader=torch.utils.data.DataLoader(testset,batch_size=batch_size,shuffle=False)
valloader=torch.utils.data.DataLoader(valset,batch_size=batch_size,shuffle=False)

8088


In [109]:
print(len(dataset))
trainset,testset=torch.utils.data.random_split(dataset,[round(0.8*len(dataset)),round(0.2*len(dataset))],generator=torch.Generator().manual_seed(42))
trainloader=torch.utils.data.DataLoader(trainset,batch_size=4,shuffle=True)
testloader=torch.utils.data.DataLoader(testset,batch_size=4,shuffle=False)
print(len(trainset)/batch_size,len(testset)/batch_size)

8088
1617.5 404.5


In [110]:
dataiter=iter(trainloader)
images,labels=dataiter.next()
print(images.shape,labels)

torch.Size([4, 3, 227, 227]) tensor([1, 0, 0, 1])


In [111]:
def plot_img(img):
    npimg=img.numpy()
    plt.imshow(np.transpose(npimg,(1,2,0)))
    plt.show()

# Preparing The CNN

In [112]:
def evaluation(dataloader,model):
    model.eval()
    with torch.no_grad():
        total,correct=0,0
        for data in dataloader:
            inputs,labels=data
            inputs,labels=inputs.to(device),labels.to(device)
            outputs=model(inputs)
    #         print(outputs)
    #         print(outputs,labels)
            m = nn.Sigmoid()
            outputs=m(outputs)
            pred=outputs>=0.5
            pred=pred.flatten()
            total+=labels.size(0)
            # labels=torch.add(labels,-1)
            # print(pred,labels)
    #         print(list(map(lambda a: classes[a],pred)),list(map(lambda a: classes[a],labels)))
            correct+=(pred==labels).sum().item()
    print(correct,total)
    model.train()
    return 100*correct/total

In [113]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN,self).__init__()
        self.representation_network=nn.Sequential(
            nn.Conv2d(3,32,3), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size=2,stride=3),
            nn.Dropout(p=0.2),
            nn.Conv2d(32,32,3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=3),
            nn.Dropout(p=0.2),
            nn.Conv2d(32,64,3),
            nn.ReLU(),
            nn.Conv2d(64,64,3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=3),
            nn.Dropout(p=0.2),
        )
        self.classification_network=nn.Sequential(
            nn.Linear(3136,512),
            nn.ReLU(),
            nn.Linear(512,128),
            nn.ReLU(),
            nn.Linear(128,1),
#             nn.Sigmoid()
        )
    def forward(self,x):
#         print(x.shape)
        x=self.representation_network(x)
#         print(x.shape)
        # flattening of the vector=> same dimension of first index(batch size) , everythign else is flattened(-1)
        x=x.view(x.size(0),-1)
#        print(x.shape)
        x=self.classification_network(x)
#        print(x.shape)
        return x

In [None]:
def train(net,dataloader,epochs=15):
    loss_fn=nn.BCEWithLogitsLoss().to(device)
    opt=optim.Adam(params=net.parameters())
    for epoch in range(epochs):
        for i,data in enumerate(dataloader,0):
            inputs,labels=data
            inputs,labels=inputs.to(device),labels.to(device)
            opt.zero_grad()
            outputs=net(inputs)
            labels=labels.unsqueeze(-1)
            labels = labels.type_as(outputs)
    #         print(outputs)
            loss=loss_fn(outputs,labels)
            loss.backward()
            opt.step()
            torch.cuda.empty_cache()
            del inputs,labels,outputs

In [114]:
net = CNN()
# net=torch.load("./coronaCNN.pt")
# net.load_state_dict(torch.load("./coronaCNN_State.pt"))
opt=optim.Adam(params=net.parameters())
net=net.to(device)

In [115]:
print(net(images.to(device)).shape)

torch.Size([4, 1])


In [116]:
%%notify -m "Cell has completed exec"
loss_arr=[]
loss_epoch=[]
epochs=5
best_model_loss,best_model=1000000,net.state_dict()
n_iters=np.ceil(len(trainset)/batch_size)
for epoch in range(epochs):
    loss_this_epoch=0
    loss_arr=[]
    for i,data in enumerate(trainloader,0):
        inputs,labels=data
        inputs,labels=inputs.to(device),labels.to(device)
        opt.zero_grad()
        outputs=net(inputs)
        labels=labels.unsqueeze(-1)
        labels = labels.type_as(outputs)
#         print(outputs)
        loss=loss_fn(outputs,labels)
        loss_this_epoch+=loss.item()
        loss_arr.append(loss.item())
        loss.backward()
        opt.step()
        torch.cuda.empty_cache()
        del inputs,labels,outputs
        if i%100 == 0:
            print(f'iteration {i}/{n_iters}, Loss={sum(loss_arr)} ')
#             plt.plot(loss_arr)
#             plt.show()
    if loss_this_epoch < best_model_loss:
            best_model_loss=loss_this_epoch
            best_model=copy.deepcopy(net.state_dict())
    loss_epoch.append(loss_this_epoch)
    print(f'Epoch: {epoch}/{epochs}, Loss={loss_this_epoch}')
    plt.plot(loss_epoch)
    plt.show()

NameError: name 'loss_fn' is not defined

<IPython.core.display.Javascript object>

In [None]:
%%notify -m "completed"
print(evaluation(testloader,net))

In [None]:
torch.save(net.state_dict(),"./coronaCNN_State.pt")
torch.save(net,"./coronaCNN.pt")

Test Accuracy and K-Fold Cross Validation

## Cross Validation

In [14]:
from sklearn.model_selection import KFold

In [15]:
def reset_weights(m):
    '''
    Try resetting model weights to avoid
    weight leakage.
    '''
    for layer in m.children():
        if hasattr(layer, 'reset_parameters'):
            print(f'Reset trainable parameters of layer = {layer}')
            layer.reset_parameters()

In [16]:
def k_fold_cv(model,dataset,loss_function,k_folds=5,epochs=10):
    kfold = KFold(n_splits=k_folds, shuffle=True)
    # Initialize optimizer
    results = {}
    for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset)): 
        print(f'FOLD {fold}')
        print('--------------------------------')

        # Sample elements randomly from a given list of ids, no replacement.
        train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
        test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)
        # Define data loaders for training and testing data in this fold
        trainloader = torch.utils.data.DataLoader(
              dataset, 
              batch_size=batch_size, sampler=train_subsampler)
        testloader = torch.utils.data.DataLoader(
              dataset,
              batch_size=batch_size, sampler=test_subsampler)

        # Init the neural network
        network = model
        network.apply(reset_weights)
        optimizer = optim.Adam(network.parameters())
        # Run the training loop for defined number of epochs
        for epoch in range(0, epochs):

            # Print epoch
            print(f'Starting epoch {epoch+1}')

            # Set current loss value
            current_loss = 0.0

            # Iterate over the DataLoader for training data
            for i, data in enumerate(trainloader, 0):

                # Get inputs
                inputs, targets = data
                inputs = inputs.to(device)
                targets = targets.to(device)
                # Zero the gradients
                optimizer.zero_grad()

                # Perform forward pass
                outputs = network(inputs)
                targets = targets.unsqueeze(-1)
                targets = targets.type_as(outputs)
                # Compute loss
                loss = loss_function(outputs, targets)

                # Perform backward pass
                loss.backward()

                # Perform optimization
                optimizer.step()

                # Print statistics
                current_loss += loss.item()
                if i % 500 == 499:
                    print('Loss after mini-batch %5d: %.3f' %
                      (i + 1, current_loss / 500))
                    current_loss = 0.0

        # Process is complete.
        print('Training process has finished. Saving the trained model.')
        save_path = f'./CNN-fold-{fold}.pth'
        torch.save(network, save_path)

        # Evaluation for this fold
        correct, total = 0, 0
        with torch.no_grad():

            # Iterate over the test data and generate predictions
            for i, data in enumerate(testloader, 0):
                # Get inputs
                inputs, targets = data
                inputs,targets=inputs.to(device),targets.to(device)
                # Generate outputs
                outputs = network(inputs)
                m = nn.Sigmoid()
                outputs=m(outputs)
                pred=outputs>=0.5
                pred=pred.flatten()
                # Set total and correct
                total += targets.size(0)
                correct += (pred == targets).sum().item()

            # Print accuracy
            print('Accuracy for fold %d: %d %%' % (fold, 100.0 * correct / total))
            print('--------------------------------')
            results[fold] = 100.0 * (correct / total)

    # Print fold results
    print(f'K-FOLD CROSS VALIDATION RESULTS FOR {k_folds} FOLDS')
    print('--------------------------------')
    sum = 0.0
    for key, value in results.items():
        print(f'Fold {key}: {value} %')
        sum += value
    print(f'Average: {sum/len(results.items())} %')

In [None]:
def reset_weights(m):
    '''
    Try resetting model weights to avoid
    weight leakage.
    '''
    net.load_state_dict(torch.load("CNN4_originalstate.pt"))

In [17]:
%%notify -m "Completed"
net=CNN().to(device)
loss_fn=nn.BCEWithLogitsLoss().to(device)
data_set=trainset
k_fold_cv(net,data_set,loss_fn,epochs=15)

FOLD 0
--------------------------------
Reset trainable parameters of layer = Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1))
Reset trainable parameters of layer = Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1))
Reset trainable parameters of layer = Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
Reset trainable parameters of layer = Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
Reset trainable parameters of layer = Linear(in_features=3136, out_features=512, bias=True)
Reset trainable parameters of layer = Linear(in_features=512, out_features=128, bias=True)
Reset trainable parameters of layer = Linear(in_features=128, out_features=1, bias=True)
Starting epoch 1
Loss after mini-batch   500: 0.644
Loss after mini-batch  1000: 0.500
Starting epoch 2
Loss after mini-batch   500: 0.406
Loss after mini-batch  1000: 0.359
Starting epoch 3
Loss after mini-batch   500: 0.330
Loss after mini-batch  1000: 0.308
Starting epoch 4
Loss after mini-batch   500: 0.269
Loss after mini-batch  1

<IPython.core.display.Javascript object>

In [None]:
reset_weights(net)
train(net,trainloader)

In [18]:
evaluation(testloader,net)

1493 1618


92.27441285537701

In [19]:
torch.save(net,"./CNN4.pt")

In [8]:
def report(dataloader,model):
    y_true,y_pred=torch.tensor([]),torch.tensor([])
    model.eval()
    with torch.no_grad():
        total,correct=0,0
        for data in dataloader:
            inputs,labels=data
            inputs,labels=inputs.to(device),labels.to(device)
            outputs=model(inputs)
    #         print(outputs)
    #         print(outputs,labels)
            m = nn.Sigmoid()
            outputs=m(outputs)
            pred=outputs>=0.5
            pred=pred.flatten()
            y_true=torch.cat((y_true,copy.deepcopy(labels.cpu())),0)
            y_pred=torch.cat((y_pred,copy.deepcopy(pred.cpu())),0)
#             print(y_pred,y_true,y_pred==y_true,pred==labels)
            total+=labels.size(0)
            # labels=torch.add(labels,-1)
            # print(pred,labels)
    #         print(list(map(lambda a: classes[a],pred)),list(map(lambda a: classes[a],labels)))
            correct+=(pred==labels).sum().item()
#             print((pred==labels).sum())
    print("Accuracy: ",accuracy_score(y_true,y_pred))
    print("Precision: ",precision_score(y_true,y_pred))
    print("Recall: ",recall_score(y_true,y_pred))
    print("F1-Score: ",f1_score(y_true,y_pred))
    print("AUC: ",roc_auc_score(y_true,y_pred))
    print(correct,total)
#     print(y_true,y_pred)
    y_pred=y_pred.flatten()
    y_true=y_true.flatten()
#     print(classification_report(y_true, y_pred))
    
    model.train()
    return 100*correct/total

In [9]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

In [10]:
report(testloader,net)

Accuracy:  0.9227441285537701
Precision:  0.9179734620024126
Recall:  0.9303178484107579
F1-Score:  0.9241044323011537
AUC:  0.922658924205379
1493 1618


92.27441285537701