In [1]:
import torch
from datasets import load_dataset
from dataset import VangoghPhotoDataset
import sys
from utils import save_checkpoint, load_checkpoint
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import config
from tqdm import tqdm
from torchvision.utils import save_image
from discriminator_model import Discriminator
from generator_model import Generator
import warnings
warnings.filterwarnings('ignore')

  check_for_updates()


In [2]:
hf_data = load_dataset("huggan/vangogh2photo")

Repo card metadata block was not found. Setting CardData to empty.


In [3]:
hf_data

DatasetDict({
    train: Dataset({
        features: ['imageA', 'imageB'],
        num_rows: 6287
    })
    test: Dataset({
        features: ['imageA', 'imageB'],
        num_rows: 751
    })
})

In [4]:
def train_fn(
    disc_V, disc_P, gen_P, gen_V, loader, opt_disc, opt_gen, l1, mse, d_scaler, g_scaler
):
    P_reals = 0
    P_fakes = 0
    loop = tqdm(loader, leave=True)

    for idx, (vangogh, photo) in enumerate(loop):
        vangogh = vangogh.to(config.DEVICE)
        photo = photo.to(config.DEVICE)

        # Train Discriminators P and V
        with torch.cuda.amp.autocast():
            fake_photo = gen_P(vangogh)
            D_P_real = disc_P(photo)
            D_P_fake = disc_P(fake_photo.detach())
            P_reals += D_P_real.mean().item()
            P_fakes += D_P_fake.mean().item()
            D_P_real_loss = mse(D_P_real, torch.ones_like(D_P_real))
            D_P_fake_loss = mse(D_P_fake, torch.zeros_like(D_P_fake))
            D_P_loss = D_P_real_loss + D_P_fake_loss

            fake_vangogh = gen_V(photo)
            D_V_real = disc_V(vangogh)
            D_V_fake = disc_V(fake_vangogh.detach())
            D_V_real_loss = mse(D_V_real, torch.ones_like(D_V_real))
            D_V_fake_loss = mse(D_V_fake, torch.zeros_like(D_V_fake))
            D_V_loss = D_V_real_loss + D_V_fake_loss

            # SUMM
            D_loss = (D_P_loss + D_V_loss) / 2

        opt_disc.zero_grad()
        d_scaler.scale(D_loss).backward()
        d_scaler.step(opt_disc)
        d_scaler.update()

        # Train Generators P and V
        with torch.cuda.amp.autocast():
            # adversarial loss for both generators
            D_P_fake = disc_P(fake_photo)
            D_V_fake = disc_V(fake_vangogh)
            loss_G_P = mse(D_P_fake, torch.ones_like(D_P_fake))
            loss_G_V = mse(D_V_fake, torch.ones_like(D_V_fake))

            # cycle loss
            cycle_vangogh = gen_V(fake_photo)
            cycle_photo = gen_P(fake_vangogh)
            cycle_vangogh_loss = l1(vangogh, cycle_vangogh)
            cycle_photo_loss = l1(photo, cycle_photo)

            # identity loss
            identity_vangogh = gen_V(vangogh)
            identity_photo = gen_P(photo)
            identity_vangogh_loss = l1(vangogh, identity_vangogh)
            identity_photo_loss = l1(photo, identity_photo)

            # total generator loss
            G_loss = (
                loss_G_P
                + loss_G_V
                + cycle_vangogh_loss * config.LAMBDA_CYCLE
                + cycle_photo_loss * config.LAMBDA_CYCLE
                + identity_photo_loss * config.LAMBDA_IDENTITY
                + identity_vangogh_loss * config.LAMBDA_IDENTITY
            )

        opt_gen.zero_grad()
        g_scaler.scale(G_loss).backward()
        g_scaler.step(opt_gen)
        g_scaler.update()

        if idx % 200 == 0:
            save_image(fake_photo * 0.5 + 0.5, f"saved_images/photo_{idx}.png")
            save_image(fake_vangogh * 0.5 + 0.5, f"saved_images/van_gogh_{idx}.png")

        loop.set_postfix(P_real=P_reals / (idx + 1), P_fake=P_fakes / (idx + 1))


