In [398]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import os
import glob
from PIL import Image

np.random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x1159ae8f0>

In [399]:
class DataLoader():
    
    def __init__(self):
        base_dir = os.getcwd()
        self.train_X = self.img_to_npy(os.path.join(base_dir, 'emojis/Apple/Appl/*.png'))
        self.train_Y = self.img_to_npy(os.path.join(base_dir, 'emojis/Windows/Wind./*.png'))
        self.test_X = self.img_to_npy(os.path.join(base_dir, 'emojis/Test_Apple/Appl/*.png'))
        self.test_Y = self.img_to_npy(os.path.join(base_dir, 'emojis/Test_Windows/Wind/*.png'))
    
    def getTrainX(self, minibatch_size):    
        return self.getData(minibatch_size, self.train_X)
    
    def getTrainY(self, minibatch_size):
        return self.getData(minibatch_size, self.train_Y)
    
    def getTestX(self, minibatch_size):
        return self.getData(minibatch_size, self.test_X)
    
    def getTestY(self, minibatch_size):
        return self.getData(minibatch_size, self.test_Y)

    def getData(self, minibatch_size, npy_data):
        minibatch_npy_data = np.random.permutation(npy_data)[:minibatch_size]
        minibatch_tensor_data = torch.from_numpy(minibatch_npy_data)
        
        return minibatch_tensor_data
    
    def img_to_npy(self, path):
        img_array = np.ones((1, 72, 72, 3))

        img_list = glob.glob(path)

        for img_path in img_list:
            new_img = Image.open(img_path).convert('RGBA')
            new_img_array = np.array(new_img)[np.newaxis, :, :, :3]
            img_array = np.concatenate([img_array, new_img_array], axis=0)
        img_array = img_array[1:]
        
        return img_array

In [400]:
class DataLoaderNpy():
    
    def __init__(self):
        self.train_X = np.load("emojis_npy/train_X.npy")
        self.train_Y = np.load("emojis_npy/train_Y.npy")
        self.test_X = np.load("emojis_npy/test_X.npy")
        self.test_Y = np.load("emojis_npy/test_Y.npy")
        
    def getTrainX(self, minibatch_size):    
        return self.getData(minibatch_size, self.train_X)
    
    def getTrainY(self, minibatch_size):
        return self.getData(minibatch_size, self.train_Y)
    
    def getTestX(self, minibatch_size):
        return self.getData(minibatch_size, self.test_X)
    
    def getTestY(self, minibatch_size):
        return self.getData(minibatch_size, self.test_Y)

    def getData(self, minibatch_size, npy_data):
        npy_data = npy_data.astype(np.float32)
        minibatch_npy_data = np.random.permutation(npy_data)[:minibatch_size]
        minibatch_npy_data = minibatch_npy_data.transpose(0, 3, 1, 2)
        minibatch_tensor_data = torch.from_numpy(minibatch_npy_data)
        
        return minibatch_tensor_data

In [401]:
class ResidualBlock(nn.Module):
    
    def __init__(self, conv_dim):
        super(ResidualBlock, self).__init__()
        self.conv_layer = nn.Conv2d(conv_dim, conv_dim, kernel_size=3, stride=1, padding=1)
        
    def forward(self, x):
        out = x + self.conv_layer(x)
        return x

