In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torchvision
from torchvision import transforms, datasets, models, utils
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import os
import time
from PIL import Image
from sklearn import metrics

In [None]:
# transforms of images
image_transforms = {
    "train": transforms.Compose([
        transforms.RandomResizedCrop(size=300, scale=(0.8, 1.1)),
        transforms.RandomRotation(degrees=10),
        transforms.ColorJitter(0.4, 0.4, 0.4),
        transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=256),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ]),
    "val": transforms.Compose([
        transforms.RandomResizedCrop(size=300, scale=(0.8, 1.1)),
        transforms.RandomRotation(degrees=10),
        transforms.ColorJitter(0.4, 0.4, 0.4),
        transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=256),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ]),
    "test": transforms.Compose([
        transforms.RandomResizedCrop(size=300, scale=(0.8, 1.1)),
        transforms.RandomRotation(degrees=10),
        transforms.ColorJitter(0.4, 0.4, 0.4),
        transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=256),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ]),
}

In [None]:
BATCH_SIZE = 128
LR = 0.01
EPOCH = 100

In [None]:
train_dir = "./Train/"
val_dir = "./Val/"
test_dir = "./Test"

In [None]:
datasets = {
    "train": torchvision.datasets.ImageFolder(train_dir, transform=image_transforms["train"]),
    "val": torchvision.datasets.ImageFolder(val_dir, transform=image_transforms["val"]),
    "test": torchvision.datasets.ImageFolder(test_dir, transform=image_transforms["test"])
}

In [None]:
dataloaders = {
    "train": DataLoader(datasets["train"], batch_size=BATCH_SIZE, shuffle=True),
    "val": DataLoader(datasets["val"], batch_size=BATCH_SIZE, shuffle=True),
    "test": DataLoader(datasets["test"], batch_size=BATCH_SIZE, shuffle=True)
}

In [None]:
dataloaders["train"].dataset.classes

In [None]:
# a self-defined pooling layer
class AdaptiveConcataPool2d(nn.Module):
    def __init__(self, size=None):
        super(AdaptiveConcataPool2d, self).__init__()
        size = size or (1, 1)
        self.avgPooling = nn.AdaptiveAvgPool2d(size)
        self.maxPooling = nn.AdaptiveMaxPool2d(size)

    def forward(self, x):
        return torch.cat([self.maxPooling(x), self.avgPooling(x)], dim=1)

In [None]:
# transfer learning: ResNet50
def get_model():
    model = models.resnet50(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False
    model.avgpool = AdaptiveConcataPool2d()
    model.fc = nn.Sequential(
        nn.Flatten(),
        nn.BatchNorm1d(4096),
        nn.Dropout(0.5),
        nn.Linear(4096, 512),
        nn.ReLU(),
        nn.BatchNorm1d(512),
        nn.Dropout(0.5),
        nn.Linear(512, 2),
        nn.LogSoftmax(dim=1)
    )
    return model

In [None]:
# process of training and validating 
def train_val(model, device, train_loader, val_loader, optimizer, criterion, epoch):
    model.train()
    total_loss = 0.0
    val_loss = 0.0
    val_max_accuracy = 0.0
    val_acc = 0.0
    for batch_id, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()*images.size(0)
    train_loss = total_loss/len(train_loader.dataset)

    model.eval()
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, pred = torch.max(outputs, dim=1)
            correct = pred.eq(labels.view_as(pred))
            accuracy = torch.mean(correct.type(torch.FloatTensor))
            val_acc += accuracy.item()*images.size(0)

        val_loss = val_loss/len(val_loader.dataset)
        val_acc = val_acc/len(val_loader.dataset)

    return train_loss, val_loss, val_acc

In [None]:
# process of testing
def test(model, device, test_loader, criterion, epoch):
    model.eval()
    total_loss = 0.0
    correct = 0.0
    t = torch.tensor([]).to(device)
    l = torch.tensor([]).to(device)
    with torch.no_grad():
        for batch_id, (images, labels) in enumerate(test_loader):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            _, predicted = torch.max(outputs, dim=1)
            l = torch.cat((l, labels.view_as(predicted)), 0)
            t = torch.cat((t, predicted), 0)    
            
        accuracy = metrics.accuracy_score(l.tolist(), t.tolist())
        precision = np.mean(metrics.precision_score(l.tolist(), t.tolist(), average=None), axis=0)
        recall = np.mean(metrics.recall_score(l.tolist(), t.tolist(), average=None), axis=0)
        f1 = np.mean(metrics.f1_score(l.tolist(), t.tolist(), average=None), axis=0)
    
        avg_loss = total_loss/len(test_loader.dataset)
        t = torch.tensor([]).to(device)
        l = torch.tensor([]).to(device)
        
        return total_loss, accuracy, precision, recall, f1

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device.type)

In [None]:
model = get_model().to(device)
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)

In [None]:
def train_epochs(model, device, dataloaders, criterion, optimizer, epochs):
    print("{0:>15}|{1:>15}|{2:>15}|{3:>15}|{4:>15}".format(
        "Epoch", "Accuracy", "Precision", "Recall", "F1"))
    best_loss = np.inf

    for epoch in range(epochs):
        train_loss, val_loss, val_acc = train_val(
            model, device, dataloaders["train"], dataloaders["val"], optimizer, criterion, epoch)
        test_loss, acc, pre, rec, fon = test(
            model, device, dataloaders["test"], criterion, epoch)
        if test_loss < best_loss:
            best_loss = test_loss
        print("{0:>15}|{1:>15}|{2:>15}|{3:>15}|{4:>15}".format(epoch, acc, pre, rec, fon))

In [None]:
train_epochs(model, device, dataloaders, criterion, optimizer, EPOCH)