In [1]:
import torch
import torchvision
import matplotlib.pyplot as plt
import os
from PIL import Image
import json
import torch
import pathlib
from pathlib import Path
import PixelEncoder
from typing import Tuple, Dict, List
from torch.utils.data import DataLoader
from torchvision import transforms
import numpy as np
import math
from math import sqrt
from tqdm.auto import tqdm
import PixelEncoder
from PixelEncoder import PixelEncodingSet, Segments
import torch.nn.functional as F

testing2
SIZE = 64


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# GLOBAL VARIABLES

data_path = Path(
    f"/Users/mary/Documents/School/Sketch Simplification/Sketch-Simplification/dataset"
)

flower_data_path = Path(
    f"/Users/mary/Documents/School/Sketch Simplification/Sketch-Simplification/disposable/data"
)

device = "mps"

T = 300
IMG_SIZE = 64
BATCH_SIZE = 125


def linear_beta_schedule(timesteps, start=0.0001, end=0.02):
    return torch.linspace(start, end, timesteps)


# Define beta schedule
T = 300
betas = linear_beta_schedule(timesteps=T)

# Pre-calculate different terms for closed form
alphas = 1.0 - betas
alphas_cumprod = torch.cumprod(alphas, axis=0)
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
sqrt_recip_alphas = torch.sqrt(1.0 / alphas)
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - alphas_cumprod)
posterior_variance = betas * (1.0 - alphas_cumprod_prev) / (1.0 - alphas_cumprod)

In [3]:
class SketchData(torch.utils.data.Dataset):
    """
    Custom PyTorch dataset for image and JSON pairs.

    Args:
        root_dir (str): Path to the root directory containing image folders.
        transform (callable, optional): A function/transform to apply to the image.
    """

    def __init__(self, targ_dir, transform=None):
        self.paths = list(targ_dir.glob("*/."))

        self.transform = transform

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, index: int):
        "Returns one sample of data, data and label (X, y)."

        data_path = self.paths[index]

        img = Image.open(data_path / Path("base.png")).convert("L")
        assert img.size == (IMG_SIZE, IMG_SIZE)
        if self.transform:
            # return data, label (X, y)
            return self.transform(img), index
        else:
            return img, index

    def getOtherData(self, index: int):
        data_path = self.paths[index]
        pixe_set = PixelEncodingSet.load(data_path / Path("pixe_set.json"))
        segs = Segments.load(data_path / Path("segs.json"), fromBlender=False)
        return pixe_set, segs

    def getSegs(self,index:int):
        data_path = self.paths[index]
        segs = Segments.load(data_path / Path("segs.json"), fromBlender=False)
        return segs

data = SketchData(targ_dir=data_path)
data.getOtherData(0)

(<PixelEncoder.PixelEncodingSet at 0x17aedbf10>,
 <PixelEncoder.Segments at 0x17f6c35b0>)

In [4]:
# # FLOWERS DATASET

# data = torchvision.datasets.data = torchvision.datasets.Flowers102(
#     root=flower_data_path,
#     download=True,
# )

In [5]:
from torch import nn
import math


class Block(nn.Module):
    def __init__(self, in_ch, out_ch, time_emb_dim, up=False):
        super().__init__()
        self.time_mlp = nn.Linear(time_emb_dim, out_ch)
        if up:
            self.conv1 = nn.Conv2d(2 * in_ch, out_ch, 3, padding=1)
            self.transform = nn.ConvTranspose2d(out_ch, out_ch, 4, 2, 1)
        else:
            self.conv1 = nn.Conv2d(in_ch, out_ch, 3, padding=1)
            self.transform = nn.Conv2d(out_ch, out_ch, 4, 2, 1)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, padding=1)
        self.bnorm1 = nn.BatchNorm2d(out_ch)
        self.bnorm2 = nn.BatchNorm2d(out_ch)
        self.relu = nn.ReLU()

    def forward(
        self,
        x,
        t,
    ):
        # First Conv
        h = self.bnorm1(self.relu(self.conv1(x)))
        # Time embedding
        time_emb = self.relu(self.time_mlp(t))
        # Extend last 2 dimensions
        time_emb = time_emb[(...,) + (None,) * 2]
        # Add time channel
        h = h + time_emb
        # Second Conv
        h = self.bnorm2(self.relu(self.conv2(h)))
        # Down or Upsample
        return self.transform(h)


class SinusoidalPositionEmbeddings(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, time):
        device = time.device
        half_dim = self.dim // 2
        embeddings = math.log(10000) / (half_dim - 1)
        embeddings = torch.exp(torch.arange(
            half_dim, device=device) * -embeddings)
        embeddings = time[:, None] * embeddings[None, :]
        embeddings = torch.cat((embeddings.sin(), embeddings.cos()), dim=-1)
        # TODO: Double check the ordering here
        return embeddings


