In [None]:
# 1. Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
# 2. Install required libraries
!pip install albumentations==1.3.0





In [None]:
# 3. Import necessary libraries
import os
import torch
import torch.nn as nn
import numpy as np
import cv2
from tqdm import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2


In [None]:
# 4. Define model architecture
class DownConv(nn.Module):
    def __init__(self, in_filters, out_filters):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_filters, out_filters, kernel_size=3, stride=2, padding=1, padding_mode='reflect'),
            nn.InstanceNorm2d(out_filters),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.block(x)

class UpConv(nn.Module):
    def __init__(self, in_filters, out_filters):
        super().__init__()
        self.block = nn.Sequential(
            nn.ConvTranspose2d(in_filters, out_filters, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.InstanceNorm2d(out_filters),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.block(x)

class ResBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.block = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(channels, channels, 3),
            nn.InstanceNorm2d(channels),
            nn.ReLU(inplace=True),
            nn.ReflectionPad2d(1),
            nn.Conv2d(channels, channels, 3),
            nn.InstanceNorm2d(channels)
        )

    def forward(self, x):
        return x + self.block(x)

class Generator(nn.Module):
    def __init__(self, img_channels, num_res=9):
        super().__init__()
        self.conv_1 = nn.Sequential(
            nn.Conv2d(img_channels, out_channels=64, kernel_size=7, padding=3, padding_mode='reflect'),
            nn.InstanceNorm2d(64),
            nn.ReLU(inplace=True))

        self.down = nn.Sequential(
            DownConv(64, 128),
            DownConv(128, 256))

        self.bottleneck = nn.Sequential(*[ResBlock(256) for _ in range(num_res)])

        self.up = nn.Sequential(
            UpConv(256, 128),
            UpConv(128, 64))

        self.conv_2 = nn.Conv2d(64, img_channels, kernel_size=7, stride=1, padding=3, padding_mode='reflect')

    def forward(self, x):
        x = self.conv_1(x)
        x = self.down(x)
        x = self.bottleneck(x)
        x = self.up(x)
        return torch.tanh(self.conv_2(x))


In [None]:
# 5. Load model weights
state_dict_path = '/content/drive/MyDrive/gen_monet_dict_1.pth'
model = Generator(3)
model.load_state_dict(torch.load(state_dict_path, map_location=torch.device('cpu')))
model.eval()


In [None]:
# 6. Define transforms
transform = A.Compose([
    A.Resize(256, 256),  # resize for model compatibility
    A.Normalize(mean=[0.5]*3, std=[0.5]*3, max_pixel_value=255.0),
    ToTensorV2()
])


In [None]:
# 7. Define paths
input_dir = '/content/drive/MyDrive/test_photos'
output_dir = '/content/drive/MyDrive/monet_output'

os.makedirs(output_dir, exist_ok=True)

# 8. Convert and save all photos
for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
        path = os.path.join(input_dir, filename)
        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        transformed = transform(image=image)['image'].unsqueeze(0)

        with torch.no_grad():
            output = model(transformed)
            output = output.squeeze().permute(1, 2, 0).numpy()
            output = ((output + 1) * 127.5).astype(np.uint8)

        out_path = os.path.join(output_dir, filename)
        cv2.imwrite(out_path, cv2.cvtColor(output, cv2.COLOR_RGB2BGR))


In [None]:
!pip install torch-fidelity

In [None]:
from torch_fidelity import calculate_metrics
import os



# Optional: Make sure both directories exist
assert os.path.exists(input_dir), f"{input_dir} does not exist!"
assert os.path.exists(output_dir), f"{output_dir} does not exist!"

# Calculate FID
metrics = calculate_metrics(
    input1=input_dir,
    input2=output_dir,
    cuda=True,  # Set to False if you’re not using a GPU
    isc=False,  # Inception Score not needed
    fid=True    # Compute only FID
)

# Output the FID Score
print( metrics['frechet_inception_distance'])


Creating feature extractor "inception-v3-compat" with features ['2048']
Extracting statistics from input 1
Looking for samples non-recursivelty in "/content/drive/MyDrive/test_photos" with extensions png,jpg,jpeg
Found 400 samples, some are lossy-compressed - this may affect metrics
Processing samples
Extracting statistics from input 2
Looking for samples non-recursivelty in "/content/drive/MyDrive/monet_output" with extensions png,jpg,jpeg
Found 400 samples, some are lossy-compressed - this may affect metrics
Processing samples


135.4182421594357


Frechet Inception Distance: 135.4182421594357


In [None]:
!pip install torchvision pillow

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch==2.6.0->torchvision)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch==2.6.0->torchvision)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch==2.6.0->torchvision)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch==2.6.0->torchvision)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch==2.6.0->torchvision)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch==2.6.0->torchvision)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.nn import BCEWithLogitsLoss
from torch.utils.data import Dataset
from PIL import Image
from typing import Tuple
import random
import os

