# 1. Packages

In [1]:
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
%matplotlib inline

In [3]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms

In [4]:
%load_ext autoreload
%autoreload 2
from google.colab import drive
import os
import sys
drive.mount("/content/drive")
GOOGLE_DRIVE_PATH_AFTER_MYDRIVE = "DL_FINAL"
GOOGLE_DRIVE_PATH = os.path.join("drive", "MyDrive", GOOGLE_DRIVE_PATH_AFTER_MYDRIVE)
print(os.listdir(GOOGLE_DRIVE_PATH))

# Add to sys so we can import .py files.
sys.path.append(GOOGLE_DRIVE_PATH)
print(f"google drive = {GOOGLE_DRIVE_PATH}")

Mounted at /content/drive
['pathmnist.npz', 'breastmnist.npz', 'pneumoniamnist.npz', 'Evaluating Generative Models for Medical Images.pptx', 'organsmnist.npz', 'bloodmnist.npz', 'Final project.pdf', 'wgan', 'Copy of Conditioned GAN for Medical Image.ipynb', 'wgan_new', 'FID.ipynb', 'Untitled1.ipynb', 'Conditioned GAN for Medical Image.ipynb']
google drive = drive/MyDrive/DL_FINAL


Check GPU status

In [5]:
if torch.cuda.is_available():
    print("Good to go!")
    DEVICE = torch.device("cuda")
else:
    print("Please set GPU via Edit -> Notebook Settings.")
    DEVICE = torch.device("cpu")

Good to go!


# 2. Dataset Loading
https://github.com/MedMNIST/MedMNIST


Requirements: one color, two greyscales \\

Color: PathMNIST

Greyscale: PneumoniaMNIST + BreastMNIST


Details of the dataset:https://www.nature.com/articles/s41597-022-01721-8

In [6]:
pathmnist_dir = os.path.join(GOOGLE_DRIVE_PATH, "pathmnist.npz")
pneumoniamnist_dir = os.path.join(GOOGLE_DRIVE_PATH, "pneumoniamnist.npz")
breatmnist_dir = os.path.join(GOOGLE_DRIVE_PATH, "breastmnist.npz")
bloodmnist_dir = os.path.join(GOOGLE_DRIVE_PATH, "bloodmnist.npz")
organsmnist_dir = os.path.join(GOOGLE_DRIVE_PATH, "organsmnist.npz")

#### Custom Dataset

In [7]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
import PIL.Image as Image

class MedMNISTDataset(Dataset):
    def __init__(self, data_dir, split='train', transform=None):
        self.data_dir = data_dir
        self.transform = transform

        # Load data from npz file
        data = np.load(data_dir)
        if split == 'train':
            self.images = data['train_images']
            self.labels = data['train_labels']
        elif split == 'val':
            self.images = data['val_images']
            self.labels = data['val_images']
        else:
          self.images = data['test_images']
          self.labels = data['test_labels']

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.fromarray(self.images[idx])
        label = int(self.labels[idx])

        if self.transform:
            image = self.transform(image)

        return image, label

#### bloodmnist_dir

In [8]:

dataset_dir = bloodmnist_dir
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])


train_dataset = MedMNISTDataset(dataset_dir, split='train', transform=transform)
val_dataset = MedMNISTDataset(dataset_dir, split='val', transform=transform)
test_dataset = MedMNISTDataset(dataset_dir, split='test', transform=transform)


bloodmnist_train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
bloodmnist_val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)
bloodmnist_test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)
print(f"len of train_dataset = {len(train_dataset)}")
print(f"len of val_dataset = {len(val_dataset)}")
print(f"len of test_dataset = {len(test_dataset)}")

len of train_dataset = 11959
len of val_dataset = 1712
len of test_dataset = 3421


In [9]:
dataiter = iter(bloodmnist_train_loader)
images, labels = next(dataiter)
# MNIST image size is 1*28*28
img_size = images.shape[2]
print(images.shape)

torch.Size([128, 3, 28, 28])


#### pneumoniamnist_dir

In [10]:

dataset_dir = pneumoniamnist_dir
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])
# Create datasets
train_dataset = MedMNISTDataset(dataset_dir, split='train', transform=transform)
val_dataset = MedMNISTDataset(dataset_dir, split='val', transform=transform)
test_dataset = MedMNISTDataset(dataset_dir, split='test', transform=transform)

