In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torchvision import transforms
import torchvision.models as models

seed = 7
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)

In [None]:
DATA_PATH = '../input/covid19-radiography-database/COVID-19_Radiography_Dataset'

TEST_RATIO = 0.1
VAL_RATIO = 0.3

TRANSFORM_IMG = transforms.Compose([
    transforms.Resize((224,224)),
    #transforms.CenterCrop(256),
    transforms.ToTensor()
    #transforms.Normalize(mean=[0.485, 0.456, 0.406],
    #                     std=[0.229, 0.224, 0.225] )
    ])

dataset = ImageFolder(DATA_PATH, transform=TRANSFORM_IMG)
num_class  = len(dataset.class_to_idx)
test_size = int(len(dataset) * TEST_RATIO)
val_size = int(len(dataset) * VAL_RATIO)
train_size = len(dataset) - test_size - val_size
train_set, val_set, test_set = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

print('Dataset with other lung disease (lung opacity, viral pneumonia)')
print('Dataset classes')
print(dataset.class_to_idx)

print('\nDataset Size')
print('Dataset: '+ str(len(dataset)))
print('Train set: '+ str(len(train_set)))
print('Val set: '+ str(len(val_set)))
print('Test set: '+ str(len(test_set)))

In [None]:
BATCH_SIZE = 128
train_loader = DataLoader(train_set, batch_size = BATCH_SIZE, shuffle = True)
val_loader = DataLoader(val_set, batch_size = BATCH_SIZE, shuffle = True)
test_loader = DataLoader(test_set, batch_size = BATCH_SIZE, shuffle = True)

In [None]:
def get_model(num_class):
    model = models.mobilenet_v3_small(pretrained=True, progress=True)
    #num_ftrs = model.fc.in_features
    #model.fc = nn.Linear(num_ftrs, num_class)
    model.classifier[3] = nn.Linear(in_features=1024, out_features=num_class, bias=True)
    return model

def train(model, total_epoch, train_loader, val_loader):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(device)
    #device = torch.device('cpu')
    
    best_val_acc = 0

    loss_fn = nn.CrossEntropyLoss() 
    opt = torch.optim.Adam(model.parameters())

    train_loss_epoch_history = []
    val_loss_epoch_history = []
    train_loss_minibatch_history = []
    val_loss_minibatch_history = []

    model.to(device)

    for epoch in range(total_epoch):
        model.train() #training mode

        train_loss = 0 
        val_loss = 0
        train_correct = 0 # store the the number of correct train set prediction every batch
        val_correct = 0 # store the number of correct val set predictions every batch

        for input_batch, target_batch in train_loader:
            
            input_batch = input_batch.to(device) # move the input to selected device
            target_batch = target_batch.to(device) # move the target to selected device

            opt.zero_grad() #zero the gradients
            predict_batch = model(input_batch) #forward
            loss_batch = loss_fn(predict_batch,target_batch) #loss
            loss_batch.backward() #backward

            opt.step()#optimize i.e update weights

            train_loss += loss_batch.item()
            predict = np.argmax(predict_batch.detach().cpu().numpy(), axis=1)
            train_correct += (predict == target_batch.detach().cpu().numpy()).sum()
            
            train_loss_minibatch_history.append(loss_batch.item()) # save train data minibatch loss
        
        model.eval()
        with torch.no_grad():
            for input_batch, target_batch in val_loader:
                
                input_batch = input_batch.to(device) # move the input to selected device
                target_batch = target_batch.to(device) # move the target to selected device

                opt.zero_grad() #zero the gradients
                predict_batch = model(input_batch) #forward
                loss_batch = loss_fn(predict_batch,target_batch) #loss
                #loss_batch.backward() #backward

                #opt.step()#optimize i.e update weights

                val_loss += loss_batch.item()

                predict = np.argmax(predict_batch.detach().cpu().numpy(), axis=1)
                val_correct += (predict == target_batch.detach().cpu().numpy()).sum()

                val_loss_minibatch_history.append(loss_batch.item())

        train_loss_epoch_history.append(train_loss/len(train_loader.dataset)) # save train set loss every epoch
        val_loss_epoch_history.append(val_loss/len(val_loader.dataset)) # save val set loss every epoch

        train_acc = round(train_correct/len(train_loader.dataset),5) # calculate train accuracy
        val_acc = round(val_correct/len(val_loader.dataset),5) # calculate val accuracy
        
        #if val_acc > best_val_acc:
        #    best_val_acc = val_acc
        #    print('Saved model at epoch ', epoch+1)
        filename = './model1_'+str(epoch+1)+ '_' + str(val_acc) + '.pt'
        torch.save(model, filename)
        
        print(f'Epoch {epoch+1}/{total_epoch}  train_loss: {train_loss/len(train_loader.dataset)}  train_accuracy: {train_acc} val_loss: {val_loss/len(val_loader.dataset)} val_accuracy: {val_acc}')
    
    return train_correct, val_correct, train_loss_minibatch_history, train_loss_epoch_history

