<font size = "5"> Artificial Intelligence for the Media

<font size="4">
Mini-Project
By Marissa Beaty

For this Mini-Project, I will be training a Pix2Pix model on a self-made dataset composed of outputs and inputs. The outputs for this project are a collection of 1,290 images collected from seven different Pinterest searches using the PinDown Google extension. The inputs for this project are those same images run through a Canny edge detection function to leave only outlines of the images defining characteristics. The below code will train a Pix2Pix model on this dataset. This model will be in Pytorch, and is borrowed from the research team, Phillip Isola, Jun-Yan Zhu, Tinghui Zhou, and Alexei A. Efros who published this model in 2017. The model is available here: https://github.com/phillipi/pix2pix. Their model was adapted into a notebook by Aniket Maurya, available here: https://librecv.github.io/blog/gans/pytorch/2021/02/13/Pix2Pix-explained-with-code.html. The research and notebook above is what the following code is based off and adapted from. 

In [1]:
#importing packages
import os
from glob import glob 
from pathlib import Path

import matplotlib.pyplot as plt
import pytorch_lightning as pl
from pytorch_lightning import LightningModule
import torch
import cv2
import numpy as np
from PIL import Image
from torch import nn
from torch.utils.data import DataLoader, Dataset #iterates over the dataset
from torchvision import transforms
from torchvision.transforms.functional import center_crop
from torchvision.utils import make_grid
from tqdm.auto import tqdm #makes our loops show a smart progress meter 


In [2]:
#This code was originally in the notebook created by Aniket Maurya. This did not properly load the images, so I coded my own class directly below this. 

#path = "Training_small"  

#class ArchitectDataset(Dataset): 
    #def __init__(self, path):
      #self.filenames = glob(str(Path(path) / "*"))
      #self.target_size = target_size 

    #def __len__(self):
        #print(len(self.filenames))
        #return len(self.filenames)
    
    #def __getitem__(self, idx):
        #filename = self.filenames[idx]
        #image = Image.open(filename)
        #image = transforms.functional.to_tensor(image)
        #image_width = image.shape[2] 

        #real = image[:, :, : image_width // 2] 
        #condition = image[:, :, image_width // 2 :] 

        #target_size = self.target_size
        #if target_size:
         #   condition = nn.functional.interpolate(condition, size=target_size)
         #   real = nn.functional.interpolate(real, size=target_size)

        #return real, condition

In [3]:
#creating a class that will separate the paired images into  real and condition images (outputs and inputs)
#this was written in replace of the above code that did not work

class Buildings_Dataset(Dataset):
    def __init__(self, path):
        self.image_files = glob('Training_small/*.png')

    def __len__(self):
        return len(self.image_files)
    
    def read_image(self, idx):
        file_image = self.image_files[idx]
        image = Image.open(file_image)
        image = np.array(image)
        width = image.shape[1]
        width_half = width // 2

        #here there are various additions, such as swapaxes and unsqueeze used as tests to try and flip the axis produced by the class
        #the images produced here are in the wrong order for the rest of the code to function.
        #unfortunately, neither of these flipped the order as expected, thus the code could not move forward. 
        condition = image[:, :width_half, :]
        condition = torch.swapaxes(condition, 0, 2)
        # condition = torch.swapaxes(condition, 0, 1)
        condition = condition.unsqueeze(0)
        real = image[:, width_half:, :]
        real = torch.swapaxes(real, 0, 2)
        real = real.unsqueeze(0)
        # real = torch.swapaxes(real, 0, 1)
        print(real.shape)
        print(condition.shape)

        condition = condition.astype(np.float32)
        real = real.astype(np.float32)

        return real, condition
    


In [32]:
#checking that the read_image function correctly reads in the images 

def read_image(path):
    #file_image = self.image_files[idx]
    image = Image.open(path)
    image = np.array(image)
    width = image.shape[1]
    width_half = width // 2

    #here there are various additions, such as swapaxes and unsqueeze used as tests to try and flip the axis produced by the class
    #the images produced here are in the wrong order for the rest of the code to function.
    #unfortunately, neither of these flipped the order as expected, thus the code could not move forward. 
    condition = image[:, :width_half, :]
    #condition = torch.swapaxes(condition, 0, 2)
    #condition = torch.swapaxes(condition, 0, 1)
    real = image[:, width_half:, :]
    #real = torch.swapaxes(real, 0, 2)
    ## real = torch.swapaxes(real, 0, 1)
    print(real.shape)
    print(condition.shape)

    condition = condition.astype(np.float32)
    real = real.astype(np.float32)

    return real, condition

