In [1]:
# from google.colab import drive
# drive.mount('/content/drive')
# !unzip /content/drive/MyDrive/colab/images.zip
# !pip install wandb

In [2]:
from __future__ import print_function

import argparse
import os
import random
import warnings

import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sn
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
import torchaudio
import torchvision
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as utils
import tqdm
from IPython import display
from IPython.display import HTML, clear_output
from sklearn.linear_model import Ridge
from sklearn.metrics import accuracy_score, confusion_matrix, mean_squared_error
from sklearn.preprocessing import StandardScaler
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision.datasets import ImageFolder
from torchvision.models import resnet18
from tqdm.notebook import tqdm

import wandb

In [3]:
def set_seed(seed):
    # torch.backends.cudnn.deterministic = True
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [4]:
# warnings.filterwarnings("ignore")
set_seed(3407)
wandb.login()

# model.zero_grad()
# loss.backward()
# optimizer.step()

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mseara[0m. Use [1m`wandb login --relogin`[0m to force relogin


True

In [5]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DATAROOT = (
    "/home/seara/Desktop/Github/vk-try/images"
    if os.path.isdir("/home/seara/Desktop/Github/vk-try/images")
    else "/content/images"
)
WORKERS = 4
BATCH_SIZE = 512
NUM_EPOCHS = 2000
LR = 0.0002
BETA1 = 0.5
IMAGE_SIZE = (64, 64)

In [6]:
class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        self.main = nn.Sequential(
            nn.ConvTranspose2d(
                in_channels=100,
                out_channels=1024,
                kernel_size=4,
                stride=1,
                padding=0,
                bias=False,
            ),
            nn.BatchNorm2d(1024),
            nn.ReLU(True),
            nn.ConvTranspose2d(
                in_channels=1024,
                out_channels=512,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False,
            ),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.ConvTranspose2d(
                in_channels=512,
                out_channels=256,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False,
            ),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.ConvTranspose2d(
                in_channels=256,
                out_channels=128,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False,
            ),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
        )
        self.output = nn.Sequential(
            nn.ConvTranspose2d(
                in_channels=128,
                out_channels=3,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=True,
            ),
            nn.Tanh(),
        )

    def forward(self, x):
        x = self.main(x)
        return self.output(x)

In [7]:
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.main = nn.Sequential(
            nn.Conv2d(
                in_channels=3,
                out_channels=64,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False,
            ),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False,
            ),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(
                in_channels=128,
                out_channels=256,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False,
            ),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(
                in_channels=256,
                out_channels=512,
                kernel_size=4,
                stride=2,
                padding=1,
                bias=False,
            ),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
        )
        self.output = nn.Sequential(
            nn.Conv2d(
                in_channels=512,
                out_channels=1,
                kernel_size=4,
                stride=1,
                padding=0,
                bias=False,
            ),
            nn.Sigmoid(),
        )

    def forward(self, x):
        x = self.main(x)
        return self.output(x)

In [8]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find("BatchNorm") != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [9]:
class DCGAN:
    def __init__(
        self,
        num_epochs,
        lr,
        beta1,
        device,
    ):

        self.num_epochs = num_epochs
        self.lr = lr
        self.beta1 = beta1
        self.device = device

        self.real_label = 1.0
        self.fake_label = 0.0

        self.G = Generator()
        self.D = Discriminator()

        self.G.apply(weights_init)
        self.D.apply(weights_init)

        self.G.to(self.device)
        self.D.to(self.device)

        self.loss = nn.BCELoss()

        self.d_optimizer = optim.Adam(
            self.D.parameters(), lr=self.lr, betas=(self.beta1, 0.999)
        )
        self.g_optimizer = optim.Adam(
            self.G.parameters(), lr=self.lr, betas=(self.beta1, 0.999)
        )
        print(self.G)
        print(self.D)

    def train(self, dataloader):
        wandb.init(project="vk_try", name="leaky relu")
        # wandb.watch(model)

        self.G.train()
        self.D.train()

        losses = []

        for epoch in range(self.num_epochs):
            d_loss_list = []
            d_loss_real_list = []
            d_loss_fake_list = []
            g_loss_list = []
            for i, (real_images, _) in enumerate(dataloader):
                # Discriminator step
                self.D.zero_grad()

                real_images = real_images.to(self.device)
                current_batch_size = real_images.size(0)

                label = torch.ones(current_batch_size, device=self.device)
                d_real_output = self.D(real_images).flatten()
                d_loss_real = self.loss(d_real_output, label)

                noise = torch.randn(current_batch_size, 100, 1, 1, device=self.device)

                g_output = self.G(noise)
                label = torch.zeros(current_batch_size, device=self.device)
                d_fake_output = self.D(g_output.detach()).flatten()
                d_loss_fake = self.loss(d_fake_output, label)

                d_loss = d_loss_real + d_loss_fake
                d_loss.backward()

                self.d_optimizer.step()

                # Generator step
                self.G.zero_grad()
                label = torch.ones(current_batch_size, device=self.device)
                d_fake_output = self.D(g_output).flatten()
                g_loss = self.loss(d_fake_output, label)
                g_loss.backward()
                self.g_optimizer.step()

                # Logging
                d_loss_list.append(d_loss.item())
                d_loss_real_list.append(d_loss_real.item())
                d_loss_fake_list.append(d_loss_fake.item())
                g_loss_list.append(g_loss.item())

            if epoch % 100 == 0:
                images_count = 32
                gen_images = self.generate(images_count)
                plt.figure(figsize=(15, 15))
                plt.axis("off")
                plt.title("Fake Images")
                plt.imshow(
                    utils.make_grid(
                        gen_images[:images_count].cpu(), padding=2, normalize=True
                    ).permute(1, 2, 0)
                )
                plt.show()
                self.G.train()
                self.D.train()
                path = (
                    f"/home/seara/Desktop/Github/vk-try/finalmodel{epoch}.pt"
                    if os.path.isdir("/home/seara/Desktop/Github/vk-try")
                    else f"/content/drive/MyDrive/colab/finalmodel{epoch}.pt"
                )
                self.save(path)

            metrics = {
                "d_loss": sum(d_loss_list) / len(d_loss_list),
                "d_loss_real": sum(d_loss_real_list) / len(d_loss_real_list),
                "d_loss_fake": sum(d_loss_fake_list) / len(d_loss_fake_list),
                "g_loss": sum(g_loss_list) / len(g_loss_list),
            }
            print(f"Epoch: {epoch}: {metrics}")
            wandb.log(metrics)

    def generate(self, images_count):
        self.D.eval()
        self.G.eval()
        with torch.no_grad():
            noise = torch.randn(images_count, 100, 1, 1, device=self.device)
            return self.G(noise)

    def save(self, path):
        save_dict = {
            "discriminator": self.D.state_dict(),
            "generator": self.G.state_dict(),
            "d_optimizer": self.d_optimizer.state_dict(),
            "g_optimizer": self.g_optimizer.state_dict(),
        }
        torch.save(save_dict, path)

    def load(self, path):
        load_dict = torch.load(path)

        if "generator" in load_dict:
            print("Loaded gen")
            self.G.load_state_dict(load_dict["generator"])
        if "discriminator" in load_dict:
            print("Loaded dis")
            self.D.load_state_dict(load_dict["discriminator"])
        if "d_optimizer" in load_dict:
            print("Loaded d_opt")
            self.d_optimizer.load_state_dict(load_dict["d_optimizer"])
        if "g_optimizer" in load_dict:
            print("Loaded g_opt")
            self.g_optimizer.load_state_dict(load_dict["g_optimizer"])

In [10]:
dataset = ImageFolder(
    root=DATAROOT,
    transform=transforms.Compose(
        [
            transforms.Resize(IMAGE_SIZE),
            # transforms.ColorJitter(saturation=1, hue=0.5),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ]
    ),
)
dataloader = DataLoader(
    dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=WORKERS
)

In [11]:
# image = next(iter(dataloader))[0][4]
# plt.figure(figsize=(15, 15))
# plt.subplot(1, 2, 1)
# plt.axis("off")
# plt.title("Training Images")
# plt.imshow(image.permute(1, 2, 0))

# jitter = transforms.ColorJitter(saturation=0.5, hue=0.5)
# plt.subplot(1, 2, 2)
# plt.axis("off")
# plt.title("Training Images")
# plt.imshow(jitter(image).permute(1, 2, 0))
# plt.show()

In [12]:
model = DCGAN(NUM_EPOCHS, LR, BETA1, DEVICE)
model.train(dataloader)

Generator(
  (main): Sequential(
    (0): ConvTranspose2d(100, 1024, kernel_size=(4, 4), stride=(1, 1), bias=False)
    (1): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): ConvTranspose2d(1024, 512, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (4): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): ConvTranspose2d(512, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU(inplace=True)
    (9): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (10): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): ReLU(inplace=True)
  )
  (output): Sequential(
    (0): ConvTranspose2d(128, 3, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
  

In [13]:
# images_count = 32

# plt.figure(figsize=(15, 15))
# plt.subplot(1, 2, 1)
# plt.axis("off")
# plt.title("Training Images")
# plt.imshow(
#     utils.make_grid(
#         next(iter(dataloader))[0][:images_count], padding=2, normalize=True
#     ).permute(1, 2, 0)
# )




# image = next(iter(dataloader))[0]
# plt.figure(figsize=(15, 15))
# plt.subplot(1, 2, 1)
# plt.axis("off")
# plt.title("Training Images")
# plt.imshow(image[0].permute(1, 2, 0))
# # Plot the fake images from the last epoch
# gen_images = model.generate(images_count)

# plt.subplot(1, 2, 2)
# plt.axis("off")
# plt.title("Fake Images")
# plt.imshow(
#     utils.make_grid(gen_images[:images_count].cpu(), padding=2, normalize=True).permute(
#         1, 2, 0
#     )
# )
# plt.show()

In [14]:
# path = "model800.pt"
# save = {
#     "discriminator": model.D.state_dict(),
#     "generator": model.G.state_dict(),
#     "d_optimizer": model.d_optimizer.state_dict(),
#     "g_optimizer": model.g_optimizer.state_dict(),
# }
# torch.save(save, path)

In [15]:
# load = torch.load(path)
# loaded_model = DCGAN(num_epochs=NUM_EPOCHS, lr=LR, beta1=BETA1, device=DEVICE)

In [16]:
# gen_images = loaded_model.generate(images_count)
# plt.figure(figsize=(15, 15))
# plt.axis("off")
# plt.title("Fake Images")
# plt.imshow(
#     utils.make_grid(gen_images[:images_count].cpu(), padding=2, normalize=True).permute(
#         1, 2, 0
#     )
# )
# plt.show()