In [402]:
class Generator(nn.Module):
    
    def __init__(self, conv_dim=32):
        super(Generator, self).__init__()
        
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, conv_dim, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(conv_dim),
            nn.ReLU(inplace=True)
        )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(conv_dim, conv_dim*2, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(conv_dim*2),
            nn.ReLU(inplace=True)
        )
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(conv_dim*2, conv_dim*4, kernel_size=4, stride=2, padding=0),
            nn.BatchNorm2d(conv_dim*4),
            nn.ReLU(inplace=True)
        )
        
        self.layer4 = nn.Sequential(
            ResidualBlock(conv_dim*4),
            nn.BatchNorm2d(conv_dim*4),
            nn.ReLU(inplace=True)
        )
        
        self.layer5 = nn.Sequential(
            nn.ConvTranspose2d(conv_dim*4, conv_dim*2, kernel_size=4, stride=2, padding=0),
            nn.BatchNorm2d(conv_dim*2),
            nn.ReLU(inplace=True)
        )
        
        self.layer6 = nn.Sequential(
            nn.ConvTranspose2d(conv_dim*2, conv_dim, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(conv_dim),
            nn.ReLU(inplace=True)
        )
        
        self.last = nn.Sequential(
            nn.ConvTranspose2d(conv_dim, 3, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )
    
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = self.layer6(out)
        out = self.last(out)
        
        return out

In [403]:
class Discriminator(nn.Module):
    
    def __init__(self, conv_dim=32):
        super(Discriminator, self).__init__()
        
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, conv_dim, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(conv_dim),
            nn.ReLU(inplace=True)
        )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(conv_dim, conv_dim*2, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(conv_dim*2),
            nn.ReLU(inplace=True)
        )
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(conv_dim*2, conv_dim*4, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(conv_dim*4),
            nn.ReLU(inplace=True)
        )
        
        self.layer4 = nn.Sequential(
            nn.Conv2d(conv_dim*4, conv_dim*8, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(conv_dim*8),
            nn.ReLU(inplace=True)
        )
        
        self.last = nn.Sequential(
            nn.Conv2d(conv_dim*8, 1, kernel_size=4, stride=2, padding=0),
            nn.Sigmoid()
        )
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.last(out)
        out = out.squeeze()
        
        return out

In [404]:
G = Generator()
F = Generator()
Dx = Discriminator()
Dy = Discriminator()

data_loader = DataLoaderNpy()
x_sample = data_loader.getTrainX(minibatch_size)
y_sample = data_loader.getTrainY(minibatch_size)

mse_loss = nn.MSELoss(reduction="mean")

In [405]:
def training_loop():
    
    minibatch_size = 32
    conv_dim = 32
    
    g_lr, d_lr = 0.0001, 0.0004
    beta1, beta2 = 0.0, 0.9
    
    G = Generator()
    F = Generator()
    Dx = Discriminator()
    Dy = Discriminator()
    
    data_loader = DataLoaderNpy()
    print("Initialized")

    g_params = list(G.parameters()) + list(F.parameters())
    d_params = list(Dx.parameters()) + list(Dy.parameters())
    
    mse_loss = nn.MSELoss(reduction="mean")
    
    for i in range(100):
        print(i)
        #sampling
        x_sample = data_loader.getTrainX(minibatch_size)
        y_sample = data_loader.getTrainY(minibatch_size)

        #train D with real images for 
        d_optimizer.zero_grad()

        d_real_loss = mse_loss(Dx(x_sample), torch.full((minibatch_size, ), 1)) + mse_loss(Dy(y_sample), torch.full((minibatch_size, ), 1))
        d_real_loss.backward()
        d_optimizer.step()

        #train D with fake images 
        d_optimizer.zero_grad()

        d_fake_loss = mse_loss(Dy(G(x_sample)), torch.full((minibatch_size, ), 0)) + mse_loss(Dx(F(y_sample)), torch.full((minibatch_size, ), 0))
        d_fake_loss.backward()
        d_optimizer.step()

        #train G with X--Y-->X cycle
        g_optimizer.zero_grad()

        g_loss = mse_loss(Dx(F(y_sample)), torch.full((minibatch_size, ), 1)) + mse_loss(y_sample, G(F(y_sample)))
        g_loss.backward()
        g_optimizer.step()

        #train G with Y--X-->Y cycle
        d_optimizer.zero_grad()

        f_loss = mse_loss(Dy(G(x_sample)), torch.full((minibatch_size, ), 1)) + mse_loss(x_sample, F(G(x_sample)))
        f_loss.backward()
        g_optimizer.step()

In [406]:
training_loop()

Initialized
0
1
2
3
4


KeyboardInterrupt: 