In [33]:
#running my dataset through the class above to take a look at the tensors produced
#notice how the tensor is structued as (512, 512, 3) for the rest of the code to work, it needs to be (3, 512, 512)
#when running torch.swapaxes(), we get the following error "argument 'input' (position 1) must be Tensor, not numpy.ndarray"
#attempting to convert it to a tensor, causes other sections of the code to break
path = "Training_small/0002.png"  
image_dataset = read_image(path)

(512, 512, 3)
(512, 512, 3)


In [32]:
#decrease the layer size (as part of the U-Net shaped network Pix2Pix needs)

class DownSampleConv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel=4, strides=2, padding=1, activation=True, batchnorm=True):
        super().__init__()
        self.activation = activation
        self.batchnorm = batchnorm

        self.conv = nn.Conv2d(in_channels, out_channels, kernel, strides, padding)

        if batchnorm:
            self.bn = nn.BatchNorm2d(out_channels)

        if activation:
            self.act = nn.LeakyReLU(0.2)

    def forward(self, x):
        x = self.conv(x)
        if self.batchnorm:
            x = self.bn(x)
        if self.activation:
            x = self.act(x)
        return x

In [33]:
#expanding the network (as part of the U-Net shaped network Pix2Pix needs)
class UpSampleConv(nn.Module):
    def __init__(
        self,
        in_channels,
        out_channels,
        kernel=4,
        strides=2,
        padding=1,
        activation=True,
        batchnorm=True,
        dropout=False 
    ):

        super().__init__()
        self.activation = activation
        self.batchnorm = batchnorm
        self.dropout = dropout

        self.deconv = nn.ConvTranspose2d(in_channels, out_channels, kernel, strides, padding)

        if batchnorm:
            self.bn = nn.BatchNorm2d(out_channels)

        if activation:
            self.act = nn.ReLU(True)

        if dropout:
            self.drop = nn.Dropout2d(0.5)

    def forward(self, x):
        x = self.deconv(x)
        if self.batchnorm:
            x = self.bn(x)

        if self.dropout:
            x = self.drop(x)
        return x

In [34]:
#creating Generator

class Generator(nn.Module):

    def __init__(self, in_channels, out_channels):
        super().__init__()

        #encoder/downsample convs
        self.encoders=[
            DownSampleConv(in_channels, 64, batchnorm=False),
            DownSampleConv(64, 128),
            DownSampleConv(128, 256),
            DownSampleConv(256, 512),
            DownSampleConv(512, 512),
            DownSampleConv(512, 512),
            DownSampleConv(512, 512), 
            DownSampleConv(512, 512, batchnorm=False),
        ]

        #decoder/upsample convs
        self.decoders=[
            UpSampleConv(512, 512, dropout=True),
            UpSampleConv(1024, 512, dropout=True),
            UpSampleConv(1024, 512, dropout=True),
            UpSampleConv(1024, 512),
            UpSampleConv(1024, 256),
            UpSampleConv(512, 128),
            UpSampleConv(256, 64),
        ]

        self.decover_channels = [512, 512, 512, 512, 256, 128, 64]
        self.final_conv = nn.ConvTranspose2d(64, out_channels, kernel_size=4, stride=2, padding=1)
        self.tanh = nn.Tanh()

        self.encoders = nn.ModuleList(self.encoders)
        self.decoders = nn.ModuleList(self.decoders)

    def forward(self, x):
        skips_cons = []
        for encoder in self.encoders:
            x = encoder(x)

            skips_cons.append(x)

        skips_cons = list(reversed(skips_cons[:-1]))
        decoders = self.decoders[:-1]

        for decoder, skip in zip(decoders, skips_cons):
            x = decoder(x)
            x = torch.cat((x, skip), axis=1)

        x = self.decoders[-1](x)
        x = self.final_conv(x)
        return self.tanh(x)



In [35]:
#Creating a discriminator using the same downsampleconv we definind in the generator

class PatchGAN(nn.Module):
    
    def __init__(self, input_channels):
        super().__init__()
        self.d1 = DownSampleConv(input_channels, 64, batchnorm=False)
        self.d2 = DownSampleConv(64, 128)
        self.d3 = DownSampleConv(128, 256)
        self.d4 = DownSampleConv(256, 512)
        self.final = nn.Conv2d(512, 1, kernel_size=1)

    def forward(self, x, y):
        x = torch.cat((x, y)) #, axis=1)
        x0 = self.d1(x)
        x1 = self.d2(x0)
        x2 = self.d3(x1)
        x3 = self.d4(x2)
        xn = self.final(x3)

        return xn

In [36]:
#defining the loss function

adversarial_loss = nn.BCEWithLogitsLoss()

reconstruction_loss = nn.L1Loss()

In [37]:
#initializing weights

def _weights_init(m):
    if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d)):
        torch.nn.init.normal_(m.weight, 0.0, 0.02)
    if isinstance(m, nn.BatchNorm2d):
        torch.nn.init.normal_(m.weight, 0.0, 0.02)
        torch.nn.init.constant_(m.bias, 0)

