# COGS 181 Neural Networks & Deep Learning Final Project

Import the necessary packages

- Pandas and NumPy for data management
- Matplotlib for plotting and displaying images
- Torch and its submodules for building the network
- Torchvision for doing image processing operations

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import shutil
import math

from sklearn.preprocessing import OneHotEncoder

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import torchvision
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torchvision.io import read_image

Define the device on which we should train the network and store each tensor.

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('Device:', device)

def display_image(tsr_img):
    plt.imshow(tsr_img.permute(1, 2, 0))

Device: cuda:0


Set each path variable for retrieving the images and their labels.

The setup for this training is the following:

1. If the path to the directory of classified images exists, clear it for new session
2. If it does not exist, create it.
3. Copy every file that is already classified from the directory with all images (classified and unclassified) to the directory with classified images.

In [4]:
labels_path = './model_resources/images.csv'
img_dir = './model_resources/classified_images'
all_images_dir = './classification_interface/images'

# remove all existing image copies from img_dir
if os.path.exists(img_dir):
    os.system(f'Remove-Item {img_dir}/*')
else:
    os.mkdir(img_dir)

# copy all classified images into img_dir
labels = pd.read_csv(labels_path, header=None)

for img_filename in labels[0]:
    # copy file from all_images_dir to img_dir
    source = os.path.join(all_images_dir, img_filename)
    destination = os.path.join(img_dir, img_filename)
    shutil.copy(source, destination)

In [5]:
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file, header=None)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform
        
    def __len__(self):
        return len(self.img_labels)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx,0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [6]:
dataroot = "classification_interface/images"

# number of workers for dataloader
workers = 2

# size of batch for training
batch_size = 128

# input size of all images
image_size = 200

# number of channels
nc = 3

# size of Z (latent vector)
nz = 100

# size of feature maps in generator
ngf = 64

# size of features maps in discriminator
ndf = 64

# number of epochs for training
num_epochs = 5

# learning rate
lr = 0.0002

# Beta1 hyperparameter for Adam optimizers
beta1 = 0.5

# number of GPUs
ngpu = 1

# ratio of training to testing data
training_ratio = 0.85

In [7]:
transform_seq = transforms.Compose([
    transforms.Resize(size=(300, 300), antialias=True),
    transforms.CenterCrop(size=(300, 300)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

In [8]:
# Load the data into a custom dataset
dataset = CustomImageDataset(labels_path, img_dir, transform_seq)

# Split up the data into testing and training data
dataset_size = dataset.__len__()
training_size = math.floor(dataset_size * training_ratio)
test_size = dataset_size - training_size

training_data, test_data = torch.utils.data.random_split(dataset, [training_size, test_size])

# Create dataloaders
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True, num_workers=workers)
test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True, num_workers=workers)

all_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=workers)

In [9]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [10]:
class Generator(nn.Module):
    def __init__(self, ngpu):
        super(Generator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            nn.ConvTranspose2d(nz, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),
            
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),
            
            nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),
            
            nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),
            
            nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False),
            nn.Tanh()
        )
    
    def forward(self, input):
        return self.main(input)

In [15]:
netG = Generator(ngpu).to(device)
    
netG.apply(weights_init)

print(netG)

Generator(
  (main): Sequential(
    (0): ConvTranspose2d(100, 512, kernel_size=(4, 4), stride=(1, 1), bias=False)
    (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): ConvTranspose2d(512, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (4): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (7): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU(inplace=True)
    (9): ConvTranspose2d(128, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (10): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): ReLU(inplace=True)
    (12): ConvTranspose2d(64, 3, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (13): Tanh()
  )
)


In [13]:
class Discriminator(nn.Module):
    def __init__(self, ngpu):
        super(Discriminator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )
    
    def forward(self, input):
        return self.main(input)

In [16]:
netD = Discriminator(ngpu).to(device)

netD.apply(weights_init)

print(netD)

Discriminator(
  (main): Sequential(
    (0): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): LeakyReLU(negative_slope=0.2, inplace=True)
    (2): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (4): LeakyReLU(negative_slope=0.2, inplace=True)
    (5): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (6): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (7): LeakyReLU(negative_slope=0.2, inplace=True)
    (8): Conv2d(256, 512, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (9): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): LeakyReLU(negative_slope=0.2, inplace=True)
    (11): Conv2d(512, 1, kernel_size=(4, 4), stride=(1, 1), bias=False)
    (12): Sigmoid()
  )
)


In [17]:
criterion = nn.BCELoss()

fixed_noise = torch.randn(64, nz, 1, 1, device=device)

real_label = 1.
fake_label = 0.

optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))

In [None]:
img_list = []
G_losses = []
D_losses = []
iters = 0

print('Starting training loop...')

for epoch in range(num_epochs):
    
    for i, data in enumerate(all_dataloader, 0):
    
        # Discriminator
    
        netD.zero_grad()
        
        real_cup = data[0].to(device)
        
        b_size = real_cpu.size(0)
        
        label = torch.full((b.size,), real_label, dtype=torch.float, device=device)
        
        output = netD(real_cpu).view(-1)
        
        errD_real = criterion(output, label)
        
        errD_real.backward()
        
        D_x = output.mean().item()
        
        noise = torch.randn(b_size, nz, 1, 1, device=device)
        
        fake = netG(noise)
        
        label.fill_(fake_label)
        
        output = netD(fake.detach()).view(-1)
        
        errD_fake = criterion(output, label)
        
        errD_fake.backward()
        
        D_G_z1 = output.mean().item()
        
        errD = errD_real + errD_fake
        
        optimizerD.step()
        
        # Generator
        
        netG.zero_grad()
        
        label.fill_(real_label)
        
        output = netD(fake).view(-1)
        
        errG = criterion(output, label)
        
        errG.backward()
        
        D_G_z2 = output.mean().item()
        
        optimizerG.step()
        
        # Stats
        
        if i % 50 == 0:
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f' % (epoch, num_epochs, i, len(dataloader),errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
            
        G_losses.append(errG.item())
        D_losses.append(errD.item())
        
        if (iters % 500 == 0) or ((epoch == num_epochs-1) and (i == len(dataloader)-1)):
            with torch.no_grad():
                fake = netG(fixed_noise).detach().cpu()
            img_list.append(vutils.make_grid(fake, padding=2, normalize=True))
        
        iters += 1

Starting training loop...


### References

[[1] Optimal ratio for data splitting](https://onlinelibrary.wiley.com/doi/full/10.1002/sam.11583)

[[2] DCGAN Tutorial - PyTorch](https://pytorch.org/tutorials/beginner/dcgan_faces_tutorial.html)

In [10]:
# image_data = pd.read_csv('./archive/images.csv')

# # clean and format image data
# image_data.sort_values(by=['image'], inplace=True)
# image_data.reset_index(inplace=True)
# image_data.drop(columns=['index', 'sender_id'], inplace=True)

# image_data.columns = ['image', 'type', 'kids']

# types = image_data['type']

# encoded = np.zeros((len(types), len(types.unique())))

# for idx in range(len(types)):
#     encoded[idx][list(types.unique()).index(types[idx])] = 1
    
# image_data = pd.concat([image_data, pd.DataFrame(encoded)], axis=1)

# image_data.columns = np.concatenate((image_data.columns[:3], types.unique()))

# image_data.columns = [c.lower().replace('-', '_').replace(' ', '_') for c in image_data.columns]

# image_data.drop(columns=['type'], inplace=True)

# image_data['kids'] = image_data['kids'].apply(lambda x: int(x))