https://erdem.pl/2023/11/step-by-step-visual-introduction-to-diffusion-models

In [None]:
%pip -q install diffusers accelerate

In [None]:
!pip install torch_snippets


In [None]:
from torch_snippets import *
from diffusers import DDPMScheduler, UNet2DModel
from torch.utils.data import Subset, DataLoader
import torchvision

device = 'cuda' # torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

Using device: cuda


In [None]:
transform = torchvision.transforms.Compose([
    torchvision.transforms.Resize(32),
    torchvision.transforms.ToTensor()
])

dataset = torchvision.datasets.MNIST(root="mnist/", train=True, download=True, transform=transform)
# dataset = Subset(dataset, [0,1,2,3,4,5,6,7])
len(dataset)

60000

In [None]:
train_dataloader = DataLoader(dataset, batch_size=8, shuffle=True)
x, y = next(iter(train_dataloader))
print('Input shape:', x.shape)
print('Labels:', y)
show(torchvision.utils.make_grid(x)[0], cmap='Greys')

In [None]:
# Dataloader (you can mess with batch size)
batch_size = 128
train_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# How many runs through the data should we do?

# Create the network
net = UNet2DModel(
    sample_size=28,  # the target image resolution
    in_channels=1,  # the number of input channels, 3 for RGB images
    out_channels=1,  # the number of output channels
    layers_per_block=1,  # how many ResNet layers to use per UNet block
    block_out_channels=(32, 64, 128, 256),  # Roughly matching our basic unet example
    down_block_types=(
        "DownBlock2D",  # a regular ResNet downsampling block
        "AttnDownBlock2D",  # a ResNet downsampling block with spatial self-attention
        "AttnDownBlock2D",
        "AttnDownBlock2D",
    ),
    up_block_types=(
        "AttnUpBlock2D",
        "AttnUpBlock2D",
        "AttnUpBlock2D",  # a ResNet upsampling block with spatial self-attention
        "UpBlock2D",   # a regular ResNet upsampling block
      ),
)
_ = net.to(device)

In [None]:
for bx, (x, y) in enumerate(train_dataloader):
  break

In [None]:
pred = net.conv_in(x.to(device))
pred.shape

torch.Size([128, 32, 32, 32])

In [None]:
pred2 = net.time_embedding(pred.to(device))
pred2.shape

torch.Size([128, 32, 32, 128])

In [None]:
_ = net.to(device)

In [None]:
time_embedding_output = None

# Define the hook function
def get_time_embedding_output(module, input, output):
    global time_embedding_output
    time_embedding_output = output

# Attach the hook to the time_embedding layer
# hook = net.down_blocks[1].attentions[0].group_norm.register_forward_hook(get_time_embedding_output)

hook = net.down_blocks[1].register_forward_hook(get_time_embedding_output)

# Now, run your data through the model
# Assuming 'data' is your input tensor
_ = net(x.to(device), 78)

# Detach the hook after use
hook.remove()

# 'time_embedding_output' now contains the output of the time_embedding layer
print(time_embedding_output[1][1].shape)

torch.Size([128, 64, 8, 8])


In [None]:
noise_scheduler = DDPMScheduler(num_train_timesteps=1000)

def corrupt(xb, timesteps=None):
  if timesteps is None:
    timesteps = torch.randint(0, 999, (len(xb),)).long().to(device)
  noise = torch.randn_like(xb)
  noisy_xb = noise_scheduler.add_noise(xb, noise, timesteps)
  return noisy_xb, timesteps

In [None]:
_ = net.to(device)

In [None]:
import matplotlib.pyplot as plt

class Report:
    def __init__(self, n_epochs):
        self.n_epochs = n_epochs
        self.losses = []

    def record(self, epoch, loss, end="\r"):
        self.losses.append(loss)
        print(f"Epoch {epoch:.2f}: Loss {loss:.4f}", end=end)

    def report_avgs(self, epoch):
        avg_loss = sum(self.losses) / len(self.losses)
        print(f"\nEpoch {epoch} completed. Avg Loss: {avg_loss:.4f}")

    def plot(self, log=False):
        """Plot the loss curve."""
        plt.figure(figsize=(8, 5))
        plt.plot(self.losses, label="Loss", color="blue")

        if log:
            plt.yscale("log")  # Use log scale if specified

        plt.xlabel("Batch Iterations")
        plt.ylabel("Loss")
        plt.title("Training Loss Curve")
        plt.legend()
        plt.show()



In [None]:
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import CosineAnnealingLR

# The training loop

n_epochs = 1
report = Report(n_epochs)
loss_fn = nn.MSELoss()
opt = torch.optim.Adam(net.parameters(), lr=1e-3)

scheduler = CosineAnnealingLR(opt, T_max=len(train_dataloader), verbose=False)

for epoch in range(n_epochs):
    n = len(train_dataloader)
    for bx, (x, y) in enumerate(train_dataloader):
        x = x.to(device)  # Data on the GPU
        noisy_x, timesteps = corrupt(x)  # Create our noisy x
        pred = net(noisy_x, timesteps).sample
        loss = loss_fn(pred, x)  # How close is the output to the true 'clean' x?
        opt.zero_grad()
        loss.backward()
        opt.step()
        scheduler.step()
        report.record(epoch + ((bx + 1) / n), loss=loss.item(), end='\r')
    report.report_avgs(epoch + 1)