def display_progress(cond, fake, real, figsize=(10,5)):
    cond = cond.detach().cpu().permute(1, 2, 0) 
    fake = fake.detach().cpu().permute(1, 2, 0)
    real = real.detach().cpu().permute(1, 2, 0)

    fig, ax = plt.subplots(1, 3, figsize=figsize)
    ax[0].imshow(cond)
    ax[2].imshow(fake)
    ax[1].imshow(real)

    plt.show()

In [38]:
#defining the Pix2Pix model

class Pix2Pix(pl.LightningModule):

    def __init__(self, in_channels, out_channels, learning_rate=0.0002, lambda_recon=200, display_step=25):
        
        super().__init__()
        self.save_hyperparameters()

        self.display_step = display_step
        self.gen = Generator(in_channels, out_channels)
        self.patch_gan = PatchGAN(in_channels + out_channels)

        #initializing weights
        self.gen = self.gen.apply(_weights_init)
        self.patch_gan = self.patch_gan.apply(_weights_init)

        self.adversarial_criterion = nn.BCEWithLogitsLoss()
        self.recon_criterion = nn.L1Loss()

    def _gen_step(self, real_images, conditioned_images):
        #calculate the adversarial loss
        fake_images = self.gen(conditioned_images)
        disc_logits = self.patch_gan(fake_images, conditioned_images)
        adversarial_loss = self.adversarial_criterion(disc_logits, torch.ones_like(disc_logits))

        #calculate the reconstruction loss
        recon_loss = self.recon_criterion(fake_images, real_images)
        lambda_recon = self.hparams.lambda_recon

    def _disc_step(self, real_images, conditioned_images):
        fake_images = self.gen(conditioned_images).detach 
        fake_logits = self.patch_gan(fake_images, conditioned_images)

        real_logits = self.patch_gan(real_images, conditioned_images)

        fake_loss = self.adversarial_criterion(fake_logits, torch.zeros_like(fake_logits))
        real_loss = self.adversarial_criterion(real_logits, torch.ones_like(real_logits))
        return(real_loss + fake_loss)
    
    def configure_optimizers(self):
        lr = self.hparams.learning_rate
        gen_opt = torch.optim.Adam(self.gen.parameters(), lr=lr)
        disc_opt = torch.optim.Adam(self.patch_gan.parameters(), lr=lr)
        return disc_opt, gen_opt
    
    def training_step(self, batch, batch_idx, optimizer_idx):
        real, condition = batch

        loss = None
        if optimizer_idx == 0:
            loss = self._disc_step(real, condition)
            self.log('PatchGAN Loss', loss)
        elif optimizer_idx == 1:
            loss = self._gen_step(real, condition)
            self.log('Generator Loss', loss)

        if self.current_epoch%self.display_step==0 and batch_idx==0 and optimizer_idx==1:
            fake = self.gen(condition).detach()
            display_progress(condition[0], fake[0], real[0])

        return loss

In [39]:
#train the model\
batch_size = 2
image_dataset = Buildings_Dataset('')
#added num workers here based on research of errors not defining num workers. 
dataloader = DataLoader(image_dataset, batch_size=batch_size, shuffle=True, num_workers=8) 

pix2pix = Pix2Pix(3, 3)
trainer = pl.Trainer(max_epochs=10, accelerator="cpu") 
trainer.fit(pix2pix, dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name                  | Type              | Params
------------------------------------------------------------
0 | gen                   | Generator         | 54.4 M
1 | patch_gan             | PatchGAN          | 2.8 M 
2 | adversarial_criterion | BCEWithLogitsLoss | 0     
3 | recon_criterion       | L1Loss            | 0     
------------------------------------------------------------
57.2 M    Trainable params
0         Non-trainable params
57.2 M    Total params
228.713   Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/marissabeaty/miniconda3/envs/pytorch/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Users/marissabeaty/miniconda3/envs/pytorch/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'Buildings_Dataset' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/marissabeaty/miniconda3/envs/pytorch/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Users/marissabeaty/miniconda3/envs/pytorch/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'Buildings_Dataset' on <module '__main__' (built-in)>
Traceback (most recent call last):

RuntimeError: DataLoader worker (pid(s) 74234) exited unexpectedly

When trying to train the model, I kept getting the same error: "DataLoader worker (pid(s) 74234) exited unexpectedly." After research into this, I concluded that this was an issue with the DataLoader not being able to read the image files. This goes back to the error we get above, in that the photos themselves are read in incorrectly for the model. Specifically, they are read in as width, height, RGB, but for the model to work they need to be formatted as RGB, width, height. Attempting to change them to the correct format caused other sections of the code to break. No other solutions were found to fix this error. 