# Setting up GPU

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

In [None]:
!pip3 install --upgrade pip
!pip3 install einops

In [None]:
import locale
locale.getpreferredencoding = lambda: "UTF-8"

# Imports and Definitions

In [None]:
# Import of libraries
import random
import imageio
import numpy as np
from argparse import ArgumentParser

from tqdm.auto import tqdm
import matplotlib.pyplot as plt

import einops
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader

from torchvision.transforms import Compose, ToTensor, Lambda

# Setting reproducibility
SEED = 0
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

## Getting device

If you are running this codebook from Google Colab, make sure you are using a GPU runtime. For non-pro users, typically a *Tesla T4* GPU is provided.

In [None]:
# Getting device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")
print(f"Using device: {device}\t" + (f"{torch.cuda.get_device_name(0)}" if torch.cuda.is_available() else "CPU"))

## Execution options

Here's a few options you should set:

 - `no_train` specifies whether you want to skip the training loop and just use a pre-trained model. If you haven't trained a model already using this notebook, keep this as `False`. If you want to use a pre-trained model, load it in the colab filesystem.

- `batch_size`, `n_epochs` and `lr` are your typical training hyper-parameters. Notice that `lr=0.001` is the hyper-parameter used by the authors.


In [None]:
no_train = False
batch_size = 64
n_epochs = 50
lr = 0.001

# Utility functions

Following are two utility functions: `show_images` allows to display images in a square-like pattern with a custom title, while `show_fist_batch` simply shows the images in the first batch of a DataLoader object.

In [None]:
def show_images(images, title=""):
    """Shows the provided images as sub-pictures in a square"""

    # Converting images to CPU numpy arrays
    if type(images) is torch.Tensor:
        images = images.detach().cpu().numpy()

    # Defining number of rows and columns
    fig = plt.figure(figsize=(20, 20))
    rows = int(len(images) ** (1 / 2))
    cols = round(len(images) / rows)

    # Populating figure with sub-plots
    idx = 0
    for r in range(rows):
        for c in range(cols):
            fig.add_subplot(rows, cols, idx + 1)

            if idx < len(images):
                # print(images[idx][0])####
                # print(len(images[idx][0]))
                # print("#########")
                plt.imshow(images[idx][0], cmap="gray")
                idx += 1
    fig.suptitle(title, fontsize=30)

    # Showing the figure
    plt.show()

In [None]:
def show_first_batch(loader):
    for batch in loader:
        show_images(batch[0], "Images in the first batch")
        break

## Loading data


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
data_path = '/content/drive/MyDrive/AI_Research/train'

from torchvision import transforms
from torchvision.datasets import ImageFolder
from torchvision.transforms import Compose, ToTensor, Lambda
from torch.utils.data import DataLoader

transform = Compose([
    transforms.Resize(256),
    transforms.CenterCrop(256),
    transforms.Grayscale(),
    ToTensor(),
    Lambda(lambda x: (x - 0.5) * 2) # Normalizing
])

dataset = ImageFolder(data_path, transform=transform)

loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
# Optionally, show a batch of regular images
show_first_batch(loader)

# Defining the DDPM module

We now proceed and define a DDPM PyTorch module. Since in principle the DDPM scheme is independent of the model architecture used in each denoising step, we define a high-level model that is constructed using a `network` parameter, as well as:

- `n_steps`: number of diffusion steps $T$;
- `min_beta`: value of the first $\beta_t$ ($\beta_1$);
- `max_beta`: value of the last  $\beta_t$ ($\beta_T$);
- `device`: device onto which the model is run;
- `image_chw`: tuple contining dimensionality of images.

The `forward` process of DDPMs benefits from a nice property: We don't actually need to slowly add noise step-by-step, but we can directly skip to whathever step $t$ we want using coefficients $\alpha_bar$.

For the `backward` method instead, we simply let the network do the job.

Note that in this implementation, $t$ is assumed to be a `(N, 1)` tensor, where `N` is the number of images in tensor `x`. We thus support different time-steps for multiple images.

