In [None]:
cd msg_gan

In [None]:
import os
import math
import torch
import pickle
import torch.nn as nn
from sklearn.model_selection import KFold, train_test_split
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import ImageFolder
from torchvision import transforms
import torchvision.utils as vutils
from MSG_GAN.GAN import *
from MSG_GAN.CustomLayers import *
from MSG_GAN.utils import *
import numpy as np
import matplotlib.pyplot as plt
from torch.nn.functional import avg_pool2d 

In [None]:
depth = 6

latent_size = 512

batch_size = 32

n_classes = 4

epochs = 20

lr = 3e-4

gpu_parallelize = True
num_workers = 2

image_size = 128

images_dir = 'msg_gan/data'
model_dir = '/output files'
samples_dir = '/output files'
#ACMSGGAN Discriminator checkpoint file
model_path = '/models/GAN_DIS_1450.pth'

#model_path = '/content/drive/MyDrive/Colab Notebooks/Brain Tumor Classification/GAN_DIS_650.pth'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 

In [None]:
dataset = ImageFolder(root=images_dir,
                           transform=transforms.Compose([
                               transforms.Resize(image_size),
                               transforms.CenterCrop(image_size),
                               transforms.Grayscale(1),
                               transforms.ToTensor(),
                               transforms.Normalize((0.5,), (0.5,)),
                           ]))

In [None]:
train_size = int(0.9 * len(dataset))
test_size = (len(dataset) - train_size) 
print(train_size, test_size)

In [None]:
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size], generator=torch.manual_seed(3))

In [None]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

In [None]:
for b, (images, labels) in enumerate(train_loader):
    break

class_names = ['Glioma','Menignioma', 'No Tumor', 'Pituitary']
print('Label:', labels.numpy())
print('Class: ', *np.array([class_names[i] for i in labels]))

# Print the images
im = vutils.make_grid(images, nrow=8)  # the default nrow is 8
plt.figure(figsize=(10,4))
plt.imshow(np.transpose(im.numpy(), (1, 2, 0)));

In [None]:
model = CustomDiscriminator(depth, latent_size, n_classes=n_classes).to(device)
model = nn.DataParallel(model)
model.load_state_dict(torch.load(model_path, map_location=device))