# Create dataloaders
pneu_train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
pneu_val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)
pneu_test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)
print(f"len of train_dataset = {len(train_dataset)}")
print(f"len of val_dataset = {len(val_dataset)}")
print(f"len of test_dataset = {len(test_dataset)}")

len of train_dataset = 4708
len of val_dataset = 524
len of test_dataset = 624


In [11]:
dataiter = iter(pneu_train_loader)
images, labels = next(dataiter)
# MNIST image size is 1*28*28
img_size = images.shape[2]
print(images.shape)

torch.Size([128, 1, 28, 28])


#### breatmnist_dir

In [12]:

dataset_dir = breatmnist_dir
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,),(0.5,))
])

# Create datasets
train_dataset = MedMNISTDataset(dataset_dir, split='train', transform=transform)
val_dataset = MedMNISTDataset(dataset_dir, split='val', transform=transform)
test_dataset = MedMNISTDataset(dataset_dir, split='test', transform=transform)

# Create dataloaders
breast_train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
breast_val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)
breast_test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)
print(f"len of train_dataset = {len(train_dataset)}")
print(f"len of val_dataset = {len(val_dataset)}")
print(f"len of test_dataset = {len(test_dataset)}")

len of train_dataset = 546
len of val_dataset = 78
len of test_dataset = 156


In [13]:
dataiter = iter(breast_train_loader)
images, labels = next(dataiter)
# MNIST image size is 1*28*28
img_size = images.shape[2]
print(images.shape)

torch.Size([128, 1, 28, 28])


# 3. Implement and train a class-conditional GAN on each dataset

#### Discriminator

In [14]:
class Discriminator(nn.Module):
    def __init__(self, channels_img, features_d, num_classes, img_size):
        super(Discriminator, self).__init__()
        self.img_size = img_size
        self.disc = nn.Sequential(
            # input: 128*2*28*28
            # ADD ONE MORE CHANNEL
            nn.Conv2d(channels_img + 1, features_d, kernel_size=4, stride=2, padding=1), #14*14
            nn.LeakyReLU(0.2),
            # _block(in_channels, out_channels, kernel_size, stride, padding)
            self._block(features_d, features_d * 2, 4, 2, 1), # 7*7
            # self._block(features_d*2, features_d *4 , 3, 2, 1), #7*7 16-->
            self._block(features_d * 2, features_d * 4, 5, 1, 0), #3*3
            # # After all _block img output is 4x4 (Conv2d below makes into 1x1)
            nn.Conv2d(features_d * 4, 1, kernel_size=3, stride=1, padding=0)# 1*1
        )
        self.embed = nn.Embedding(num_classes, img_size * img_size)

    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.Conv2d(
                in_channels,
                out_channels,
                kernel_size,
                stride,
                padding,
                bias=False,
            ),
            nn.InstanceNorm2d(out_channels, affine=True),
            nn.LeakyReLU(0.2),
        )

    def forward(self, x, labels):
      # additional channels, reshape to add as one more channel
        embedding = self.embed(labels).view(labels.shape[0],1,self.img_size, self.img_size)
        # print(f"embedding shape dis = {embedding.shape}")
        x = torch.cat([x, embedding], dim=1) # N, C+1, H, W
        res = self.disc(x)
        # print(f"disrciminator res shape = {res.shape}")
        return res

#### Generator

In [15]:
class Generator(nn.Module):
    def __init__(self, channels_noise, channels_img, features_g, num_classes, img_size, embed_size):
        super(Generator, self).__init__()
        self.img_size = img_size
        self.net = nn.Sequential(
            # Input: N x channels_noise x 1 x 1
            self._block(channels_noise+embed_size, features_g * 8, 3, 1, 0),  # img: 3*3
            self._block(features_g * 8, features_g * 4, 3, 2, 0),  # img: 7*7
            self._block(features_g * 4, features_g * 2, 4, 2, 1),  # img: 14*14
            # self._block(features_g * 4, features_g * 2, 3, 2, 1),  # img: 28*28
            nn.ConvTranspose2d(
                features_g * 2, channels_img, kernel_size=4, stride=2, padding=1
            ),
            # Output: N x channels_img x 28*28
            nn.Tanh(),
        )
        self.embed = nn.Embedding(num_classes, embed_size) # should add at latent dimension


    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.ConvTranspose2d(
                in_channels,
                out_channels,
                kernel_size,
                stride,
                padding,
                bias=False,
            ),
            nn.InstanceNorm2d(out_channels, affine=True),
            nn.ReLU(),
        )

    def forward(self, x, labels):
      embedding = self.embed(labels).unsqueeze(2).unsqueeze(3) # input: N, embed_size, 1, 1, add dimension.
      x = torch.cat([x, embedding], dim=1)

      return self.net(x)

