# Imports

In [None]:
import random

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict
import seaborn as sns
from sklearn.metrics import confusion_matrix
from torchvision.datasets import ImageFolder
from torch.utils.data import ConcatDataset
from torch.utils.data import DataLoader, Dataset, random_split
from PIL import Image
from google.colab import drive


seed_value = 44
torch.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value)
np.random.seed(seed_value)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

drive.mount('/gdrive')

# Classifier

In [None]:
class Classifier(nn.Module):
    def __init__(self, channels_img, features_d, num_classes, img_size):
        super(Classifier, self).__init__()
        self.img_size = img_size
        self.net = nn.Sequential(
            nn.Conv2d(channels_img, features_d, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            self._block(features_d, features_d * 2, 4, 2, 1),
            self._block(features_d * 2, features_d * 4, 4, 2, 1),
            self._block(features_d * 4, features_d * 8, 4, 2, 1),
            self._block(features_d * 8, features_d * 16, 4, 2, 1),
            nn.Conv2d(features_d * 16, num_classes, kernel_size=4, stride=2, padding=0),
        )

    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.Conv2d(
                in_channels,
                out_channels,
                kernel_size,
                stride,
                padding,
                bias=False,
            ),
            nn.BatchNorm2d(out_channels, affine=True),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.net(x)
        return x.squeeze(-1).squeeze(-1)


In [None]:
def initialize_weights(model):
    for m in model.modules():
        if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d, nn.BatchNorm2d)):
            nn.init.normal_(m.weight.data, 0.0, 0.02)

# Hiperparameters

In [None]:
LEARNING_RATE = 1e-4
BATCH_SIZE = 64
IMAGE_SIZE = 128
CHANNELS_IMG = 1
NUM_CLASSES = 2
NUM_EPOCHS = 15

# Training on real data

In [None]:
model = Classifier(CHANNELS_IMG, 128, NUM_CLASSES, IMAGE_SIZE).to(device)
initialize_weights(model)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

transform = transforms.Compose(
    [
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(
            [0.5 for _ in range(CHANNELS_IMG)], [0.5 for _ in range(CHANNELS_IMG)]
        ),
    ]
)

train_data_dir = '/gdrive/MyDrive/chest_xray/train'
test_data_dir = '/gdrive/MyDrive/chest_xray/test'
train_data = datasets.ImageFolder(train_data_dir, transform=transform)
test_data = datasets.ImageFolder(test_data_dir, transform=transform)
trainloader = torch.utils.data.DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
testloader = torch.utils.data.DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)

losses = []
for epoch in range(NUM_EPOCHS):
    running_loss = 0
    for imgs, labels in trainloader:
        imgs = imgs.to(device)
        labels = labels.to(device)

        outputs = model(imgs)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    else:
        avg_loss = running_loss/len(trainloader)
        print(f"Epoch [{epoch + 1}/{NUM_EPOCHS}] - Loss: {avg_loss:.4f}")
        losses.append(avg_loss)

# Results

In [None]:
plt.figure(figsize=(10,5))
plt.plot(losses)
plt.xlabel("Numer epoki")
plt.ylabel("Wartość funkcji straty")
plt.savefig('classifier_loss.eps', format='eps', dpi=1200)
plt.show()

In [None]:
model.eval()
correct = 0
total = 0

true_labels = []
predicted_labels = []