In [5]:
def main():
    disc_H = Discriminator(in_channels=3).to(config.DEVICE)
    disc_Z = Discriminator(in_channels=3).to(config.DEVICE)
    gen_Z = Generator(img_channels=3, num_residuals=9).to(config.DEVICE)
    gen_H = Generator(img_channels=3, num_residuals=9).to(config.DEVICE)
    opt_disc = optim.Adam(
        list(disc_H.parameters()) + list(disc_Z.parameters()),
        lr=config.LEARNING_RATE,
        betas=(0.5, 0.999),
    )

    opt_gen = optim.Adam(
        list(gen_Z.parameters()) + list(gen_H.parameters()),
        lr=config.LEARNING_RATE,
        betas=(0.5, 0.999),
    )

    L1 = nn.L1Loss()
    mse = nn.MSELoss()

    if config.LOAD_MODEL:
        load_checkpoint(
            config.CHECKPOINT_GEN_H,
            gen_H,
            opt_gen,
            config.LEARNING_RATE,
        )
        load_checkpoint(
            config.CHECKPOINT_GEN_Z,
            gen_Z,
            opt_gen,
            config.LEARNING_RATE,
        )
        load_checkpoint(
            config.CHECKPOINT_CRITIC_H,
            disc_H,
            opt_disc,
            config.LEARNING_RATE,
        )
        load_checkpoint(
            config.CHECKPOINT_CRITIC_Z,
            disc_Z,
            opt_disc,
            config.LEARNING_RATE,
        )

    train_dataset = VangoghPhotoDataset(
    hf_dataset=hf_data["train"],
    transform=config.transforms,
    )
    val_dataset = VangoghPhotoDataset(
        hf_dataset=hf_data["test"],
        transform=config.transforms,
    )
    
    train_loader = DataLoader(
        train_dataset,
        batch_size=config.BATCH_SIZE,
        shuffle=True,
        num_workers=config.NUM_WORKERS,
        pin_memory=True,
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=1,
        shuffle=False,
        pin_memory=True,
    )
    g_scaler = torch.cuda.amp.GradScaler()
    d_scaler = torch.cuda.amp.GradScaler()

    for epoch in range(config.NUM_EPOCHS):
        train_fn(
            disc_H,
            disc_Z,
            gen_Z,
            gen_H,
            train_loader,
            opt_disc,
            opt_gen,
            L1,
            mse,
            d_scaler,
            g_scaler,
        )

        if config.SAVE_MODEL:
            save_checkpoint(gen_H, opt_gen, filename=config.CHECKPOINT_GEN_H)
            save_checkpoint(gen_Z, opt_gen, filename=config.CHECKPOINT_GEN_Z)
            save_checkpoint(disc_H, opt_disc, filename=config.CHECKPOINT_CRITIC_H)
            save_checkpoint(disc_Z, opt_disc, filename=config.CHECKPOINT_CRITIC_Z)

In [6]:
# if __name__ == "__main__":
#     main()

In [7]:
# !pip install torchmetrics
# !pip install pytorch-fid
# !pip install torch-fidelity

In [11]:
from torchmetrics.image.inception import InceptionScore
from tqdm import tqdm


def denorm_to_uint8(img_tensor: torch.Tensor) -> torch.Tensor:
    """
    Преобразует изображение из [-1, 1] (или [0, 1]) в [0, 255] и uint8:
    - ожидается, что входной тензор в формате BxCxHxW
    """
    if img_tensor.is_floating_point():
        img_tensor = img_tensor.float()
        img_tensor = (img_tensor * 0.5 + 0.5) * 255.0  # [-1,1] -> [0,1] -> [0,255]
    return img_tensor.clamp(0, 255).to(torch.uint8)

@torch.no_grad()
def evaluate_fn(gen_P, gen_V, val_loader):
    gen_P.eval()
    gen_V.eval()

    inception = InceptionScore().to(config.DEVICE)

    for vangogh, photo in tqdm(val_loader, desc="Evaluating"):
        vangogh = vangogh.to(config.DEVICE)
        photo = photo.to(config.DEVICE)

        fake_photo = gen_P(vangogh)

        # Денормализация и преобразование в uint8
        fake_photo_uint8 = denorm_to_uint8(fake_photo)
        photo_uint8 = denorm_to_uint8(photo)

        inception.update(fake_photo_uint8)

    inception_mean, inception_std = inception.compute()

    print(f"Inception Score: {inception_mean:.4f} ± {inception_std:.4f}")

    gen_P.train()
    gen_V.train()


In [12]:
torch.cuda.empty_cache()

In [13]:
gen_Z = Generator(img_channels=3, num_residuals=9).to(config.DEVICE)
gen_H = Generator(img_channels=3, num_residuals=9).to(config.DEVICE)

gen_H_path = '/home/emilmeister/SashaProjects/ganchik/genh.pth.tar'
gen_Z_path = '/home/emilmeister/SashaProjects/ganchik/genz.pth.tar'

print("=> Loading checkpoint gen_H")
checkpoint_gen_H = torch.load(gen_H_path, map_location=config.DEVICE)
gen_H.load_state_dict(checkpoint_gen_H["state_dict"])

print("=> Loading checkpoint gen_Z")
checkpoint_gen_Z = torch.load(gen_Z_path, map_location=config.DEVICE)
gen_Z.load_state_dict(checkpoint_gen_Z["state_dict"])


val_dataset = VangoghPhotoDataset(
        hf_dataset=hf_data["test"],
        transform=config.transforms,
    )

val_loader = DataLoader(
        val_dataset,
        batch_size=1,
        shuffle=False,
        pin_memory=True,
    )

evaluate_fn(gen_H, gen_Z, val_loader)

=> Loading checkpoint gen_H
=> Loading checkpoint gen_Z


Evaluating: 100%|██████████| 751/751 [00:06<00:00, 108.53it/s]

Inception Score: 5.7665 ± 0.6452





# IC расположился в районе 5-6 что говорит о довольно хорошем качестве получившейся модели.

# При визуальной оценке получившихся изображений, на мой взгляд, тоже получилось довольно неплохо. Да, есть местами картинки которые прям фигня, но подавляющее большинство можно подумать что реально рисовал Ван Гог))

# В целом работа получилась довольно интересная, я понял как работает CycleGAN, а также познакомился с метриками IC & FID. Раньше с изображениями не работал, как первый опыт считаю очень круто, нормально погрузился в работу сверток.

# P.S. Сначала пробовал обучать без identity loss - получалось все довольно плохо, потом решил все таки его добавить и качество поднялось до приемлемого уровня. Моделька училась 15 часов на 150 эпохах. Уже где то с первой эпохи модель начинает генерировать что то нормальное, но вот прям удовлетворяющего уровня удалось добиться при раздувании эпох до 150.