Code: Part 1

a) The network F(x, t, θ) computes the denoising score which is the conditional mean in the diffusion process by using the U-Net architecture. The ScoreNet class is a time-dependent score-based model that uses U-Net architecture. At self.embed the time t is encoded using Gaussian Fourier features which are then passed through a dense layer. The Gaussian Fourier Projection class turns the time t into a high dimensional embedding to capture time. The output is then transofrmed by a dense layer to produce a time embedding. The first convolutional block processes the input image x. The input image is passed through conv1 and the time embedding is projected to match the feature map size using a dense layer (dense1). These are added rogether, normalized using gnorm1, and activated using a switch activation function. The network then downsamples the feature maps through a series of convolutional layers (conv2, conv3, conv4). Each downsampling block consists of a convolutional layer with a stride of 2, a dense layer that projects the time embedding to match the current feature map size, group normalization, and the swish activation function. Once we reach the bottleneck, the network unsamples the feature maps through transposed convolutional layers. Each unsampling block consists of a transposed convolutional layer. The swish activation function used helps maintain a flow of gradiants during backpropogation. Time t has an affect on every stage of this network. The encoded time embedding is added to the feature maps after each convolutional layer. Adding the encoded time embedding to the feature maps makes sure that the model is aware of the current timestep, allowing it to make time-dependent adjustments to the output.

In [19]:
#@title Defining a time-dependent score-based model (double click to expand or collapse)
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset, random_split
from torchvision import transforms
from sklearn.model_selection import train_test_split