with torch.no_grad():
    for imgs, labels in testloader:
        imgs = imgs.to(device)
        labels = labels.to(device)

        outputs = model(imgs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        true_labels.extend(labels.cpu().numpy())
        predicted_labels.extend(predicted.cpu().numpy())

accuracy = 100 * correct / total
print(f"Accuracy of the classifier on test data: {accuracy:.2f}%")

conf_mat = confusion_matrix(true_labels, predicted_labels)

TP = conf_mat[1, 1]
TN = conf_mat[0, 0]
FP = conf_mat[0, 1]
FN = conf_mat[1, 0]

precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1 = 2 * (precision * recall) / (precision + recall)
specificity = TN / (TN + FP)

print(f"Precision: {precision:.4f}")
print(f"Recall (Sensitivity): {recall:.4f}")
print(f"Specificity: {specificity:.4f}")
print(f"F1-Score: {f1:.4f}")
print("Confusion Matrix:")
print(conf_matrix)

plt.figure(figsize=(10,7))
sns.heatmap(conf_mat, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Przewidziana etykieta')
plt.ylabel('Prawdziwa etykieta')
plt.savefig('classifier_conf_matrix.eps', format='eps', dpi=1200)
plt.show()

# Generator code

In [None]:
class Generator(nn.Module):
    def __init__(self, channels_noise, channels_img, features_g, num_classes,
                 img_size, embed_size):
        super(Generator, self).__init__()
        self.img_size = img_size
        self.net = nn.Sequential(
            self._block(channels_noise + embed_size, features_g * 32, 4, 1, 0),
            self._block(features_g * 32, features_g * 16, 4, 2, 1),
            self._block(features_g * 16, features_g * 8, 4, 2, 1),
            self._block(features_g * 8, features_g * 4, 4, 2, 1),
            self._block(features_g * 4, features_g * 2, 4, 2, 1),
            nn.ConvTranspose2d(
                features_g * 2, channels_img, kernel_size=4, stride=2, padding=1
            ),
            nn.Tanh(),
        )
        self.embed = nn.Embedding(num_classes, embed_size)

    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.ConvTranspose2d(
                in_channels,
                out_channels,
                kernel_size,
                stride,
                padding,
                bias=False,
            ),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
        )

    def forward(self, x, labels):
        embedding = self.embed(labels).unsqueeze(2).unsqueeze(3)
        x = torch.cat([x, embedding], dim=1)
        return self.net(x)

In [None]:
IMAGE_SIZE = 128
CHANNELS_IMG = 1
NUM_CLASSES = 2
GEN_EMBEDDING = 100
NOISE_DIM = 100
FEATURES_GEN = 128

In [None]:
loaded_gen = Generator(NOISE_DIM, CHANNELS_IMG, FEATURES_GEN, NUM_CLASSES, IMAGE_SIZE, GEN_EMBEDDING).to(device)
loaded_gen.load_state_dict(torch.load('/gdrive/MyDrive/Magisterka/final_gan_model/generator_model.pth'))
loaded_gen.eval()

# Checking classifier trained on real data results with generated data

In [None]:
def generate_images(generator, num_images, class_label):
    noise = torch.randn(num_images, NOISE_DIM, 1, 1).to(device)

    labels = torch.full((num_images,), class_label, dtype=torch.long).to(device)

    with torch.no_grad():
        images = generator(noise, labels).cpu().numpy()
    return images

healthy_images = generate_images(loaded_gen, 1000, 0)
pneumonia_images = generate_images(loaded_gen, 1000, 1)

In [None]:
healthy_images_tensor = torch.tensor(healthy_images).to(device)
pneumonia_images_tensor = torch.tensor(pneumonia_images).to(device)

healthy_predictions = torch.argmax(model(healthy_images_tensor), dim=1)
pneumonia_predictions = torch.argmax(model(pneumonia_images_tensor), dim=1)

correct_healthy = (healthy_predictions == 0).sum().item()
correct_pneumonia = (pneumonia_predictions == 1).sum().item()

print(f"Poprawnie sklasyfikowane zdrowe obrazy: {correct_healthy}/{len(healthy_images)}")
print(f"Poprawnie sklasyfikowane obrazy z zapaleniem płuc: {correct_pneumonia}/{len(pneumonia_images)}")

true_labels_healthy = [0] * len(healthy_images)
true_labels_pneumonia = [1] * len(pneumonia_images)

true_labels = true_labels_healthy + true_labels_pneumonia
predicted_labels = list(healthy_predictions.cpu().numpy()) + list(pneumonia_predictions.cpu().numpy())

conf_matrix = confusion_matrix(true_labels, predicted_labels)

TP = conf_mat[1, 1]
TN = conf_mat[0, 0]
FP = conf_mat[0, 1]
FN = conf_mat[1, 0]

precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1 = 2 * (precision * recall) / (precision + recall)
specificity = TN / (TN + FP)

print(f"Precision: {precision:.4f}")
print(f"Recall (Sensitivity): {recall:.4f}")
print(f"Specificity: {specificity:.4f}")
print(f"F1-Score: {f1:.4f}")
print("Confusion Matrix:")
print(conf_matrix)

plt.figure(figsize=(10,7))
sns.heatmap(conf_matrix, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Przewidziana etykieta')
plt.ylabel('Prawdziwa etykieta')
plt.savefig('classifier_conf_matrix_for_test_generated.eps', format='eps', dpi=1200)
plt.show()

# Lerning classifier on generated + real data

In [None]:
model = Classifier(CHANNELS_IMG, 128, NUM_CLASSES, IMAGE_SIZE).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

transform = transforms.Compose(
    [
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(
            [0.5 for _ in range(CHANNELS_IMG)], [0.5 for _ in range(CHANNELS_IMG)]
        ),
    ]
)

train_data_dir = '/gdrive/MyDrive/chest_xray/train'
test_data_dir = '/gdrive/MyDrive/chest_xray/test'
train_data = datasets.ImageFolder(train_data_dir, transform=transform)
test_data = datasets.ImageFolder(test_data_dir, transform=transform)
testloader = torch.utils.data.DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)

healthy_images = [img.squeeze(0) for img in healthy_images]
pneumonia_images = [img.squeeze(0) for img in pneumonia_images]

healthy_pil_images = [transforms.ToPILImage()(img) for img in healthy_images]
pneumonia_pil_images = [transforms.ToPILImage()(img) for img in pneumonia_images]


class GeneratedDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

generated_healthy_dataset = GeneratedDataset(healthy_pil_images, [0]*len(healthy_pil_images), transform)
generated_pneumonia_dataset = GeneratedDataset(pneumonia_pil_images, [1]*len(pneumonia_pil_images), transform)

combined_train_dataset = ConcatDataset([train_data, generated_healthy_dataset, generated_pneumonia_dataset])
combined_trainloader = DataLoader(combined_train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)

losses = []
for epoch in range(NUM_EPOCHS):
    running_loss = 0
    for imgs, labels in combined_trainloader:
        imgs = imgs.to(device)
        labels = labels.to(device)

        outputs = model(imgs)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    else:
        avg_loss = running_loss/len(combined_trainloader)
        print(f"Epoch [{epoch + 1}/{NUM_EPOCHS}] - Loss: {avg_loss:.4f}")
        losses.append(avg_loss)

# Results

In [None]:
plt.figure(figsize=(10,5))
plt.plot(losses)
plt.xlabel("Numer epoki")
plt.ylabel("Wartość funkcji straty")
plt.savefig('classifier_loss_2.eps', format='eps', dpi=1200)
plt.show()

In [None]:
model.eval()
correct = 0
total = 0

true_labels = []
predicted_labels = []

with torch.no_grad():
    for imgs, labels in testloader:
        imgs = imgs.to(device)
        labels = labels.to(device)

        outputs = model(imgs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        true_labels.extend(labels.cpu().numpy())
        predicted_labels.extend(predicted.cpu().numpy())

accuracy = 100 * correct / total
print(f"Accuracy of the classifier on test data: {accuracy:.2f}%")

conf_mat = confusion_matrix(true_labels, predicted_labels)

TP = conf_mat[1, 1]
TN = conf_mat[0, 0]
FP = conf_mat[0, 1]
FN = conf_mat[1, 0]

precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1 = 2 * (precision * recall) / (precision + recall)
specificity = TN / (TN + FP)

print(f"Precision: {precision:.4f}")
print(f"Recall (Sensitivity): {recall:.4f}")
print(f"Specificity: {specificity:.4f}")
print(f"F1-Score: {f1:.4f}")
print("Confusion Matrix:")
print(conf_matrix)

plt.figure(figsize=(10,7))
sns.heatmap(conf_mat, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Przewidziana etykieta')
plt.ylabel('Prawdziwa etykieta')
plt.savefig('classifier_conf_matrix_learned_on_real_and_generated.eps', format='eps', dpi=1200)
plt.show()