In [None]:
import os 
from PIL import Image
IMAGES_PATH = '../input/img_align_celeba/img_align_celeba'
image_files = os.listdir('../input/img_align_celeba/img_align_celeba/')
image_files = [os.path.join(IMAGES_PATH, file_name) for file_name in image_files]


In [None]:
import numpy as np
from PIL import Image
idx = np.random.randint(len(image_files))
image = Image.open(image_files[idx])
image

# Loader

In [None]:
import torch
import torchvision.transforms as transforms

def load_image(img_path):
    image = Image.open(img_path).convert('RGB')    
    return image
class ImageDataSet(torch.utils.data.Dataset):

    def __init__(self, root, image_loader=load_image, transform=None):
        self.root = root
        self.image_files = os.listdir(self.root)
        self.loader = image_loader
        self.transform = transform
    def __len__(self):
        # Here, we need to return the number of samples in this dataset.
        return len(self.image_files)

    def __getitem__(self, index):
        images = self.loader(os.path.join(self.root, self.image_files[index]))
        if self.transform is not None:
            images = self.transform(images)
        return images

In [None]:
from torch.utils.data import DataLoader

def get_data_loader(image_dir = IMAGES_PATH, 
                    image_size= 64, batch_size=16, num_workers=0):

    
    # resize and normalize the images
    transform = transforms.Compose([transforms.Resize((image_size, image_size)), # resize to 128x128
                                    transforms.ToTensor()])


    # define datasets using ImageFolder
    dataset = ImageDataSet(image_dir, transform=transform)

    # create and return DataLoaders
    loader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

    return loader

In [None]:
data_loader = get_data_loader()

In [None]:
import torchvision
import matplotlib.pyplot as plt
dataiter = iter(data_loader)
images= dataiter.next()

# show images
fig = plt.figure(figsize=(12, 8))
npimg = torchvision.utils.make_grid(images).numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))

In [None]:

def scale(x, feature_range=(-1, 1)):
    min_scale, max_scale = feature_range
    x = x*(max_scale-min_scale) + min_scale
    return x

In [None]:
img = images[0]
scaled_img = scale(img)

print('Min: ', scaled_img.min())
print('Max: ', scaled_img.max())

In [None]:
import torch.nn as nn
import torch.nn.functional as F

def convolution(in_channels, out_channels, kernel_size, stride=2, padding=1, batch_norm=True):

    layers = []
    conv_layer = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, 
                           kernel_size=kernel_size, stride=stride, padding=padding, bias=False)
    
    layers.append(conv_layer)

    if batch_norm:
        layers.append(nn.BatchNorm2d(out_channels))
    return nn.Sequential(*layers)

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class Discriminator(nn.Module):

    def __init__(self, conv_dim = 64):
        super(Discriminator, self).__init__()
        self.conv_dim = conv_dim
        self.conv1 = convolution(3, conv_dim, 4, batch_norm=False) # x= 32, y = 32, depth 64
        self.conv2 = convolution(conv_dim, conv_dim*2, 4) # (16, 16, 128)
        self.conv3 = convolution(conv_dim*2, conv_dim*4, 4) # (8, 8, 256)
        self.conv4 = convolution(conv_dim*4, conv_dim*8, 3, padding=1, stride =1) # (8, 8, 512)
        
        # Classification layer
        self.fc = nn.Linear(conv_dim*8*8*8 , 1)
    def forward(self, x):        
        # relu applied to all conv layers but last
        out = F.relu(self.conv1(x))
        out = F.relu(self.conv2(out))
        out = F.relu(self.conv3(out))
        out = F.relu(self.conv4(out))
        # last, classification layer
        out = out.view(-1, self.conv_dim*8*8*8)
        out = self.fc(out)
        return out


