**Pneumonia**

is an inflammatory condition of the lung affecting primarily the small air sacs known as alveoli, Typically symptoms include some combination of productive or dry cough, chest pain, fever, and trouble breathing. Severity is variable.


Pneumonia is usually caused by infection with viruses or bacteria and less commonly by other microorganisms, certain medications and conditions such as autoimmune diseases. Risk factors include other lung diseases such as cystic fibrosis, COPD, and asthma, diabetes, heart failure, a history of smoking, a poor ability to cough such as following a stroke, or a weak immune system. Diagnosis is often based on the symptoms and physical examination. Chest X-ray, blood tests, and culture of the sputum may help confirm the diagnosis.

In [1]:
!pip install -U -q kaggle
!mkdir -p ~/.kaggle

The syntax of the command is incorrect.


In [2]:
from google.colab import files
files.upload()

ModuleNotFoundError: No module named 'google'

In [None]:
!cp kaggle.json ~/.kaggle/

In [None]:
!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia

In [None]:
!unzip -q 'chest-xray-pneumonia' -d 'input/'

In [None]:
!unzip -q 'input/chest_xray' -d 'input/'

In [None]:
import torchvision
from torchvision import transforms,models,datasets
import torch
import numpy as np
import os 
import matplotlib.pyplot as plt
import torch.optim as optim
import torch.nn as nn
from glob import glob
import os
from collections import OrderedDict
from PIL import Image
import seaborn as sns

In [None]:
#swapped the test and validation folder as validation is very small

data_dir = {
            'train': './input/chest_xray/train',
            'test': './input/chest_xray/val',
            'valid': './input/chest_xray/test',
            }

In [None]:
# load images
train_files = np.array(glob(os.path.join(data_dir['train'], '*/*')))
valid_files = np.array(glob(os.path.join(data_dir['valid'], '*/*')))
test_files = np.array(glob(os.path.join(data_dir['test'], '*/*')))
                       

# print number of images in each dataset
print('There are %d train images.' % len(train_files))
print('There are %d validation images.' % len(valid_files))
print('There are %d test images.' % len(test_files))

In [None]:
#verifying the distribution inside the train
train_files_nor = np.array(glob(os.path.join(data_dir['train'], 'NORMAL/*')))
train_files_pne = np.array(glob(os.path.join(data_dir['train'], 'PNEUMONIA/*')))

# print number of images in each Class
print('There are %d Normal train images.' % len(train_files_nor))
print('There are %d Pneumonia train images.' % len(train_files_pne))

In [None]:
#replicate the normal class images to balance the classes