In [None]:
# DDPM class
class MyDDPM(nn.Module):
    def __init__(self, network, n_steps=200, min_beta=10 ** -4, max_beta=0.02, device=None, image_chw=(1, 256, 256)):
        super(MyDDPM, self).__init__()
        self.n_steps = n_steps
        self.device = device
        self.image_chw = image_chw
        self.network = network.to(device)
        self.betas = torch.linspace(min_beta, max_beta, n_steps).to(
            device)  # Number of steps is typically in the order of thousands
        self.alphas = 1 - self.betas
        self.alpha_bars = torch.tensor([torch.prod(self.alphas[:i + 1]) for i in range(len(self.alphas))]).to(device)

    def forward(self, x0, t, eta=None):
        # Make input image more noisy (we can directly skip to the desired step)
        n, c, h, w = x0.shape
        a_bar = self.alpha_bars[t]

        if eta is None:
            eta = torch.randn(n, c, h, w).to(self.device)

        noisy = a_bar.sqrt().reshape(n, 1, 1, 1) * x0 + (1 - a_bar).sqrt().reshape(n, 1, 1, 1) * eta
        return noisy

    def backward(self, x, t):
        # Run each image through the network for each timestep t in the vector t.
        # The network returns its estimation of the noise that was added.
        return self.network(x, t)

## Visualizing forward and backward

Now that we have defined the high-level functioning of a DDPM model, we can already define some related utility functions.

In particular, we will be showing the forward process (which is independent of the denoising network) with the `show_forward` method.