In [None]:
class Generator(nn.Module):
    
    def __init__(self, z_size, conv_dim):
        """
        Initialize the Generator Module
        :param z_size: The length of the input latent vector, z
        :param conv_dim: The depth of the inputs to the *last* transpose convolutional layer
        """
        super(Generator, self).__init__()

        # complete init function
        

    def forward(self, x):
        """
        Forward propagation of the neural network
        :param x: The input to the neural network     
        :return: A 32x32x3 Tensor image as output
        """
        # define feedforward behavior
        
        return x

In [None]:
class ResidualBlock(nn.Module):
    """Defines a residual block.
       This adds an input x to a convolutional layer (applied to x) with the same size input and output.
       These blocks allow a model to learn an effective transformation from one domain to another.
    """
    def __init__(self, conv_dim):
        super(ResidualBlock, self).__init__()
        
        self.conv_layer1 = conv(in_channels=conv_dim, out_channels=conv_dim, 
                                kernel_size=3, stride=1, padding=1, batch_norm=True)
        
        self.conv_layer2 = conv(in_channels=conv_dim, out_channels=conv_dim, 
                               kernel_size=3, stride=1, padding=1, batch_norm=True)
        
    def forward(self, x):

        out_1 = F.relu(self.conv_layer1(x))
        out_2 = x + self.conv_layer2(out_1)
        return out_2

In [None]:
def deconvolution(in_channels, out_channels, kernel_size, stride=2, padding=1, batch_norm=True):
    """Creates a transpose convolutional layer, with optional batch normalization.
    """
    layers = []
    # append transpose conv layer
    layers.append(nn.ConvTranspose2d(in_channels, out_channels, kernel_size, stride, padding, bias=False))
    # optional batch norm layer
    if batch_norm:
        layers.append(nn.BatchNorm2d(out_channels))
    return nn.Sequential(*layers)

In [None]:
class Generator(nn.Module):
    
    def __init__(self, z_size, conv_dim=64):
        super(Generator, self).__init__()
        
        self.conv_dim = conv_dim
        
        self.fc = nn.Linear(z_size, conv_dim*4*8*8)
        self.deconv1 = deconvolution(conv_dim*4, conv_dim*2, 4)
        self.deconv2 = deconvolution(conv_dim*2, conv_dim, 4)
        self.deconv3 = deconvolution(conv_dim, 3, 4, batch_norm=False)
        
        

    def forward(self, x):
        x = self.fc(x)
        x = x.view(-1, self.conv_dim*4, 4, 4)
        x = F.relu(self.deconv1(x))
        x = F.relu(self.deconv2(x))
        x = F.tanh(self.deconv3(x))        
        return x

In [None]:
conv_dim = 64
z_size = 100

# define discriminator and generator
D = Discriminator(conv_dim)
G = Generator(z_size=z_size, conv_dim=conv_dim)

print(D)
print()
print(G)

In [None]:
train_on_gpu = torch.cuda.is_available()

if train_on_gpu:
    # move models to GPU
    G.cuda()
    D.cuda()
    print('GPU available for training. Models moved to GPU')
else:
    print('Training on CPU.')

In [None]:
def real_loss(D_out, smooth=False):
    batch_size = D_out.size(0)
    # label smoothing
    if smooth:
        # smooth, real labels = 0.9
        labels = torch.ones(batch_size)*0.9
    else:
        labels = torch.ones(batch_size) # real labels = 1
    # move labels to GPU if available     
    if train_on_gpu:
        labels = labels.cuda()
    # binary cross entropy with logits loss
    criterion = nn.BCEWithLogitsLoss()
    # calculate loss
    loss = criterion(D_out.squeeze(), labels)
    return loss

def fake_loss(D_out):
    batch_size = D_out.size(0)
    labels = torch.zeros(batch_size) # fake labels = 0
    if train_on_gpu:
        labels = labels.cuda()
    criterion = nn.BCEWithLogitsLoss()
    # calculate loss
    loss = criterion(D_out.squeeze(), labels)
    return loss

In [None]:
import torch.optim as optim

# params
lr = 0.0002
beta1=0.5
beta2=0.999 # default value

