# 1 Importing modules

In [1]:
# Importing relevant modules

import os
import torch
import torch.nn as nn
from matplotlib import pyplot as plt
from tqdm import tqdm
from torch import optim
import logging
from torch.utils.tensorboard import SummaryWriter
import torchvision
from PIL import Image
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader


In [2]:
# Download data
# Careful - if you have downloaded the data once, you must not run this again


!mkdir Landscapes
%cd Landscapes
!mkdir Landscapes
%cd Landscapes
!wget -O Landscapes.zip -P /content/Landscapes/  https://www.dropbox.com/sh/7g4e186l8f43s57/AADHqpUCbTC10AviqG9MqFz2a?dl=0
!unzip Landscapes.zip > /dev/null
!rm Landscapes.zip
%cd /content/

/Users/nilsmart96/Code/fs/ainf/fs-ainf-class/Assignment Solutions/Block 2/Landscapes
/Users/nilsmart96/Code/fs/ainf/fs-ainf-class/Assignment Solutions/Block 2/Landscapes/Landscapes
zsh:1: no matches found: https://www.dropbox.com/sh/7g4e186l8f43s57/AADHqpUCbTC10AviqG9MqFz2a?dl=0
unzip:  cannot find or open Landscapes.zip, Landscapes.zip.zip or Landscapes.zip.ZIP.
rm: Landscapes.zip: No such file or directory
[Errno 2] No such file or directory: '/content/'
/Users/nilsmart96/Code/fs/ainf/fs-ainf-class/Assignment Solutions/Block 2/Landscapes/Landscapes


In [None]:
#from google.colab import drive
#drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [3]:
logging.basicConfig(format="%(asctime)s - %(levelname)s: %(message)s", level=logging.INFO, datefmt="%I:%M:%S")

# 2 Forward and Backward diffusion

We first need to build the inputs for our model, which are more and more noisy images. Instead of doing this sequentially, we can use the closed form to calculate the image for any of the timesteps individually. 

**Key Takeaways**:
- The noise-levels/variances can be pre-computed
- There are different types of variance schedules
- We can sample each timestep image independently (Sums of Gaussians is also Gaussian)
- No model is needed in this forward step

In [4]:
# Key components of diffusion model - noising schedule, function for noising images, function for sampling images

class Diffusion:
    def __init__(self, noise_steps=1000, beta_start=1e-4, beta_end=0.02, img_size=256, device="cuda"):
        self.noise_steps = noise_steps  # 1000, time steps as proposed in first papes (newer versions need less time steps)
        self.beta_start = beta_start  # lower end for beta
        self.beta_end = beta_end    # Upper end for beta
        self.img_size = img_size   # Image size 64*64
        self.device = device        # GPU?

        # Linear beta schedule for simplicity (not cosine Schedule)
        self.beta = self.prepare_noise_schedule().to(device)
        # Task: Define Alpha see lecture notes for definition of alpha
        self.alpha = (1 - self.beta).cumprod(dim=0)
        # Task: Define Alpha hat - see lecture notes for definition of alpha_hat
        self.alpha_hat = self.alpha.cumprod(dim=0)

    def prepare_noise_schedule(self):
        '''Creates linspace from start beta to end beta with the number of noise_steps) 
        '''
        return torch.linspace(self.beta_start, self.beta_end, self.noise_steps)

    def noise_images(self, x, t):
        '''Creats noisy versions of images for given timestep
           First option is to iteratively add noise over and over until we are at required timestep
           However closed form to directly get to timestep used here
        '''
        sqrt_alpha_hat = torch.sqrt(self.alpha_hat[t])[:, None, None, None]      # take the squareroot of alpha_hat and get into right shape (see lecture notes)
        # TASK: required noise factor Required noise factor
        sqrt_one_minus_alpha_hat = torch.sqrt(1 - self.alpha_hat[t])[:, None, None, None]
        #TASK :generate random numbers from a normal distribution with mean 0 and std 1 with same dimensionality as x
        Ɛ = torch.randn_like(x)
        return sqrt_alpha_hat * x + sqrt_one_minus_alpha_hat * Ɛ, Ɛ                 # first factor is scaling of the mean, second factor is adding the pure noise scaled by the standard deviation

    def sample_timesteps(self, n):
        return torch.randint(low=1, high=self.noise_steps, size=(n,))      # sample n timesteps between 1 and the (max) number of noise_steps (which is 1000 in our example)

    def sample(self, model, n):
        logging.info(f"Sampling {n} new images....")
        #TASK: Set model to evaluation mode
        model.eval()
        #Task set no grad, as gradients accumulate (are saved) otherwise leading to out of memory
        with torch.no_grad():
            x = torch.randn((n, 3, self.img_size, self.img_size)).to(self.device)       # Create initial images sampling from normal distribution using torch.randn
            #Task: Loop going over 1000 time steps in reverse order, starting with highest and going until 1
            for i in reversed(range(1, self.noise_steps + 1)):
                t = (torch.ones(n) * i).long().to(self.device)                          # Creating time-steps by creating tensor of length n
                predicted_noise = model(x, t)                                           # Time steps fed into model together with current images
                alpha = self.alpha[t][:, None, None, None]
                alpha_hat = self.alpha_hat[t][:, None, None, None]
                beta = self.beta[t][:, None, None, None]
                if i > 1:
                    noise = torch.randn_like(x)                                         # Need noise for time steps greater than one (scaled with mean an variance below)
                else:
                    noise = torch.zeros_like(x)
                x = 1 / torch.sqrt(alpha) * (x - ((1 - alpha) / (torch.sqrt(1 - alpha_hat))) * predicted_noise) + torch.sqrt(beta) * noise    # remove a little bit of noise according to formula (Algorithm 2 - sampling ) shown in lecture handout
        model.train()                                                                 # set model back to training
        x = (x.clamp(-1, 1) + 1) / 2                                                  # clamp outputs into valid range -1 to 1 , plus 1 divided by to is to bring range of values back between 0 and 1
        x = (x * 255).type(torch.uint8)                                               # multiply by 255 to bring into valid pixel range
        return x

