<center><font color='#0C2577'>Generating Art: Sampling & Distribution</font></center>

<center><sup>Team Artemis</sup></center>

<center><img src="https://i.imgur.com/BC2sAdw.png" alt="Alt text that describes the graphic" title="Universiteit Leiden"/></center>

<center><font color='#0C2577'><b>Leiden University</b></font></center>

<center><sub><br>Based on </br><a href="https://www.ritchievink.com/blog/2018/07/16/generative-adversarial-networks-in-pytorch-the-distribution-of-art/">Generative Adversarial Networks in Pytorch: The distribution of Art</a></sub><center>



## PyTorch GAN

### Import Modules

In [None]:
# Data Processing
import numpy as np
import pandas as pd

# PyTorch Compute
import torch
import torch.nn as nn
import torch.utils.data
import torch.nn.functional as F
from torchvision import datasets, transforms

# Image Processing
from PIL import Image

# Image Rendering
import matplotlib.pyplot as plt

# Utility
import os
from google.colab import files
from IPython.display import clear_output

# Clean Repository
!rm -r sample_data

Initialize the CUDA Device

In [None]:
# PyTorch Initialization
device = ('cuda' if torch.cuda.is_available() else 'cpu')
print("Torch Version\t", torch.__version__)
print("Using Device\t", torch.cuda.get_device_name(0))

Torch Version	 1.7.0+cu101
Using Device	 Tesla T4


### Gather Data

Import Kaggle API Key

In [None]:
# Generate API Key
!pip install -q kaggle
api = '{\"username\":\"KAGGLE_USERNAME\",\"key\":\"KAGGLE_KEY\"}'

# Write API Key
with open('/kaggle.json', 'w') as f:
  f.write(api)

# Configure API Key
!mkdir ~/.kaggle
!cp /kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

Get Data from Kaggle

In [None]:
# Download Data Range from Painter by Numbers Repository [4-5GB per Chunk]
for i in range(1,5):

  src = "train_" + str(i) + ".zip" 
  dst = "/content/train_" + str(i) +".zip"

  !kaggle competitions download -f $src -p /content/ -c painter-by-numbers
  !unzip $dst -d /content/
  !rm $src

  clear_output()

### Process Data

Detect Oversized Images

<sub>Threshold: 50,000,000 px, Limit: 89,478,485 px</sub>

In [None]:
# Increase Buffer for Reading Headers
Image.MAX_IMAGE_PIXELS = 1000000000

rml = []

# Search Data Range
for i in range(1,5):

  src = "/content/train_" + str(i)

  # Folder Tree Walk
  for subdir, dirs, files in os.walk(src):
      for file in files:

          # Generate Filepath
          filepath = subdir + os.sep + file

          # Math Image Files
          if filepath.endswith(".jpg"):

              # Add Oversized Images to Removal List
              img = Image.open(filepath)
              if img.size[0] * img.size[1] > 50000000:
                print(filepath, ":", img.size)
                rml.append(filepath)

/content/train_1/102257.jpg : (15530, 6911)
/content/train_1/100545.jpg : (7682, 9252)
/content/train_2/29855.jpg : (25528, 3000)
/content/train_3/32649.jpg : (16902, 7638)
/content/train_3/38814.jpg : (12004, 7867)
/content/train_3/30403.jpg : (6025, 8785)
/content/train_3/33557.jpg : (24890, 30000)
/content/train_3/3321.jpg : (11480, 11480)
/content/train_4/46794.jpg : (10931, 8764)
/content/train_4/4436.jpg : (16905, 7636)


Sanitize Training Data

In [None]:
# Remove Oversized Images
for d in rml:
  os.remove(d)

### Generator Model

In [None]:
# Torch Generator Model
class Generator(nn.Module):

    def __init__(self, input_size=200, alpha=0.2):

        # Torch Callback
        super(Generator, self).__init__()

        # Parameters
        kernel_size = 4
        padding = 1
        stride = 2
        
        # Input: (r,g,b,a) * 4 * 64 ~ 4 * 4 * 1024
        self.input = nn.Linear(input_size, 4 * 4 * 1024)
        self.net = nn.Sequential(
            nn.BatchNorm2d(1024),
            nn.LeakyReLU(alpha),
            nn.ConvTranspose2d(1024, 512, kernel_size, stride, padding),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(alpha),
            nn.ConvTranspose2d(512, 512, kernel_size, stride, padding),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(alpha),
            nn.ConvTranspose2d(512, 512, kernel_size, stride, padding),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(alpha),
            nn.ConvTranspose2d(512, 256, kernel_size, stride, padding),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(alpha),
            nn.ConvTranspose2d(256, 128, kernel_size, stride, padding),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(alpha),
            nn.ConvTranspose2d(128, 3, kernel_size, stride, padding),
            nn.Tanh()
        )
  
    # Feed Forward Step
    def forward(self, z):
        x = self.input(z)
        return self.net(x.view(-1, 1024, 4, 4))

