In [1]:
import os
from torchvision import transforms
from Dataset import Dataset
from data_loading import open_preprocess_photos, open_preprocess_photos_flip
from typing import List

# Model

In [2]:
from torchvision.models import resnet34, ResNet34_Weights
import torch

pretrained = resnet34(weights=ResNet34_Weights.IMAGENET1K_V1)

pretrained.fc = torch.nn.Identity()

class Model(torch.nn.Module):
    def __init__(self, pretrained):
        super(Model, self).__init__()
        self.pretrained = pretrained
        self.linear1 = torch.nn.Linear(512, 256)
        self.linear2 = torch.nn.Linear(256, 1)
        self.sigmoid = torch.nn.Sigmoid()

    def forward(self, x):
        x = self.pretrained(x)
        x = self.linear1(x)
        x = torch.nn.functional.relu(x)
        x = self.linear2(x)
        x = self.sigmoid(x)
        return x

model = Model(pretrained)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Data

In [3]:
normal_dir: str = r'../chest_xray/train/NORMAL'
pneumo_dir: str = r'../chest_xray/train/PNEUMONIA'

assert os.path.exists(normal_dir) and os.path.isdir(normal_dir), "Normal dir isn't found or isn't a directory"
assert os.path.exists(pneumo_dir) and os.path.isdir(pneumo_dir), "Pneumonia dir isn't found or isn't a directory"

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
transform = transforms.Compose([
    transforms.ToTensor(),
    normalize,
])

normal = open_preprocess_photos_flip(normal_dir, transform, (224, 224))
pneumonia = open_preprocess_photos(pneumo_dir, transform, (224, 224))

dataset_train = Dataset(normal, pneumonia[:len(normal)], 0, 1, 64)

In [4]:
normal_dir_test: str = r'../chest_xray/test/NORMAL'
pneumo_dir_test: str = r'../chest_xray/test/PNEUMONIA'

assert os.path.exists(normal_dir_test) and os.path.isdir(normal_dir_test), "Normal dir isn't found or isn't a directory"
assert os.path.exists(pneumo_dir_test) and os.path.isdir(pneumo_dir_test), "Pneumonia dir isn't found or isn't a directory"

normal_test = open_preprocess_photos_flip(normal_dir_test, transform, (224, 224))
pneumonia_test = open_preprocess_photos(pneumo_dir_test, transform, (224, 224))

dataset_test = Dataset(normal_test, pneumonia_test[:len(normal)], 0, 1, 64)

In [6]:
def calc_acc(y_true: torch.Tensor, y_pred: torch.Tensor, threshold: float) -> float:
  assert y_true.shape == y_pred.shape, "Accuracy calculation received two different sized tensors"
  y_pred_mean = []

  for pred in y_pred:
    if pred < threshold:
      y_pred_mean.append(0)
    else:
      y_pred_mean.append(1)

  nb_correct = 0

  for i in range(len(y_pred_mean)):
    if y_pred_mean[i] == y_true[i]:
      nb_correct += 1

  return nb_correct / len(y_pred_mean)

In [7]:
def evaluate(model: Model, dataset: Dataset) -> tuple:
    model.eval()
    correct = 0
    total = 0
    loss = 0

    with torch.no_grad():
        for inputs, labels in dataset:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            predicted = torch.round(outputs)

            total += labels.size(0)
            correct += (predicted == labels.unsqueeze(-1)).sum().item()

            loss += criterion(outputs, labels.unsqueeze(-1)).item()

    accuracy = correct / total
    average_loss = loss / total
    model.train()
    
    return accuracy, average_loss

In [16]:
def train(model, optim, criterion, epochs, dataset, verbose: bool = True) -> List[list]:
    # initialize indicator lists for later use
    losses = []
    mean_accs = []
    len_dataset = len(dataset)
    
    for epoch in range(epochs):
        # init indicators
        epoch_loss = 0
        sum_acc = 0
        
        for inputs, labels in dataset:
            # load data and move to GPU
            inputs = inputs.clone().to(device)
            labels = labels.clone().to(device).unsqueeze(-1)
            
            optim.zero_grad()
            outputs = model(inputs)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optim.step()
            
            # calculate indicators
            epoch_loss += loss.item()
            sum_acc += calc_acc(labels, outputs, 0.5)
            
            del inputs, labels
        
        # shuffle dataset
        dataset.shuffle(epoch)
        
        # append indicators to indicator lists
        epoch_mean_acc = sum_acc / len_dataset
        losses.append(epoch_loss)
        mean_accs.append(epoch_mean_acc)
        
        #test_acc, test_loss = evaluate(model, test)
        test_acc, test_loss = 0, 0
        
        # print data
        if verbose:
            print(f"Epoch [{epoch + 1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_mean_acc:.4f}, Test Accuracy: {test_acc:.4f}, Test Loss: {test_loss:.4f}")
    return [losses, mean_accs]

In [17]:
lr=1e-3
momentum=0.9
batch_size=256
epoch=5

dataset_train.change_batch_size(batch_size, 0)
optimizer = torch.optim.SGD(model.parameters(), lr, momentum)
criterion = torch.nn.BCELoss()

In [18]:
losses, accs = train(model, optimizer, criterion, epoch, dataset_train)

Epoch [1/5], Loss: 0.8548, Accuracy: 0.9849, Test Accuracy: 0.0000, Test Loss: 0.0000
Epoch [2/5], Loss: 0.3255, Accuracy: 0.9948, Test Accuracy: 0.0000, Test Loss: 0.0000
Epoch [3/5], Loss: 0.1262, Accuracy: 0.9991, Test Accuracy: 0.0000, Test Loss: 0.0000
Epoch [4/5], Loss: 0.1036, Accuracy: 0.9993, Test Accuracy: 0.0000, Test Loss: 0.0000
Epoch [5/5], Loss: 0.0446, Accuracy: 0.9998, Test Accuracy: 0.0000, Test Loss: 0.0000
