In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [3]:
!pip install kaggle -q



In [4]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import glob
import zipfile
import io
import subprocess
from pathlib import Path

In [5]:
class MonetPhotoDataset(Dataset):
    def __init__(self, monet_dir, photo_dir, transform=None):
        self.monet_images = sorted(glob.glob(os.path.join(monet_dir, "*.jpg")))
        self.photo_images = sorted(glob.glob(os.path.join(photo_dir, "*.jpg")))
        self.transform = transform

    def __len__(self):
        return max(len(self.monet_images), len(self.photo_images))

    def __getitem__(self, idx):
        monet_idx = idx % len(self.monet_images)
        photo_idx = idx % len(self.photo_images)
        
        monet_img = Image.open(self.monet_images[monet_idx]).convert('RGB')
        photo_img = Image.open(self.photo_images[photo_idx]).convert('RGB')
        
        if self.transform:
            monet_img = self.transform(monet_img)
            photo_img = self.transform(photo_img)
        
        return {'monet': monet_img, 'photo': photo_img}

In [6]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        def conv_block(in_channels, out_channels, kernel_size=4, stride=2, padding=1, norm=True, relu=True):
            layers = [nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=not norm)]
            if norm:
                layers.append(nn.BatchNorm2d(out_channels))
            if relu:
                layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers
        
        self.model = nn.Sequential(
            *conv_block(3, 64, norm=False),   
            *conv_block(64, 128),             
            *conv_block(128, 256),           
            *conv_block(256, 512),            
            nn.Conv2d(512, 1, kernel_size=3, stride=1, padding=1)  
        )
    
    def forward(self, x):
        return self.model(x)

In [7]:
def conv_block(in_channels, out_channels, kernel_size=3, stride=1, padding=1, use_norm=True, use_relu=True):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=not use_norm)]
    if use_norm:
        layers.append(nn.InstanceNorm2d(out_channels))
    if use_relu:
        layers.append(nn.ReLU(inplace=True))
    return nn.Sequential(*layers)

def upconv_block(in_channels, out_channels, kernel_size=3, stride=1, padding=1, output_padding=0, use_norm=True, use_relu=True):
    layers = [nn.ConvTranspose2d(in_channels, out_channels, kernel_size, stride, padding, output_padding=output_padding, bias=not use_norm)]
    if use_norm:
        layers.append(nn.InstanceNorm2d(out_channels))
    if use_relu:
        layers.append(nn.ReLU(inplace=True))
    return nn.Sequential(*layers)

In [8]:
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.block = nn.Sequential(
            conv_block(channels, channels, kernel_size=3, stride=1, padding=1, use_norm=True, use_relu=True),
            conv_block(channels, channels, kernel_size=3, stride=1, padding=1, use_norm=True, use_relu=False)
        )
    
    def forward(self, x):
        return x + self.block(x)

In [9]:
class Generator(nn.Module):
    def __init__(self, num_residual_blocks=9):
        super(Generator, self).__init__()
        self.initial = nn.Sequential(
            nn.ReflectionPad2d(3),
            conv_block(3, 64, kernel_size=7, stride=1, padding=0, use_norm=True, use_relu=True)
        )
        self.down1 = conv_block(64, 128, kernel_size=3, stride=2, padding=1, use_norm=True, use_relu=True)
        self.down2 = conv_block(128, 256, kernel_size=3, stride=2, padding=1, use_norm=True, use_relu=True)
        self.res_blocks = nn.Sequential(
            *[ResidualBlock(256) for _ in range(num_residual_blocks)]
        )
        self.up1 = upconv_block(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1, use_norm=True, use_relu=True)
        self.up2 = upconv_block(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1, use_norm=True, use_relu=True)
        self.final = nn.Sequential(
            nn.ReflectionPad2d(3),
            nn.Conv2d(64, 3, kernel_size=7, stride=1, padding=0),
            nn.Tanh()
        )
    
    def forward(self, x):
        x = self.initial(x)
        x = self.down1(x)
        x = self.down2(x)
        x = self.res_blocks(x)
        x = self.up1(x)
        x = self.up2(x)
        x = self.final(x)
        return x

