In [40]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import os
import numpy as np
from PIL import Image
from torch.utils.data import Subset, random_split, ConcatDataset

In [34]:
# Define the CNN architecture
class CNN(nn.Module):
    def __init__(self, device, image_sizes= 299, kernel_size=3, max_pool_size=2, lr=0.001, epochs=3):
        """
        CNN contructor
        inputs:
        device - device in which the cnn will be executed
        kernel_size (hiperparameter) - this is the size that will be used in the convolution layers
            type: int, a single number will work with a grid of size n x n
        max_pool_size - this is the size of the pooling layer
        lr (hiperparameter) - the learning rate that will be used to train the cnn
        epochs (hiperparmeter) - the amount of iterations that will be used to train the cnn
        """
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=kernel_size, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=max_pool_size, stride=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=kernel_size, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=max_pool_size, stride=2)
        self.fc1 = nn.Linear(32 * (image_sizes//(max_pool_size**2))**2, 128)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(128, 4)

        self.lr = lr
        self.epochs = epochs
        self.device = device


    def forward(self, x):
        """
        The forward function is used in the training to understand the infrastructure
        of the proposed model
        """
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        return x
    
    def train_cnn(self, train_dataset, train_labels):
        """
        Method to train the cnn based on the inputs:
        train_dataset - tensor with the information of the pixels of the images
        train_labels - tesnro with the category of each of the images frrom the train_dataset
        """
        # Set the loss function and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.parameters(), self.lr)

        # Create a data loader for the training dataset
        train_loader = DataLoader(dataset=list(zip(train_dataset, train_labels)), batch_size=16, shuffle=True)

        self.train()
        for epoch in range(self.epochs):
            running_loss = 0.0
            for images, labels in train_loader:
                images = images.to(self.device)
                labels = labels.to(self.device)

                optimizer.zero_grad()

                outputs = self(images)
                loss = criterion(outputs, labels)

                loss.backward()
                optimizer.step()

            running_loss += loss.item() * images.size(0)

            epoch_loss = running_loss / len(train_dataset)
            print(f"Epoch [{epoch+1}/{self.epochs}], Loss: {epoch_loss:.4f}")

    def predict(self, test_dataset):
        predictions = []
        self.eval()  # Set the model to evaluation mode
        # Create a data loader for the training dataset
        dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)

        with torch.no_grad():  # Disable gradient computation for efficiency
            for inputs in dataloader:
                # Forward pass through the model to obtain predictions
                outputs = self(inputs)
                # Get the predicted class indices
                _, predicted = torch.max(outputs, 1)
                # Append the predicted class indices to the predictions list
                predictions.extend(predicted.tolist())
        
        return predictions


In [135]:
# load images from additional dataset
# https://www.kaggle.com/datasets/gibi13/pneumonia-covid19-image-dataset

# Define the input folder with the original images
input_folder = "images/additional_dataset/"

# Define the output folder to save the resized images
output_folder = "images/analysis_dataset/"

# Define the target size for the resized images
target_size = (299, 299)



categories = ['COVID', 'Viral_Pneumonia']

for category in categories:
    # Get the list of files in the input folder
    file_list = os.listdir(os.path.join(input_folder, category))
    # Iterate over each file in the input folder
    for file_name in file_list:
        # Construct the full path of the input image
        input_path = os.path.join(input_folder, category, file_name)
        
        # Open the image using PIL
        image = Image.open(input_path)
        
        # Resize the image to the target size
        resized_image = image.resize(target_size)
        
        # Construct the full path of the output image
        output_path = os.path.join(output_folder, category, file_name)
        
        # Save the resized image to the output folder
        resized_image.save(output_path)
        
        # Close the image
        image.close()

In [23]:
# delete all files in images/cropped_dataset

cropped_images_folder = 'images/cropped_dataset/'
categories = {'COVID': 0, 'Lung_Opacity': 1, 'Normal': 2, 'Viral_Pneumonia': 3}

for category in categories.keys():
    folder_path = os.path.join(cropped_images_folder, category)

    file_list = os.listdir(folder_path)

    for file_name in file_list:
        file_path = os.path.join(cropped_images_folder, category, file_name)
        os.remove(file_path)

In [24]:
# Feature engineering - crop images borders only for some images, ignore the rest

original_images_folder = 'images/analysis_dataset/'
cropped_images_folder = 'images/cropped_dataset/'

target_size = (250, 250)

categories = {'COVID': 0, 'Lung_Opacity': 1, 'Normal': 2, 'Viral_Pneumonia': 3}

category_amount = []

for category in categories.keys():
    folder_path = os.path.join(original_images_folder, category)
    image_files = os.listdir(folder_path)
    category_amount.append(len(image_files))

print("Amount of images in analysis dataset:")
print(f"\tCOVID\t\t\t : {category_amount[0]}\n \
        Lung_Opacity\t\t : {category_amount[1]}\n \
        Normal\t\t\t : {category_amount[2]}\n \
        Viral_Pneumonia\t : {category_amount[3]}\n")

max_training = min(category_amount)
print("Maximum amount of images to use in the training: ", min(category_amount))

for category in categories.keys():

    cat_files = os.listdir(os.path.join(original_images_folder, category))

    for i, file in enumerate(cat_files):

        if i == max_training: break

        # constructing image path
        input_path = os.path.join(original_images_folder, category, file)
        # Open the image
        image = Image.open(input_path)

        if image.size[0] < 299 or image.size[1] < 299:
            continue
        
        # get original image size and calculate borders
        width, height = image.size

        left = (width - target_size[0]) // 2
        upper = (height - target_size[1]) // 2
        right = left + target_size[0]
        lower = upper + target_size[1]

        # crop the image
        cropped_image = image.crop((left, upper, right, lower))
        # construct the new path
        output_path = os.path.join(cropped_images_folder, category, file)
        cropped_image.save(output_path)
        image.close()

Amount of images in analysis dataset:
	COVID			 : 4596
         Lung_Opacity		 : 6012
         Normal			 : 10192
         Viral_Pneumonia	 : 2857

Maximum amount of images to use in the training:  2857


In [55]:
parent_folder_path = 'images/cropped_dataset/'

categories = {'COVID': 0, 'Lung_Opacity': 1, 'Normal': 2, 'Viral_Pneumonia': 3}

arrays = []
category_amount = []

for category in categories.keys():
    folder_path = os.path.join(parent_folder_path, category)
    image_files = os.listdir(folder_path)
    category_amount.append(len(image_files))

max_training = min(category_amount)

for cat_folder, value in categories.items():

    folder_path = os.path.join(parent_folder_path, cat_folder)
    image_files = os.listdir(folder_path)


    for i, file_name in enumerate(image_files):

        if i >= max_training: break


        file_path = os.path.join(folder_path, file_name)
        # open image using PIL
        image = Image.open(file_path)
        # image to numpy array
        image_array = np.array(image)

        if image.size != (250, 250):
            print(file_path, " IS NOT 250X250, it is: ", image.size)
            continue

        if image_array.shape != (250, 250):
            image_array = np.dot(image_array[..., :3], [0.2989, 0.5870, 0.1140])

        arrays.append(image_array)

arrays = arrays/ np.max(arrays)
image_data = np.stack(arrays, axis=0)
image_data = image_data.reshape(len(image_data), 1, 250, 250)
image_data = torch.from_numpy(image_data).to(torch.float32)

print(image_data.shape)

# Stratify - get a random amount of values from
train_perc = 0.8

train_size = int(train_perc * max_training)
test_size = max_training - train_size

train_dataset = []
test_dataset = []
train_labels = []
test_labels = []
for category, value in categories.items():
    train_cat_dataset, test_cat_dataset = random_split(image_data[value*max_training:(value+1)*max_training], [train_size, test_size])
    train_dataset.append(train_cat_dataset)
    test_dataset.append(test_cat_dataset)
    train_cat_labels = [value] * train_size
    train_labels += train_cat_labels
    test_cat_labels = [value] * test_size
    test_labels += test_cat_labels

train_dataset = ConcatDataset(train_dataset)
test_dataset = ConcatDataset(test_dataset)
train_labels = torch.from_numpy(np.array(train_labels)).to(torch.long)
test_labels = torch.from_numpy(np.array(test_labels))
    




torch.Size([11428, 1, 250, 250])


In [56]:
# Create an instance of the CNN model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CNN(epochs=4, device=device, image_sizes=250).to(device)

model.train_cnn(train_cat_dataset, train_labels)


Epoch [1/4], Loss: 0.0000
Epoch [2/4], Loss: 0.0000
Epoch [3/4], Loss: 0.0000
Epoch [4/4], Loss: 0.0000


In [57]:
predictions = model.predict(test_dataset)
print("Model Preductions: \n", predictions)
print("Real categories: \n", test_labels)

correct = 0
incorrect = 0
for i in range(len(predictions)):
    if predictions[i] == test_labels[i]:
        correct += 1
    else:
        incorrect += 1

print("Correct: ", correct, " Incorrect: ", incorrect)

Model Preductions: 
 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 