We run the backward pass and generate new images with the `generate_new_images` method, but this time we will put more effort into the function and also make it such that a GIF image is created. Notice that in the paper (https://arxiv.org/pdf/2006.11239.pdf) by Ho et. al., two options are considered for $\sigma_t^2$:

- $\sigma_t^2$ = $\beta_t$
- $\sigma_t^2$ = $\frac{1 - \bar{\alpha_{t-1}}}{1 - \bar{\alpha_{t}}} \beta_t$

In this implementation, they are both a few line-comments away. However, the two terms are rougly always the same and little difference is noticeable. By default, I choose the first option out of simplicity.

In [None]:
def show_forward(ddpm, loader, device):
    # Showing the forward process
    for batch in loader:
        imgs = batch[0]

        show_images(imgs, "Original images")

        for percent in [0.25, 0.5, 0.75, 1]:
            show_images(
                ddpm(imgs.to(device),
                     [int(percent * ddpm.n_steps) - 1 for _ in range(len(imgs))]),
                f"DDPM Noisy images {int(percent * 100)}%"
            )
        break

In [None]:
def generate_new_images(ddpm, n_samples=16, device=None, frames_per_gif=100, gif_name="sampling.gif", c=1, h=256, w=256):
    """Given a DDPM model, a number of samples to be generated and a device, returns some newly generated samples"""
    frame_idxs = np.linspace(0, ddpm.n_steps, frames_per_gif).astype(np.uint)
    frames = []

    with torch.no_grad():
        if device is None:
            device = ddpm.device

        # Starting from random noise
        x = torch.randn(n_samples, c, h, w).to(device)

        for idx, t in enumerate(list(range(ddpm.n_steps))[::-1]):
            # Estimating noise to be removed
            time_tensor = (torch.ones(n_samples, 1) * t).to(device).long()
            # print(x.shape)
            eta_theta = ddpm.backward(x, time_tensor)

            alpha_t = ddpm.alphas[t]
            alpha_t_bar = ddpm.alpha_bars[t]

            # Partially denoising the image
            x = (1 / alpha_t.sqrt()) * (x - (1 - alpha_t) / (1 - alpha_t_bar).sqrt() * eta_theta)

            if t > 0:
                z = torch.randn(n_samples, c, h, w).to(device)

                # Option 1: sigma_t squared = beta_t
                beta_t = ddpm.betas[t]
                sigma_t = beta_t.sqrt()

                # Option 2: sigma_t squared = beta_tilda_t
                # prev_alpha_t_bar = ddpm.alpha_bars[t-1] if t > 0 else ddpm.alphas[0]
                # beta_tilda_t = ((1 - prev_alpha_t_bar)/(1 - alpha_t_bar)) * beta_t
                # sigma_t = beta_tilda_t.sqrt()

                # Adding some more noise like in Langevin Dynamics fashion
                x = x + sigma_t * z

            # Adding frames to the GIF
            if idx in frame_idxs or t == 0:
                # Putting digits in range [0, 255]
                normalized = x.clone()
                for i in range(len(normalized)):
                    normalized[i] -= torch.min(normalized[i])
                    normalized[i] *= 255 / torch.max(normalized[i])

                # Reshaping batch (n, c, h, w) to be a (as much as it gets) square frame
                frame = einops.rearrange(normalized, "(b1 b2) c h w -> (b1 h) (b2 w) c", b1=int(n_samples ** 0.5))
                frame = frame.cpu().numpy().astype(np.uint8)

                # Rendering frame
                frames.append(frame)

    with imageio.get_writer(gif_name, mode="I") as writer:
        for idx, frame in enumerate(frames):
            rgb_frame = np.repeat(frame, 3, axis=2)
            writer.append_data(rgb_frame)

            # Showing the last frame for a longer time
            if idx == len(frames) - 1:
                last_rgb_frame = np.repeat(frames[-1], 3, axis=2)
                for _ in range(frames_per_gif // 3):
                    writer.append_data(last_rgb_frame)

    return x

# UNet architecture

 So now we simply define an architecture that will be responsible of denoising the we should be good to go... Not so fast! While in principle that's true, we have to be careful to conditioning our model with the temporal information.

Remember that the only term of the loss function that we really care about is $||\epsilon - \epsilon_\theta(\sqrt{\bar{\alpha}_t}x_0 + \sqrt{1 - \bar{\alpha}_t}\epsilon, t)||^2$, where $\epsilon$ is some random noise and $\epsilon_\theta$ is the model's prediction of the noise. Now, $\epsilon_\theta$ is a function of both $x$ and $t$ and we don't want to have a distinct model for each denoising step (thousands of independent models), but instead we want to use a single model that takes as input the image $x$ and the scalar value indicating the timestep $t$.

To do so, in practice we use a sinusoidal embedding (function `sinusoidal_embedding`) that maps each time-step to a `time_emb_dim` dimension. These time embeddings are further mapped with some time-embedding MLPs (function `_make_te`) and added to tensors through the network in a channel-wise manner.

In [None]:
import torch
import torch.nn as nn

def sinusoidal_embedding(n, d):
    # Returns the standard positional embedding
    embedding = torch.zeros(n, d)
    wk = torch.tensor([1 / 10_000 ** (2 * j / d) for j in range(d)])
    wk = wk.reshape((1, d))
    t = torch.arange(n).reshape((n, 1))
    embedding[:,::2] = torch.sin(t * wk[:,::2])
    embedding[:,1::2] = torch.cos(t * wk[:,::2])

    return embedding

class MyBlock(nn.Module):
    def __init__(self, shape, in_c, out_c, kernel_size=3, stride=1, padding=1, activation=None, normalize=True):
        super(MyBlock, self).__init__()
        self.ln = nn.LayerNorm(shape)
        self.conv1 = nn.Conv2d(in_c, out_c, kernel_size, stride, padding)
        self.conv2 = nn.Conv2d(out_c, out_c, kernel_size, stride, padding)
        self.activation = nn.SiLU() if activation is None else activation
        self.normalize = normalize

    def forward(self, x):
        out = self.ln(x) if self.normalize else x
        out = self.conv1(out)
        out = self.activation(out)
        out = self.conv2(out)
        out = self.activation(out)
        return out

class MyUNet(nn.Module):
    def __init__(self, n_steps=1000, time_emb_dim=100):
        super(MyUNet, self).__init__()

        # Sinusoidal embedding
        self.time_embed = nn.Embedding(n_steps, time_emb_dim)
        self.time_embed.weight.data = sinusoidal_embedding(n_steps, time_emb_dim)
        self.time_embed.requires_grad_(False)

        # First half
        self.te1 = self._make_te(time_emb_dim, 1)
        self.b1 = nn.Sequential(
            MyBlock((1, 256, 256), 1, 10),
            MyBlock((10, 256, 256), 10, 10),
            MyBlock((10, 256, 256), 10, 10),
        )
        self.down1 = nn.Conv2d(10, 10, 4, 2, 1)

        self.te2 = self._make_te(time_emb_dim, 10)
        self.b2 = nn.Sequential(
            MyBlock((10, 128, 128), 10, 20),
            MyBlock((20, 128, 128), 20, 20),
            MyBlock((20, 128, 128), 20, 20),
        )
        self.down2 = nn.Conv2d(20, 20, 4, 2, 1)

        self.te3 = self._make_te(time_emb_dim, 20)
        self.b3 = nn.Sequential(
            MyBlock((20, 64, 64), 20, 40),
            MyBlock((40, 64, 64), 40, 40),
            MyBlock((40, 64, 64), 40, 40),
        )
        self.down3 = nn.Conv2d(40, 40, 4, 2, 1) # New Line

        self.te4 = self._make_te(time_emb_dim, 40)
        self.b4 = nn.Sequential(
            MyBlock((40, 32, 32), 40, 80),
            MyBlock((80, 32, 32), 80, 80),
            MyBlock((80, 32, 32), 80, 80)
        ) # New Lines
        self.down4 = nn.Conv2d(80, 80, 4, 2, 1)

        self.te5 = self._make_te(time_emb_dim, 80)
        self.b5 = nn.Sequential(
            MyBlock((80, 16, 16), 80, 160),
            MyBlock((160, 16, 16), 160, 160),
            MyBlock((160, 16, 16), 160, 160)
        )
        self.down5 = nn.Sequential(
            nn.Conv2d(160, 160, 4, 2, 1),
            nn.SiLU(),
            nn.Conv2d(160, 160, 4, 2, 1)
        )

        # Bottleneck
        self.te_mid = self._make_te(time_emb_dim, 160)

        self.b_mid = nn.Sequential(
            MyBlock((160, 4, 4), 160, 80),
            MyBlock((80, 4, 4), 80, 80),
            MyBlock((80, 4, 4), 80, 160)
        ) # ############

        # Second half
        self.up1 = nn.Sequential(
            nn.ConvTranspose2d(160, 160, 4, 2, 1),
            nn.SiLU(),
            nn.ConvTranspose2d(160, 160, 4, 2, 1)
        )
        self.te6 = self._make_te(time_emb_dim, 320)
        self.b6 = nn.Sequential(
            MyBlock((320, 16, 16), 320, 160),
            MyBlock((160, 16, 16), 160, 80),
            MyBlock((80, 16, 16), 80, 80)
        )

        self.up2 = nn.ConvTranspose2d(80, 80, 4, 2, 1)
        self.te7 = self._make_te(time_emb_dim, 160)
        self.b7 = nn.Sequential(
            MyBlock((160, 32, 32), 160, 80),
            MyBlock((80, 32, 32), 80, 40),
            MyBlock((40, 32, 32), 40, 40)
        )

        self.up3 = nn.ConvTranspose2d(40, 40, 4, 2, 1)
        self.te8 = self._make_te(time_emb_dim, 80)
        self.b8 = nn.Sequential(
            MyBlock((80, 64, 64), 80, 40),
            MyBlock((40, 64, 64), 40, 20),
            MyBlock((20, 64, 64), 20, 20)
        )

        self.up4 = nn.ConvTranspose2d(20, 20, 4, 2, 1)
        self.te9 = self._make_te(time_emb_dim, 40)
        self.b9 = nn.Sequential(
            MyBlock((40, 128, 128), 40, 20),
            MyBlock((20, 128, 128), 20, 10),
            MyBlock((10, 128, 128), 10, 10)
        )

        self.up5 = nn.ConvTranspose2d(10, 10, 4, 2, 1)
        self.te_out = self._make_te(time_emb_dim, 20)
        self.b_out = nn.Sequential(
            MyBlock((20, 256, 256), 20, 10),
            MyBlock((10, 256, 256), 10, 10),
            MyBlock((10, 256, 256), 10, 10, normalize=False)
        )

        self.conv_out = nn.Conv2d(10, 1, 3, 1, 1)

    def forward(self, x, t):
        # x is (N, 2, 64, 64) (image with positional embedding stacked on channel dimension)
        t = self.time_embed(t)
        n = len(x)
        out1 = self.b1(x + self.te1(t).reshape(n, -1, 1, 1))  # (N, 10, 64, 64)
        out2 = self.b2(self.down1(out1) + self.te2(t).reshape(n, -1, 1, 1))  # (N, 20, 32, 32)
        out3 = self.b3(self.down2(out2) + self.te3(t).reshape(n, -1, 1, 1))  # (N, 40, 16, 16)
        #######
        out4 = self.b4(self.down3(out3) + self.te4(t).reshape(n, -1, 1, 1))  # (N, 40, 16, 16)
        out5 = self.b5(self.down4(out4) + self.te5(t).reshape(n, -1, 1, 1))  # (N, 40, 16, 16)


        out_mid = self.b_mid(self.down5(out5) + self.te_mid(t).reshape(n, -1, 1, 1))  # (N, 40, 8, 8)

        out6 = torch.cat((out5, self.up1(out_mid)), dim=1)  # (N, 80, 16, 16)
        out6 = self.b6(out6 + self.te6(t).reshape(n, -1, 1, 1))  # (N, 20, 16, 16)

        out7 = torch.cat((out4, self.up2(out6)), dim=1)  # (N, 40, 32, 32)
        out7 = self.b7(out7 + self.te7(t).reshape(n, -1, 1, 1))  # (N, 10, 32, 32)

        out8 = torch.cat((out3, self.up3(out7)), dim=1)  # (N, 40, 32, 32)
        out8 = self.b8(out8 + self.te8(t).reshape(n, -1, 1, 1))  # (N, 10, 32, 32)

        out9 = torch.cat((out2, self.up4(out8)), dim=1)  # (N, 40, 32, 32)
        out9 = self.b9(out9 + self.te9(t).reshape(n, -1, 1, 1))  # (N, 10, 32, 32)

        out = torch.cat((out1, self.up5(out9)), dim=1)  # (N, 20, 64, 64)
        out = self.b_out(out + self.te_out(t).reshape(n, -1, 1, 1))  # (N, 1, 64, 64)

        out = self.conv_out(out)

        return out

    def _make_te(self, dim_in, dim_out):
        return nn.Sequential(
            nn.Linear(dim_in, dim_out),
            nn.SiLU(),
            nn.Linear(dim_out, dim_out)
        )


# Instantiating the model


In [None]:
# Defining model
n_steps, min_beta, max_beta = 1000, 10 ** -4, 0.02  # Originally used by the authors
ddpm = MyDDPM(MyUNet(n_steps), n_steps=n_steps, min_beta=min_beta, max_beta=max_beta, device=device)

In [None]:
sum([p.numel() for p in ddpm.parameters()])

# Optional visualizations

In [None]:
# Optionally, load a pre-trained model that will be further trained
# ddpm.load_state_dict(torch.load(store_path, map_location=device))

In [None]:
# Optionally, show the diffusion (forward) process
show_forward(ddpm, loader, device)

In [None]:
# Optionally, show the denoising (backward) process
generated = generate_new_images(ddpm, gif_name="before_training.gif")
show_images(generated, "Images generated before training")

# Training loop

The training loop is fairly simple. With each batch of our dataset, we run the forward process on the batch. We use a different timesteps $t$ for each of the `N` images in our `(N, C, H, W)` batch tensor to guarantee more training stability. The added noise is a `(N, C, H, W)` tensor $\epsilon$.

Once we obtained the noisy images, we try to predict $\epsilon$ out of them with our network. We optimize with a simple Mean-Squared Error (MSE) loss.

In [None]:
def training_loop(ddpm, loader, n_epochs, optim, device, display=False, store_path="ddpm_model.pt",early_stopping_patience=3):
    mse = nn.MSELoss()
    best_loss = float("inf")
    n_steps = ddpm.n_steps
    loss_list = []
    early_stopping_counter = 0
    previous_loss = float("inf")

    for epoch in tqdm(range(n_epochs), desc=f"Training progress", colour="#00ff00"):
        epoch_loss = 0.0
        for step, batch in enumerate(tqdm(loader, leave=False, desc=f"Epoch {epoch + 1}/{n_epochs}", colour="#005500")):
            # Loading data
            x0 = batch[0].to(device)
            n = len(x0)

            # Picking some noise for each of the images in the batch, a timestep and the respective alpha_bars
            eta = torch.randn_like(x0).to(device)
            t = torch.randint(0, n_steps, (n,)).to(device)

            # Computing the noisy image based on x0 and the time-step (forward process)
            noisy_imgs = ddpm(x0, t, eta)

            # Getting model estimation of noise based on the images and the time-step
            eta_theta = ddpm.backward(noisy_imgs, t.reshape(n, -1))

            # Optimizing the MSE between the noise plugged and the predicted noise
            loss = mse(eta_theta, eta)
            optim.zero_grad()
            loss.backward()
            optim.step()

            epoch_loss += loss.item() * len(x0) / len(loader.dataset)

        loss_list.append(epoch_loss)


        # Display images generated at this epoch
        if display:
            show_images(generate_new_images(ddpm, device=device), f"Images generated at epoch {epoch + 1}")

        log_string = f"Loss at epoch {epoch + 1}: {epoch_loss:.3f}"

        # Storing the model
        if best_loss > epoch_loss:
            best_loss = epoch_loss
            torch.save(ddpm.state_dict(), store_path)
            log_string += " --> Best model ever (stored)"

        print(log_string)

    return loss_list

In [None]:
# Training
store_path = "/content/drive/MyDrive/AI_Research/256_DDPM_Xray/xray-256.pt"
if not no_train:
    loss_list = training_loop(ddpm, loader, n_epochs, optim=Adam(ddpm.parameters(), lr), device=device, store_path=store_path)

# Plot the training loss using Matplotlib.
print(loss_list)
plt.plot(range(1, len(loss_list) + 1), loss_list, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss over Epochs')
plt.legend()
plt.show()


# Testing the trained model

Time to check how well our model does. We re-store the best performing model according to our training loss and set it to evaluation mode. Finally, we display a batch of generated images and the relative obtained and nice GIF.

In [None]:
# Loading the trained model
best_model = MyDDPM(MyUNet(), n_steps=n_steps, device=device)
best_model.load_state_dict(torch.load(store_path, map_location=device))
best_model.eval()
print("Model loaded")

In [None]:
print("Generating new images")
generated = generate_new_images(
        best_model,
        n_samples=9,
        device=device,
        gif_name="xray.gif"
    )
show_images(generated, "Final result")

# Loading saved Best Model and Generate Images

In [None]:
# Load a pre-trained model that will be further trained
ddpm.load_state_dict(torch.load('/content/drive/MyDrive/AI_Research/256_DDPM_Xray/xray-256_50Epochs.pt', map_location=device))

In [None]:
generated = generate_new_images(
        ddpm,
        n_samples=9,
        device=device,
        gif_name="xray.gif"
    )
show_images(generated, "Final result")

# Saving Newly Generated Images

In [None]:
from PIL import Image
import os
import numpy as np

def save_images(images, save_path='/content/drive/MyDrive/AI_Research/256_DDPM_Xray/New_Images'):
    # Create the folder if it doesn't exist
    os.makedirs(save_path, exist_ok=True)

    # Converting images to CPU numpy arrays and handling the shape and data type
    if type(images) is torch.Tensor:
        images = images.detach().cpu().numpy()

    for i in range(len(images)):
        # Reshape the image to (256, 256) and normalize pixel values to [0, 1]
        image_data = np.squeeze(images[i])  # Remove the singleton dimensions if present
        image_data = (image_data - np.min(image_data)) / (np.max(image_data) - np.min(image_data))

        # Scale the pixel values from 0-1 to 0-255
        scaled_image = (image_data * 255).astype('uint8')

        # Create an image object from the numpy array
        image_obj = Image.fromarray(scaled_image, mode='L')  # 'L' mode for grayscale images

        # Save the image as a PNG file
        image_filename = f'image_{i}.png'
        image_path = os.path.join(save_path, image_filename)
        image_obj.save(image_path)

        print(f'Saved image {i} to {image_path}')


In [None]:
save_images(generated)

# Evaluation

Note: Make sure to create two seperate folders and include the same amount of images from positive and negative cases. In this case you will need 9 real positive images and real negative images in two separate folders.

In [None]:
# Set the folder paths

# Positive Cases
truth_folder_path = "<folder path where positive cases are stored>"

# Negative Cases
negative_folder_path = "<folder path where negative cases are stored>"

# Synthetically Generated Images
synthetic_folder_path = "folder path where newly generated images are stored"

## MSE

In [None]:
import os
import cv2
import numpy as np

def mse_between_folders(truth_folder, synthetic_folder):
    truth_images = os.listdir(truth_folder)
    synthetic_images = os.listdir(synthetic_folder)

    # Sort the images to ensure they match each other for comparison
    truth_images.sort()
    synthetic_images.sort()

    total_mse = 0.0

    for truth_img_name, synthetic_img_name in zip(truth_images, synthetic_images):
        truth_img_path = os.path.join(truth_folder, truth_img_name)
        synthetic_img_path = os.path.join(synthetic_folder, synthetic_img_name)

        # Read images using OpenCV (ensure you have OpenCV installed: pip install opencv-python)
        truth_img = cv2.imread(truth_img_path, cv2.IMREAD_GRAYSCALE)
        truth_img = cv2.resize(truth_img, (256, 256))

        synthetic_img = cv2.imread(synthetic_img_path, cv2.IMREAD_GRAYSCALE)
        synthetic_img = cv2.resize(synthetic_img, (256, 256))


        # Check if the images have the same dimensions
        if truth_img.shape != synthetic_img.shape:
            raise ValueError(f"The dimensions of {truth_img_name} and {synthetic_img_name} do not match.")

        # Calculate MSE between the two images
        mse = np.mean((truth_img - synthetic_img) ** 2)
        # print(mse)
        total_mse += mse

    # Calculate the average MSE over all images
    average_mse = total_mse / len(truth_images)

    return average_mse


In [None]:
# Between Generated images and real Pneumonia images
mse_value = mse_between_folders(truth_folder_path, synthetic_folder_path)
print("Average MSE:", mse_value)


In [None]:
# Between Generated images and Non Pneumonia images
mse_value = mse_between_folders(negative_folder_path, synthetic_folder_path)
print("Average MSE:", mse_value)

In [None]:
# Between Real Peunomia and Non Pneumonia images
mse_value = mse_between_folders(negative_folder_path, truth_folder_path)
print("Average MSE:", mse_value)

## Structural Similarity Index (SSIM)

SSIM is a perceptual metric that measures the structural similarity between two images. A higher SSIM value indicates a better similarity between the synthetic and ground truth images.

In [None]:
from skimage.metrics import structural_similarity as ssim
import cv2

def evaluate_ssim(truth_folder, synthetic_folder):
    truth_images = os.listdir(truth_folder)
    synthetic_images = os.listdir(synthetic_folder)

    # Sort the images to ensure they match each other for comparison
    truth_images.sort()
    synthetic_images.sort()

    total_ssim = 0.0

    for truth_img_name, synthetic_img_name in zip(truth_images, synthetic_images):
        truth_img_path = os.path.join(truth_folder, truth_img_name)
        synthetic_img_path = os.path.join(synthetic_folder, synthetic_img_name)

        # Read images using OpenCV (ensure you have OpenCV installed: pip install opencv-python)
        truth_img = cv2.imread(truth_img_path, cv2.IMREAD_GRAYSCALE)
        truth_img = cv2.resize(truth_img, (256, 256))

        synthetic_img = cv2.imread(synthetic_img_path, cv2.IMREAD_GRAYSCALE)
        synthetic_img = cv2.resize(synthetic_img, (256, 256))


        # Check if the images have the same dimensions
        if truth_img.shape != synthetic_img.shape:
            raise ValueError(f"The dimensions of {truth_img_name} and {synthetic_img_name} do not match.")

        # Calculate SSIM between the two images
        ssim_score = ssim(truth_img, synthetic_img, data_range=synthetic_img.max() - synthetic_img.min())
        total_ssim += ssim_score


    # Calculate the average SSIM over all images
    average_ssim = total_ssim / len(truth_images)

    return average_ssim


In [None]:
# Between Generated images and real Pneumonia images
ssim_score = evaluate_ssim(truth_folder_path, synthetic_folder_path)
print("Average SSIM:", ssim_score)

In [None]:
# Between Generated images and Non Pneumonia images
ssim_score = evaluate_ssim(negative_folder_path, synthetic_folder_path)
print("Average SSIM:", ssim_score)

In [None]:
# Between Real Peunomia and Non Pneumonia images
ssim_score = evaluate_ssim(negative_folder_path, truth_folder_path)
print("Average SSIM:", ssim_score)

## Peak Signal-to-Noise Ratio (PSNR)

In [None]:
from skimage.metrics import peak_signal_noise_ratio as psnr
import cv2

def evaluate_psnr(truth_folder, synthetic_folder):
    truth_images = os.listdir(truth_folder)
    synthetic_images = os.listdir(synthetic_folder)

    # Sort the images to ensure they match each other for comparison
    truth_images.sort()
    synthetic_images.sort()

    total_psnr = 0.0

    for truth_img_name, synthetic_img_name in zip(truth_images, synthetic_images):
        truth_img_path = os.path.join(truth_folder, truth_img_name)
        synthetic_img_path = os.path.join(synthetic_folder, synthetic_img_name)

        # Read images using OpenCV (ensure you have OpenCV installed: pip install opencv-python)
        truth_img = cv2.imread(truth_img_path, cv2.IMREAD_GRAYSCALE)
        truth_img = cv2.resize(truth_img, (256, 256))

        synthetic_img = cv2.imread(synthetic_img_path, cv2.IMREAD_GRAYSCALE)
        synthetic_img = cv2.resize(synthetic_img, (256, 256))


        # Check if the images have the same dimensions
        if truth_img.shape != synthetic_img.shape:
            raise ValueError(f"The dimensions of {truth_img_name} and {synthetic_img_name} do not match.")

        # Calculate SSIM between the two images
        psnr_score = psnr(truth_img, synthetic_img, data_range=synthetic_img.max() - synthetic_img.min())

        total_psnr += ssim_score


    # Calculate the average SSIM over all images
    average_psnr = total_psnr / len(truth_images)

    return average_psnr

In [None]:
# Between Generated images and real Pneumonia images
psnr_score = evaluate_psnr(truth_folder_path, synthetic_folder_path)
print("Average PSNR:", psnr_score)

In [None]:
# Between Generated images and Non Pneumonia images
psnr_score = evaluate_psnr(negative_folder_path, synthetic_folder_path)
print("Average PSNR:", psnr_score)

In [None]:
# Between Generated images and Non Pneumonia images
psnr_score = evaluate_psnr(negative_folder_path, truth_folder_path)
print("Average PSNR:", psnr_score)

# Visualizing the diffusion

In [None]:
from IPython.display import display, Image
from PIL import Image as PILImage

def resize_gif(input_path, output_path, scale=0.5):
    gif_image = PILImage.open(input_path)

    # Determine the new size
    width, height = gif_image.size
    new_width, new_height = int(width * scale), int(height * scale)

    # Create a list to store frames
    frames = []

    # Iterate through each frame of the GIF
    try:
        while True:
            frames.append(gif_image.copy())
            gif_image.seek(len(frames))  # Move to the next frame
    except EOFError:
        pass

    # Resize each frame and save to a new list
    resized_frames = [frame.resize((new_width, new_height), PILImage.ANTIALIAS) for frame in frames]

    # Save the resized frames as a new GIF
    resized_frames[0].save(output_path, save_all=True, append_images=resized_frames[1:], loop=0)

# Example usage: Resize the 'xray.gif' to 50% and display it
input_gif_path = 'xray.gif'
output_gif_path = 'resized_xray.gif'
resize_gif(input_gif_path, output_gif_path, scale=0.5)
display(Image(output_gif_path))


In [None]:
# # Download the gif
# from google.colab import files

# files.download('./xray.gif')