### Weight Initialization

In [16]:
def initialize_weights(model):
    # Initializes weights according to the DCGAN paper
    for m in model.modules():
        if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d, nn.BatchNorm2d)):
            nn.init.normal_(m.weight.data, 0.0, 0.02)

In [17]:
# Gradient Penalty:
def gradient_penalty(critic, real, fake, labels, device="cpu"):
    BATCH_SIZE, C, H, W = real.shape
    epsilon = torch.rand((BATCH_SIZE, 1, 1, 1)).repeat(1, C, H, W).to(device)
    interpolated_images = real * epsilon + fake * (1 - epsilon) # interpolate

    # Calculate critic scores: scores from interpolated images
    mixed_scores = critic(interpolated_images, labels)

    # Take the gradient of the scores with respect to the images
    gradient = torch.autograd.grad(
        inputs=interpolated_images,
        outputs=mixed_scores,
        grad_outputs=torch.ones_like(mixed_scores),
        create_graph=True,
        retain_graph=True,
    )[0] # The first element
    gradient = gradient.view(gradient.shape[0], -1)
    gradient_norm = gradient.norm(2, dim=1)
    gradient_penalty = torch.mean((gradient_norm - 1) ** 2)
    return gradient_penalty

### Model Training

### PneumoniaMNIST

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
# Hyperparameters etc.
device = "cuda" if torch.cuda.is_available() else "cpu"
LEARNING_RATE = 1e-4
BATCH_SIZE = 64
IMG_SIZE = 28
CHANNELS_IMG = 1
NUM_CLASSES = 2
GEN_EMBEDDING = 100
Z_DIM = 100
NUM_EPOCHS = 100
FEATURES_CRITIC = 16
FEATURES_GEN = 16
CRITIC_ITERATIONS = 5
LAMBDA_GP = 10
train_loader = pneu_train_loader

gen = Generator(Z_DIM, CHANNELS_IMG, FEATURES_GEN, NUM_CLASSES, IMG_SIZE, GEN_EMBEDDING).to(device)
critic = Discriminator(CHANNELS_IMG, FEATURES_CRITIC, NUM_CLASSES, IMG_SIZE).to(device)
initialize_weights(gen)
initialize_weights(critic)

# initializate optimizer
opt_gen = optim.Adam(gen.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.9))
opt_critic = optim.Adam(critic.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.9))

step = 0

gen.train()
critic.train()

for epoch in range(NUM_EPOCHS):
    # Target labels not needed! <3 unsupervised
    for batch_idx, (real, labels) in enumerate(tqdm(train_loader)):
        real = real.to(device)
        cur_batch_size = real.shape[0]
        labels = labels.to(device)
        # Train Critic: max E[critic(real)] - E[critic(fake)]
        # equivalent to minimizing the negative of that
        for _ in range(CRITIC_ITERATIONS):
            noise = torch.randn(cur_batch_size, Z_DIM, 1, 1).to(device)

            fake = gen(noise, labels)
            # print(f"noise shape = {real.shape}, labels shape = {labels.shape}")
            critic_real = critic(real, labels).reshape(-1)
            critic_fake = critic(fake, labels).reshape(-1)
            gp = gradient_penalty(critic, real, fake, labels, device=device)
            loss_critic = (
                -(torch.mean(critic_real) - torch.mean(critic_fake)) + LAMBDA_GP * gp
            )
            critic.zero_grad()
            loss_critic.backward(retain_graph=True)
            opt_critic.step()

        # Train Generator: max E[critic(gen_fake)] <-> min -E[critic(gen_fake)]
        gen_fake = critic(fake, labels).reshape(-1)
        loss_gen = -torch.mean(gen_fake)
        gen.zero_grad()
        loss_gen.backward()
        opt_gen.step()

        # Print losses occasionally and print to tensorboard
        if batch_idx % 10 == 0 and batch_idx > 0:
            print(
                f"Epoch [{epoch}/{NUM_EPOCHS}] Batch {batch_idx}/{len(train_loader)} \
                  Loss D: {loss_critic:.4f}, loss G: {loss_gen:.4f}"
            )

            with torch.no_grad():
                fake = gen(noise, labels)

                img_grid_real = torchvision.utils.make_grid(real[0], normalize=True)
                img_grid_fake = torchvision.utils.make_grid(fake[0], normalize=True)

                GOOGLE_DRIVE_PATH_fake = '/content/drive/My Drive/DL_FINAL/wgan/pneumnist_fake/'
                os.makedirs(GOOGLE_DRIVE_PATH_fake, exist_ok=True)
                GOOGLE_DRIVE_PATH_real = '/content/drive/My Drive/DL_FINAL/wgan/pneumnist_real/'
                os.makedirs(GOOGLE_DRIVE_PATH_real,exist_ok=True)
                torchvision.utils.save_image(img_grid_real, os.path.join(GOOGLE_DRIVE_PATH_real,f"wgan_epoch_{epoch+1}.jpg"))

                torchvision.utils.save_image(img_grid_fake, os.path.join(GOOGLE_DRIVE_PATH_fake,f"wgan_epoch_{epoch+1}.jpg"))

            step += 1