monet = "/content/drive/MyDrive/monet_jpg"
photo =  "/content/drive/MyDrive/photo_jpg"
# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
epochs = 10  # Number of training epochs
max_lr = 2e-4  # Maximum learning rate for scheduler
initial_lr = max_lr / 10  # Initial learning rate for optimizers
weight_decay = 0
class ImageFolderCustom(Dataset):

    def __init__(self, root: str, class_idx:int, max_size:int = -1, transform=None) -> None:
        self.paths = [os.path.join(root, img) for img in os.listdir(root)]
        random.shuffle(self.paths)
        self.paths = self.paths[:max_size]
        self.transform = transform
        self.class_idx = class_idx

    def load_image(self, index: int) -> Image.Image:
        "Opens an image via a path and returns it."
        image_path = self.paths[index]
        return Image.open(image_path)

    def __len__(self) -> int:
        "Returns the total number of samples."
        return len(self.paths)

    def __getitem__(self, index: int) -> Tuple[torch.Tensor, int]:
        "Returns one sample of data, data and label (X, y)."
        img = self.load_image(index)

        if self.transform:
            return self.transform(img), self.class_idx
        else:
            return img, self.class_idx

data_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor()
])

monet_dataset = ImageFolderCustom(root=monet,
                                  class_idx=1,
                                  max_size=32,
                                  transform=data_transform)
photo_dataset = ImageFolderCustom(root=photo,
                                  class_idx=0,
                                  transform=data_transform)
monet_dl = DataLoader(dataset=monet_dataset,
                      batch_size=1,
                      num_workers=1,
                      pin_memory=True,
                      shuffle=True)
photo_dl = DataLoader(dataset=photo_dataset,
                      batch_size=1,
                      num_workers=1,
                      pin_memory=True,
                      shuffle=True)

def downsample(in_channels, out_channels, kernel_size, apply_instancenorm=True):
    result = nn.Sequential(nn.Conv2d(in_channels,out_channels,kernel_size, stride=2, padding=1, bias=False))

    if apply_instancenorm:
        result.append(nn.InstanceNorm2d(out_channels))

    result.append(nn.LeakyReLU(0.2, inplace=True))
    return result

def upsample(in_channels, out_channels, kernel_size, apply_dropout=False):
    result = nn.Sequential(
        nn.ConvTranspose2d(in_channels, out_channels, kernel_size, stride=2, padding=1),
        nn.InstanceNorm2d(out_channels)
    )

    if apply_dropout:
        result.append(nn.Dropout(0.5))

    result.append(nn.ReLU())

    return result