class GaussianFourierProjection(nn.Module):
  """Gaussian random features for encoding time steps."""
  def __init__(self, embed_dim, scale=30.):
    super().__init__()
    # Randomly sample weights during initialization. These weights are fixed
    # during optimization and are not trainable.
    self.W = nn.Parameter(torch.randn(embed_dim // 2) * scale, requires_grad=False)
  def forward(self, x):
    x_proj = x * self.W * 2 * np.pi
    return torch.cat([torch.sin(x_proj), torch.cos(x_proj)], dim=-1)


class Dense(nn.Module):
  """A fully connected layer that reshapes outputs to feature maps."""
  def __init__(self, input_dim, output_dim):
    super().__init__()
    self.dense = nn.Linear(input_dim, output_dim)
  def forward(self, x):
    return self.dense(x)[..., None, None]


class ScoreNet(nn.Module):
  """A time-dependent score-based model built upon U-Net architecture."""

  def __init__(self, channels=[32, 64, 128, 256], embed_dim=256, group_num=4):

    super().__init__()

    self.embed = nn.Sequential(GaussianFourierProjection(embed_dim=embed_dim),
         nn.Linear(embed_dim, embed_dim))

    self.conv1 = nn.Conv2d(1, channels[0], 3, stride=1, bias=False)
    self.dense1 = Dense(embed_dim, channels[0])
    self.gnorm1 = nn.GroupNorm(group_num, num_channels=channels[0])
    self.conv2 = nn.Conv2d(channels[0], channels[1], 3, stride=2, bias=False)
    self.dense2 = Dense(embed_dim, channels[1])
    self.gnorm2 = nn.GroupNorm(group_num, num_channels=channels[1])
    self.conv3 = nn.Conv2d(channels[1], channels[2], 3, stride=2, bias=False)
    self.dense3 = Dense(embed_dim, channels[2])
    self.gnorm3 = nn.GroupNorm(group_num, num_channels=channels[2])
    self.conv4 = nn.Conv2d(channels[2], channels[3], 3, stride=2, bias=False)
    self.dense4 = Dense(embed_dim, channels[3])
    self.gnorm4 = nn.GroupNorm(group_num, num_channels=channels[3])

    self.tconv4 = nn.ConvTranspose2d(channels[3], channels[2], 3, stride=2, bias=False)
    self.dense5 = Dense(embed_dim, channels[2])
    self.tgnorm4 = nn.GroupNorm(group_num, num_channels=channels[2])
    self.tconv3 = nn.ConvTranspose2d(channels[2] + channels[2], channels[1], 3, stride=2, bias=False, output_padding=1)
    self.dense6 = Dense(embed_dim, channels[1])
    self.tgnorm3 = nn.GroupNorm(group_num, num_channels=channels[1])
    self.tconv2 = nn.ConvTranspose2d(channels[1] + channels[1], channels[0], 3, stride=2, bias=False, output_padding=1)
    self.dense7 = Dense(embed_dim, channels[0])
    self.tgnorm2 = nn.GroupNorm(group_num, num_channels=channels[0])
    self.tconv1 = nn.ConvTranspose2d(channels[0] + channels[0], 1, 3, stride=1)

    # The swish activation function
    self.act = lambda x: x * torch.sigmoid(x)
    #added rho0 and rho1
    self.rho_0 = nn.Parameter(torch.tensor(1.0))
    self.rho_1 = nn.Parameter(torch.tensor(1.0))

  def forward(self, x, t):
    # Obtain the Gaussian random feature embedding for t
    embed = self.act(self.embed(t))

    h1 = self.conv1(x) # ...
    h1 += self.dense1(embed) #...
    h1 = self.gnorm1(h1) # ...
    h1 = self.act(h1) # ...
    h2 = self.conv2(h1) # ...
    h2 += self.dense2(embed)
    h2 = self.gnorm2(h2)
    h2 = self.act(h2)
    h3 = self.conv3(h2)
    h3 += self.dense3(embed)
    h3 = self.gnorm3(h3)
    h3 = self.act(h3)
    h4 = self.conv4(h3)
    h4 += self.dense4(embed)
    h4 = self.gnorm4(h4)
    h4 = self.act(h4)

    h = self.tconv4(h4) # ...
    h += self.dense5(embed) # ...
    h = self.tgnorm4(h)
    h = self.act(h)
    h = self.tconv3(torch.cat([h, h3], dim=1)) # ...
    h += self.dense6(embed)
    h = self.tgnorm3(h)
    h = self.act(h)
    h = self.tconv2(torch.cat([h, h2], dim=1))
    h += self.dense7(embed)
    h = self.tgnorm2(h)
    h = self.act(h)
    h = self.tconv1(torch.cat([h, h1], dim=1))

    F_xt_theta = h

    mu_xt_theta = self.rho_0 * (x - self.rho_1 * F_xt_theta)

    return mu_xt_theta

      

In [20]:
class Diffusion(nn.Module):
    def __init__(self, model, n_steps, device, min_beta, max_beta):
        super().__init__()
        self.model = model
        self.n_steps = n_steps
        self.device = device
        
        #alpha = 1 - beta and alpha bar is product sum of alphas
        self.beta = torch.linspace(min_beta, max_beta, n_steps).to(device)
        self.alpha = 1 - self.beta
        self.alpha_bar = torch.cumprod(self.alpha, dim = 0).to(device)
        #store beta, alpha, \bar alpha    
    
    def forward_process(self, x0, t):
        
        # finding xt given x0, sqrt of alpha bar times x0 + the sqrt of 1 - alpha bar epsilon
        noise = torch.randn_like(x0).to(self.device)
        alpha_bar_t = self.alpha_bar[t].reshape(-1, 1, 1, 1)
        x_t = torch.sqrt(alpha_bar_t) * x0 + torch.sqrt(1 - alpha_bar_t)* noise
        return x_t
    #sample x_{t-1}, x_t, given x_0
    
    def predict_next(self, xt, t):
        #use mu xt theta formula from number 8 in theory
        # t_tensor = t / self.n_steps
        # t_tensor = t_tensor.reshape(-1,1).to(self.device)

        t_tensor = (t / self.n_steps).view(xt.shape[0], 1).to(self.device)
        e_theta_xt = self.model(xt, t_tensor)
        alpha_t = self.alpha[t].reshape(-1, 1, 1, 1)
        alpha_bar_t = self.alpha_bar[t].reshape(-1, 1, 1, 1)
        
        mu_xt_theta = (1 / torch.sqrt(alpha_t)) * (xt - ((1 - alpha_t) / torch.sqrt(1 - alpha_bar_t)) * e_theta_xt)
        return mu_xt_theta
    
    #compute mu(xt, t)
    
    
    def sample_xt_xt_minus_1(self, x0, num_steps):
        
        #to sample xt minus 1 we take mu_xt_t_theta and add sqare root of 1 - alpha t times epsilon
        
        x_t = self.sample_xt(x0, t)
        
        mu_xt_theta = self.predict_next(x_t, t)
        
        noise = torch.randn_like(x_t).to(self.device)
        
        beta_t = self.beta[t].reshape(-1, 1, 1, 1)
        
        #use beta to scale in the reverse
        x_t_minus_1 = mu_xt_theta + torch.sqrt(beta_t) * noise
    
        return x_t, x_t_minus_1
    
    def new_sample_xt_xt_minus_1(self, x0, t):
        
        x_t = self.forward_process(x0, t)
        
        
        mu_xt_theta = self.predict_next(x_t, t)
        
        # use conditional mean of xt and x0
        alpha_t = self.alpha[t]
        alpha_bar_t = self.alpha_bar[t]
        alpha_bar_t_minus_1 = self.alpha_bar[t-1] if t > 0 else torch.tensor(1.0, device = self.device)
        mu_t_xt_x0 = ((1 - alpha_t) * torch.sqrt(alpha_bar_t_minus_1) * x0 + (1 - alpha_bar_t_minus_1) * torch.sqrt(alpha_t) * x_t) / (1 - alpha_bar_t)
        #get noise based on xt
        noise = torch.rand_like(x_t).to(self.device)
        # variance
        rho_t = torch.sqrt((1 - alpha_t) * (1 - alpha_bar_t_minus_1) / (1 - alpha_bar_t)).reshape(-1, 1, 1, 1)
        #calculate xt-1 using conditional and variance and noise based on xt
        x_t_minus_1 = mu_t_xt_x0 + rho_t * noise
    
        return x_t, x_t_minus_1
    
    
    
    def compute_loss(self, x0):
        # get batch size from x0
        batch_size = x0.shape[0]
        # get random time step t so we can train on different time steps
        t = torch.randint(0, self.n_steps, (batch_size,), device = self.device).long()
        
        x_t_list = []
        x_t_minus_1_list = []
        # for each batch we sample xt and xt-1 given x0 at the sampled time step
        for i in range(batch_size):
          x_t, x_t_minus_1 = self.new_sample_xt_xt_minus_1(x0[i:i+1], t[i])
          x_t_list.append(x_t)
          x_t_minus_1_list.append(x_t_minus_1)
        
        x_t = torch.cat(x_t_list, dim = 0)
        x_t_minus_1 = torch.cat(x_t_minus_1_list, dim = 0)
        # preduct the mean
        mu_xt_theta = self.predict_next(x_t, t)
        # use prediced mean to calculate loss and compare predicted mean of reverse with the actual sampled values
        loss = ((x_t_minus_1 - mu_xt_theta) ** 2 / (2 * (1 - self.alpha[t].reshape(-1, 1, 1, 1)))).mean()
        return loss
    
    def reverse(self, x_t, num_steps):
        #iterate over time steps in reverse order, start from T-1 to 1
        
        x = x_t
        for t in reversed(range(1, num_steps)):
          batch_size = x_t.shape[0]
          # use predicted mean, gives estimated denoised version of the current sample
          t_tensor = torch.randint(0, self.n_steps, (batch_size,), device = self.device).long()
          mu_xt_theta = self.predict_next(x, t = t_tensor)
          # if t > 1 add gaussian noise to the sample, if t =1 then zero noise
          noise = torch.randn_like(x).to(self.device) if t > 1 else torch.zeros_like(x).to(self.device)
          # use beta to denoise sample
          beta_t = self.beta[t].reshape(-1, 1, 1, 1)
          #gets sample xt-1 using the predicted mean and scaling by Bt
          x = mu_xt_theta + torch.sqrt(beta_t) * noise
        return x



e) To reduce the variance of the estimate, we can use the fact that q(x_{t-1} | x_t, x_0) is Gaussian. We can then use the conditional distribution to sample better. We will use rho_t to help reduce the variance of our loss estimates by using a more accurate mean for x_{t-1}. We first compute the conditional mean and then get the sample noise, we scale the noise by rho_t to reduce the variance of the estimate by calculating the uncertainty at each time step.

In [21]:
import numpy as np
def get_mnist():
    data = np.float64(np.load("MNIST_data.npy"))
    labels = np.float32(np.load("MNIST_labels.npy"))
    
    data = data / 255
    data.shape
    labels.shape
    
    print(data.shape)
    data=np.float32(data)/255.
    train_data=data[0:50000].reshape((-1,1,28,28))
    train_labels=np.int32(labels[0:50000])
    val_data=data[50000:55000].reshape((-1,1,28,28))
    val_labels=np.int32(labels[55000:60000])
    test_data=data[55000:65000].reshape((-1,1,28,28))
    test_labels=np.int32(labels[55000:65000])
    
    data_train = torch.tensor(train_data)
    labels_train = torch.tensor(train_labels, dtype=torch.long)
    data_val = torch.tensor(val_data)
    labels_val = torch.tensor(val_labels, dtype=torch.long)
    data_test = torch.tensor(test_data)
    labels_test = torch.tensor(test_labels, dtype=torch.long)

    train_dataset = TensorDataset(data_train, labels_train)
    val_dataset = TensorDataset(data_val, labels_val)
    test_dataset = TensorDataset(data_test, labels_test)
    batch_size = 100
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader, test_dataset



In [4]:
def train_diffusion(model, train_loader, val_loader, num_epochs, device):
    optimizer = torch.optim.Adam(model.parameters(), lr = 0.01)
    rho_optimizer = torch.optim.Adam([model.model.rho_0, model.model.rho_1], lr = 0.2)
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for batch in train_loader:
            x0, _ = batch
            x0 = x0.to(device)
            optimizer.zero_grad()
            rho_optimizer.zero_grad()
            loss = model.compute_loss(x0)
            loss.backward()
            optimizer.step()
            rho_optimizer.step()
            train_loss += loss.item()
        train_loss /=len(train_loader)

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                x0, _ = batch
                x0 = x0.to(device)
                loss = model.compute_loss(x0)
                val_loss += loss.item()
        val_loss /= len(val_loader)

        print(f"Epoch {epoch + 1}/ {num_epochs}, Train Loss: {train_loss: .4f}, Val Loss: {val_loss:.4f}")
        

train_loader, val_loader, test_loader, test_dataset = get_mnist()

device = 'cuda' if torch.cuda.is_available() else 'cpu'
score_model = ScoreNet().to(device)
diffusion_model = Diffusion(score_model, n_steps = 200, device = device, min_beta = 0.0001, max_beta = 0.1, tens = False).to(device)



(70000, 784)


In [5]:
train_diffusion(diffusion_model, train_loader, val_loader, num_epochs = 10, device = device)

Epoch 1/ 10, Train Loss:  0.0667, Val Loss: 0.0513
Epoch 2/ 10, Train Loss:  0.0468, Val Loss: 0.0419
Epoch 3/ 10, Train Loss:  0.0415, Val Loss: 0.0410
Epoch 4/ 10, Train Loss:  0.0412, Val Loss: 0.0415
Epoch 5/ 10, Train Loss:  0.0411, Val Loss: 0.0408
Epoch 6/ 10, Train Loss:  0.0411, Val Loss: 0.0439
Epoch 7/ 10, Train Loss:  0.0464, Val Loss: 0.0456
Epoch 8/ 10, Train Loss:  0.0441, Val Loss: 0.0428
Epoch 9/ 10, Train Loss:  0.0433, Val Loss: 0.0436
Epoch 10/ 10, Train Loss:  0.0487, Val Loss: 0.0490


(70000, 784)
Epoch 1/ 60, Train Loss:  0.0651, Val Loss: 0.0507
Epoch 2/ 60, Train Loss:  0.0464, Val Loss: 0.0443
Epoch 3/ 60, Train Loss:  0.0416, Val Loss: 0.0410
Epoch 4/ 60, Train Loss:  0.0412, Val Loss: 0.0409
Epoch 5/ 60, Train Loss:  0.0411, Val Loss: 0.0409
Epoch 6/ 60, Train Loss:  0.0411, Val Loss: 0.0410
Epoch 7/ 60, Train Loss:  0.0443, Val Loss: 0.0431
Epoch 8/ 60, Train Loss:  0.0459, Val Loss: 0.0439
Epoch 9/ 60, Train Loss:  0.0429, Val Loss: 0.0437
Epoch 10/ 60, Train Loss:  0.0434, Val Loss: 0.0409
Epoch 11/ 60, Train Loss:  0.0421, Val Loss: 0.0411
Epoch 12/ 60, Train Loss:  0.0410, Val Loss: 0.0409
Epoch 13/ 60, Train Loss:  0.0409, Val Loss: 0.0411
Epoch 14/ 60, Train Loss:  0.0411, Val Loss: 0.0407
Epoch 15/ 60, Train Loss:  0.0409, Val Loss: 0.0407
Epoch 16/ 60, Train Loss:  0.0408, Val Loss: 0.0409
Epoch 17/ 60, Train Loss:  0.0408, Val Loss: 0.0406
Epoch 18/ 60, Train Loss:  0.0408, Val Loss: 0.0407
Epoch 19/ 60, Train Loss:  0.0408, Val Loss: 0.0406
Epoch 20/ 60, Train Loss:  0.0407, Val Loss: 0.0405
Had to stop after 20 because was taking long to run and want to continue testing, kernel died so switched epochs to 10 to be able to rerun all parts

In [22]:
def sample_images(model, num_samples, num_steps, device):
    x_t = torch.randn(num_samples, 1, 28, 28).to(device)
    samples = model.reverse(x_t = x_t, num_steps = num_steps)
    return samples

def show_samples(samples, n = 20):
    samples = samples.cpu().detach().numpy()
    fig, axes = plt.subplots(1, n, figsize = (n, 1))
    for i in range(n):
        axes[1].imshow(samples[i, 0], cmap = 'gray')
        axes[i].axis('off')
    plt.show()

In [8]:
import spicy as sp
# from spicy.linalg import sqrtm

def calculate_fid(real_activations, generated_activations):

    activations_dtype = real_activations.dtype
    if activations_dtype != np.float64:
        real_activations = real_activations.astype(np.float64)
        generated_activations = generated_activations.astype(np.float64)
    
    m = np.mean(real_activations, 0)
    m_w = np.mean(generated_activations, 0)
    num_examples_real = float(real_activations.shape[0])
    num_examples_generated = float(generated_activations.shape[0])
    
    real_centered = real_activations - m
    sigma = real_centered.T.dot(real_centered) / (num_examples_real - 1)
    
    gen_centered = generated_activations - m_w
    sigma_w = gen_centered.T.dot(gen_centered) / (num_examples_generated - 1)
    
    sqrt_sigma = sp.linalg.sqrtm(sigma)
    sqrts = sqrt_sigma.dot(sigma_v.dot(sigma))
    
    sqrt_trace_component = np.trace(sp.linalg.sqrtm(sigma))
    
    trace = np.trace(sigma + sigma_w) - 2.0 * sqrt_trace_component
    
    mean = np.sum(np.square(m - m_w))
    fid = trace + mean
    if activations_dtype != np.float64:
        fid = fid.astype(activations_dtype)
    
    return fid

In [None]:
num_samples = 1000

score_model = ScoreNet().to(device)
diffusion_model = Diffusion(score_model, n_steps = 200, device = device, min_beta = 0.0001, max_beta = 0.1).to(device)
samples = sample_images(diffusion_model, num_samples, num_steps=200, device=device)
show_samples(samples, n=20)

Cannot run code above because kernel dies when I do. I attempted to change the batch size or the number of samples or number of epochs but seems to die everytime.

In [None]:
import numpy as np
data = np.float64(np.load("MNIST_data.npy"))
data = data / 255
data.shape

samples = sample_images(diffusion_model, num_samples, num_steps=200, device=device)

train_data=data[0:50000].reshape((-1,1,28,28))

data_train = torch.tensor(train_data)

real_features = data_train[:num_samples]
generated_features = samples

fid_score = calculate_fid(real_features, generated_features)
print(f"FID Score: {fid_score}")

Code: Part 2

2a

In [3]:
class Diffusion(nn.Module):
    def __init__(self, model, n_steps, device, min_beta, max_beta):
        super().__init__()
        self.model = model
        self.n_steps = n_steps
        self.device = device
        
        #alpha = 1 - beta and alpha bar is product sum of alphas
        self.beta = torch.linspace(min_beta, max_beta, n_steps).to(device)
        self.alpha = 1 - self.beta
        self.alpha_bar = torch.cumprod(self.alpha, dim = 0).to(device)
        #store beta, alpha, \bar alpha    
    
    def forward_process(self, x0, t):
        
        # finding xt given x0, sqrt of alpha bar times x0 + the sqrt of 1 - alpha bar epsilon

        #compute Xj and epsilon j
        noise = torch.randn_like(x0).to(self.device)
        alpha_bar_t = self.alpha_bar[t].reshape(-1, 1, 1, 1)
        x_t = torch.sqrt(alpha_bar_t) * x0 + torch.sqrt(1 - alpha_bar_t)* noise
        return x_t, noise
    #sample x_{t-1}, x_t, given x_0
    
    def predict_next(self, xt, t):
        #use mu xt theta formula from number 8 in theory
        t_tensor = (t / self.n_steps).float().view(xt.shape[0], 1).to(self.device)
        e_theta_xt = self.model(xt, t_tensor)
        alpha_t = self.alpha[t].reshape(-1, 1, 1, 1)
        alpha_bar_t = self.alpha_bar[t].reshape(-1, 1, 1, 1)
        
        mu_xt_theta = (1 / torch.sqrt(alpha_t)) * (xt - ((1 - alpha_t) / torch.sqrt(1 - alpha_bar_t)) * e_theta_xt)
        return mu_xt_theta
    
    #compute mu(xt, t)
    
    def compute_loss(self, x0):
        # get batch size from x0
        batch_size = x0.shape[0]
        # get random time step t so we can train on different time steps
        t = torch.randint(0, self.n_steps, (batch_size,), device = self.device).long()
        
        x_t_list = []
        noise_list = []
        # for each batch we sample xt and xt-1 given x0 at the sampled time step
        for i in range(batch_size):
          x_t, noise = self.forward_process(x0[i:i+1], t[i])
          x_t_list.append(x_t)
          noise_list.append(noise)
        
        x_t = torch.cat(x_t_list, dim = 0)
        noise = torch.cat(noise_list, dim = 0)
        # use model to predict ej based off of Xtj
        e_theta_xt = self.model(x_t, (t / self.n_steps).float().view(-1,1).to(self.device))
        #loss contribution then avg
        loss = ((noise - e_theta_xt) ** 2).mean()
        return loss
    
    def reverse(self, x_T, num_steps):
        #iterate over time steps in reverse order, start from T-1 to 1
        
        x = x_T
        for t in reversed(range(1, num_steps)):
          # use predicted mean, gives estimated denoised version of the current sample
          mu_xt_theta = self.predict_next(x, t)
          # if t > 1 add gaussian noise to the sample, if t =1 then zero noise
          noise = torch.randn_like(x).to(self.device) if t > 1 else torch.zeros_like(x).to(self.device)
          # use beta to denoise sample
          beta_t = self.beta[t].reshape(-1, 1, 1, 1)
          #gets sample xt-1 using the predicted mean and scaling by Bt
          x = mu_xt_theta + torch.sqrt(beta_t) * noise
        return x



2b

In [7]:
import numpy as np
def get_mnist():
    data = np.float64(np.load("MNIST_data.npy"))
    labels = np.float32(np.load("MNIST_labels.npy"))
    
    data = data / 255
    data.shape
    labels.shape
    
    print(data.shape)
    data=np.float32(data)/255.
    train_data=data[0:50000].reshape((-1,1,28,28))
    train_labels=np.int32(labels[0:50000])
    val_data=data[50000:55000].reshape((-1,1,28,28))
    val_labels=np.int32(labels[55000:60000])
    test_data=data[55000:65000].reshape((-1,1,28,28))
    test_labels=np.int32(labels[55000:65000])
    
    data_train = torch.tensor(train_data)
    labels_train = torch.tensor(train_labels, dtype=torch.long)
    data_val = torch.tensor(val_data)
    labels_val = torch.tensor(val_labels, dtype=torch.long)
    data_test = torch.tensor(test_data)
    labels_test = torch.tensor(test_labels, dtype=torch.long)

    train_dataset = TensorDataset(data_train, labels_train)
    val_dataset = TensorDataset(data_val, labels_val)
    test_dataset = TensorDataset(data_test, labels_test)
    batch_size = 100
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader, test_dataset


In [10]:
def train_diffusion(model, train_loader, val_loader, num_epochs, device):
    optimizer = torch.optim.Adam(model.parameters(), lr = 0.01)
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for batch in train_loader:
            x0, _ = batch
            x0 = x0.to(device)
            optimizer.zero_grad()
            loss = model.compute_loss(x0)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        train_loss /=len(train_loader)

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                x0, _ = batch
                x0 = x0.to(device)
                loss = model.compute_loss(x0)
                val_loss += loss.item()
        val_loss /= len(val_loader)

        print(f"Epoch {epoch + 1}/ {num_epochs}, Train Loss: {train_loss: .4f}, Val Loss: {val_loss:.4f}")
        
            

In [11]:
train_loader, val_loader, test_loader, test_dataset = get_mnist()
device = 'cuda' if torch.cuda.is_available() else 'cpu'

score_model = ScoreNet().to(device)
diffusion_model = Diffusion(score_model, n_steps=200, device=device, min_beta=0.0001, max_beta=0.1).to(device)
train_diffusion(diffusion_model, train_loader, val_loader, num_epochs=60, device=device)

(70000, 784)
Epoch 1/ 60, Train Loss:  0.3337, Val Loss: 0.0459
Epoch 2/ 60, Train Loss:  0.0328, Val Loss: 0.0244
Epoch 3/ 60, Train Loss:  0.0207, Val Loss: 0.0186
Epoch 4/ 60, Train Loss:  0.0159, Val Loss: 0.0139
Epoch 5/ 60, Train Loss:  0.0123, Val Loss: 0.0098
Epoch 6/ 60, Train Loss:  0.0064, Val Loss: 0.0038
Epoch 7/ 60, Train Loss:  0.0039, Val Loss: 0.0029
Epoch 8/ 60, Train Loss:  0.0030, Val Loss: 0.0029
Epoch 9/ 60, Train Loss:  0.0029, Val Loss: 0.0022
Epoch 10/ 60, Train Loss:  0.0023, Val Loss: 0.0020


KeyboardInterrupt: 

2c

In [16]:
def sample_images(model, num_samples, num_steps, device):
    x_T = torch.randn(num_samples, 1, 28, 28).to(device)
    samples = model.reverse(x_T, num_steps)
    return samples

def show_samples(samples, n = 20):
    samples = samples.cpu().detach().numpy()
    fig, axes = plt.subplots(1, n, figsize = (n, 1))
    for i in range(n):
        axes[1].imshow(samples[i, 0], cmap = 'gray')
        axes[i].axis('off')
    plt.show()

In [None]:
num_samples = 1000
show_samples(samples, n=20)

In [None]:
import numpy as np
data = np.float64(np.load("MNIST_data.npy"))
data = data / 255
data.shape
num_samples = 100
samples = sample_images(diffusion_model, num_samples, num_steps=200, device=device)

data=np.float32(data)/255.
train_data=data[0:50000].reshape((-1,1,28,28))

data_train = torch.tensor(train_data)

real_features = data_train[:num_samples]
generated_features = samples

fid_score = calculate_fid(real_features, generated_features)
print(f"FID Score: {fid_score}")

Can not run above because kills kernel.

![Screen Shot 2024-05-23 at 11.12.41 AM.png](attachment:46c8c8a2-cb32-4b99-a403-c5b808fde734.png)

3

When looking at the models we have created in parts 1 and 2 the main difference is the additional rho parameters in part 1 that are learned during training. Learning these parametrs can help handle scaling the noise component, and adding complexity and optimization. The loss function in part 2 will only focus on the squared error between the predicted and actual noise components. Additional parametrs can make models harder to train but makes them more adaptable in the process of running. We do see a difference in the image quality results and loss in part 1 and 2. The rho parameters in part 1 can help generate higher quality sample images. However, when looking at the loss function's results it can be easier to interpret the actual noise components values since there is no parameter offset. Since part 1 produces higher quality images due to its additional parameters, the FID score is higher for part 1.