# 3 Defining the UNet neural network



For a great introduction to UNets, have a look at this post: https://amaarora.github.io/2020/09/13/unet.html.

Key Takeaways:

- We use a simple form of a UNet for to predict the noise in the image.
- The input is a noisy image, the ouput the noise in the image
- Because the parameters are shared accross time, we need to tell the network in which timestep we are
- The Timestep is encoded by the transformer Sinusoidal Embedding
We output one single value (mean), because the variance is fixed

In [5]:
# The UNet achitecture consists of an encoder and a decoder with a bottleneck in between
# The encoder and the decoder each have Double Convolutional layers and downsampling blocks ...
# ...(in the case of the encoder) as well as upsampling blocks (in the case of the dencoder)
# We are going to implement the double convoluational layers, upsampling and downsampling blocks first and then the UNet architecture

import torch
import torch.nn as nn
import torch.nn.functional as F

# Convolution block consisting of 

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels, mid_channels=None, residual=False):
        super().__init__()
        self.residual = residual
        if not mid_channels:
            mid_channels = out_channels
        #Task: Set-up 2d covolution followed by group norm and Gelu activation,followed by another 2d convoluation and a group norm 
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, stride=1),
            nn.GroupNorm(32, mid_channels),
            nn.GELU(),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, stride=1),
            nn.GroupNorm(32, out_channels)
        )

    def forward(self, x):
        if self.residual:
            return F.gelu(x + self.double_conv(x))
        else:
            return self.double_conv(x)

# Downsample block
class Down(nn.Module):
    def __init__(self, in_channels, out_channels, emb_dim=256):
        super().__init__()
        #Task: Set-up 2d covolution followed by group norm and Gelu activation,followed by another 2d convoluation and a group norm 
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=1),
            nn.GroupNorm(32, out_channels),
            nn.GELU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, stride=1),
            nn.GroupNorm(32, out_channels)
        )

        self.emb_layer = nn.Sequential(            # linear projection to bring time dimension to right dimension
            nn.SiLU(),
            nn.Linear(
                emb_dim,
                out_channels
            ),
        )

    def forward(self, x, t):
        x = self.maxpool_conv(x)
        emb = self.emb_layer(t)[:, :, None, None].repeat(1, 1, x.shape[-2], x.shape[-1]) 
        return x + emb


