In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import glob
import torch
import torch.nn as nn
from torch.utils.data import Dataset,DataLoader
import torchvision.transforms as transforms
from torchvision.utils import make_grid
import random
import os

# Set a seed for reproducibility
random.seed(42)


# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directorimport os


# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!cat /proc/cpuinfo | grep processor

processor	: 0
processor	: 1


In [None]:
# envirement related variable
n_cpu = 2

# Preparing Data

In [None]:
root = "../input/"
root

'../input/'

In [None]:
img_height = 256
img_width = 256
channels = 3

In [None]:
epoch = 0 # epoch to start training from
n_epochs = 200 # number of epochs of training
batch_size = 1 # size of the batches
lr = 0.0002 # adam : learning rate
b1 = 0.5 # adam : decay of first order momentum of gradient
b2 = 0.999 # adam : decay of first order momentum of gradient
decay_epoch = 100 # suggested default : 100 (suggested 'n_epochs' is 200)
                 # epoch from which to start lr decay

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_features):
        super(ResidualBlock, self).__init__()

        self.block = nn.Sequential(
            nn.ReflectionPad2d(1), # Pads the input tensor using the reflection of the input boundary
            nn.Conv2d(in_features, in_features, 3),
            nn.InstanceNorm2d(in_features),
            nn.ReLU(inplace=True),
            nn.ReflectionPad2d(1),
            nn.Conv2d(in_features, in_features, 3),
            nn.InstanceNorm2d(in_features)
        )

    def forward(self, x):
        return x + self.block(x)


class GeneratorResNet(nn.Module):
    def __init__(self, input_shape, num_residual_block):
        super(GeneratorResNet, self).__init__()

        channels = input_shape[0]

        # Initial Convolution Block
        out_features = 64
        model = [
            nn.ReflectionPad2d(channels),
            nn.Conv2d(channels, out_features, 7),
            nn.InstanceNorm2d(out_features),
            nn.ReLU(inplace=True)
        ]
        in_features = out_features

        # Downsampling
        for _ in range(2):
            out_features *= 2
            model += [
                nn.Conv2d(in_features, out_features, 3, stride=2, padding=1),
                nn.InstanceNorm2d(out_features),
                nn.ReLU(inplace=True)
            ]
            in_features = out_features

        # Residual blocks
        for _ in range(num_residual_block):
            model += [ResidualBlock(out_features)]

        # Upsampling
        for _ in range(2):
            out_features //= 2
            model += [
                nn.Upsample(scale_factor=2), # --> width*2, heigh*2
                nn.Conv2d(in_features, out_features, 3, stride=1, padding=1),
                nn.ReLU(inplace=True)
            ]
            in_features = out_features

        # Output Layer
        model += [nn.ReflectionPad2d(channels),
                  nn.Conv2d(out_features, channels, 7),
                  nn.Tanh()
                 ]

        # Unpacking
        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)

In [None]:
class Discriminator(nn.Module):
    def __init__(self, input_shape):
        super(Discriminator, self).__init__()

        channels, height, width = input_shape

        # Calculate output shape of image discriminator (PatchGAN)
        self.output_shape = (1, height//2**4, width//2**4)

        def discriminator_block(in_filters, out_filters, normalize=True):
            """Returns downsampling layers of each discriminator block"""
            layers = [nn.Conv2d(in_filters, out_filters, 4, stride=2, padding=1)]
            if normalize:
                layers.append(nn.InstanceNorm2d(out_filters))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.model = nn.Sequential(
            *discriminator_block(channels, 64, normalize=False),
            *discriminator_block(64, 128),
            *discriminator_block(128,256),
            *discriminator_block(256,512),
            nn.ZeroPad2d((1,0,1,0)),
            nn.Conv2d(512, 1, 4, padding=1)
        )

    def forward(self, img):
        return self.model(img)

In [None]:
criterion_GAN = torch.nn.MSELoss()
criterion_cycle = torch.nn.L1Loss()
criterion_identity = torch.nn.L1Loss()

In [None]:
input_shape = (channels, img_height, img_width) # (3,256,256)
n_residual_blocks = 9 # suggested default, number of residual blocks in generator

G_AB = GeneratorResNet(input_shape, n_residual_blocks)
G_BA = GeneratorResNet(input_shape, n_residual_blocks)
D_A = Discriminator(input_shape)
D_B = Discriminator(input_shape)

In [None]:
cuda = torch.cuda.is_available()

if cuda:
    G_AB = G_AB.cuda()
    G_BA = G_BA.cuda()
    D_A = D_A.cuda()
    D_B = D_B.cuda()

    criterion_GAN.cuda()
    criterion_cycle.cuda()
    criterion_identity.cuda()

In [None]:
def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02) # reset Conv2d's weight(tensor) with Gaussian Distribution
        if hasattr(m, 'bias') and m.bias is not None:
            torch.nn.init.constant_(m.bias.data, 0.0) # reset Conv2d's bias(tensor) with Constant(0)
        elif classname.find('BatchNorm2d') != -1:
            torch.nn.init.normal_(m.weight.data, 1.0, 0.02) # reset BatchNorm2d's weight(tensor) with Gaussian Distribution
            torch.nn.init.constant_(m.bias.data, 0.0) # reset BatchNorm2d's bias(tensor) with Constant(0)