class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        self.down_stack = nn.ModuleList([
            downsample(3, 64, 4, apply_instancenorm=False), # (bs, 128, 128, 64)
            downsample(64, 128, 4), # (bs, 64, 64, 128)
            downsample(128, 256, 4), # (bs, 32, 32, 256)
            downsample(256,512, 4), # (bs, 16, 16, 512)
            downsample(512,512, 4), # (bs, 8, 8, 512)
            downsample(512,512, 4), # (bs, 4, 4, 512)
            downsample(512,512, 4), # (bs, 2, 2, 512)
            downsample(512,512, 4, apply_instancenorm=False)
        ])

        self.up_stack = nn.ModuleList([
            upsample(512,512, 4, apply_dropout=True), # (bs, 2, 2, 1024)
            upsample(1024,512, 4, apply_dropout=True), # (bs, 4, 4, 1024)1536
            upsample(1024,512, 4, apply_dropout=True), # (bs, 8, 8, 1024)
            upsample(1024,512, 4), # (bs, 16, 16, 1024)
            upsample(1024,256, 4), # (bs, 32, 32, 512)
            upsample(512,128, 4), # (bs, 64, 64, 256)
            upsample(256,64, 4), # (bs, 128, 128, 128)
        ])

        self.last = nn.Sequential(
            nn.ConvTranspose2d(128, 3, kernel_size=4, stride=2, padding=1, bias=False),
            nn.Tanh()) # (bs, 256, 256, 3)

    def forward(self,x):
        skips = []
        for down in self.down_stack:
            x = down(x)
            skips.append(x)

        skips = reversed(skips[:-1])

        for up, skip in zip(self.up_stack, skips):
            x = up(x)
            x = torch.cat([x, skip], dim=1)

        x = self.last(x)
        return x