# Training Routine
def train_gen(gen, batch_size):

    # Generate Latent Space
    z = torch.tensor(np.random.normal(0, 1, (batch_size, 200)), dtype=torch.float32)
    
    # Move Params to GPU Memory
    if next(gen.parameters()).is_cuda:
        z = z.cuda()
    
    # Clear Gradient
    gen.zero_grad()

    # Feed Forward Random Latent Space to Generator
    generated = gen(z)

    # Feed Foward Generator Latent Space to Discriminator
    y_fake = dis(generated)

    # Calculate Gradient
    ones = torch.ones_like(y_fake)
    if next(gen.parameters()).is_cuda:
        ones = ones.cuda()
    loss = F.binary_cross_entropy_with_logits(y_fake, ones)

    # Backpropagate
    loss.backward()
    optimizer_gen.step()
    
    return loss, generated

### Discriminator Model

In [None]:
# Torch Discriminator Model
class Discriminator(nn.Module):

    def __init__(self, alpha=0.2):

        # Torch Callback
        super(Discriminator, self).__init__()

        # Parameters
        kernel_size = 4
        padding = 1
        stride = 2
        
        # Accepts Latent Space of Generator (Last Layer)
        self.net = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size, stride, padding),
            nn.LeakyReLU(alpha),
            nn.Conv2d(128, 256, kernel_size, stride, padding),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(alpha),
            nn.Conv2d(256, 512, kernel_size, stride, padding),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(alpha),
            nn.Conv2d(512, 512, kernel_size, stride, padding),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(alpha),
            nn.Conv2d(512, 512, kernel_size, stride, padding),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(alpha),
            nn.Conv2d(512, 1024, kernel_size, stride, padding),
            nn.BatchNorm2d(1024),
            nn.LeakyReLU(alpha),
        )
        self.output = nn.Linear(4 * 4 * 1024, 1)
        # Output Gradient in Generator Dim
        
    # Feed Forward Step
    def forward(self, x):
        x = self.net(x)
        x = torch.reshape(x, (-1, 4 * 4 * 1024))
        x = self.output(x)
        
        if self.training:
            return x
        
        return F.sigmoid(x)

# Training Routine
def train_dis(dis, gen, x):

    # Generate Latent Space
    z = torch.tensor(np.random.normal(0, 1, (batch_size, 200)), dtype=torch.float32)

    # Move Params to GPU Memory
    if next(gen.parameters()).is_cuda:
        x = x.cuda()
        z = z.cuda()

    # Clear Gradient
    dis.zero_grad()

    # Baseline Discriminant
    y_real_pred = dis(x)
    
    # Batch Normalization
    idx = np.random.uniform(0, 1, y_real_pred.shape)
    idx = np.argwhere(idx < 0.03)

    ones = np.ones(y_real_pred.shape) + np.random.uniform(-0.1, 0.1)
    ones[idx] = 0

    zeros = np.zeros(y_real_pred.shape) + np.random.uniform(0, 0.2)
    zeros[idx] = 1

    ones = torch.from_numpy(ones).float()
    zeros = torch.from_numpy(zeros).float()

    # Move Normalized Params to Memory
    if next(gen.parameters()).is_cuda:
        ones = ones.cuda()
        zeros = zeros.cuda()

    # Calculate Gradient
    loss_real = F.binary_cross_entropy_with_logits(y_real_pred, ones)

    # Generator Switch
    generated = gen(z)
    y_fake_pred = dis(generated)

    # Calculate Second Gradient
    loss_fake = F.binary_cross_entropy_with_logits(y_fake_pred, zeros)

    # Combine Gradients
    loss = loss_fake + loss_real

    # Backpropagate
    loss.backward()
    optimizer_dis.step()
    
    return loss

### Data Loader

In [None]:
# Image Folder Data Loader using Pillow
class ImageFolderEX(datasets.ImageFolder):

    # Pillow GetItem Overloading
    def __getitem__(self, index):
        # Default Image Iterator
        def get_img(index):
            path, label = self.imgs[index]
            try:
                img = self.loader(os.path.join(self.root, path))
            except:
                img = get_img(index + 1)
            return img
        img = get_img(index)

        # Normalization Step
        out = self.transform(img) * 2 - 1
        return out

# Use Pillow to Downsample Images to 256x256 and Normalize 
trans = transforms.Compose([
    transforms.Resize((256, 256), interpolation=2), 
    transforms.ToTensor(),
])

