https://github.com/ozanciga/gans-with-pytorch/tree/master/wgan-gp

In [1]:
#torch cuda
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
from google.colab import drive

drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
import os
import numpy as np
from PIL import Image
import cv2
import numpy as np

In [8]:
from torch import nn

# Residual network.
# WGAN-GP paper defines a residual block with up & downsampling.
# See the official implementation (given in the paper).
# I use architectures described in the official implementation,
# since I find it hard to deduce the blocks given here from the text alone.
class MeanPoolConv(nn.Module):
    def __init__(self, n_input, n_output, k_size):
        super(MeanPoolConv, self).__init__()
        conv1 = nn.Conv2d(n_input, n_output, k_size, stride=1, padding=(k_size-1)//2, bias=True)
        self.model = nn.Sequential(conv1)
    def forward(self, x):
        out = (x[:,:,::2,::2] + x[:,:,1::2,::2] + x[:,:,::2,1::2] + x[:,:,1::2,1::2]) / 4.0
        out = self.model(out)
        return out

class ConvMeanPool(nn.Module):
    def __init__(self, n_input, n_output, k_size):
        super(ConvMeanPool, self).__init__()
        conv1 = nn.Conv2d(n_input, n_output, k_size, stride=1, padding=(k_size-1)//2, bias=True)
        self.model = nn.Sequential(conv1)
    def forward(self, x):
        out = self.model(x)
        out = (out[:,:,::2,::2] + out[:,:,1::2,::2] + out[:,:,::2,1::2] + out[:,:,1::2,1::2]) / 4.0
        return out

class UpsampleConv(nn.Module):
    def __init__(self, n_input, n_output, k_size):
        super(UpsampleConv, self).__init__()

        self.model = nn.Sequential(
            nn.PixelShuffle(2),
            nn.Conv2d(n_input, n_output, k_size, stride=1, padding=(k_size-1)//2, bias=True)
        )
    def forward(self, x):
        x = x.repeat((1, 4, 1, 1)) # Weird concat of WGAN-GPs upsampling process.
        out = self.model(x)
        return out

class ResidualBlock(nn.Module):
    def __init__(self, n_input, n_output, k_size, resample='up', bn=True, spatial_dim=None):
        super(ResidualBlock, self).__init__()

        self.resample = resample

        if resample == 'up':
            self.conv1 = UpsampleConv(n_input, n_output, k_size)
            self.conv2 = nn.Conv2d(n_output, n_output, k_size, padding=(k_size-1)//2)
            self.conv_shortcut = UpsampleConv(n_input, n_output, k_size)
            self.out_dim = n_output
        elif resample == 'down':
            self.conv1 = nn.Conv2d(n_input, n_input, k_size, padding=(k_size-1)//2)
            self.conv2 = ConvMeanPool(n_input, n_output, k_size)
            self.conv_shortcut = ConvMeanPool(n_input, n_output, k_size)
            self.out_dim = n_output
            self.ln_dims = [n_input, spatial_dim, spatial_dim] # Define the dimensions for layer normalization.
        else:
            self.conv1 = nn.Conv2d(n_input, n_input, k_size, padding=(k_size-1)//2)
            self.conv2 = nn.Conv2d(n_input, n_input, k_size, padding=(k_size-1)//2)
            self.conv_shortcut = None # Identity
            self.out_dim = n_input
            self.ln_dims = [n_input, spatial_dim, spatial_dim]

        self.model = nn.Sequential(
            nn.BatchNorm2d(n_input) if bn else nn.LayerNorm(self.ln_dims),
            nn.ReLU(inplace=True),
            self.conv1,
            nn.BatchNorm2d(self.out_dim) if bn else nn.LayerNorm(self.ln_dims),
            nn.ReLU(inplace=True),
            self.conv2,
        )

    def forward(self, x):
        if self.conv_shortcut is None:
            return x + self.model(x)
        else:
            return self.conv_shortcut(x) + self.model(x)

class DiscBlock1(nn.Module):
    def __init__(self, n_output):
        super(DiscBlock1, self).__init__()

        self.conv1 = nn.Conv2d(3, n_output, 3, padding=(3-1)//2)
        self.conv2 = ConvMeanPool(n_output, n_output, 1)
        self.conv_shortcut = MeanPoolConv(3, n_output, 1)

        self.model = nn.Sequential(
            self.conv1,
            nn.ReLU(inplace=True),
            self.conv2
        )

    def forward(self, x):
        return self.conv_shortcut(x) + self.model(x)

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()

        self.model = nn.Sequential(                     # 128 x 1 x 1
            nn.ConvTranspose2d(128, 128, 4, 1, 0),      # 128 x 4 x 4
            #nn.ConvTranspose2d(128, 128, 4, 2, 1),     #Para 64x64 pixeles de entrada
            ResidualBlock(128, 128, 3, resample='up'),  # 128 x 8 x 8
            ResidualBlock(128, 128, 3, resample='up'),  # 128 x 16 x 16
            ResidualBlock(128, 128, 3, resample='up'),  # 128 x 32 x 32
            ResidualBlock(128, 128, 3, resample='up'),  # 128 x 64 x 64
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 3, 3, padding=(3-1)//2),     # 3 x 64 x 64
            nn.Tanh()
        )

    def forward(self, z):
        img = self.model(z)
        return img


class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        n_output = 128
        '''
        This is a parameter but since we experiment with a single size
        of 3 x 32 x 32 images, it is hardcoded here.
        '''

        self.DiscBlock1 = DiscBlock1(n_output)                      # 128 x 32 x 32

        self.model = nn.Sequential(
            ResidualBlock(n_output, n_output, 3, resample='down', bn=False, spatial_dim=32),  # 128 x 16 x 16
            ResidualBlock(n_output, n_output, 3, resample='down', bn=False, spatial_dim=16),  # 128 x 8 x 8
            ResidualBlock(n_output, n_output, 3, resample=None, bn=False, spatial_dim=8),    # 128 x 8 x 8
            ResidualBlock(n_output, n_output, 3, resample=None, bn=False, spatial_dim=8),    # 128 x 8 x 8
            nn.ReLU(inplace=True),
        )
        self.l1 = nn.Sequential(nn.Linear(128, 1))                  # 128 x 1

    def forward(self, x):
        # x = x.view(-1, 3, 32, 32)
        y = self.DiscBlock1(x)
        y = self.model(y)
        y = y.view(x.size(0), 128, -1)
        y = y.mean(dim=2)
        out = self.l1(y).unsqueeze_(1).unsqueeze_(2) # or *.view(x.size(0), 128, 1, 1, 1)
        return out




In [5]:
import torch.utils.data as data
import torchvision.transforms as transforms

class CustomDataset(data.Dataset):
    def __init__(self, X_folder, y_folder, transform=None):
        self.X_folder = X_folder
        self.y_folder = y_folder
        self.transform = transform

        # Obtener la lista de nombres de archivo en las carpetas
        self.X_filenames = [filename for filename in os.listdir(X_folder) if filename.endswith('.jpg')]
        self.y_filenames = [filename for filename in os.listdir(y_folder) if filename.endswith('.jpg')]

        #self.resizer = transforms.Resize((64, 64))  # Redimensionar las imágenes a 64x64 píxeles
        #self.to_grayscale = transforms.Grayscale()  # Convertir las imágenes a escala de grises

    def __len__(self):
        return len(self.X_filenames)

    def __getitem__(self, index):
        X_filename = self.X_filenames[index]
        y_filename = self.y_filenames[index]

        if not X_filename.endswith(".jpg"):
            return self.__getitem__((index + 1) % len(self))

        # Cargar las imágenes y las etiquetas
        X = Image.open(os.path.join(self.X_folder, X_filename))
        y = Image.open(os.path.join(self.y_folder, y_filename))

        # Redimensionar las imágenes a 64x64 píxeles y convertirlas a escala de grises
        #X = self.resizer(X)
        #X = self.to_grayscale(X)
        #y = self.resizer(y)
        #y = self.to_grayscale(y)

        if self.transform:
            X = self.transform(X)
            y = self.transform(y)

        return X, y

    def get_images(self):
        images = []
        for X_filename in self.X_filenames:
            X = Image.open(os.path.join(self.X_folder, X_filename))#.convert("RGB")
            #X = self.resizer(X)
            if self.transform:
                X = self.transform(X)
            images.append(X)
        return images

    def get_labels(self):
        labels = []
        for y_filename in self.y_filenames:
            y = Image.open(os.path.join(self.y_folder, y_filename))#.convert("RGB")
            #y = self.resizer(y)
            if self.transform:
                y = self.transform(y)
            labels.append(y)
        return labels


In [9]:
import torch
from torch import nn, optim
from torch.autograd.variable import Variable

from torchvision import transforms, datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F

import torchvision.utils as vutils

import errno

class Args:
    def __init__(self):
        self.n_epochs = 100
        self.batch_size = 64
        self.alpha = 0.0001
        self.b1 = 0.5
        self.b2 = 0.9
        self.n_critic = 5
        self.lambda_1 = 10
        self.img_size = 64
        self.channels = 3
        #self.display_port = 8097
        #self.display_server = "http://localhost"
        self.sample_interval = 256
opt = Args()


img_dims = (opt.channels, opt.img_size, opt.img_size)
n_features = opt.channels * opt.img_size * opt.img_size

# TODO: Use some initialization in the future.
def init_weights(m):
    if type(m) == nn.ConvTranspose2d:
        torch.nn.init.kaiming_normal(m.weight, mode='fan_out', nonlinearity='relu')
    elif type(m) == nn.Conv2d:
        torch.nn.init.kaiming_uniform_(m.weight, mode='fan_in', nonlinearity='leaky_relu')

# Definir las rutas de las carpetas de entrenamiento y prueba
trainX_folder = '/content/gdrive/MyDrive/Dev/AI_MsC/TFM/CRACK500/traindata/traindata/'
trainy_folder = '/content/gdrive/MyDrive/Dev/AI_MsC/TFM/CRACK500/valdata/valdata/'

# Crear las transformaciones
transform = transforms.Compose([
    transforms.Resize((opt.img_size, opt.img_size)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

# Crear el dataset personalizado de entrenamiento
dataset_full = CustomDataset(trainX_folder, trainy_folder, transform=transform)
# Obtener xtrain e ytrain
dataset = dataset_full.get_images()
# Crear los dataloaders
batch_iterator = torch.utils.data.DataLoader(dataset, batch_size=opt.batch_size, shuffle=True, num_workers=2)

cuda = torch.cuda.is_available()
Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
gan_loss = nn.BCELoss()

generator = Generator()
discriminator = Discriminator()

optimizer_D = optim.Adam(discriminator.parameters(), lr=opt.alpha, betas=(opt.b1, opt.b2))
optimizer_G = optim.Adam(generator.parameters(), lr=opt.alpha, betas=(opt.b1, opt.b2))

# Loss record.
g_losses = []
d_losses = []
epochs = []
loss_legend = ['Discriminator', 'Generator']

if cuda:
    generator = generator.cuda()
    discriminator = discriminator.cuda()

noise_fixed = Variable(Tensor(25, 128, 1, 1).normal_(0, 1), requires_grad=False) # To track the progress of the GAN.

for epoch in range(opt.n_epochs):
    print('Epoch {}'.format(epoch))
    for i, batch in enumerate(batch_iterator):
        # == Discriminator update == #
        for iter in range(opt.n_critic):
            # Sample real and fake images, using notation in paper.
            x = Variable(batch.type(Tensor))
            noise = Variable(Tensor(batch.size(0), 128, 1, 1).normal_(0, 1))
            x_tilde = Variable(generator(noise), requires_grad=True)

            epsilon = Variable(Tensor(batch.size(0), 1, 1, 1).uniform_(0, 1))

            x_hat = epsilon*x + (1 - epsilon)*x_tilde
            x_hat = torch.autograd.Variable(x_hat, requires_grad=True)

            # Put the interpolated data through critic.
            dw_x = discriminator(x_hat)
            # A great exercise on learning how the autograd.grad works!
            grad_x = torch.autograd.grad(outputs=dw_x, inputs=x_hat,
                                         grad_outputs=Variable(Tensor(batch.size(0), 1, 1, 1).fill_(1.0), requires_grad=False),
                                         create_graph=True, retain_graph=True, only_inputs=True)
            grad_x = grad_x[0].view(batch.size(0), -1)
            grad_x = grad_x.norm(p=2, dim=1) # My naming is inaccurate, this is the 2-norm of grad(D_w(x_hat))

            # Update discriminator (or critic, since we don't output probabilities anymore).
            optimizer_D.zero_grad()

            # WGAN-GP loss, defined properly as a loss unlike the WGAN paper.
            d_loss = torch.mean(discriminator(x_tilde)) - torch.mean(discriminator(x)) + opt.lambda_1*torch.mean((grad_x - 1)**2)
            # d_loss = torch.mean(d_loss) # there's a reason for why this shouldn't be done this way :)

            d_loss.backward()
            optimizer_D.step()

        # == Generator update == #
        noise = Variable(Tensor(batch.size(0), 128, 1, 1).normal_(0, 1))
        imgs_fake = generator(noise)

        optimizer_G.zero_grad()

        g_loss = -torch.mean(discriminator(imgs_fake))

        g_loss.backward()
        optimizer_G.step()

        print('[%d/%d][%d/%d] Loss_D: %.4f Loss_G: %.4f' % (epoch+1, 70, i+1, len(batch_iterator), d_loss.data, g_loss.data))
        if i % 100 == 0: # Every 100 steps:
            vutils.save_image(x, '/content/gdrive/MyDrive/Dev/AI_MsC/TFM/BestNotebooks/wGAN-v1_Images/real_samples.png', normalize = True) # We save the real images of the minibatch.
            fake = generator(noise) # We get our fake generated images.
            vutils.save_image(fake.detach(), f"/content/gdrive/MyDrive/Dev/AI_MsC/TFM/BestNotebooks/wGAN-v1_Images/fake_samples/fake_samples_epoch_{epoch:03d}.png", normalize=True) # We also save the fake generated images of the minibatch.


Epoch 0
[1/70][1/4] Loss_D: 4.4981 Loss_G: 2.2033
[1/70][2/4] Loss_D: -3.5825 Loss_G: 3.3254
[1/70][3/4] Loss_D: -9.8524 Loss_G: 5.0289
[1/70][4/4] Loss_D: -14.6230 Loss_G: 5.2666
Epoch 1
[2/70][1/4] Loss_D: -17.9885 Loss_G: 6.8849
[2/70][2/4] Loss_D: -23.9062 Loss_G: 6.2201
[2/70][3/4] Loss_D: -24.5498 Loss_G: 6.2076
[2/70][4/4] Loss_D: -23.5531 Loss_G: 6.2552
Epoch 2
[3/70][1/4] Loss_D: -21.3566 Loss_G: -4.0157
[3/70][2/4] Loss_D: -19.8737 Loss_G: 5.2533
[3/70][3/4] Loss_D: -23.0726 Loss_G: 6.9850
[3/70][4/4] Loss_D: -16.1973 Loss_G: -1.9666
Epoch 3
[4/70][1/4] Loss_D: -18.0114 Loss_G: -11.6285
[4/70][2/4] Loss_D: -17.7231 Loss_G: -10.7598
[4/70][3/4] Loss_D: -14.1109 Loss_G: 12.8975
[4/70][4/4] Loss_D: -16.9094 Loss_G: -5.1044
Epoch 4
[5/70][1/4] Loss_D: -14.3335 Loss_G: -11.5591
[5/70][2/4] Loss_D: -10.7606 Loss_G: -2.3974
[5/70][3/4] Loss_D: -5.4305 Loss_G: -11.2291
[5/70][4/4] Loss_D: -6.3470 Loss_G: -21.3477
Epoch 5
[6/70][1/4] Loss_D: -10.9703 Loss_G: -13.2586
[6/70][2/4] Loss_

In [10]:
# Guardar el modelo en un archivo .pth
gen_path = "/content/gdrive/MyDrive/Dev/AI_MsC/TFM/BestNotebooks/wGAN-v1_Images/models/generator.pth"
dis_path = "/content/gdrive/MyDrive/Dev/AI_MsC/TFM/BestNotebooks/wGAN-v1_Images/models/discriminator.pth"

torch.save(generator.state_dict(), gen_path)
torch.save(discriminator.state_dict(), dis_path)

# Función para generar una imagen aleatoria y guardarla en un archivo .jpg
def generar_imagen_aleatoria(generator_path, output_path):
    # Cargar los pesos del generador desde el archivo .pth
    generator = Generator()  # Reemplaza "Generator()" con la clase o función que define tu generador
    generator.load_state_dict(torch.load(generator_path))
    generator.eval()

    # Generar una imagen aleatoria
    with torch.no_grad():
        noise = torch.randn(1, 128, 1, 1)  # Ajusta el tamaño del ruido según tu generador
        imagen_generada = generator(noise)

    # Guardar la imagen generada en un archivo .jpg
    vutils.save_image(imagen_generada, output_path, normalize=True)


In [11]:
# Ejemplo de uso:
output_path = "/content/gdrive/MyDrive/Dev/AI_MsC/TFM/BestNotebooks/wGAN-v1_Images/models/output.jpg"
generar_imagen_aleatoria(gen_path, output_path)