In [None]:
G_AB.apply(weights_init_normal)
G_BA.apply(weights_init_normal)
D_A.apply(weights_init_normal)
D_B.apply(weights_init_normal)

Discriminator(
  (model): Sequential(
    (0): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.2, inplace=True)
    (2): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): InstanceNorm2d(128, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
    (4): LeakyReLU(negative_slope=0.2, inplace=True)
    (5): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (6): InstanceNorm2d(256, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
    (7): LeakyReLU(negative_slope=0.2, inplace=True)
    (8): Conv2d(256, 512, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (9): InstanceNorm2d(512, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
    (10): LeakyReLU(negative_slope=0.2, inplace=True)
    (11): ZeroPad2d((1, 0, 1, 0))
    (12): Conv2d(512, 1, kernel_size=(4, 4), stride=(1, 1), padding=(1, 1))
  )
)

In [None]:
import itertools
# lr = 0.0002
# b1 = 0.5
# b2 = 0.999

optimizer_G = torch.optim.Adam(
    itertools.chain(G_AB.parameters(), G_BA.parameters()), lr=lr, betas=(b1,b2)
)

optimizer_D_A = torch.optim.Adam(
    D_A.parameters(), lr=lr, betas=(b1,b2)
)
optimizer_D_B = torch.optim.Adam(
    D_B.parameters(), lr=lr, betas=(b1,b2)
)

In [None]:
class LambdaLR:
    def __init__(self, n_epochs, offset, decay_start_epoch):
        assert (n_epochs - decay_start_epoch) > 0, "Decay must start before the training session ends!"
        self.n_epochs = n_epochs
        self.offset = offset
        self.decay_start_epoch = decay_start_epoch

    def step(self, epoch):
        return 1.0 - max(0, epoch+self.offset - self.decay_start_epoch)/(self.n_epochs - self.decay_start_epoch)


In [None]:

lr_scheduler_G = torch.optim.lr_scheduler.LambdaLR(
    optimizer_G,
    lr_lambda=LambdaLR(n_epochs, epoch, decay_epoch).step
)

lr_scheduler_D_A = torch.optim.lr_scheduler.LambdaLR(
    optimizer_D_A,
    lr_lambda=LambdaLR(n_epochs, epoch, decay_epoch).step
)
lr_scheduler_D_B = torch.optim.lr_scheduler.LambdaLR(
    optimizer_D_B,
    lr_lambda=LambdaLR(n_epochs, epoch, decay_epoch).step
)


In [None]:
from PIL import Image

transforms_ = [
    transforms.Resize(int(img_height*1.12), Image.BICUBIC),
    transforms.RandomCrop((img_height, img_width)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
]

In [None]:
def to_rgb(image):
    rgb_image = Image.new("RGB", image.size)
    rgb_image.paste(image)
    return rgb_image

In [None]:
print(root)

../input/


In [None]:
paintings_dataset_path = os.path.join(root,"van-gogh-paintings","VincentVanGogh",'**', )
photo_dataset_path = os.path.join(root,"vangogh2photo","vangogh2photo","trainB","*.*")

print(len(glob.glob(paintings_dataset_path)))
print(len(glob.glob(photo_dataset_path,recursive=True)))

0
0


In [None]:
from torch.utils.data import Dataset

class ImageDataset(Dataset):
    def __init__(self, root, transforms_=None, stratified = False, unaligned=False, mode='train',val_percentage=0.1,test_percentage=0.1):
        self.transform = transforms.Compose(transforms_)
        self.unaligned = unaligned
        self.mode = mode
        self.stratified = stratified
        self.files_A = glob.glob(os.path.join(root,"vangogh2photo","vangogh2photo","trainA","*.*"),recursive=True)
        random.shuffle(self.files_A)
        self.files_B = glob.glob(os.path.join(root,"vangogh2photo","vangogh2photo","trainB","*.*"),recursive=True)[:400]
        random.shuffle(self.files_A)
        random.shuffle(self.files_B)
        self.filesA_val_size = int(len(self.files_A) * val_percentage)
        self.filesA_test_size = int(len(self.files_A) * test_percentage)
        self.filesA_train_size = len(self.files_A) -  self.filesA_val_size - self.filesA_test_size
        self.filesB_val_size = int(len(self.files_B) * val_percentage)
        self.filesB_test_size = int(len(self.files_B) * test_percentage)
        self.filesB_train_size = len(self.files_B) -  self.filesB_val_size - self.filesB_test_size
        if self.stratified :
            paintings_cat_path = os.path.join(root,"van-gogh-paintings","VincentVanGogh",'**')
            self.strata =  glob.glob(paintings_cat_path)
            idx = self.files_A_train_size // len(self.strata)
            for file in self.strata :
                     paintings_path = os.path.join(root,"van-gogh-paintings","VincentVanGogh",file,"*.*")
                     paintings = os.glob(paintings_path,recursive=True)
                     random.shuffle(paintings)
                     if self.mode == 'train' :
                             train_size = (len(self.files_A)//len(paintings)) * ((1 - val_percentage - test_percentage) // len(self.strata))
                             print(train_size)
                             self.files_A += paintings[:train_size]
                     if self.mode == 'val' :
                             val_size = (len(self.files_A)//len(paintings)) * (val_percentage  // len(self.strata))
                             print(val_size)
                             self.files_A += paintings[train_size:val_size]
                     if self.mode == 'test' :
                             test_size = (len(self.files_A)//len(paintings)) * (test_percentage  // len(self.strata))
                             print(test_size)
                             self.files_A += paintings[val_size:]
            if self.mode == 'train':
                        self.files_B = self.files_B[:self.filesB_train_size]
            if self.mode == 'val':
                        self.files_B = self.files_B[self.filesB_train_size:self.filesB_val_size]
            if self.mode == 'test':
                        self.files_B = self.files_B[self.filesB_val_size:]
        else :
            if self.mode == 'train':
                        self.files_A = self.files_A[:self.filesA_train_size]
                        self.files_B = self.files_B[:self.filesB_train_size]
            if self.mode == 'val':
                        self.files_A = self.files_A[self.filesA_train_size:self.filesA_val_size+self.filesA_train_size]
                        self.files_B = self.files_B[self.filesB_train_size:self.filesB_val_size+self.filesB_train_size]
            if self.mode == 'test':
                        self.files_A = self.files_A[self.filesA_val_size+self.filesA_train_size:]
                        self.files_B = self.files_B[self.filesB_val_size+self.filesB_train_size:]
    def  __getitem__(self, index):
        image_A = Image.open(self.files_A[index % len(self.files_A)])
        image_B = Image.open(self.files_B[index % len(self.files_B)])
        if image_A.mode != 'RGB':
            image_A = to_rgb(image_A)
        if image_B.mode != 'RGB':
            image_B = to_rgb(image_B)

        item_A = self.transform(image_A)
        item_B = self.transform(image_B)
        return {'A':item_A, 'B':item_B}

    def __len__(self):

        return max(len(self.files_A), len(self.files_B))

In [None]:
len(ImageDataset(root, transforms_=transforms_))

0

In [None]:
dataloader = DataLoader(
    ImageDataset(root, transforms_=transforms_),
    batch_size=batch_size,
    shuffle=True,
    num_workers=n_cpu # 3
)
test_dataloader = DataLoader(
    ImageDataset(root, transforms_=transforms_,mode="test"),
    batch_size=5, # 1
    shuffle=True,
    num_workers=n_cpu # 3
)
val_dataloader = DataLoader(
    ImageDataset(root, transforms_=transforms_, mode='val'),
    batch_size=10,
    shuffle=True,
    num_workers=n_cpu
)

ValueError: num_samples should be a positive integer value, but got num_samples=0

In [None]:
import matplotlib.pyplot as plt
Tensor = torch.cuda.FloatTensor if cuda else torch.Tensor
def sample_images():
    """show a generated sample from the test set"""
    imgs = next(iter(val_dataloader))
    G_AB.eval()
    G_BA.eval()
    real_A = imgs['A'].type(Tensor) #A van gogh
    real_B = imgs['B'].type(Tensor) # B : photo
    fake_B = G_AB(real_A).detach()
    fake_A = G_BA(real_B).detach()
    # Arange images along x-axis
    real_A = make_grid(real_A, nrow=5, normalize=True)
    fake_B = make_grid(fake_B, nrow=5, normalize=True)
    real_B = make_grid(real_B, nrow=5, normalize=True)
    fake_A = make_grid(fake_A, nrow=5, normalize=True)
    # Arange images along y-axis
    image_grid = torch.cat((real_A, fake_B, real_B, fake_A), 1)
    plt.imshow(image_grid.cpu().permute(1,2,0))
    plt.title('Real A vs Fake B | Real B vs Fake A')
    plt.axis('off')
    plt.show();

In [None]:
temp_imgs = next(iter(test_dataloader))
G_AB.eval() # test mode
G_BA.eval() # test mode
print(temp_imgs['A'].shape)
print(temp_imgs['B'].shape)

NameError: name 'test_dataloader' is not defined

In [None]:
temp_real_A = temp_imgs['A'].type(Tensor) # A : van gogh
temp_fake_B = G_AB(temp_real_A).detach()
temp_real_B = temp_imgs['B'].type(Tensor) # B : photo
temp_fake_A = G_BA(temp_real_B).detach()
print(temp_real_A.shape)
print(temp_fake_B.shape)
print(temp_real_B.shape)
print(temp_fake_A.shape)

In [None]:
temp_real_A = make_grid(temp_real_A, nrow=5, normalize=True)
temp_real_B = make_grid(temp_real_B, nrow=5, normalize=True)
temp_fake_A = make_grid(temp_fake_A, nrow=5, normalize=True)
temp_fake_B = make_grid(temp_fake_B, nrow=5, normalize=True)
temp_image_grid = torch.cat((temp_real_A, temp_fake_A, temp_real_B, temp_fake_B), 1)
plt.imshow(temp_image_grid.cpu().permute(1,2,0))
plt.title('Real A | Fake B | Real B | Fake A ')
plt.axis('off');

In [None]:
batch_size

In [None]:
class ReplayBuffer():
    def __init__(self, max_size=50):
        assert (max_size > 0), 'Empty buffer or trying to create a black hole. Be careful.'
        self.max_size = max_size
        self.data = []

    def push_and_pop(self, data):
        to_return = []
        for element in data.data:
            element = torch.unsqueeze(element, 0)
            if len(self.data) < self.max_size:
                self.data.append(element)
                to_return.append(element)
            else:
                if random.uniform(0,1) > 0.5:
                    i = random.randint(0, self.max_size-1)
                    to_return.append(self.data[i].clone())
                    self.data[i] = element
                else:
                    to_return.append(element)
        return torch.cat(to_return)

In [None]:
def log_generated_images(epoch):
    imgs = next(iter(val_dataloader))
    G_AB.eval()
    G_BA.eval()
    real_A = imgs['A'].type(Tensor) #A van gogh
    real_B = imgs['B'].type(Tensor) # B : photo
    fake_B = G_AB(real_A).detach()
    fake_A = G_BA(real_B).detach()
    # Arange images along x-axis
    real_A = make_grid(real_A, nrow=5, normalize=True)
    real_B = make_grid(real_B, nrow=5, normalize=True)
    fake_A = make_grid(fake_A, nrow=5, normalize=True)
    real_A = real_A.permute(1, 2, 0).cpu().numpy()
    real_A = (real_A * 255).astype(np.uint8)  # Optional: scale to [0, 255]
    fake_A = fake_A.permute(1, 2, 0).cpu().numpy()
    fake_A = (fake_A * 255).astype(np.uint8)
    real_B = real_B.permute(1, 2, 0).cpu().numpy()
    real_B = (real_B * 255).astype(np.uint8)
    # Optional: scale to
    wandb.log({"generated_images": [wandb.Image(fake_A, caption=f"van gogh paintings Epoch {epoch}"),wandb.Image(real_B, caption=f" real images  Epoch {epoch}"),wandb.Image(real_A, caption=f" real van gogh paintings  Epoch {epoch}")]})