In [None]:
print("Discriminator Network Parameters:\n")
print(f"Total Parameters: {sum(p.numel() for p in model.parameters())}")
print(f"Total Trainable Parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

In [None]:
model.module.layers[0].aux = nn.Sequential(
    nn.Linear(512, 512),
    nn.LeakyReLU(0.2, True),
    model.module.layers[0].aux
)

for params in model.parameters():
    params.requires_grad = False

for params in model.module.layers[0].aux.parameters():
    params.requires_grad = True

model = model.to(device)

In [None]:
print("Discriminator Network Parameters (After Freezing for fine-tuning as classifier):\n")
print(f"Total Parameters: {sum(p.numel() for p in model.parameters())}")
print(f"Total Trainable Parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

In [None]:
criterion = nn.CrossEntropyLoss().to(device)

In [None]:
os.makedirs(model_dir, exist_ok=True)

In [None]:
#Main 98.29
# Print
model_name = 'New_AC_MSGGAN_classifier_Train_Test_Split_dis500_lr_5e-6'
    
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
Q = math.floor(len(train_loader)/batch_size)
# scheduler = torch.optim.lr_scheduler.CyclicLR(optimizer, base_lr=lr, max_lr=0.1, step_size_up=5, mode="exp_range", gamma=0.85, cycle_momentum=False)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=Q)
print('\nTraining the model')
b, test_b = 0, 0

training_losses = []
training_accuracies = []
test_loss = []
test_corr = []

start_time = time.time()

for epoch in range(epochs):
    e_start = time.time()

    model.train()

    running_loss = 0.0
    running_accuracy = 0.0
    tst_corr = 0.0

    for b, (X_train, y_train) in enumerate(train_loader):
        X_train, y_train = X_train.to(device), y_train.to(device)
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        X_train = [X_train] + [avg_pool2d(X_train, int(np.power(2, i)))
                                  for i in range(1, depth)]
        X_train = list(reversed(X_train))

        _, y_pred = model(X_train)

        y_pred = y_pred.view(-1, n_classes)

        loss = criterion(y_pred, y_train)

        predicted = torch.argmax(y_pred.data, dim=1).data
        batch_corr = (predicted == y_train).sum()
        running_accuracy += batch_corr
        
        loss.backward()
        
        optimizer.step()

        # scheduler.step()
        
        # print statistics
        running_loss += loss.item()

        if b % int(len(train_loader)/batch_size) == 0:
            print(f'Epoch: {epoch+1:2}  batch: {b+1:6} [{b+1:6}/{len(train_loader)}]  Loss: {loss.item():.6f}  Accuracy: {running_accuracy.item()*100/((batch_size ) * (b+1)):.6f}%')
        
    training_losses.append(loss.item())
    training_accuracies.append(running_accuracy.item()*100/((batch_size ) * (b+1)))

    print(f"Epoch {epoch+1} | Training Accuracy: {training_accuracies[-1]:.6f}% | Training Loss: {training_losses[-1]:.6f}")

    model.eval()
    b = 0

    with torch.no_grad():
        correct = 0
        labels = []
        pred = []

        new_y = 0.0

        # perform test set evaluation batch wise
        for b, (X, y) in enumerate(test_loader):
            b += 1
            # set label to use CUDA if available
            X, y = X.to(device), y.to(device)

            X = [X] + [avg_pool2d(X, int(np.power(2, i)))
                                  for i in range(1, depth)]
            X = list(reversed(X))

            labels.extend(y.view(-1).cpu().numpy())

            # perform forward pass
            _, y_val = model(X)
            y_val = y_val.view(-1, n_classes)

            # get argmax of predicted values, which is our label
            predicted = torch.argmax(y_val.data, dim=1).view(-1)

            # append predicted label
            pred.extend(predicted.cpu().numpy())

            # calculate loss
            loss = criterion(y_val, y)

            # increment correct with correcly predicted labels per batch
            correct += (predicted == y).sum()

        # append correct samples labels and losses
        test_corr.append(correct.item()*100/(b*(batch_size )))
        
        test_loss.append(loss.item())
            
    print(f"Validation accuracy: {test_corr[-1]:.6f}% | Validation Loss: {test_loss[-1]:.6f}")

print('Finished Training')

end_time = time.time() - start_time    

# print training summary
print("\nTraining Duration {:.2f} minutes".format(end_time/60))
print("GPU memory used : {} kb".format(torch.cuda.memory_allocated()))
print("GPU memory cached : {} kb".format(torch.cuda.memory_reserved()))

plot_loss(training_losses, test_loss) 
plot_accuracy(training_accuracies, test_corr) 

#torch.save(model.state_dict(), '/content/drive/My Drive/' + model_name + '.pt')
#torch.save(model.state_dict(), model_dir + '/' + model_name + '.pt')

In [None]:
test_images_dir = 'msg_gan/test_data'
test_dataset = ImageFolder(root=test_images_dir,
                           transform=transforms.Compose([
                               transforms.Resize(image_size),
                               transforms.CenterCrop(image_size),
                               transforms.Grayscale(1),
                               transforms.ToTensor(),
                               transforms.Normalize((0.5,), (0.5,)),
                           ]))
print(len(test_dataset))
new_test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

#Dis -100 & Moved one image number 719 from glioma to no tumor to check the results
model.eval()

b = 0
test_loss = []
test_corr = []

with torch.no_grad():
    correct = 0
    labels = []
    pred = []

    new_y = 0.0

    # perform test set evaluation batch wise
    for b, (X, y) in enumerate(new_test_loader):
        b += 1
        # set label to use CUDA if available
        X, y = X.to(device), y.to(device)

        X = [X] + [avg_pool2d(X, int(np.power(2, i)))
                              for i in range(1, depth)]
        X = list(reversed(X))

        labels.extend(y.view(-1).cpu().numpy())

        # perform forward pass
        _, y_val = model(X)
        y_val = y_val.view(-1, n_classes)

        # get argmax of predicted values, which is our label
        predicted = torch.argmax(y_val.data, dim=1).view(-1)

        # append predicted label
        pred.extend(predicted.cpu().numpy())

        # calculate loss
        loss = criterion(y_val, y)

        # increment correct with correcly predicted labels per batch
        correct += (predicted == y).sum()

    # append correct samples labels and losses
    test_corr.append(correct.item()*100/(b*(batch_size )))
    
    test_loss.append(loss.item())

print(f"Test Loss: {test_loss[-1]}")

labels = torch.Tensor(labels)
pred = torch.Tensor(pred)

print("Test Metrics: \n")

get_all_metrics(pred, labels)

if class_names:
    plot_confusion_matrix(pred, labels, class_names)
else:
    plot_confusion_matrix(pred, labels)