In [10]:
def save_images(images, output_dir, prefix, start_idx):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    zip_path = os.path.join(output_dir, 'images.zip')
    with zipfile.ZipFile(zip_path, 'a', zipfile.ZIP_DEFLATED) as zf:
        for i, img in enumerate(images):
            img = (img * 0.5 + 0.5) * 255 
            img = img.detach().permute(1, 2, 0).cpu().numpy().astype('uint8')  
            pil_img = Image.fromarray(img)
            img_buffer = io.BytesIO()
            pil_img.save(img_buffer, format='JPEG')
            zf.writestr(f'{prefix}_{start_idx + i}.jpg', img_buffer.getvalue())

In [11]:
def train_cyclegan(monet_dir, photo_dir, output_dir, epochs=100, batch_size=1, lr=0.0002, beta1=0.5):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    dataset = MonetPhotoDataset(monet_dir, photo_dir, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    
    gen_photo_to_monet = Generator().to(device)
    gen_monet_to_photo = Generator().to(device)
    disc_monet = Discriminator().to(device)
    disc_photo = Discriminator().to(device)
    
    optimizer_G = torch.optim.Adam(
        list(gen_photo_to_monet.parameters()) + list(gen_monet_to_photo.parameters()),
        lr=lr, betas=(beta1, 0.999)
    )
    optimizer_D_monet = torch.optim.Adam(disc_monet.parameters(), lr=lr, betas=(beta1, 0.999))
    optimizer_D_photo = torch.optim.Adam(disc_photo.parameters(), lr=lr, betas=(beta1, 0.999))
    
    criterion_GAN = nn.MSELoss()
    criterion_cycle = nn.L1Loss()
    criterion_identity = nn.L1Loss()
    
    lambda_cycle = 10.0
    lambda_identity = 5.0
    for epoch in range(epochs):
        for i, batch in enumerate(dataloader):
            real_monet = batch['monet'].to(device)
            real_photo = batch['photo'].to(device)
            batch_size = real_monet.size(0)
            
            real_label = torch.ones(batch_size, 1, 16, 16).to(device)
            fake_label = torch.zeros(batch_size, 1, 16, 16).to(device)
            
            optimizer_G.zero_grad()
            
            identity_monet = gen_photo_to_monet(real_monet)
            identity_photo = gen_monet_to_photo(real_photo)
            loss_identity = (criterion_identity(identity_monet, real_monet) + criterion_identity(identity_photo, real_photo)) * lambda_identity
            
            fake_monet = gen_photo_to_monet(real_photo)
            fake_photo = gen_monet_to_photo(real_monet)
            pred_fake_monet = disc_monet(fake_monet)
            pred_fake_photo = disc_photo(fake_photo)
            loss_GAN_monet = criterion_GAN(pred_fake_monet, real_label)
            loss_GAN_photo = criterion_GAN(pred_fake_photo, real_label)
            
            cycled_photo = gen_monet_to_photo(fake_monet)
            cycled_monet = gen_photo_to_monet(fake_photo)
            loss_cycle = (criterion_cycle(cycled_photo, real_photo) + criterion_cycle(cycled_monet, real_monet)) * lambda_cycle
            
            loss_G = loss_GAN_monet + loss_GAN_photo + loss_cycle + loss_identity
            loss_G.backward()
            optimizer_G.step()
            
            optimizer_D_monet.zero_grad()
            pred_real_monet = disc_monet(real_monet)
            loss_D_real_monet = criterion_GAN(pred_real_monet, real_label)
            pred_fake_monet = disc_monet(fake_monet.detach())
            loss_D_fake_monet = criterion_GAN(pred_fake_monet, fake_label)
            loss_D_monet = (loss_D_real_monet + loss_D_fake_monet) * 0.5
            loss_D_monet.backward()
            optimizer_D_monet.step()
            
            optimizer_D_photo.zero_grad()
            pred_real_photo = disc_photo(real_photo)
            loss_D_real_photo = criterion_GAN(pred_real_photo, real_label)
            pred_fake_photo = disc_photo(fake_photo.detach())
            loss_D_fake_photo = criterion_GAN(pred_fake_photo, fake_label)
            loss_D_photo = (loss_D_real_photo + loss_D_fake_photo) * 0.5
            loss_D_photo.backward()
            optimizer_D_photo.step()
            
            if i % 100 == 0:
                print(f"Epoch [{epoch+1}/{epochs}] Batch [{i}/{len(dataloader)}] "
                      f"Loss_G: {loss_G.item():.4f} Loss_D_monet: {loss_D_monet.item():.4f} Loss_D_photo: {loss_D_photo.item():.4f}")
                
                save_images(fake_monet, output_dir, f'epoch_{epoch+1}_batch_{i}', i)
    
    torch.save(gen_photo_to_monet.state_dict(), os.path.join(output_dir, 'gen_photo_to_monet.pth'))
    torch.save(gen_monet_to_photo.state_dict(), os.path.join(output_dir, 'gen_monet_to_photo.pth'))



In [None]:
!mkdir -p /root/.config/kaggle
!mv ./kaggle.json /root/.config/kaggle/kaggle.json
!chmod 600 /root/.config/kaggle/kaggle.json

In [None]:
def setup_kaggle_competition(competition_name="gan-getting-started", download_dir="./data"):
    """
    Download and extract the Kaggle competition dataset using the Kaggle CLI.
    Returns paths to monet_jpg and photo_jpg directories.
    """
    try:
        subprocess.run(["kaggle", "--version"], check=True, capture_output=True)
    except subprocess.CalledProcessError:
        raise RuntimeError("Kaggle CLI is not installed. Install it with 'pip install kaggle'.")

    kaggle_config_dir = os.path.expanduser("~/.kaggle")
    kaggle_root_config_dir = "/root/.config/kaggle"
    if not os.path.exists(os.path.join(kaggle_config_dir, "kaggle.json")) and \
       not os.path.exists(os.path.join(kaggle_root_config_dir, "kaggle.json")) and \
       not (os.getenv("KAGGLE_USERNAME") and os.getenv("KAGGLE_KEY")):
        raise RuntimeError(
            "Kaggle authentication failed. Ensure kaggle.json is in ~/.kaggle/ or /root/.config/kaggle/, "
            "or set KAGGLE_USERNAME and KAGGLE_KEY environment variables. "
            "See https://github.com/Kaggle/kaggle-api/ for setup instructions."
        )

    download_path = Path(download_dir)
    download_path.mkdir(parents=True, exist_ok=True)

    try:
        result = subprocess.run(
            ["kaggle", "competitions", "download", "-c", competition_name, "-p", str(download_path)],
            check=True, capture_output=True, text=True
        )
    except subprocess.CalledProcessError as e:
        raise RuntimeError(f"Failed to download competition dataset: {e.stderr}")

    zip_files = list(download_path.glob("*.zip"))
    if not zip_files:
        raise FileNotFoundError("No zip file found in the download directory.")
    dataset_zip = zip_files[0] 

    with zipfile.ZipFile(dataset_zip, 'r') as zip_ref:
        zip_ref.extractall(download_path)

    monet_dir = download_path / "monet_jpg"
    photo_dir = download_path / "photo_jpg"

    if not monet_dir.exists():
        raise FileNotFoundError(f"Monet directory not found at {monet_dir}")
    if not photo_dir.exists():
        raise FileNotFoundError(f"Photo directory not found at {photo_dir}")

    return str(monet_dir), str(photo_dir)

def main():
    output_dir = "./generated_images"
    os.makedirs(output_dir, exist_ok=True)

    try:
        monet_dir, photo_dir = setup_kaggle_competition()
    except Exception as e:
        print(f"Error setting up dataset: {e}")
        return

    try:
        train_cyclegan(
            monet_dir=monet_dir,
            photo_dir=photo_dir,
            output_dir=output_dir,
            epochs=3,
            batch_size=1,
            lr=0.0002,
            beta1=0.5
        )
        print(f"Training completed. Generated images saved to {output_dir}")
    except Exception as e:
        print(f"Error during training: {e}")

if __name__ == "__main__":
    main()