# As downsample block but with upsample operation instead of maxpool
# Also takes skip connection from encoder
# After upsampling X concatenate with skip connection and feed through convoluational block
class Up(nn.Module):
    def __init__(self, in_channels, out_channels, emb_dim=256):
        super().__init__()

        self.up = nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)
        self.conv = nn.Sequential(
            DoubleConv(in_channels, in_channels, residual=True),
            DoubleConv(in_channels, out_channels, in_channels // 2),
        )

        self.emb_layer = nn.Sequential(
            nn.SiLU(),
            nn.Linear(
                emb_dim,
                out_channels
            ),
        )

    def forward(self, x, skip_x, t):
        x = self.up(x)
        x = torch.cat([skip_x, x], dim=1)
        x = self.conv(x)
        emb = self.emb_layer(t)[:, :, None, None].repeat(1, 1, x.shape[-2], x.shape[-1])
        return x + emb


# Attention block as used in transformers
class SelfAttention(nn.Module):
    def __init__(self, channels, size):
        super(SelfAttention, self).__init__()
        self.channels = channels
        self.size = size
        # Task: implement multi head attentiona layer with (channels, 4, batch_first=True)
        self.mha = nn.MultiheadAttention(embed_dim=channels, num_heads=4, batch_first=True)
        self.ln = nn.LayerNorm([channels])
        self.ff_self = nn.Sequential(
            nn.LayerNorm([channels]),
            nn.Linear(channels, channels),
            nn.GELU(),
            nn.Linear(channels, channels),
        )

    def forward(self, x):
        x = x.view(-1, self.channels, self.size * self.size).swapaxes(1, 2)
        x_ln = self.ln(x)
        attention_value, _ = self.mha(x_ln, x_ln, x_ln)
        attention_value = attention_value + x
        attention_value = self.ff_self(attention_value) + attention_value
        return attention_value.swapaxes(2, 1).view(-1, self.channels, self.size, self.size)



class UNet(nn.Module):
    def __init__(self, c_in=3, c_out=3, time_dim=256, device="cuda"):   #input and output channels both 3 since we have RGB
        super().__init__()
        self.device = device
        self.time_dim = time_dim          # dimension of timestep embedding (see below)
        # UNet has encoder, bottleneck and decoder
      
        self.inc = DoubleConv(c_in, 64)   # Double conv wrapper for two convoluational layers

        # three downsample blocks each followed by self attention block
        # Args downsample block are  (input channels, output channels)
        # Args for self attention are (channel dimension, current input resolution) 


        #Task: implement 3 downsampling block follows by self attention. Each downsample block reduces size by 2 64(start)-32-16-8, at the same time the number of channels is doubled with each layer and increases to 64-128-256 (it is double with each block)
        self.down1 = Down(64, 128)
        self.sa1 = SelfAttention(128, 32)
        self.down2 = Down(128, 256)
        self.sa2 = SelfAttention(256, 16)
        self.down3 = Down(256, 512)
        self.sa3 = SelfAttention(512, 8)
        
        # Bottleneck with convolutional layers 
        self.bot1 = DoubleConv(256, 512)
        self.bot2 = DoubleConv(512, 512)
        self.bot3 = DoubleConv(512, 256)

        # decoder which is revers of encoder
        # 3 upsampling blocks followed by attention blocks
        #Task: implement 3 upsampling blocks each followed by attention blocsk bringing the size back to 8(start)-16-31-64 - the number of channes decreases 512-256-128 it is halvd with each layer
        self.up1 = Up(512, 256, 256)
        self.sa4 = SelfAttention(256, 16)
        self.up2 = Up(256, 128, 128)
        self.sa5 = SelfAttention(128, 32)
        self.up3 = Up(128, 64, 64)
        self.sa6 = SelfAttention(64, 64)

        # 2d convolutional layer  projecting back to output dimensions with args  (final size, c_out, kernel_size=1)
        self.outc = nn.Conv2d(64, c_out, kernel_size=1)

    def pos_encoding(self, t, channels):  
        inv_freq = 1.0 / (
            10000
            ** (torch.arange(0, channels, 2, device=self.device).float() / channels)
        )
        pos_enc_a = torch.sin(t.repeat(1, channels // 2) * inv_freq)
        pos_enc_b = torch.cos(t.repeat(1, channels // 2) * inv_freq)
        pos_enc = torch.cat([pos_enc_a, pos_enc_b], dim=-1)
        return pos_enc

    # Forward pass takes as input noised images and timesteps (tensor with integer timestep values in it)
    def forward(self, x, t):
        t = t.unsqueeze(-1).type(torch.float)
        t = self.pos_encoding(t, self.time_dim)  # using positional encoding (see NLP Lecture on transformers)
        

        # Same as init with upsampling blocks taking skip connections from encoder as well
        x1 = self.inc(x)
        x2 = self.down1(x1, t)
        x2 = self.sa1(x2)
        x3 = self.down2(x2, t)
        x3 = self.sa2(x3)
        x4 = self.down3(x3, t)
        x4 = self.sa3(x4)

        x4 = self.bot1(x4)
        x4 = self.bot2(x4)
        x4 = self.bot3(x4)

        x = self.up1(x4, x3, t)
        x = self.sa4(x)
        x = self.up2(x, x2, t)
        x = self.sa5(x)
        x = self.up3(x, x1, t)
        x = self.sa6(x)
        output = self.outc(x)
        return output



# 4 Helper functions

In [6]:
def plot_images(images):
    plt.figure(figsize=(32, 32))
    plt.imshow(torch.cat([
        torch.cat([i for i in images.cpu()], dim=-1),
    ], dim=-2).permute(1, 2, 0).cpu())
    plt.show()


def save_images(images, path, **kwargs):
    grid = torchvision.utils.make_grid(images, **kwargs)
    ndarr = grid.permute(1, 2, 0).to('cpu').numpy()
    im = Image.fromarray(ndarr)
    im.save(path)


def get_data(args):
    transforms = torchvision.transforms.Compose([
        torchvision.transforms.Resize(80),  # args.image_size + 1/4 *args.image_size
        torchvision.transforms.RandomResizedCrop(args.image_size, scale=(0.8, 1.0)),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    dataset = torchvision.datasets.ImageFolder(args.dataset_path, transform=transforms)
    dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True)
    return dataloader

def setup_logging(run_name):
    os.makedirs("models", exist_ok=True)
    os.makedirs("results", exist_ok=True)
    os.makedirs(os.path.join("models", run_name), exist_ok=True)
    os.makedirs(os.path.join("results", run_name), exist_ok=True)

#5 Training

**Key Takeaways:**
- After some maths we end up with a very simple loss function
- There are other possible choices like L2 loss ect.


In [7]:
def train(args):
    setup_logging(args.run_name)
    device = args.device
    dataloader = get_data(args)
    model = UNet().to(device)
    optimizer = optim.AdamW(model.parameters(), lr=args.lr)
    mse = nn.MSELoss()
    diffusion = Diffusion(img_size=args.image_size, device=device)
    logger = SummaryWriter(os.path.join("runs", args.run_name))
    l = len(dataloader)

    for epoch in range(args.epochs):    #See algorithm 1 in the lecture handount
        logging.info(f"Starting epoch {epoch}:")
        pbar = tqdm(dataloader)
        for i, (images, _) in enumerate(pbar):
            images = images.to(device)
            t = diffusion.sample_timesteps(images.shape[0]).to(device)
            x_t, noise = diffusion.noise_images(images, t)
            predicted_noise = model(x_t, t)
            loss = mse(noise, predicted_noise)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            pbar.set_postfix(MSE=loss.item())
            logger.add_scalar("MSE", loss.item(), global_step=epoch * l + i)

        sampled_images = diffusion.sample(model, n=images.shape[0])
        save_images(sampled_images, os.path.join("results", args.run_name, f"{epoch}.jpg"))
        torch.save(model.state_dict(), os.path.join("models", args.run_name, f"ckpt.pt"))

In [10]:
class Args:
    def __init__(self):
        self.run_name = "DDPM_Uncondtional"
        self.epochs = 10
        self.batch_size = 6
        self.image_size = 64
        self.dataset_path = r"/content/Landscapes"
        self.device = "cuda"
        self.lr = 3e-4

def launch():
    args = Args()
    train(args)



In [11]:
# Note- to get good convergence and reasonable images you would have to train for at least 500 epochs 
# You can try this out if you have a run-time that does not disconnect
launch()

FileNotFoundError: [Errno 2] No such file or directory: '/Landscapes/Landscapes'