Epoch 1.00: Loss 0.0361
Epoch 1 completed. Avg Loss: 0.0396


In [None]:
report.plot(log=True)  # Log scale
report.plot(log=False)  # Normal scale

In [None]:
net.cpu()
noise = torch.randn(5,1,32,32).to(net.device)
progress = [noise[:,0]]

for ts in np.logspace(np.log10(999), 0.1, 100):
  ts = torch.Tensor([ts]).long().to(net.device)
  noise = net(noise, ts).sample.detach().cpu()
  noise, _ = corrupt(noise, ts)
  progress.append(noise[:,0])

print(len(progress))
_n = 10
subplots(torch.stack(progress[::_n]).permute(1, 0, 2, 3).reshape(-1, 32, 32), nc=11, sz=(10,4))

In [None]:
from diffusers import DiffusionPipeline

In [None]:
# Define the Stable Diffusion XL pipeline
pipeline2 = DiffusionPipeline.from_pretrained(
    "CompVis/stable-diffusion-v1-4",
    torch_dtype=torch.float16,
)

# Set the device for the pipeline
pipeline2 = pipeline2.to("cuda")

In [None]:
# Setting a seed would ensure reproducibility of the experiment.
generator = torch.Generator(device="cuda").manual_seed(42)

In [None]:
# Set the prompt
prompt = "a car in superman color"
# Set the negative prompt, or leave it `None` if you don't want to use it
negative_prompt = None
# Add the callback to the pipeline, and execute the pipeline.
image = pipeline2(
    prompt,
    negative_prompt=negative_prompt,
    generator=generator,
    height=1024,
    width=1024,
)
image.images[0]

In [None]:
# Imports
import torch
from diffusers import DiffusionPipeline


# Define the Stable Diffusion XL pipeline
pipeline = DiffusionPipeline.from_pretrained(
    "stabilityai/stable-diffusion-xl-base-1.0",
    torch_dtype=torch.float16,
)

# Set the device for the pipeline
pipeline = pipeline.to("cuda")
# uncomment the following line if you encounter OOM issues
# pipeline.enable_sequential_cpu_offload()

In [None]:
# Set the prompt
prompt = "baby in superman dress"
# Set the negative prompt, or leave it `None` if you don't want to use it
negative_prompt = None
# Add the callback to the pipeline, and execute the pipeline.
image = pipeline(
    prompt,
    negative_prompt=negative_prompt,
    generator=generator,
    height=1024,
    width=1024,
)

In [None]:
image.images[0]

In [None]:
# Set the prompt
prompt = "baby in superman dress, photorealistic, cinematic, ultra detailed"
# Set the negative prompt, or leave it `None` if you don't want to use it
negative_prompt = None
# Add the callback to the pipeline, and execute the pipeline.
image = pipeline(
    prompt,
    negative_prompt=negative_prompt,
    generator=generator,
    height=1024,
    width=1024,
)
image.images[0]

In [None]:
# Set the prompt
prompt = "a car in superman color, photorealistic, cinematic, ultra detailed"
# Set the negative prompt, or leave it `None` if you don't want to use it
negative_prompt = None
# Add the callback to the pipeline, and execute the pipeline.
image = pipeline(
    prompt,
    negative_prompt=negative_prompt,
    generator=generator,
    height=1024,
    width=1024,
)
image.images[0]

In [None]:
# Set the prompt
prompt = "an old car in a city, photorealistic, cinematic, ultra detailed, detailed face"
# Set the negative prompt, or leave it `None` if you don't want to use it
negative_prompt = 'art'
# Add the callback to the pipeline, and execute the pipeline.
image = pipeline(
    prompt,
    negative_prompt=negative_prompt,
    generator=generator,
    height=1024,
    width=1024,
)
image.images[0]

In [None]:
# Set the prompt
generator = torch.Generator(device="cuda").manual_seed(12)
prompt = "some logo, AI based calorie meter application, ultra detailed, beautiful, atractive, modern, fat"
# Set the negative prompt, or leave it `None` if you don't want to use it
negative_prompt = "text, number, gauge"
# Add the callback to the pipeline, and execute the pipeline.
image = pipeline(
    prompt,
    negative_prompt=negative_prompt,
    generator=generator,
    height=1024,
    width=1024,
)
image.images[0]

In [None]:
# Set the prompt
generator = torch.Generator(device="cuda").manual_seed(20)
prompt = "some logo, AI based calorie meter application, ultra detailed, beautiful, atractive, modern, fat"
# Set the negative prompt, or leave it `None` if you don't want to use it
negative_prompt = "text, number, gauge"
# Add the callback to the pipeline, and execute the pipeline.
image = pipeline(
    prompt,
    negative_prompt=negative_prompt,
    generator=generator,
    height=1024,
    width=1024,
)
image.images[0]

In [None]:
# Set the prompt
generator = torch.Generator(device="cuda").manual_seed(42)
prompt = "an old radio on the wooden table,ultra detailed, beautiful, photorealistic"
# Set the negative prompt, or leave it `None` if you don't want to use it
negative_prompt = None
# Add the callback to the pipeline, and execute the pipeline.
image = pipeline(
    prompt,
    negative_prompt=negative_prompt,
    generator=generator,
    height=512,
    width=512,
)
image.images[0]