Train 4 Class Model

In [None]:
model = get_model(num_class)
print(model)

In [None]:
# Train model
NUM_EPOCH = 15

train_correct, val_correct, train_loss_minibatch_history, train_loss_epoch_history = train(model, NUM_EPOCH, train_loader, val_loader)


In [None]:
from matplotlib import pyplot
fig = pyplot.figure()

pyplot.plot(range(0,len(train_loss_epoch_history)), train_loss_epoch_history)
pyplot.title("Epoch vs Error")
pyplot.xlabel("Epoch")
pyplot.ylabel("Cross Entropy Loss")


In [None]:
def test(model, test_loader):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    #device = torch.device('cpu')

    #loss_fn = nn.CrossEntropyLoss() 
    #opt = torch.optim.Adam(model.parameters())

    model.to(device)
    model.eval()
    test_correct = 0
    actual_lung_opacity_predict_covid = 0 # actual 1 predict 0
    actual_viral_pneunomia_predict_covid = 0 # actual 3 predict 0
    
    # each class correct count
    correct0, correct1, correct2, correct3 = 0, 0, 0, 0
    total0, total1, total2, total3 = 0, 0, 0, 0
    
    with torch.no_grad():
        for input_batch, target_batch in test_loader:

            input_batch = input_batch.to(device) # move the input to selected device
            target_batch = target_batch.to(device) # move the target to selected device

            #opt.zero_grad() #zero the gradients
            predict_batch = model(input_batch) #forward
            #loss_batch = loss_fn(predict_batch,target_batch) #loss
            #loss_batch.backward() #backward

            #opt.step()#optimize i.e update weights

            #test_loss += loss_batch.item()

            predict = np.argmax(predict_batch.detach().cpu().numpy(), axis=1)
            test_correct += (predict == target_batch.detach().cpu().numpy()).sum()
            
            predict = predict.reshape((-1,1))
            target = target_batch.detach().cpu().numpy().reshape((-1,1))
            
            # calculate correct prediction for each class
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,1] == 0]
            total0 += (temp.shape[0])
            temp = temp[temp[:,0] == 0]
            correct0 += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,1] == 1]
            total1 += (temp.shape[0])
            temp = temp[temp[:,0] == 1]
            correct1 += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,1] == 2]
            total2 += (temp.shape[0])
            temp = temp[temp[:,0] == 2]
            correct2 += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,1] == 3]
            total3 += (temp.shape[0])
            temp = temp[temp[:,0] == 3]
            correct3 += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,0] == 0]
            temp = temp[temp[:,1] == 1]
            actual_lung_opacity_predict_covid += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,0] == 0]
            temp = temp[temp[:,1] == 3]
            actual_viral_pneunomia_predict_covid += (temp.shape[0])
            
    
    
    
    test_acc = round(test_correct/len(test_loader.dataset),5)
    acc0 = round(correct0/total0,5)
    acc1 = round(correct1/total1,5)
    acc2 = round(correct2/total2,5)
    acc3 = round(correct3/total3,5)
    
    print('Accuracy for the test set: ', test_acc )
    print('Accuracy for COVID: ', acc0)
    print('Accuracy for Lung_Opacity: ', acc1)
    print('Accuracy for Normal: ', acc2)
    print('Accuracy for Viral Pneumonia: ', acc3)
    return test_correct, actual_lung_opacity_predict_covid, actual_viral_pneunomia_predict_covid

