In [20]:
import torch
import torch.nn as nn
import numpy as np
import torch.optim as optim
from torchvision import transforms, datasets, models, utils
from torch.utils.tensorboard import SummaryWriter
import time
import random
import numpy as np
from torchsummary import summary
from torch.utils.data import DataLoader

In [6]:
class AdaptiveConcatPool2d(nn.Module):
    def __init__(self, sz=None):
        super().__init__()
        sz = sz or (1,1)
        self.ap = nn.AdaptiveAvgPool2d(sz)
        self.mp = nn.AdaptiveMaxPool2d(sz)
    def forward(self, x): return torch.cat([self.mp(x), self.ap(x)], 1)

In [None]:
def get_model():
    model = models.resnet50(pretrained=True)
    
    for param in model.parameters():
        param.requires_grad = False
    
    model.avgpool = AdaptiveConcatPool2d()
    model.fc = nn.Sequential(
        nn.Flatten(),
        nn.BatchNorm1d(4096),
        nn.Dropout(0.5),
        nn.Linear(4096, 512),
        nn.ReLU(),
        nn.BatchNorm1d(512),
        nn.Dropout(p=0.5),
        nn.Linear(512, 2),
        nn.LogSoftmax(dim=1)
    )
    return model

In [None]:
def train(model, device, train_loader, criterion, optimizer, epoch, writer):
    model.train()
    
    total_loss = 0
    
    for batch_id, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        preds = model(data)
        loss = criterion(preds, target)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
    writer.add_scalar('Train Loss', total_loss/len(train_loader), epoch)
    writer.flush()
    
    return total_loss/len(train_loader)

In [8]:
def test(model, device, test_loader, criterion, epoch, writer):
    model.eval()
    
    total_loss, correct = 0, 0
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            total_loss += criterion(output, target).item()
            pred = output.data.max(1)[1]
            correct += pred.eq(target.data).cpu().sum()
            
            misclassified_images(pred, writer, target, data, output, epoch)
            
    total_loss /= len(test_loader)
    accuracy = 100. * correct / len(test_loader.dataset)
    
    writer.add_scalar('Test Loss', total_loss, epoch)
    writer.add_scalar('Accuracy', accuracy, epoch)
    writer.flush()
    
    return total_loss, accuracy

In [9]:
inv_normalize = transforms.Normalize(
        mean=[-0.485/0.229, -0.456/0.224, -0.406/0.225],
        std=[1/0.229, 1/0.224, 1/0.255]
    )

In [11]:
def misclassified_images(pred, writer, target, data, output, epoch, count=10):
    misclassified = (pred != target.data)
    for index, image_tensor in enumerate(data[misclassified][:count]):
        img_name = '{}->Predict-{}x{}-Actual'.format(
                epoch,
                LABEL[pred[misclassified].tolist()[index]],
                LABEL[target.data[misclassified].tolist()[index]], 
            )
        
        writer.add_image(img_name, inv_normalize(image_tensor), epoch)

In [12]:
image_transforms = {
    'train':
    transforms.Compose([
        transforms.RandomResizedCrop(size=300, scale=(0.8, 1.1)),
        transforms.RandomRotation(degrees=10),
        transforms.ColorJitter(0.4, 0.4, 0.4),
        transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=256),  # Image net standards
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])  # Imagenet standards
    ]),
    'val':
    transforms.Compose([
        transforms.Resize(size=300),
        transforms.CenterCrop(size=256),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test':
    transforms.Compose([
        transforms.Resize(size=300),
        transforms.CenterCrop(size=256),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [None]:
datadir = '../input/chest-xray-pneumonia/chest_xray/chest_xray/'
traindir = datadir + 'train/'
validdir = datadir + 'test/'
testdir = datadir + 'val/'

In [None]:
model_path = "model.pth"

In [13]:
batch_size = 128

In [None]:
data = {
    'train':
    datasets.ImageFolder(root=traindir, transform=image_transforms['train']),
    'val':
    datasets.ImageFolder(root=validdir, transform=image_transforms['val']),
    'test':
    datasets.ImageFolder(root=testdir, transform=image_transforms['test'])
}

In [None]:
dataloaders = {
    'train': DataLoader(data['train'], batch_size=batch_size, shuffle=True),
    'val': DataLoader(data['val'], batch_size=batch_size, shuffle=True),
    'test': DataLoader(data['test'], batch_size=batch_size, shuffle=True)
}

In [None]:
LABEL = dict((v,k) for k,v in data['train'].class_to_idx.items())

In [15]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
model = get_model().to(device)

In [17]:
criterion = nn.NLLLoss()

In [None]:
optimizer = optim.Adam(model.parameters())

In [2]:
PATH_to_log_dir = 'logdir/'

In [None]:
def tb_writer():
    timestr = time.strftime("%Y%m%d_%H%M%S")
    writer = SummaryWriter(PATH_to_log_dir + timestr)
    return writer

In [None]:
writer = SummaryWriter(PATH_to_log_dir + timestr)
dataiter = iter(dataloaders['train'])
images, labels = dataiter.next()
grid = utils.make_grid(images[:32])
writer.add_image('Input grid', grid, 0)
writer.close()

In [None]:
print('{0:>20} | {1:>20} | {2:>20} | {3:>20}  |'.format('Epoch','Training Loss','Test Loss', 'Accuracy'))
for epoch in range(0, 10):
    train_loss = train(model, device, dataloaders['train'], criterion, optimizer, epoch, writer)
    test_loss, accuracy = test(model, device, dataloaders['val'], criterion, epoch, writer)
    print('{0:>20} | {1:>20} | {2:>20} | {3:>20}% |'.format(epoch,train_loss,test_loss, accuracy))
    writer.close()