In [21]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [22]:
import zipfile
from PIL import Image

import itertools

import torch
import torch.nn as nn
import torch.nn.init as init
import torchvision.transforms as transforms

from torch.utils.data import Dataset, random_split, DataLoader

In [23]:
!pip install gdown
import gdown
url = 'https://drive.google.com/file/d/1Pngq97neXTthfB6TO3_c_58gFGrm8jAK/view?usp=sharing'
!gdown --id 1Pngq97neXTthfB6TO3_c_58gFGrm8jAK -O ResNet.ckpt

Downloading...
From (original): https://drive.google.com/uc?id=18TImeANVDmgI7jc5_QkKkksc-yQABtmE
From (redirected): https://drive.google.com/uc?id=18TImeANVDmgI7jc5_QkKkksc-yQABtmE&confirm=t&uuid=ba237bd5-e6aa-4dc1-8ca7-2845390b7f7a
To: /kaggle/working/ResNet.ckpt
100%|█████████████████████████████████████████| 255M/255M [00:01<00:00, 212MB/s]


## Model

In [24]:
def Upsample(in_ch, out_ch, use_dropout=True, dropout_ratio=0.5):
    if use_dropout:
        return nn.Sequential(
            nn.ConvTranspose2d(in_ch, out_ch, 3, stride=2, padding=1, output_padding=1),
            nn.InstanceNorm2d(out_ch),
            nn.Dropout(dropout_ratio),
            nn.GELU()
        )
    else:
        return nn.Sequential(
            nn.ConvTranspose2d(in_ch, out_ch, 3, stride=2, padding=1, output_padding=1),
            nn.InstanceNorm2d(out_ch),
            nn.GELU()
        )

In [25]:
def Convlayer(in_ch, out_ch, kernel_size=3, stride=2, use_leaky=True, use_inst_norm=True, use_pad=True):
    if use_pad:
        conv = nn.Conv2d(in_ch, out_ch, kernel_size, stride, 1, bias=True)
    else:
        conv = nn.Conv2d(in_ch, out_ch, kernel_size, stride, 0, bias=True)

    if use_leaky:
        actv = nn.LeakyReLU(negative_slope=0.2, inplace=True)
    else:
        actv = nn.GELU()

    if use_inst_norm:
        norm = nn.InstanceNorm2d(out_ch)
    else:
        norm = nn.BatchNorm2d(out_ch)

    return nn.Sequential(
        conv,
        norm,
        actv
    )

In [26]:
class Resblock(nn.Module):
    def __init__(self, in_features, use_dropout=True, dropout_ratio=0.5):
        super().__init__()
        layers = list()
        layers.append(nn.ReflectionPad2d(1))
        layers.append(Convlayer(in_features, in_features, 3, 1, False, use_pad=False))
        layers.append(nn.Dropout(dropout_ratio))
        layers.append(nn.ReflectionPad2d(1))
        layers.append(nn.Conv2d(in_features, in_features, 3, 1, padding=0, bias=True))
        layers.append(nn.InstanceNorm2d(in_features))
        self.res = nn.Sequential(*layers)

    def forward(self, x):
        return x + self.res(x)

In [27]:
class Generator(nn.Module):
    def __init__(self, in_ch, out_ch, num_res_blocks=6):
        super().__init__()
        model = list()
        model.append(nn.ReflectionPad2d(3))
        model.append(Convlayer(in_ch, 64, 7, 1, False, True, False))
        model.append(Convlayer(64, 128, 3, 2, False))
        model.append(Convlayer(128, 256, 3, 2, False))
        for _ in range(num_res_blocks):
            model.append(Resblock(256))
        model.append(Upsample(256, 128))
        model.append(Upsample(128, 64))
        model.append(nn.ReflectionPad2d(3))
        model.append(nn.Conv2d(64, out_ch, kernel_size=7, padding=0))
        model.append(nn.Tanh())

        self.gen = nn.Sequential(*model)

    def forward(self, x):
        return self.gen(x)

In [28]:
def init_weights(net, init_type='normal', gain=0.02):
    def init_func(m):
        classname = m.__class__.__name__
        if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear') != -1):
            init.normal_(m.weight.data, 0.0, gain)
            if hasattr(m, 'bias') and m.bias is not None:
                init.constant_(m.bias.data, 0.0)
        elif classname.find('BatchNorm2d') != -1:
            init.normal_(m.weight.data, 1.0, gain)
            init.constant_(m.bias.data, 0.0)
    net.apply(init_func)

In [29]:
class Discriminator(nn.Module):
    def __init__(self, in_ch, num_layers=4):
        super().__init__()
        model = list()
        model.append(nn.Conv2d(in_ch, 64, 4, stride=2, padding=1))
        model.append(nn.LeakyReLU(0.2, inplace=True))
        for i in range(1, num_layers):
            in_chs = 64 * 2**(i-1)
            out_chs = in_chs * 2
            if i == num_layers -1:
                model.append(Convlayer(in_chs, out_chs, 4, 1))
            else:
                model.append(Convlayer(in_chs, out_chs, 4, 2))
        model.append(nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1))
        self.disc = nn.Sequential(*model)

    def forward(self, x):
        return self.disc(x)

In [30]:
def load_checkpoint(ckpt_path, map_location=None):
    ckpt = torch.load(ckpt_path, map_location=map_location, weights_only=False)
    print(' [*] Loading checkpoint from %s succeed!' % ckpt_path)
    return ckpt