### breastmnist_dir

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
# Hyperparameters etc.
device = "cuda" if torch.cuda.is_available() else "cpu"
LEARNING_RATE = 5e-4
BATCH_SIZE = 64
IMG_SIZE = 28
CHANNELS_IMG = 1
NUM_CLASSES = 2
GEN_EMBEDDING = 100
Z_DIM = 100
NUM_EPOCHS = 100
FEATURES_CRITIC = 64
FEATURES_GEN = 64
CRITIC_ITERATIONS = 5
LAMBDA_GP = 10
train_loader = breast_train_loader

gen = Generator(Z_DIM, CHANNELS_IMG, FEATURES_GEN, NUM_CLASSES, IMG_SIZE, GEN_EMBEDDING).to(device)
critic = Discriminator(CHANNELS_IMG, FEATURES_CRITIC, NUM_CLASSES, IMG_SIZE).to(device)
initialize_weights(gen)
initialize_weights(critic)

# initializate optimizer
opt_gen = optim.Adam(gen.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.9))
opt_critic = optim.Adam(critic.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.9))

step = 0

gen.train()
critic.train()

for epoch in range(NUM_EPOCHS):

    for batch_idx, (real, labels) in enumerate(tqdm(train_loader)):
        real = real.to(device)
        cur_batch_size = real.shape[0]
        labels = labels.to(device)
        # Train Critic: max E[critic(real)] - E[critic(fake)]
        # equivalent to minimizing the negative of that
        for _ in range(CRITIC_ITERATIONS):
            noise = torch.randn(cur_batch_size, Z_DIM, 1, 1).to(device)

            fake = gen(noise, labels)
            # print(f"noise shape = {real.shape}, labels shape = {labels.shape}")
            critic_real = critic(real, labels).reshape(-1)
            critic_fake = critic(fake, labels).reshape(-1)
            gp = gradient_penalty(critic, real, fake, labels, device=device)
            loss_critic = (
                -(torch.mean(critic_real) - torch.mean(critic_fake)) + LAMBDA_GP * gp
            )
            critic.zero_grad()
            loss_critic.backward(retain_graph=True)
            opt_critic.step()

        # Train Generator: max E[critic(gen_fake)] <-> min -E[critic(gen_fake)]
        gen_fake = critic(fake, labels).reshape(-1)
        loss_gen = -torch.mean(gen_fake)
        gen.zero_grad()
        loss_gen.backward()
        opt_gen.step()

        # Print losses occasionally and print to tensorboard
        if batch_idx % 10 == 0 and batch_idx > 0:
            print(
                f"Epoch [{epoch}/{NUM_EPOCHS}] Batch {batch_idx}/{len(train_loader)} \
                  Loss D: {loss_critic:.4f}, loss G: {loss_gen:.4f}"
            )

        with torch.no_grad():
            fake = gen(noise, labels)

            img_grid_real = torchvision.utils.make_grid(real[0], normalize=True)
            img_grid_fake = torchvision.utils.make_grid(fake[0], normalize=True)
            GOOGLE_DRIVE_PATH_fake = '/content/drive/My Drive/DL_FINAL/wgan/breast_fake/'
            os.makedirs(GOOGLE_DRIVE_PATH_fake, exist_ok=True)
            GOOGLE_DRIVE_PATH_real = '/content/drive/My Drive/DL_FINAL/wgan/breast_real/'
            os.makedirs(GOOGLE_DRIVE_PATH_real,exist_ok=True)
            torchvision.utils.save_image(img_grid_real, os.path.join(GOOGLE_DRIVE_PATH_real,f"wgan_epoch_{epoch+1}.jpg"))

            torchvision.utils.save_image(img_grid_fake, os.path.join(GOOGLE_DRIVE_PATH_fake,f"wgan_epoch_{epoch+1}.jpg"))

            # step += 1