class SimpleUnet(nn.Module):
    """
    A simplified variant of the Unet architecture.
    """

    def __init__(self):
        super().__init__()
        image_channels = 1
        down_channels = (64, 128, 256, 512, 1024)
        up_channels = (1024, 512, 256, 128, 64)
        out_dim = 2
        time_emb_dim = 32

        # Time embedding
        self.time_mlp = nn.Sequential(
            SinusoidalPositionEmbeddings(time_emb_dim),
            nn.Linear(time_emb_dim, time_emb_dim),
            nn.ReLU(),
        )

        # Initial projection
        self.conv0 = nn.Conv2d(image_channels, down_channels[0], 3, padding=1)

        # Downsample
        self.downs = nn.ModuleList(
            [
                Block(down_channels[i], down_channels[i + 1], time_emb_dim)
                for i in range(len(down_channels) - 1)
            ]
        )
        # Upsample
        self.ups = nn.ModuleList(
            [
                Block(up_channels[i], up_channels[i + 1],
                      time_emb_dim, up=True)
                for i in range(len(up_channels) - 1)
            ]
        )

        # Edit: Corrected a bug found by Jakub C (see YouTube comment)
        self.output = nn.Conv2d(up_channels[-1], out_dim, 1)

    def forward(self, x, timestep):
        # Embedd time
        t = self.time_mlp(timestep)
        # Initial conv
        x = self.conv0(x)
        # Unet
        residual_inputs = []
        for down in self.downs:
            x = down(x, t)
            residual_inputs.append(x)
        for up in self.ups:
            residual_x = residual_inputs.pop()
            # Add residual x as additional channels
            x = torch.cat((x, residual_x), dim=1)
            x = up(x, t)
        return self.output(x)


model = SimpleUnet()
print("Num params: ", sum(p.numel() for p in model.parameters()))

Num params:  62437666


In [6]:
def img_to_tensor(data):
    data_transforms = transforms.Compose(
        [
            transforms.Grayscale(num_output_channels=1),
            transforms.ToTensor(),
        ]
    )
    return SketchData(data_path, data_transforms)


def tensor_to_img(image):
    reverse_transforms = transforms.Compose([transforms.ToPILImage()])
    return reverse_transforms(image)


def show_tensor_image(image):
    plt.imshow(tensor_to_img(image))


# data = img_to_tensor(data)


# plt.figure(figsize=(15, 15))
# plt.axis("off")
# num_images = 10
# stepsize = int(T / num_images)


# for idx in range(0, T, stepsize):
#     t = torch.Tensor([idx]).type(torch.int64)
#     plt.subplot(1, num_images + 1, int(idx / stepsize) + 1)
#     img = next(iter(data))[0]
#     # print(img)
#     show_tensor_image(img)
# # print(img.shape)

In [29]:
# ---- Model demo before training ----


def applyOffsets(img, displacements):
    displacements = displacements.permute(1, 2, 0)
    displacements *= IMG_SIZE

    def insideImg(x, y):
        return x >= 0 and x <= IMG_SIZE and y >= 0 and y <= IMG_SIZE

    def mix(v0, v1):
        return round(((v0 + v1) / 2))

    img_0 = tensor_to_img(img)
    img_1 = Image.new("L", (IMG_SIZE, IMG_SIZE), 255)
    pixels_0, pixels_1 = img_0.load(), img_1.load()

    for x in range(IMG_SIZE):
        for y in range(IMG_SIZE):
            x_delta, y_delta = displacements[y, x]

            x_new, y_new = x + x_delta, y + y_delta
            if insideImg(x_new, y_new):
                newColor = mix(pixels_0[x, y], pixels_1[x_new, y_new])
                # print(newColor)
                pixels_1[x_new, y_new] = newColor
    return img_1


def getRealOffsets(img_index, timesteps):
    segs = data.getSegs(img_index)
    offsets = PixelEncoder.generateSteppedOffsets(
        segs, IMG_SIZE, torch.max(timesteps))

    return torch.tensor([offsets[t] for t in timesteps])


model.to(device)

data = img_to_tensor(data)
dataloader = DataLoader(data, batch_size=BATCH_SIZE,
                        shuffle=True, drop_last=False)

timesteps = torch.randint(0, T, (BATCH_SIZE,), device=device).long()


img, img_index = next(iter(dataloader))[0]

getRealOffsets(img_index, timesteps)

out_displacements = model(img.unsqueeze(0).to(device), timesteps)

img_out = applyOffsets(img, out_displacements[0])
plt.imshow(img_out)
# print(out_img.shape)
# show_tensor_image(out_img[3])

ValueError: not enough values to unpack (expected 2, got 1)

In [None]:
def get_loss(model, x_0, t):
    x_noisy, noise = forward_diffusion_sample(x_0, t, device)
    noise_pred = model(x_noisy, t)
    return F.l1_loss(noise, noise_pred)

In [None]:
epochs = 3
# tqdm is for showing the progress bar
for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n------")

    # training

    train_loss = 0
    # add a loop to loop through all the batches
    for batch, (img, index) in enumerate(dataloader):
        model.train()

        preds = model(X)
        loss = loss_fn(preds, y)
        train_loss += (
            loss  # for getting average loss per batch in all batches in this epoch
        )
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 400 == 0:
            # len(train_dataloader.dataset) is the total number of samples in our dataset
            print(
                f"looked at {batch * len(X)}/{len(train_dataloader.dataset)} samples")

        train_loss /= len(train_dataloader)  # get average loss for the batch

    # calculate test data
    test_loss, test_acc = 0, 0
    model_0.eval()
    with torch.inference_mode():
        for X_test, y_test in test_dataloader:
            test_pred = model_0(X_test)
            test_loss += loss_fn(test_pred, y_test)

            # test_pred.argmax gives the highest
            test_loss += loss_fn(test_pred, y_test)
            test_acc += accuracy_fn(y_true=y_test,
                                    y_pred=test_pred.argmax(dim=1))

        test_loss /= len(test_dataloader)
        test_acc /= len(test_dataloader)

    # print stats for epoch
    print(
        f"Train loss: {train_loss:.4f} | Test loss: {test_loss:.4f}, Test acc: {test_acc}"
    )
    end_time = timer()
    total_train_time_model_0 = print_train_time(
        start=start_time, end=end_time, device=getModelDevice(model_0)
    )

  0%|          | 0/3 [00:00<?, ?it/s]

Epoch: 0
------





NameError: name 'X' is not defined