# Create optimizers for the discriminator and generator
d_optimizer = optim.Adam(D.parameters(), lr, [beta1, beta2])
g_optimizer = optim.Adam(G.parameters(), lr, [beta1, beta2])

In [None]:
num_epochs = 3

# keep track of loss and generated, "fake" samples
samples = []
losses = []

print_every = 3000

# Get some fixed data for sampling. These are images that are held
# constant throughout training, and allow us to inspect the model's performance
sample_size=16
fixed_z = np.random.uniform(-1, 1, size=(sample_size, z_size))
fixed_z = torch.from_numpy(fixed_z).float()

# train the network
for epoch in range(num_epochs):
    
    for batch_i, real_images in enumerate(data_loader):
                
        batch_size = real_images.size(0)
        
        # important rescaling step
        real_images = scale(real_images)
        
        # ============================================
        #            TRAIN THE DISCRIMINATOR
        # ============================================
        
        d_optimizer.zero_grad()
        
        # 1. Train with real images
        
        # Compute the discriminator losses on real images 
        if train_on_gpu:
            real_images = real_images.cuda()
        
        D_real = D(real_images)
        d_real_loss = real_loss(D_real)
        
        # 2. Train with fake images
        
        # Generate fake images
        z = np.random.uniform(-1, 1, size=(batch_size, z_size))
        z = torch.from_numpy(z).float()
        # move x to GPU, if available
        if train_on_gpu:
            z = z.cuda()
        fake_images = G(z)
        
        # Compute the discriminator losses on fake images            
        D_fake = D(fake_images)
        d_fake_loss = fake_loss(D_fake)
        
        # add up loss and perform backprop
        d_loss = d_real_loss + d_fake_loss
        d_loss.backward()
        d_optimizer.step()
        
        
        # =========================================
        #            TRAIN THE GENERATOR
        # =========================================
        g_optimizer.zero_grad()
        
        # 1. Train with fake images and flipped labels
        
        # Generate fake images
        z = np.random.uniform(-1, 1, size=(batch_size, z_size))
        z = torch.from_numpy(z).float()
        if train_on_gpu:
            z = z.cuda()
        fake_images = G(z)
        
        # Compute the discriminator losses on fake images 
        # using flipped labels!
        D_fake = D(fake_images)
        g_loss = real_loss(D_fake) # use real loss to flip labels
        
        # perform backprop
        g_loss.backward()
        g_optimizer.step()

        # Print some loss stats
        if batch_i % print_every == 0:
            # append discriminator loss and generator loss
            losses.append((d_loss.item(), g_loss.item()))
            # print discriminator and generator loss
            print('Epoch [{:5d}/{:5d}] | d_loss: {:6.4f} | g_loss: {:6.4f}'.format(
                    epoch+1, num_epochs, d_loss.item(), g_loss.item()))

    
    ## AFTER EACH EPOCH##    
    # generate and save sample, fake images
    G.eval() # for generating samples
    if train_on_gpu:
        fixed_z = fixed_z.cuda()
    samples_z = G(fixed_z)
    samples.append(samples_z)
    G.train() # back to training mode


# Save training generator samples
with open('train_samples.pkl', 'wb') as f:
    pkl.dump(samples, f)

In [None]:
def view_samples(epoch, samples):
    fig, axes = plt.subplots(figsize=(16,4), nrows=2, ncols=8, sharey=True, sharex=True)
    for ax, img in zip(axes.flatten(), samples[epoch]):
        img = img.detach().cpu().numpy()
        img = np.transpose(img, (1, 2, 0))
        img = ((img + 1)*255 / (2)).astype(np.uint8)
        ax.xaxis.set_visible(False)
        ax.yaxis.set_visible(False)
        im = ax.imshow(img.reshape((32,32,3)))

In [None]:
with open('train_samples.pkl', 'rb') as f:
    samples = pkl.load(f)

In [None]:
_ = view_samples(-1, samples)