#DEEP LEARNING
##LAB 8

In [None]:
from google.colab import drive
import pandas as pd
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from PIL import Image
import pandas as pd
import numpy as np
import os
import glob
import itertools
from torchvision.datasets import ImageFolder
from dataset import HorseZebraDataset
from discriminator_model import Discriminator
from generator_model import Generator
from utils import save_checkpoint, load_checkpoint
from tqdm import tqdm
from torchvision.utils import save_image
import config


In [None]:
class ImageDataset(torch.utils.data.Dataset):
    def __init__(self, root_A, root_B, transforms_=None):
        self.transform = transforms.Compose(transforms_)
        self.files_A = sorted(glob.glob(os.path.join(root_A, "*.*")))
        self.files_B = sorted(glob.glob(os.path.join(root_B, "*.*")))

    def __getitem__(self, index):
        item_A = self.transform(Image.open(self.files_A[index % len(self.files_A)]))
        item_B = self.transform(Image.open(self.files_B[index % len(self.files_B)]))
        return {"A": item_A, "B": item_B}

    def __len__(self):
        return max(len(self.files_A), len(self.files_B))

batch_size = 1
data_transforms = [
    transforms.Resize(int(286 * 1.12), Image.BICUBIC),
    transforms.RandomCrop((256, 256)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]

dataset = ImageDataset(root_A='/content/drive/MyDrive/Deep Learning/archive/trainA', root_B='/content/drive/MyDrive/Deep Learning/archive/trainB', transforms_=data_transforms)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)

metadata_df = pd.read_csv('/content/drive/MyDrive/Deep Learning/mnist_train.csv')




In [None]:

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
        self.conv4 = nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1)
        self.conv5 = nn.Conv2d(512, 512, kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))
        return x

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
        self.conv4 = nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1)
        self.conv5 = nn.Conv2d(512, 1, kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        x = F.leaky_relu(self.conv1(x), negative_slope=0.2)
        x = F.leaky_relu(self.conv2(x), negative_slope=0.2)
        x = F.leaky_relu(self.conv3(x), negative_slope=0.2)
        x = F.leaky_relu(self.conv4(x), negative_slope=0.2)
        x = torch.sigmoid(self.conv5(x))
        return x

G_AB = Generator()
G_BA = Generator()
D_A = Discriminator()
D_B = Discriminator()

print(G_AB)
print(D_A)


Generator(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv3): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv4): Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv5): Conv2d(512, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
)
Discriminator(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv3): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv4): Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv5): Conv2d(512, 1, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SpatialTransformer(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(SpatialTransformer, self).__init__()
        self.localization = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.fc_loc = nn.Sequential(
            nn.Linear(128 * 7 * 7, 256),
            nn.ReLU(True),
            nn.Linear(256, 2 * 3)
        )
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

    def forward(self, x):
        xs = self.localization(x)
        xs = xs.view(-1, 128 * 7 * 7)
        theta = self.fc_loc(xs).view(-1, 2, 3)
        grid = F.affine_grid(theta, x.size())
        x = F.grid_sample(x, grid)
        return x

class GeneratorSTN(nn.Module):
    def __init__(self):
        super(GeneratorSTN, self).__init__()
        self.stn = SpatialTransformer(in_channels=3, out_channels=3)

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.norm1 = nn.InstanceNorm2d(64)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.norm2 = nn.InstanceNorm2d(128)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.norm3 = nn.InstanceNorm2d(256)
        self.conv4 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.norm4 = nn.InstanceNorm2d(512)
        self.conv5 = nn.Conv2d(512, 3, kernel_size=3, padding=1)

    def forward(self, x):
        x = self.stn(x)

        x = F.relu(self.norm1(self.conv1(x)))
        x = F.relu(self.norm2(self.conv2(x)))
        x = F.relu(self.norm3(self.conv3(x)))
        x = F.relu(self.norm4(self.conv4(x)))
        x = torch.tanh(self.conv5(x))
        return x


In [None]:
class GeneratorSemiSupervised(nn.Module):
    def __init__(self):
        super(GeneratorSemiSupervised, self).__init__()

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.norm1 = nn.InstanceNorm2d(64)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.norm2 = nn.InstanceNorm2d(128)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
        self.norm3 = nn.InstanceNorm2d(256)
        self.residual_blocks = self.make_res_blocks(256, 6)
        self.deconv1 = nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.norm4 = nn.InstanceNorm2d(128)
        self.deconv2 = nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1)
        self.norm5 = nn.InstanceNorm2d(64)
        self.conv4 = nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1)
        self.tanh = nn.Tanh()

    def make_res_blocks(self, channels, num_blocks):
        res_blocks = []
        for _ in range(num_blocks):
            res_blocks.append(ResidualBlock(channels))
        return nn.Sequential(*res_blocks)

    def forward(self, x):
        x = F.relu(self.norm1(self.conv1(x)))
        x = F.relu(self.norm2(self.conv2(x)))
        x = F.relu(self.norm3(self.conv3(x)))
        x = self.residual_blocks(x)
        x = F.relu(self.norm4(self.deconv1(x)))
        x = F.relu(self.norm5(self.deconv2(x)))
        x = self.tanh(self.conv4(x))
        return x

class DiscriminatorSemiSupervised(nn.Module):
    def __init__(self):
        super(DiscriminatorSemiSupervised, self).__init__()

        self.conv1 = nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1)
        self.norm1 = nn.InstanceNorm2d(128)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1)
        self.norm2 = nn.InstanceNorm2d(256)
        self.conv4 = nn.Conv2d(256, 512, kernel_size=4, stride=1, padding=1)
        self.norm3 = nn.InstanceNorm2d(512)
        self.conv5 = nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1)

    def forward(self, x):
        x = F.leaky_relu(self.conv1(x), 0.2)
        x = F.leaky_relu(self.norm1(self.conv2(x)), 0.2)
        x = F.leaky_relu(self.norm2(self.conv3(x)), 0.2)
        x = F.leaky_relu(self.norm3(self.conv4(x)), 0.2)
        x = torch.sigmoid(self.conv5(x))
        return x