### bloodmnist_dir

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
# Hyperparameters etc.
device = "cuda" if torch.cuda.is_available() else "cpu"
LEARNING_RATE = 1e-4 # todo
BATCH_SIZE = 64 # todo
IMG_SIZE = 28
CHANNELS_IMG = 3
NUM_CLASSES = 8
GEN_EMBEDDING = 100
Z_DIM = 100
NUM_EPOCHS = 100
FEATURES_CRITIC = 128 # 16, 64, 128
FEATURES_GEN = 128 # 16, 64, 128
CRITIC_ITERATIONS = 5
LAMBDA_GP = 10
train_loader = bloodmnist_train_loader
gen = Generator(Z_DIM, CHANNELS_IMG, FEATURES_GEN, NUM_CLASSES, IMG_SIZE, GEN_EMBEDDING).to(device)
critic = Discriminator(CHANNELS_IMG, FEATURES_CRITIC, NUM_CLASSES, IMG_SIZE).to(device)
initialize_weights(gen)
initialize_weights(critic)

opt_gen = optim.Adam(gen.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.9))
opt_critic = optim.Adam(critic.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.9))

step = 0

gen.train()
critic.train()

for epoch in range(NUM_EPOCHS):
    # Target labels not needed! <3 unsupervised
    for batch_idx, (real, labels) in enumerate(tqdm(train_loader)):
        real = real.to(device)
        cur_batch_size = real.shape[0]
        labels = labels.to(device)
        for _ in range(CRITIC_ITERATIONS):
            noise = torch.randn(cur_batch_size, Z_DIM, 1, 1).to(device)

            fake = gen(noise, labels)
            # print(f"noise shape = {real.shape}, labels shape = {labels.shape}")
            critic_real = critic(real, labels).reshape(-1)
            critic_fake = critic(fake, labels).reshape(-1)
            gp = gradient_penalty(critic, real, fake, labels, device=device)
            loss_critic = (
                -(torch.mean(critic_real) - torch.mean(critic_fake)) + LAMBDA_GP * gp
            )
            critic.zero_grad()
            loss_critic.backward(retain_graph=True)
            opt_critic.step()

        # Train Generator: max E[critic(gen_fake)] <-> min -E[critic(gen_fake)]
        gen_fake = critic(fake, labels).reshape(-1)
        loss_gen = -torch.mean(gen_fake)
        gen.zero_grad()
        loss_gen.backward()
        opt_gen.step()

        # Print losses occasionally and print to tensorboard
        if batch_idx % 10 == 0 and batch_idx > 0:
            print(
                f"Epoch [{epoch}/{NUM_EPOCHS}] Batch {batch_idx}/{len(train_loader)} \
                  Loss D: {loss_critic:.4f}, loss G: {loss_gen:.4f}"
            )

        with torch.no_grad():
            fake = gen(noise, labels)

            img_grid_real = torchvision.utils.make_grid(real[0], normalize=True)
            img_grid_fake = torchvision.utils.make_grid(fake[0], normalize=True)
            GOOGLE_DRIVE_PATH_fake = '/content/drive/My Drive/DL_FINAL/wgan/blood_fake/'
            os.makedirs(GOOGLE_DRIVE_PATH_fake, exist_ok=True)
            GOOGLE_DRIVE_PATH_real = '/content/drive/My Drive/DL_FINAL/wgan/blood_real/'
            os.makedirs(GOOGLE_DRIVE_PATH_real,exist_ok=True)
            torchvision.utils.save_image(img_grid_real, os.path.join(GOOGLE_DRIVE_PATH_real,f"wgan_epoch_{epoch+1}.jpg"))

            torchvision.utils.save_image(img_grid_fake, os.path.join(GOOGLE_DRIVE_PATH_fake,f"wgan_epoch_{epoch+1}.jpg"))

            # step += 1

### Reference:
https://github.com/aladdinpersson/Machine-Learning-Collection/tree/master/ML/Pytorch/GANs