Test 4 class model

In [None]:
model = torch.load('../input/model-test/model_4class.pt', map_location=torch.device('cpu'))

In [None]:
test_correct, actual_lung_opacity_predict_covid, actual_viral_pneunomia_predict_covid = test(model, test_loader)

In [None]:
from collections import Counter
test_classes = [label for _, label in test_set]
num_sample = dict(Counter(test_classes))
print(num_sample)

print(f'Actual lung opacity predict covid19 : {actual_lung_opacity_predict_covid}/{num_sample[1]} = {round(actual_lung_opacity_predict_covid/num_sample[1],5)}')
print(f'Actual viral peunomia predict covid19 : {actual_viral_pneunomia_predict_covid}/{num_sample[3]} = {round(actual_viral_pneunomia_predict_covid/num_sample[3],5)}')


### Dataset without other lung disease

In [None]:
DATA_PATH2 = '../input/assignment-dataset/COVID-19_Radiography_Dataset'

TEST_RATIO = 0.1
VAL_RATIO = 0.3

TRANSFORM_IMG = transforms.Compose([
    transforms.Resize((224,224)),
    #transforms.CenterCrop(256),
    transforms.ToTensor()
    #transforms.Normalize(mean=[0.485, 0.456, 0.406],
    #                     std=[0.229, 0.224, 0.225] )
    ])

dataset2 = ImageFolder(DATA_PATH2, transform=TRANSFORM_IMG)
num_class2  = len(dataset2.class_to_idx)
test_size2 = int(len(dataset2) * TEST_RATIO)
val_size2 = int(len(dataset2) * VAL_RATIO)
train_size2 = len(dataset2) - test_size2 - val_size2
train_set2, val_set2, test_set2 = torch.utils.data.random_split(dataset2, [train_size2, val_size2, test_size2])

print('Dataset without other lung disease (lung opacity, viral pneunomia)')
print('Dataset classes')
print(dataset2.class_to_idx)

print('\nDataset Size')
print('Dataset: '+ str(len(dataset2)))
print('Train set: '+ str(len(train_set2)))
print('Val set: '+ str(len(val_set2)))
print('Test set: '+ str(len(test_set2)))

In [None]:
train_loader2 = DataLoader(train_set2, batch_size = BATCH_SIZE, shuffle = True)
val_loader2 = DataLoader(val_set2, batch_size = BATCH_SIZE, shuffle = True)

Train 2 class model

In [None]:
model = get_model(num_class2)
print(model)

In [None]:
# Train model
NUM_EPOCH = 15

train_correct2, val_correct2, train_loss_minibatch_history2, train_loss_epoch_history2 = train(model, NUM_EPOCH, train_loader2, val_loader2)

In [None]:
from matplotlib import pyplot
fig = pyplot.figure()

pyplot.plot(range(0,len(train_loss_epoch_history2)), train_loss_epoch_history2)
pyplot.title("Epoch vs Error")
pyplot.xlabel("Epoch")
pyplot.ylabel("Cross Entropy Loss")

Test 2 class model

In [None]:
model = torch.load('../input/model-test/model_2class.pt', map_location=torch.device('cpu'))