class AuxiliaryClassifier(nn.Module):
    def __init__(self):
        super(AuxiliaryClassifier, self).__init__()

        self.conv1 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)
        self.norm1 = nn.InstanceNorm2d(512)
        self.conv2 = nn.Conv2d(512, 1024, kernel_size=3, stride=2, padding=1)
        self.norm2 = nn.InstanceNorm2d(1024)
        self.conv3 = nn.Conv2d(1024, num_classes, kernel_size=1, stride=1)
        self.avgpool = nn.AdaptiveAvgPool2d(1)

    def forward(self, x):
        x = F.relu(self.norm1(self.conv1(x)))
        x = F.relu(self.norm2(self.conv2(x)))
        x = self.conv3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        return x


In [None]:
def adversarial_training(gen_Z, gen_H, disc_Z, disc_H, opt_gen, opt_disc, loader, mse_loss, g_scaler, d_scaler):
    loop = tqdm(loader, leave=True)
    for idx, (zebra, horse) in enumerate(loop):
        zebra = zebra.to(config.DEVICE)
        horse = horse.to(config.DEVICE)

        with torch.cuda.amp.autocast():
            fake_zebra = gen_Z(horse)
            disc_Z_real = disc_Z(zebra)
            disc_Z_fake = disc_Z(fake_zebra.detach())
            loss_D_Z_real = mse_loss(disc_Z_real, torch.ones_like(disc_Z_real))
            loss_D_Z_fake = mse_loss(disc_Z_fake, torch.zeros_like(disc_Z_fake))
            loss_D_Z = (loss_D_Z_real + loss_D_Z_fake) / 2

        opt_disc.zero_grad()
        d_scaler.scale(loss_D_Z).backward()
        d_scaler.step(opt_disc)
        d_scaler.update()

        with torch.cuda.amp.autocast():
            fake_horse = gen_H(zebra)
            disc_H_real = disc_H(horse)
            disc_H_fake = disc_H(fake_horse.detach())
            loss_D_H_real = mse_loss(disc_H_real, torch.ones_like(disc_H_real))
            loss_D_H_fake = mse_loss(disc_H_fake, torch.zeros_like(disc_H_fake))
            loss_D_H = (loss_D_H_real + loss_D_H_fake) / 2

        opt_disc.zero_grad()
        d_scaler.scale(loss_D_H).backward()
        d_scaler.step(opt_disc)
        d_scaler.update()

        with torch.cuda.amp.autocast():
            fake_zebra = gen_Z(horse)
            fake_horse = gen_H(zebra)
            disc_Z_fake = disc_Z(fake_zebra)
            disc_H_fake = disc_H(fake_horse)
            loss_G_Z = mse_loss(disc_Z_fake, torch.ones_like(disc_Z_fake))
            loss_G_H = mse_loss(disc_H_fake, torch.ones_like(disc_H_fake))
            cycle_zebra = gen_Z(fake_horse)
            cycle_horse = gen_H(fake_zebra)
            cycle_loss = config.LAMBDA_CYCLE * (mse_loss(cycle_zebra, zebra) + mse_loss(cycle_horse, horse))
            total_loss = loss_G_Z + loss_G_H + cycle_loss

        opt_gen.zero_grad()
        g_scaler.scale(total_loss).backward()
        g_scaler.step(opt_gen)
        g_scaler.update()

        loop.set_postfix(loss_D_Z=loss_D_Z.item(), loss_D_H=loss_D_H.item(), loss_G=total_loss.item())

def main():
    gen_Z = Generator(img_channels=3, num_residuals=9).to(config.DEVICE)
    gen_H = Generator(img_channels=3, num_residuals=9).to(config.DEVICE)
    disc_Z = Discriminator(in_channels=3).to(config.DEVICE)
    disc_H = Discriminator(in_channels=3).to(config.DEVICE)
    opt_gen = optim.Adam(list(gen_Z.parameters()) + list(gen_H.parameters()), lr=config.LEARNING_RATE, betas=(0.5, 0.999))
    opt_disc = optim.Adam(list(disc_Z.parameters()) + list(disc_H.parameters()), lr=config.LEARNING_RATE, betas=(0.5, 0.999))
    mse_loss = nn.MSELoss()

    dataset = HorseZebraDataset(root_horse=config.TRAIN_DIR + "/horses", root_zebra=config.TRAIN_DIR + "/zebras", transform=config.transforms)
    loader = DataLoader(dataset, batch_size=config.BATCH_SIZE, shuffle=True, num_workers=config.NUM_WORKERS, pin_memory=True)

    g_scaler = torch.cuda.amp.GradScaler()
    d_scaler = torch.cuda.amp.GradScaler()

    for epoch in range(config.NUM_EPOCHS):
        adversarial_training(gen_Z, gen_H, disc_Z, disc_H, opt_gen, opt_disc, loader, mse_loss, g_scaler, d_scaler)

        if config.SAVE_MODEL:
            save_checkpoint(gen_Z, opt_gen, filename=config.CHECKPOINT_GEN_Z)
            save_checkpoint(gen_H, opt_gen, filename=config.CHECKPOINT_GEN_H)
            save_checkpoint(disc_Z, opt_disc, filename=config.CHECKPOINT_DISC_Z)
            save_checkpoint(disc_H, opt_disc, filename=config.CHECKPOINT_DISC_H)

if __name__ == "__main__":
    main()


ModuleNotFoundError: No module named 'dataset'