In [1]:
import torch
import torch.utils.data as data
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets

import matplotlib.pyplot as plt
import PIL.Image as Image
import numpy as np

from tqdm import tqdm
from torchsummary import summary
from torch.utils.tensorboard import SummaryWriter

In [2]:
ngpu = 3

device = torch.device("cuda:0" if torch.cuda.is_available() and ngpu > 0 else "cpu")

batch_size = 10000

num_epochs = 20000

num_workers = 4

train_shuffle = True

latent_space_vector = 100

lr = 0.0002

betas = (0.5, 0.999)

In [3]:
train_data = datasets.MNIST(
    root="/data/DataSet/",
    download=True,
    train=True,
    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(
            mean=(0.5),
            std=(0.5)
        )
    ])
)

train_loader = data.DataLoader(
    dataset=train_data,
    batch_size=batch_size,
    shuffle=train_shuffle,
    num_workers=num_workers
)

In [4]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find("BatchNorm") != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [5]:
noise = torch.randn(
    size=(batch_size, latent_space_vector, 1, 1),
    device=device
)

print(noise.shape)

torch.Size([10000, 100, 1, 1])


In [6]:
class Generator(nn.Module):
    def __init__(self, in_channels: int = 100, out_channels: int = 1):
        super().__init__()
        
        # 100x1x1 -> 1024x2x2
        self.layer1 = nn.Sequential(
            nn.ConvTranspose2d(
                in_channels=in_channels,    
                out_channels=1024,
                kernel_size=2,
                stride=1,
                padding=0,
                bias=False
            ),
            nn.BatchNorm2d(
                num_features=1024
            ),
            nn.ReLU(
                inplace=True
            )
        )
        # 1024x2x2 -> 512x4x4
        self.layer2 = nn.Sequential(
            nn.ConvTranspose2d(
                in_channels=1024,
                out_channels=512,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False
            ),
            nn.BatchNorm2d(
                num_features=512
            ),
            nn.ReLU(
                inplace=True
            )
        )
        # 512x4x4 -> 256x8x8
        self.layer3 = nn.Sequential(
            nn.ConvTranspose2d(
                in_channels=512,
                out_channels=256,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False
            ),
            nn.BatchNorm2d(
                num_features=256
            ),
            nn.ReLU(
                inplace=True
            )
        )
        # 256x8x8 -> 128x16x16
        self.layer4 = nn.Sequential(
            nn.ConvTranspose2d(
                in_channels=256,
                out_channels=128,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False
            ),
            nn.BatchNorm2d(
                num_features=128
            ),
            nn.ReLU(
                inplace=True
            )
        )
        # 128x16x16 -> 1x28x28
        self.layer5 = nn.Sequential(
            nn.ConvTranspose2d(
                in_channels=128,
                out_channels=out_channels,
                kernel_size=2,
                stride=2,
                padding=2,
                bias=False
            ),
            nn.Tanh()
        )
    
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        return x

In [7]:
class Discriminator(nn.Module):
    def __init__(self, in_channels: int = 1, out_channels: int = 4):
        super().__init__()
        
        # 1x28x28 -> 64x16x16
        self.layer1 = nn.Sequential(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=64,
                kernel_size=2,
                stride=2,
                padding=2,
                bias=False
            ),
            nn.BatchNorm2d(
                num_features=64
            ),
            nn.LeakyReLU(
                negative_slope=0.2,
                inplace=True
            )
        ) 
        # 64x16x16 -> 128x8x8
        self.layer2 = nn.Sequential(
            nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False
            ),
            nn.BatchNorm2d(
                num_features=128
            ),
            nn.LeakyReLU(
                negative_slope=0.2,
                inplace=True
            )
        ) 
        # 128x8x8 -> 256x4x4
        self.layer3 = nn.Sequential(
            nn.Conv2d(
                in_channels=128,
                out_channels=256,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False
            ),
            nn.BatchNorm2d(
                num_features=256
            ),
            nn.LeakyReLU(
                negative_slope=0.2,
                inplace=True
            )
        ) 
        # 256x4x4 -> 512x2x2
        self.layer4 = nn.Sequential(
            nn.Conv2d(
                in_channels=256,
                out_channels=512,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False
            ),
            nn.BatchNorm2d(
                num_features=512
            ),
            nn.LeakyReLU(
                negative_slope=0.2,
                inplace=True
            )
        )
        # 512x2x2 -> 1x1x1
        self.layer5 = nn.Sequential(
            nn.Conv2d(
                in_channels=512,
                out_channels=1,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False
            ),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)

        return x

In [8]:
netG = Generator().to(device)
netD = Discriminator().to(device)

if (device.type == "cuda") and (ngpu > 1):
    netG = nn.DataParallel(netG, list(range(ngpu)))
    netD = nn.DataParallel(netD, list(range(ngpu)))

netG.apply(weights_init)
netD.apply(weights_init)

criterion = nn.BCELoss()
optimizerG = optim.Adam(
    params=netG.parameters(),
    lr=lr,
    betas=betas
)
optimizerD = optim.Adam(
    params=netD.parameters(),
    lr=lr,
    betas=betas
)

fixed_noise = torch.randn(64, latent_space_vector, 1, 1, device=device)

real_label = 1
fake_label = 0

In [9]:
# writer = SummaryWriter("Tensorboard/")

for epoch in tqdm(range(1, num_epochs + 1)):
    for idx, (imgs, _) in enumerate(train_loader, 1):
        print(idx)

        '''
        Training Discriminator
        '''

        netD.zero_grad()

        real_x = imgs.to(device)
        real_y = torch.ones((batch_size, ), dtype=torch.float, device=device)
        real_y_hat = netD(real_x).view(-1)

        D_R_loss = criterion(real_y_hat, real_y)
        D_R_loss.backward()

        # real_y_hat_mean = real_y_hat.mean().item()

        noise = torch.randn((batch_size, 100, 1, 1), dtype=torch.float, device=device)
        fake_x = netG(noise)
        fake_y = torch.zeros((batch_size, ), dtype=torch.float, device=device)
        fake_y_hat = netD(fake_x).view(-1)

        D_F_loss = criterion(fake_y_hat, fake_y)
        D_F_loss.backward()

        D_loss_sum = D_R_loss + D_F_loss

        # fake_y_hat_mean = fake_y_hat.mean().item()


        optimizerD.step()

        '''
        Training Generator
        '''

        netG.zero_grad()
        
        fake_x = netG(noise)
        fake_y_hat = netD(fake_x).view(-1)

        G_loss = criterion(fake_y_hat, real_y)

        G_loss.backward()

        optimizerG.step()
        break
    break

  0%|          | 0/20000 [00:00<?, ?it/s]

1


  0%|          | 0/20000 [00:12<?, ?it/s]