In [None]:
def test2(model, test_loader):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    #device = torch.device('cpu')

    #loss_fn = nn.CrossEntropyLoss() 
    #opt = torch.optim.Adam(model.parameters())

    model.to(device)
    model.eval()
    test_correct = 0
    actual_lung_opacity_predict_covid = 0 # actual 1 predict 0
    actual_viral_pneunomia_predict_covid = 0 # actual 3 predict 0
    
    # each class correct count
    correct0, correct1, correct2, correct3 = 0, 0, 0, 0
    total0, total1, total2, total3 = 0, 0, 0, 0
    
    with torch.no_grad():
        for input_batch, target_batch in test_loader:

            input_batch = input_batch.to(device) # move the input to selected device
            target_batch = target_batch.to(device) # move the target to selected device

            #opt.zero_grad() #zero the gradients
            predict_batch = model(input_batch) #forward
            #loss_batch = loss_fn(predict_batch,target_batch) #loss
            #loss_batch.backward() #backward

            #opt.step()#optimize i.e update weights

            #test_loss += loss_batch.item()

            predict = np.argmax(predict_batch.detach().cpu().numpy(), axis=1)
            predict[predict == 1] = 2
            test_correct += (predict == target_batch.detach().cpu().numpy()).sum()
            
            predict = predict.reshape((-1,1))
            target = target_batch.detach().cpu().numpy().reshape((-1,1))
            
            # calculate correct prediction for each class
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,1] == 0]
            total0 += (temp.shape[0])
            temp = temp[temp[:,0] == 0]
            correct0 += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,1] == 1]
            total1 += (temp.shape[0])
            temp = temp[temp[:,0] == 1]
            correct1 += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,1] == 2]
            total2 += (temp.shape[0])
            temp = temp[temp[:,0] == 2]
            correct2 += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,1] == 3]
            total3 += (temp.shape[0])
            temp = temp[temp[:,0] == 3]
            correct3 += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,0] == 0]
            temp = temp[temp[:,1] == 1]
            actual_lung_opacity_predict_covid += (temp.shape[0])
            
            temp = np.concatenate((predict, target), axis=1)
            temp = temp[temp[:,0] == 0]
            temp = temp[temp[:,1] == 3]
            actual_viral_pneunomia_predict_covid += (temp.shape[0])
            
    
    test_acc = round(test_correct/len(test_loader.dataset),5)
    acc0 = round(correct0/total0,5)
    acc1 = round(correct1/total1,5)
    acc2 = round(correct2/total2,5)
    acc3 = round(correct3/total3,5)
    
    print('Accuracy for the test set: ', test_acc )
    print('Accuracy for COVID: ', acc0)
    print('Accuracy for Lung_Opacity: ', acc1)
    print('Accuracy for Normal: ', acc2)
    print('Accuracy for Viral Pneumonia: ', acc3)
    return test_correct, actual_lung_opacity_predict_covid, actual_viral_pneunomia_predict_covid

In [None]:
test_correct, actual_lung_opacity_predict_covid, actual_viral_pneunomia_predict_covid = test2(model, test_loader)

In [None]:
from collections import Counter
test_classes = [label for _, label in test_set]
num_sample = dict(Counter(test_classes))
print(num_sample)

print(f'Actual lung opacity predict covid19 : {actual_lung_opacity_predict_covid}/{num_sample[1]} = {round(actual_lung_opacity_predict_covid/num_sample[1],5)}')
print(f'Actual viral peunomia predict covid19 : {actual_viral_pneunomia_predict_covid}/{num_sample[3]} = {round(actual_viral_pneunomia_predict_covid/num_sample[3],5)}')


Output Display

In [None]:
model = torch.load('../input/model-test/model_4class.pt', map_location=torch.device('cpu'))

In [None]:
from matplotlib import pyplot as plt
import itertools

dataloader = DataLoader(test_set, batch_size = BATCH_SIZE, shuffle = False)

class_dict = {0:'COVID', 1:'Lung_Opacity', 2:'Normal', 3:'Viral Pneumonia'}

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#input_batch, target_batch = next(iter(test_loader))
input_batch, target_batch = next(itertools.islice(dataloader, 5, None))
    