!mkdir './temp1'
!mkdir './temp2'
!cp -n ./input/chest_xray/train/NORMAL/* ./temp1
!cp -n ./input/chest_xray/train/NORMAL/* ./temp2

In [None]:
#%cd content
!ls ./temp1/

In [None]:
def rename(directory,pre):
    i=0

    for filename in os.listdir(directory):
        dst =pre+str(i) + ".jpg"
        src =directory+ filename 
        dst =directory+ dst 
          
        # rename() function will 
        # rename all the files 
        os.rename(src, dst) 
        i += 1



In [None]:
directory1='./temp1/'
rename(directory1,'c1')
dest='./input/chest_xray/train/NORMAL/'


In [None]:
!cp -n ./temp1/* ./input/chest_xray/train/NORMAL/

In [None]:
directory2='./temp2/'
rename(directory2,'c2')


In [None]:
!cp -n ./temp2/* ./input/chest_xray/train/NORMAL/

In [None]:
#verifying the distribution inside the train
train_files_nor = np.array(glob(os.path.join(data_dir['train'], 'NORMAL/*')))
train_files_pne = np.array(glob(os.path.join(data_dir['train'], 'PNEUMONIA/*')))

# print number of images in each Class
print('There are %d Normal train images.' % len(train_files_nor))
print('There are %d Pneumonia train images.' % len(train_files_pne))

In [None]:
!rm ./temp1/* ./temp2/*
!rmdir ./temp1
!rmdir ./temp2 

In [None]:
#Data trasforms

#train transform
transform_data=transforms.Compose([transforms.RandomResizedCrop(224),transforms.RandomHorizontalFlip(),
                                      transforms.RandomRotation(10),transforms.ToTensor(),
                                      transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])])

#validation and testing transform
transform_test=transforms.Compose([transforms.Resize(size=(224,224)),
                                      transforms.ToTensor(),
                                      transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])])


In [None]:
#transform data with appropriate transformers
train_data=datasets.ImageFolder(data_dir['train'],transform=transform_data)
valid_data=datasets.ImageFolder(data_dir['valid'],transform=transform_test)
test_data=datasets.ImageFolder(data_dir['test'],transform=transform_test)


In [None]:
# Using the image datasets and the trainforms to define the dataloaders
train_loader=torch.utils.data.DataLoader(train_data,batch_size=32,num_workers=0,shuffle=True)
valid_loader=torch.utils.data.DataLoader(valid_data,batch_size=32,num_workers=0,shuffle=True)
test_loader=torch.utils.data.DataLoader(test_data,batch_size=32,num_workers=0,shuffle=False)


In [None]:
### we get the class_to_index in the data_Set but what we really need is the cat_to_names  so we will create
_ = valid_data.class_to_idx
cat_to_name = {_[i]: i for i in list(_.keys())}

In [None]:
def showimage(data_loader, number_images, cat_to_name):
    dataiter = iter(data_loader)
    images, labels = dataiter.next()
    images = images.numpy() # convert images to numpy for display
    # plot the images in the batch, along with the corresponding labels
    fig = plt.figure(figsize=(number_images, 4))
    # display 20 images
    for idx in np.arange(number_images):
        ax = fig.add_subplot(2, number_images/2, idx+1, xticks=[], yticks=[])
        img = np.transpose(images[idx])
        plt.imshow(img)
        ax.set_title(cat_to_name[labels.tolist()[idx]])
        

#### to show some  images
showimage(valid_loader,6,cat_to_name)

In [None]:
#Load the pretrained molde
model = models.densenet121(pretrained=True) 

#load the parameters and only train the classifier
for param in model.parameters():
  
    param.requires_grad = False

num_ftrs = model.classifier.in_features
        
model.classifier =nn.Sequential(nn.Linear(num_ftrs, 512),
                                 nn.ReLU(),
                                 nn.Dropout(0.25),
                                 nn.Linear(512, 256),
                                 nn.ReLU(),
                                 nn.Dropout(0.25),
                                 nn.Linear(256, 32),
                                 nn.ReLU(),
                                 nn.Dropout(0.25),
                                 nn.Linear(32, 2)
                               )


In [None]:
# check if CUDA is available
use_cuda = torch.cuda.is_available()

# Define the device:
device = torch.device('cuda:0')



if use_cuda:
    print('GPU is  available :)   Training on GPU ...')
    # Put the model on the device:
    model = model.to(device)
else:
    print('GPU is not available :(  Training on CPU ...')

In [None]:
# we use Adam optimizer, use cross entropy loss as our loss function
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss().to(device)


In [None]:
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True 


def train(n_epochs, model, optimizer, criterion, use_cuda, save_path):
    """returns trained model"""
    # initialize tracker for minimum validation loss
    valid_loss_min = np.Inf 
    
    for epoch in range(1, n_epochs+1):
        # initialize variables to monitor training and validation loss
        train_loss = 0.0
        valid_loss = 0.0
        
        ###################
        # train the model #
        ###################
        
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            # move to GPU
            if use_cuda:
                data, target = data.cuda(), target.cuda()
            ## find the loss and update the model parameters accordingly
            ## record the average training loss, using something like
            ## train_loss = train_loss + ((1 / (batch_idx + 1)) * (loss.data - train_loss))
            optimizer.zero_grad()
            output=model(data)
            loss=criterion(output,target)
            loss.backward()
            optimizer.step()
            train_loss =train_loss + ((1 / (batch_idx + 1)) * (loss.data - train_loss))
                       
        ######################    
        # validate the model #
        ######################
        model.eval()
        for batch_idx, (data, target) in enumerate(valid_loader):
            # move to GPU
            if use_cuda:
                data, target = data.cuda(), target.cuda()
            ## update the average validation loss
            output=model(data)
            loss=criterion(output,target)
            valid_loss =valid_loss+ ((1 / (batch_idx + 1)) * (loss.data - valid_loss))


            
        # print training/validation statistics 
        print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
            epoch, 
            train_loss,
            valid_loss
            ))
        
        ## TODO: save the model if validation loss has decreased
        if valid_loss <= valid_loss_min:
            print('Validation loss decreased ({:.6f} -->{:.6f}). Saving Model..'.format(
            valid_loss_min,
            valid_loss))
            torch.save(model.state_dict(),save_path)
            valid_loss_min=valid_loss
    # return trained model
    return model


In [None]:
 
model = train(40, model, optimizer, criterion, use_cuda, 'model.pt')


In [None]:
# load the model that got the best validation accuracy
model.load_state_dict(torch.load('model.pt'))

In [None]:
def test(model, criterion, use_cuda):

    # monitor test loss and accuracy
    test_loss = 0.
    correct = 0.
    total = 0.

    model.eval()
    for batch_idx, (data, target) in enumerate(valid_loader):
        # move to GPU
        if use_cuda:
            data, target = data.cuda(), target.cuda()
        # forward pass: compute predicted outputs by passing inputs to the model
        output = model(data)
        # calculate the loss
        loss = criterion(output, target)
        # update average test loss 
        test_loss = test_loss + ((1 / (batch_idx + 1)) * (loss.data - test_loss))
        # convert output probabilities to predicted class
        pred = output.data.max(1, keepdim=True)[1]
        # compare predictions to true label
        correct += np.sum(np.squeeze(pred.eq(target.data.view_as(pred))).cpu().numpy())
        total += data.size(0)
            
    print('Test Loss: {:.6f}\n'.format(test_loss))

    print('\nTest Accuracy: %2d%% (%2d/%2d)' % (
        100. * correct / total, correct, total))




In [None]:
# call test function    
test(model, criterion, use_cuda)