In [12]:
import torch
import torch.nn as nn 
from torchvision import  datasets, models, transforms 
from torch.utils.data import DataLoader, WeightedRandomSampler
import numpy as np 
import matplotlib.pyplot as plt 
from PIL import Image
import os 
from collections import Counter
import torch.optim as optim
from torchvision.models import VGG16_Weights
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, ConfusionMatrixDisplay, classification_report

In [13]:
transform = transforms.Compose([
    
    transforms.Grayscale(num_output_channels=3),
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(p=0.5),  
    transforms.RandomRotation(10), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) #* these value matches for the grayscale images
])

In [14]:
data_dir = './Datasets/LungXRays-grayscale'
train_dataset = datasets.ImageFolder(os.path.join(data_dir, 'train'), transform=transform)
val_dataset = datasets.ImageFolder(os.path.join(data_dir, 'val'), transform=transform)
test_dataset = datasets.ImageFolder(os.path.join(data_dir, 'test'), transform=transform)
print(f'training data: {len(train_dataset)}')
print(f'validation data: {len(val_dataset)}')
print(f'test data: {len(test_dataset)}')


training data: 4861
validation data: 1620
test data: 1625


In [15]:
#TODO  check for the distribuation classes
def get_class_distribution(dataset): 
    counter_pr_class = Counter([dataset.classes[label] for _, label in dataset.imgs])
    return counter_pr_class   
train_distribution = get_class_distribution(train_dataset)
print(train_distribution)


Counter({'COVID19': 1218, 'Pneumonia': 1218, 'Tuberculosis': 1218, 'Normal': 1207})


In [16]:
##* datasets loader

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True); 
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

dataiter = iter(train_loader)
imgs, labels = next(dataiter)
print(imgs.shape)
print(torch.min(imgs), torch.max(imgs))
print(f'images shapes: {imgs.shape}')

torch.Size([32, 3, 128, 128])
tensor(-1.) tensor(1.)
images shapes: torch.Size([32, 3, 128, 128])


In [17]:
#* pretrained model for vgg16 
num_classes = len(train_dataset.classes)
vgg16_model = models.vgg16(weights=VGG16_Weights.IMAGENET1K_V1)
vgg16_model.classifier[6] = nn.Linear(vgg16_model.classifier[6].in_features, num_classes)

In [18]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model = vgg16_model.to(device) 
criterion = nn.CrossEntropyLoss();  
optimizer = optim.Adam(model.parameters(), lr=0.0001, )
#optim.SGD(model.parameters(), lr=0.0001, momentum=0.9)
num_epoch = 20

In [19]:
#* we need to train the module 

def train_model(model, train_loader, criterion, optimizer, device):
    

        #* we sting the model into train mode
    model.train(); 
    
    learning_loss = 0.0; 
    correct = 0; 
    total = 0; 
    
    for inputs, labels in train_loader: 
        
        inputs, labels = inputs.to(device), labels.to(device)
        #* starting with the gradient from  the intial point 0 
        optimizer.zero_grad()
        outputs = model(inputs); 
        loss = criterion(outputs, labels); 
        loss.backward();
        optimizer.step(); 
        learning_loss += loss.item(); 
        _, predict = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predict == labels).sum().item(); 
    
    learning_loss_result = learning_loss / total if total > 0 else 0
    train_acc = 100 * correct /total if total > 0 else 0
    return learning_loss_result, train_acc
                

In [20]:
### evaluation the model

def validation(model, val_loader, criterion, device ): 
    model.eval(); 
    running_loss = 0.0; 
    total = 0; 
    correct = 0; 
    
    with torch.no_grad(): 
        for inputs, labels in val_loader: 
            inputs, labels = inputs.to(device), labels.to(device); 
            outputs = model(inputs); 
            loss = criterion(outputs, labels); 
            running_loss += loss.item(); 
            _,predict = torch.max(outputs, 1); 
            
            total  += labels.size(0); 
            correct  += (predict == labels).sum().item()
            
    result_running_loss = running_loss / total if total > 0 else 0 
    result_val_acc = 100* correct / total if total > 0 else 0; 
    return  result_running_loss, result_val_acc; 

In [21]:
#* class for the early stopping 
class EarlyStopping:
    def __init__(self, patience =5, delta = 0, path='best.pt'):
        self.patience = patience
        self.counter = 0
        self.best_loss = float('inf')
        self.early_stop = False
        self.delta = delta
        self.path = path
    
    def __call__(self, val_loss, model):
        if val_loss < self.best_loss - self.delta:
            
            #* here we keep track for if val decreases and save the best model
            print(f'Validation loss started decreasing from: {self.best_loss:.4f} to {val_loss:.4f} saves best model..')
            self.best_loss = val_loss 
            self.Save_best_model(model)
            self.counter = 0
        else: 
            self.counter += 1 
            print(f'Early stopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
    
    def Save_best_model(self, model): 
        torch.save(model.state_dict(), self.path)
                     

In [22]:
earlyStopping = EarlyStopping(patience=5,  path='best_vgg16.pth')

train_losses, train_accs, val_losses, val_accs =[], [], [], []
for epoch in range(num_epoch):
    
    train_loss, train_acc = train_model(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = validation(model, val_loader, criterion, device)
    train_losses.append(train_loss); 
    train_accs.append(train_acc); 
    val_losses.append(val_loss)
    val_accs.append(val_acc); 

    print(f'Epoch {epoch+1}/{num_epoch}: Train Loss: {train_loss:.4f} Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')
    
    earlyStopping(val_loss, model)
    if earlyStopping.early_stop: 
        print('Note early stopping is triggered no need more training')
        break

RuntimeError: Adaptive pool MPS: output sizes must be divisible by input sizes. Non-divisible input sizes are not implemented on MPS device yet. For now, you can manually transfer tensor to cpu in this case. Please refer to [this issue](https://github.com/pytorch/pytorch/issues/96056)

In [None]:
vgg16_model.load_state_dict(torch.load('best_vgg16.pth', map_location=device, weights_only=True))
model = vgg16_model.to(device)