class_0_idx = np.argwhere(target_batch == 0).detach().numpy()#[0][14]
class_1_idx = np.argwhere(target_batch == 1).detach().numpy()#[0][1]
class_2_idx = np.argwhere(target_batch == 2).detach().numpy()#[0][1]
class_3_idx = np.argwhere(target_batch == 3).detach().numpy()#[0][0]
'''
picture_0 = torch.unsqueeze(input_batch[class_0_idx], dim=0)
picture_1 = torch.unsqueeze(input_batch[class_1_idx], dim=0)
picture_2 = torch.unsqueeze(input_batch[class_2_idx], dim=0)
picture_3 = torch.unsqueeze(input_batch[class_3_idx], dim=0)

picture_0 = picture_0.to(device)
picture_1 = picture_1.to(device)
picture_2 = picture_2.to(device)
picture_3 = picture_3.to(device)

predict_0 = model(picture_0)
predict_1 = model(picture_1)
predict_2 = model(picture_2)
predict_3 = model(picture_3)

'''
picture_0 = input_batch[class_0_idx].to(device)
picture_1 = input_batch[class_1_idx].to(device)
picture_2 = input_batch[class_2_idx].to(device)
picture_3 = input_batch[class_3_idx].to(device)

predict_0 = model(picture_0)
predict_1 = model(picture_1)
predict_2 = model(picture_2)
predict_3 = model(picture_3)

temp_0 = np.argmax(predict_0.detach().cpu().numpy(), axis=1)
idx0 = np.argwhere(temp_0 == target_batch[class_0_idx].detach().numpy())
temp_1 = np.argmax(predict_1.detach().cpu().numpy(), axis=1)
idx1 = np.argwhere(temp_1 == target_batch[class_1_idx].detach().numpy())
temp_2 = np.argmax(predict_2.detach().cpu().numpy(), axis=1)
idx2 = np.argwhere(temp_2 == target_batch[class_2_idx].detach().numpy())
temp_3 = np.argmax(predict_3.detach().cpu().numpy(), axis=1)
idx3 = np.argwhere(temp_3 == target_batch[class_3_idx].detach().numpy())

#print(idx0)
#print(idx1)
#print(idx2)
#print(idx3)

predict_0 = predict_0.detach().cpu().numpy()[idx0[0][0]]
predict_1 = predict_1.detach().cpu().numpy()[idx1[0][0]]
predict_2 = predict_2.detach().cpu().numpy()[idx2[0][0]]
predict_3 = predict_3.detach().cpu().numpy()[idx3[0][0]]

img_0 = input_batch[idx0[0][0]].permute( 1, 2, 0).detach().numpy()
img_1 = input_batch[idx1[0][0]].permute( 1, 2, 0).detach().numpy()
img_2 = input_batch[idx2[0][0]].permute( 1, 2, 0).detach().numpy()
img_3 = input_batch[idx3[0][0]].permute( 1, 2, 0).detach().numpy()

'''
img_0 = torch.squeeze(picture_0).permute( 1, 2, 0).detach().numpy()
img_1 = torch.squeeze(picture_1).permute( 1, 2, 0).detach().numpy()
img_2 = torch.squeeze(picture_2).permute( 1, 2, 0).detach().numpy()
img_3 = torch.squeeze(picture_3).permute( 1, 2, 0).detach().numpy()
'''
fig, ax = plt.subplots(2,2, figsize=(10,10))
ax[0][0].imshow(img_0)
temp_title0 = 'Actual: '+ class_dict[0] + ' Predict: ' + class_dict[np.argmax(predict_0)]
ax[0][0].set_title(temp_title0)

ax[0][1].imshow(img_1)
temp_title1 = 'Actual: '+ class_dict[1] + ' Predict: ' + class_dict[np.argmax(predict_1)]
ax[0][1].set_title(temp_title1)

ax[1][0].imshow(img_2)
temp_title2 = 'Actual: '+ class_dict[2] + ' Predict: ' + class_dict[np.argmax(predict_2)]
ax[1][0].set_title(temp_title2)

ax[1][1].imshow(img_3)
temp_title3 = 'Actual: '+ class_dict[3] + ' Predict: ' + class_dict[np.argmax(predict_3)]
ax[1][1].set_title(temp_title3)