monet_generator = Generator()
photo_generator = Generator()
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()

        self.sequential = nn.Sequential(
            downsample(3, 64, 4, apply_instancenorm=False),
            downsample(64,128, 4),
            downsample(128, 256, 4),
            nn.ZeroPad2d(1),
            nn.Conv2d(256, 512, 4, bias=False),
            nn.InstanceNorm2d(512),
            nn.LeakyReLU(),
            nn.ZeroPad2d(1),
            nn.Conv2d(512, 1, 4),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.sequential(x)
        return x

monet_discriminator = Discriminator()
photo_discriminator = Discriminator()

class CycleGAN(nn.Module):
    def __init__(
        self,
        monet_generator,
        photo_generator,
        monet_discriminator,
        photo_discriminator,
        generator_loss_fn,
        discriminator_loss_fn,
        calc_cycle_loss_fn,
        identity_loss_fn,
        lambda_cycle=10,
    ):
        super(CycleGAN, self).__init__()
        self.monet_generator = monet_generator
        self.photo_generator = photo_generator
        self.monet_discriminator = monet_discriminator
        self.photo_discriminator = photo_discriminator
        self.lambda_cycle = lambda_cycle

        self.generator_loss_fn = generator_loss_fn
        self.discriminator_loss_fn = discriminator_loss_fn
        self.calc_cycle_loss_fn = calc_cycle_loss_fn
        self.identity_loss_fn = identity_loss_fn

    def train_step(self, X_monet, X_photo,
                   monet_generator_optimizer,
                   photo_generator_optimizer,
                   monet_discriminator_optimizer,
                   photo_discriminator_optimizer,
                   device):

        X_monet = X_monet.to(device)
        X_photo = X_photo.to(device)

        fake_monet = self.monet_generator(X_photo)
        cycled_photo = self.photo_generator(fake_monet)

        fake_photo = self.photo_generator(X_monet)
        cycled_monet = self.monet_generator(fake_photo)

        same_monet = self.monet_generator(X_monet)
        same_photo = self.photo_generator(X_photo)

        disc_X_monet = self.monet_discriminator(X_monet)
        disc_fake_monet = self.monet_discriminator(fake_monet)
        disc_cycled_monet = self.monet_discriminator(cycled_monet)

        disc_X_photo = self.photo_discriminator(X_photo)
        disc_fake_photo = self.photo_discriminator(fake_photo)
        disc_cycled_photo = self.photo_discriminator(cycled_photo)

        monet_gen_loss = self.generator_loss_fn(disc_fake_monet)
        photo_gen_loss = self.generator_loss_fn(disc_fake_photo)

        total_cycle_loss = self.calc_cycle_loss_fn(X_monet, cycled_monet, self.lambda_cycle) + self.calc_cycle_loss_fn(X_photo, cycled_photo, self.lambda_cycle)

        total_monet_gen_loss = monet_gen_loss + total_cycle_loss + self.identity_loss_fn(X_monet, same_monet, self.lambda_cycle)
        total_photo_gen_loss = photo_gen_loss + total_cycle_loss + self.identity_loss_fn(X_photo, same_photo, self.lambda_cycle)

        monet_disc_loss = self.discriminator_loss_fn(disc_X_monet, disc_fake_monet)
        photo_disc_loss = self.discriminator_loss_fn(disc_X_photo, disc_fake_photo)

        monet_generator_optimizer.zero_grad()
        photo_generator_optimizer.zero_grad()
        monet_discriminator_optimizer.zero_grad()
        photo_discriminator_optimizer.zero_grad()

        total_monet_gen_loss.backward(retain_graph=True)
        total_photo_gen_loss.backward(retain_graph=True)
        monet_disc_loss.backward(retain_graph=True)
        photo_disc_loss.backward(retain_graph=True)

        grad_clip = 0.1
        nn.utils.clip_grad_value_(monet_generator.parameters(), grad_clip)
        nn.utils.clip_grad_value_(photo_generator.parameters(), grad_clip)
        nn.utils.clip_grad_value_(monet_discriminator.parameters(), grad_clip)
        nn.utils.clip_grad_value_(photo_discriminator.parameters(), grad_clip)

        monet_generator_optimizer.step()
        photo_generator_optimizer.step()
        monet_discriminator_optimizer.step()
        photo_discriminator_optimizer.step()

        return {
            "monet_gen_loss": total_monet_gen_loss,
            "photo_gen_loss": total_photo_gen_loss,
            "monet_disc_loss": monet_disc_loss,
            "photo_disc_loss": photo_disc_loss
        }

BCEWLL = BCEWithLogitsLoss()

def generator_loss(generated):
    real_targets = torch.ones(generated.size(0), 1, 30, 30, device=device)
    return BCEWLL(generated,real_targets)
def discriminator_loss(real, generated):
    real_targets = torch.ones(real.size(0), 1, 30, 30,device=device)
    fake_targets = torch.zeros(generated.size(0), 1, 30, 30, device=device)

    real_loss = BCEWLL(real,real_targets)
    generated_loss = BCEWLL(generated,fake_targets)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss * 0.5
def calc_cycle_loss(real_image, cycled_image, LAMBDA):
    loss1 = torch.mean(torch.abs(real_image - cycled_image))
    return LAMBDA * loss1
def identity_loss(real_image, same_image, LAMBDA):
    loss = torch.mean(torch.abs(real_image - same_image))
    return LAMBDA * 0.5 * loss
from tqdm.notebook import tqdm

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

def fit_one_cycle(epochs, max_lr, cycle_gan_model, monet_dl, photo_dl, device,
                  m_gen_optimizer, p_gen_optimizer, m_disc_optimizer,p_disc_optimizer,
                  weight_decay =0,start_idx = 1):
    torch.cuda.empty_cache()
    history = {"m_gen_lrs" : [],
               "p_gen_lrs" : [],
               "m_disc_lrs" : [],
               "p_disc_lrs" : []}

    monet_gen_losses,photo_gen_losses,monet_disc_losses,photo_disc_losses = [],[],[],[]

    m_gen_sched = torch.optim.lr_scheduler.OneCycleLR(m_gen_optimizer, max_lr, epochs=epochs, steps_per_epoch=len(monet_dl))
    p_gen_sched = torch.optim.lr_scheduler.OneCycleLR(p_gen_optimizer, max_lr, epochs=epochs, steps_per_epoch=len(photo_dl))
    m_disc_sched = torch.optim.lr_scheduler.OneCycleLR(m_disc_optimizer, max_lr, epochs=epochs, steps_per_epoch=len(monet_dl))
    p_disc_sched = torch.optim.lr_scheduler.OneCycleLR(p_disc_optimizer, max_lr, epochs=epochs, steps_per_epoch=len(photo_dl))

    cycle_gan_model.train()

    for epoch in tqdm(range(epochs)):
        torch.cuda.empty_cache()
        combined_dl = zip(monet_dl,photo_dl)
        total_monet_gen_loss = 0
        total_photo_gen_loss = 0
        monet_disc_loss = 0
        photo_disc_loss = 0
        for batch_idx, (real_images_monet, real_images_photo) in enumerate(combined_dl):
            real_monets, _ = real_images_monet
            real_photos, _ = real_images_photo

            losses = cycle_gan_model.train_step(real_monets, real_photos, m_gen_optimizer, p_gen_optimizer,
                                                m_disc_optimizer, p_disc_optimizer, device)

            total_monet_gen_loss += losses["monet_gen_loss"]
            total_photo_gen_loss += losses["photo_gen_loss"]
            monet_disc_loss += losses["monet_disc_loss"]
            photo_disc_loss += losses["photo_disc_loss"]

            monet_gen_losses.append(losses["monet_gen_loss"])
            photo_gen_losses.append(losses["photo_gen_loss"])
            monet_disc_losses.append(losses["monet_disc_loss"])
            photo_disc_losses.append(losses["photo_disc_loss"])

            m_gen_sched.step()
            p_gen_sched.step()
            m_disc_sched.step()
            p_disc_sched.step()

            history["m_gen_lrs"].append(get_lr(m_gen_optimizer))
            history["p_gen_lrs"].append(get_lr(p_gen_optimizer))
            history["m_disc_lrs"].append(get_lr(m_disc_optimizer))
            history["p_disc_lrs"].append(get_lr(p_disc_optimizer))


        total_monet_gen_loss /= batch_idx
        total_photo_gen_loss /= batch_idx
        monet_disc_loss /= batch_idx
        photo_disc_loss /= batch_idx

        print("Epoch [{}/{}], total_monet_gen_loss: {:.4f}, total_photo_gen_loss: {:.4f}, monet_disc_loss: {:.4f}, photo_disc_loss: {:.4f}".format(
        epoch+1, epochs, total_monet_gen_loss, total_photo_gen_loss, monet_disc_loss, photo_disc_loss))
    return history
cycle_gan_model = CycleGAN(monet_generator, photo_generator,
                 monet_discriminator, photo_discriminator,
                 generator_loss,discriminator_loss,
                 calc_cycle_loss,identity_loss).to(device)
m_gen_optimizer = torch.optim.Adam(monet_generator.parameters(), max_lr, betas=(0.5, 0.999))
p_gen_optimizer = torch.optim.Adam(photo_generator.parameters(), max_lr, betas=(0.5, 0.999))
m_disc_optimizer = torch.optim.Adam(monet_discriminator.parameters(), max_lr, betas=(0.5, 0.999))
p_disc_optimizer = torch.optim.Adam(photo_discriminator.parameters(), max_lr, betas=(0.5, 0.999))
history = fit_one_cycle(epochs, max_lr, cycle_gan_model,
                        monet_dl, photo_dl, device,
                        m_gen_optimizer,
                        p_gen_optimizer,
                        m_disc_optimizer,
                        p_disc_optimizer,
                        )
torch.cuda.empty_cache()


  0%|          | 0/10 [00:00<?, ?it/s]

Epoch [1/10], total_monet_gen_loss: 16.4679, total_photo_gen_loss: 16.3772, monet_disc_loss: 0.7688, photo_disc_loss: 0.7573


OutOfMemoryError: CUDA out of memory. Tried to allocate 16.00 MiB. GPU 0 has a total capacity of 14.74 GiB of which 8.12 MiB is free. Process 11207 has 14.73 GiB memory in use. Of the allocated memory 14.60 GiB is allocated by PyTorch, and 11.53 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
test_photo = photo_dataset[random.randint(0,7083)][0]
transforms.ToPILImage()(test_photo)
output = cycle_gan_model.monet_generator(torch.reshape(test_photo, (1, 3, 256, 256)).to(device))
transforms.ToPILImage()(torch.squeeze(output))


SyntaxError: invalid syntax (<ipython-input-4-f7bad79af8c0>, line 322)