In [31]:
class CycleGAN(object):
    def __init__(self, in_ch, out_ch, device, start_lr=2e-4, lmbda=10, idt_coef=0.5, decay_epoch=0):
        self.epochs = 1
        self.decay_epoch = decay_epoch if decay_epoch > 0 else int(self.epochs/2)
        self.lmbda = lmbda
        self.idt_coef = idt_coef
        self.device = device
        self.gen_mtp = Generator(in_ch, out_ch)
        self.gen_ptm = Generator(in_ch, out_ch)
        self.desc_m = Discriminator(in_ch)
        self.desc_p = Discriminator(in_ch)
        self.init_models()
        self.mse_loss = nn.MSELoss()
        self.l1_loss = nn.L1Loss()
        self.adam_gen = torch.optim.Adam(itertools.chain(self.gen_mtp.parameters(), self.gen_ptm.parameters()),
                                         lr = start_lr, betas=(0.5, 0.999))
        self.adam_desc = torch.optim.Adam(itertools.chain(self.desc_m.parameters(), self.desc_p.parameters()),
                                          lr=start_lr, betas=(0.5, 0.999))

    def init_models(self):
        init_weights(self.gen_mtp)
        init_weights(self.gen_ptm)
        init_weights(self.desc_m)
        init_weights(self.desc_p)
        self.gen_mtp = self.gen_mtp.to(self.device)
        self.gen_ptm = self.gen_ptm.to(self.device)
        self.desc_m = self.desc_m.to(self.device)
        self.desc_p = self.desc_p.to(self.device)

    def load_weights(self, ckpt_path):
        ckpt = load_checkpoint(ckpt_path, self.device)
        # self.start_epoch = ckpt['epoch']
        self.gen_mtp.load_state_dict(ckpt['gen_mtp'])
        self.gen_ptm.load_state_dict(ckpt['gen_ptm'])
        self.desc_m.load_state_dict(ckpt['desc_m'])
        self.desc_p.load_state_dict(ckpt['desc_p'])
        self.adam_gen.load_state_dict(ckpt['optimizer_gen'])
        self.adam_desc.load_state_dict(ckpt['optimizer_desc'])

## Load model

In [32]:
device = torch.device('cuda')

def load_my_model(path):
    # 1. Initialize the architecture
    model = CycleGAN(3, 3, device)
    
    # 2. Load the weights
    model.load_weights(path)

    return model

In [33]:
import glob

checkpoint_dir = '/kaggle/working'
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoints = glob.glob(os.path.join(checkpoint_dir, '*.ckpt'))

In [34]:
model = load_my_model(checkpoints[0])

 [*] Loading checkpoint from /kaggle/working/ResNet.ckpt succeed!


## DataLoader

In [35]:
class ImageDataset(Dataset):
    def __init__(self, photo_dir, size=(256, 256), normalize=True):
        super().__init__()
        self.photo_dir = photo_dir
        self.photo_idx = dict()
        if normalize:
            self.transform = transforms.Compose([
                transforms.Resize(size),
                transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
            ])
        else:
            self.transform = transforms.Compose([
                transforms.Resize(size),
                transforms.ToTensor()
            ])
        for i, fl in enumerate(os.listdir(self.photo_dir)):
            self.photo_idx[i] = fl

    def __getitem__(self, idx):
        img_name = self.photo_idx[idx]
        photo_path = os.path.join(self.photo_dir, img_name)
        
        photo_img = Image.open(photo_path)
        photo_img = self.transform(photo_img)
        
        return photo_img

    def __len__(self):
        return len(self.photo_idx.keys())

In [36]:
img_ds = ImageDataset('/kaggle/input/gan-getting-started/photo_jpg/', normalize=False)

In [37]:
img_dl = DataLoader(img_ds, batch_size=1, pin_memory=True)

In [38]:
def unnorm(img, mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]):
    for t, m, s in zip(img, mean, std):
        t.mul_(s).add_(s)

    return img

In [39]:
import matplotlib.pyplot as plt

def show_image(photo_img, monet_img):
    f = plt.figure(figsize=(5, 5))
    
    f.add_subplot(1, 2, 1)
    plt.title('Photo')
    photo_img = unnorm(photo_img)
    plt.imshow(photo_img[0].permute(1, 2, 0))
    
    f.add_subplot(1, 2, 2)
    plt.title('Monet')
    monet_img = unnorm(monet_img)
    plt.imshow(monet_img[0].permute(1, 2, 0))

In [None]:
# Create a directory for images
os.makedirs('../images', exist_ok=True)

it = iter(img_dl)
print(len(it))
# Generate 7,000+ images
photo_path = '/kaggle/input/gan-getting-started/photo_jpg/'
for i in range(len(it)):
    # Preprocess, Predict with model, Postprocess
    photo_img = next(it)
    
    pred_monet = model.gen_ptm(photo_img.to(device)).cpu().detach()
    # show_image(photo_img, pred_monet)
    # Save as JPG
    img_array = pred_monet[0].permute(1, 2, 0).numpy()
    img_array = (img_array * 255).clip(0, 255).astype('uint8')
    im = Image.fromarray(img_array)
    im.save(f"../images/{i+1}.jpg")
    if (i + 1) % 500 == 0:
        print(f'{i + 1} is generated')


In [None]:
import shutil
import os

# Define the folder path
folder_to_zip = '../images'
output_filename = 'images' # Will result in images.zip

# Use shutil for a more 'Pythonic' and robust zipping process
shutil.make_archive(output_filename, 'zip', folder_to_zip)

print(f"Zip complete! Archive size: {os.path.getsize('images.zip') / 1024**2:.2f} MB")