# Initialize Dataloader
data = torch.utils.data.DataLoader(ImageFolderEX('/content/', trans), 
	batch_size=64, shuffle=True, drop_last=True, num_workers=0)
x = next(iter(data))
data.dataset

Dataset ImageFolderEX
    Number of datapoints: 36629
    Root location: /content/
    StandardTransform
Transform: Compose(
               Resize(size=(256, 256), interpolation=PIL.Image.BILINEAR)
               ToTensor()
           )

### Training

Current Recommendation: Train 5 epochs at a time and save/load model to prevent parameter loss due to timeout.

In [None]:
# Training Hyper-Parameters
epochs = 20
batch_size = 64

# Adam Hyper-Parameters
lr = 0.0002
beta_1 = 0.5
beta_2 = 0.999

# Clear GPU Memory Cache
torch.cuda.empty_cache()

# Allocate Generator and Discriminator to GPU Memory
gen = Generator().cuda()
dis = Discriminator().cuda()

# Use Adam Gradient Optimizer
optimizer_gen = torch.optim.Adam(gen.parameters(), lr, betas=(beta_1, beta_2))
optimizer_dis = torch.optim.Adam(dis.parameters(), lr, betas=(beta_1, beta_2))

# Re-initialize Dataloader
data = torch.utils.data.DataLoader(ImageFolderEX('/content/', trans), 
				   batch_size=batch_size, shuffle=True, 
				   drop_last=True, num_workers=2)

# Sample Size
n = len(data)

# Train GAN with Checkpoints [May Take up to 1-2 Hours per Epoch on Full Data!]
for epoch in range(0, epochs):

    # Counters
    c = 0
    n = len(data) 

    # Iterate over Batches
    for x in iter(data): 
        c += 1

        # Alternate Gradient Optimization
        loss_dis = train_dis(dis, gen, x)
        loss_gen, generated = train_gen(gen, batch_size)
        
        # Update Torch Step Parameter
        global_step = epoch * n + c

        # Report Discriminator and Generator Loss Gradient Every X Batches
        if c % 10 == 0:
            print(f'loss: {loss_dis.item()}, \t {loss_gen.item()} \t epoch: {epoch}, \t {c}/{n}')

    # Store Parameter Checkpoint Every Epoch
    torch.save(dis.state_dict(), "/content/d_checkpoint.pt")
    torch.save(gen.state_dict(), "/content/g_checkpoint.pt")

loss: 1.037842035293579, 	 4.262066841125488 	 epoch: 0, 	 10/572
loss: 1.0591826438903809, 	 6.958964824676514 	 epoch: 0, 	 20/572
loss: 0.769879937171936, 	 4.82633638381958 	 epoch: 0, 	 30/572
loss: 0.3848050832748413, 	 1.6541800498962402 	 epoch: 0, 	 40/572
loss: 0.8646762371063232, 	 1.1944224834442139 	 epoch: 0, 	 50/572
loss: 0.8183815479278564, 	 1.3031032085418701 	 epoch: 0, 	 60/572
loss: 0.6067913174629211, 	 6.182055473327637 	 epoch: 0, 	 70/572
loss: 0.8642334938049316, 	 5.26708984375 	 epoch: 0, 	 80/572
loss: 0.2852151691913605, 	 1.895041584968567 	 epoch: 0, 	 90/572
loss: 0.7615597248077393, 	 1.3522231578826904 	 epoch: 0, 	 100/572
loss: 1.446528673171997, 	 4.071013927459717 	 epoch: 0, 	 110/572
loss: 0.41369402408599854, 	 2.9724349975585938 	 epoch: 0, 	 120/572
loss: 0.37093788385391235, 	 5.429798126220703 	 epoch: 0, 	 130/572
loss: 0.5002121925354004, 	 2.9967503547668457 	 epoch: 0, 	 140/572
loss: 0.5637274980545044, 	 1.4450247287750244 	 epoch: 0

Save Final Model

In [None]:
# Store Parametric Weights of the Final Models
torch.save(dis.state_dict(), "/content/d_params.pt")
torch.save(gen.state_dict(), "/content/g_params.pt")

Generate Sample

In [None]:
# Generate from Gaussian (mu=0,sigma=1) Random Latent Space
z = torch.tensor(np.random.normal(0, 1, (batch_size, 200)), dtype=torch.float32)

# Load Sample into GPU Memory
z = z.cuda()

# Feed through Generator
img = gen(z)

# Transform to PIL Format
img = transforms.ToPILImage(mode=None)(img[0])

# Render Image
plt.imshow(img)