In [5]:
import os
import yaml
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.utils as vutils
from torchvision.utils import save_image
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from PIL import Image

DATASET_PATH = '/kaggle/input/skin-lesion/images'




In [16]:
import shutil
import os

# Path to the folder you want to zip
folder_to_zip = '/kaggle/working/ganoutput'

# Path where the zip file will be saved
output_zip = '/kaggle/working/ganoutput.zip'

# Create a zip file of the folder (including subfolders and files)
shutil.make_archive(output_zip.replace('.zip', ''), 'zip', folder_to_zip)

print(f"Zipped {folder_to_zip} into {output_zip}")


Zipped /kaggle/working/ganoutput into /kaggle/working/ganoutput.zip


In [6]:
# Generator for 256x256 images
class Generator_256(nn.Module):
    def __init__(self, ngpu, nz, ngf, nc):
        super(Generator_256, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            nn.ConvTranspose2d(nz, ngf * 32, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 32),
            nn.ReLU(True),
            nn.ConvTranspose2d(ngf * 32, ngf * 16, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 16),
            nn.ReLU(True),
            nn.ConvTranspose2d(ngf * 16, ngf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),
            nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),
            nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),
            nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False),
            nn.Tanh()
        )

    def forward(self, input):
        return self.main(input)

# Discriminator with Spectral Normalization for 256x256 images
class Discriminator_SN_256(nn.Module):
    def __init__(self, ngpu, nc, ndf):
        super(Discriminator_SN_256, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            nn.utils.spectral_norm(nn.Conv2d(nc, ndf, 4, 2, 1, bias=False)),
            nn.LeakyReLU(0.2, inplace=True),
            nn.utils.spectral_norm(nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False)),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.utils.spectral_norm(nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False)),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            nn.utils.spectral_norm(nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False)),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.utils.spectral_norm(nn.Conv2d(ndf * 8, ndf * 16, 4, 2, 1, bias=False)),
            nn.BatchNorm2d(ndf * 16),
            nn.LeakyReLU(0.2, inplace=True),
            nn.utils.spectral_norm(nn.Conv2d(ndf * 16, 1, 4, 1, 0, bias=False)),
            nn.AdaptiveAvgPool2d(1)
        )

    def forward(self, input):
        return self.main(input).view(-1)

def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

def to_gpu(ngpu):
    device = torch.device("cuda:0" if (torch.cuda.is_available() and ngpu > 0) else "cpu")
    return device

def loss_plot(G_losses, D_losses, out):
    plt.figure(figsize=(10, 5))
    plt.title("Generator and Discriminator Loss During Training")
    plt.plot(G_losses, label="G")
    plt.plot(D_losses, label="D")
    plt.xlabel("iterations")
    plt.ylabel("Loss")
    plt.legend()
    plt.savefig(out + 'loss_plot.png')
    plt.close()

def save_generated_images(img_list, out_dir):
    os.makedirs(out_dir, exist_ok=True)
    for i, batch in enumerate(img_list):
        for j, img in enumerate(batch):
            save_image(img, f"{out_dir}/generated_image_{i}_{j}.png", normalize=True)
    print(f"Saved generated images to {out_dir}")

def visual_evaluation(dataloader, img_list, device, out):
    real_batch = next(iter(dataloader))
    plt.figure(figsize=(15, 15))
    plt.subplot(1, 2, 1)
    plt.axis("off")
    plt.title("Real Images")
    plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(device)[:64], padding=5, normalize=True).cpu(), (1, 2, 0)))

    plt.subplot(1, 2, 2)
    plt.axis("off")
    plt.title("Fake Images")
    plt.imshow(np.transpose(img_list[-1], (1, 2, 0)))
    plt.savefig(f"{out}real_fake_comparison.png")
    plt.close()

    print(f"Saved real vs fake image comparison to {out}real_fake_comparison.png")

class SingleClassDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.images = [f for f in os.listdir(root_dir) if f.endswith('.jpg') or f.endswith('.png')]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.images[idx])
        image = Image.open(img_name).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, 0  # 0 is a dummy label

def create_single_class_dataloader(image_size, batch_size, class_name, num_workers=2, root_dir=DATASET_PATH):
    transform = transforms.Compose([
        transforms.Resize(image_size),
        transforms.CenterCrop(image_size),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    class_path = os.path.join(root_dir, class_name)
    dataset = SingleClassDataset(root_dir=class_path, transform=transform)

    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers
    )

    return dataloader



In [7]:
def initialize_networks(params):
    if params['arch'] == 'SNGAN' and params['image_size'] == 256:
        netG = Generator_256(params['n_gpu'], params['latent_vector'], params['gen_feature_maps'], params['number_channels'])
        netD = Discriminator_SN_256(params['n_gpu'], params['number_channels'], params['dis_feature_maps'])
    else:
        raise ValueError(f"Unsupported architecture: {params['arch']} or image size: {params['image_size']}")

    return netG, netD

def training_loop(num_epochs, dataloader, netG, netD, device, criterion, nz, optimizerG, optimizerD, fixed_noise, out, use_label_smoothing, smooth_label):
    img_list = []
    img_list_only = []
    G_losses = []
    D_losses = []
    iters = 0

    real_label = smooth_label if use_label_smoothing else 1.
    fake_label = 0.
    os.makedirs(os.path.dirname(out), exist_ok=True)
    print("Starting Training Loop...")
    for epoch in range(num_epochs):
        for i, data in enumerate(dataloader, 0):
            # ... (training loop code remains unchanged)
            ############################
            # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
            ###########################
            netD.zero_grad()
            real_cpu = data[0].to(device)
            b_size = real_cpu.size(0)
            label = torch.full((b_size,), real_label, dtype=torch.float, device=device)
            output = netD(real_cpu)
            errD_real = criterion(output, label)
            errD_real.backward()
            D_x = output.mean().item()

            noise = torch.randn(b_size, nz, 1, 1, device=device)
            fake = netG(noise)
            label.fill_(fake_label)
            output = netD(fake.detach())
            errD_fake = criterion(output, label)
            errD_fake.backward()
            D_G_z1 = output.mean().item()
            errD = errD_real + errD_fake
            optimizerD.step()

            ############################
            # (2) Update G network: maximize log(D(G(z)))
            ###########################
            netG.zero_grad()
            label.fill_(real_label)  # fake labels are real for generator cost
            output = netD(fake)
            errG = criterion(output, label)
            errG.backward()
            D_G_z2 = output.mean().item()
            optimizerG.step()

            # Output training stats
            if i % 50 == 0:
                print(f'[{epoch}/{num_epochs}][{i}/{len(dataloader)}]\tLoss_D: {errD.item():.4f}\tLoss_G: {errG.item():.4f}\tD(x): {D_x:.4f}\tD(G(z)): {D_G_z1:.4f} / {D_G_z2:.4f}')

            # Save Losses for plotting later
            G_losses.append(errG.item())
            D_losses.append(errD.item())

            # Check how the generator is doing by saving G's output on fixed_noise
            if (iters % 500 == 0) or ((epoch == num_epochs-1) and (i == len(dataloader)-1)):
                with torch.no_grad():
                    fake = netG(fixed_noise).detach().cpu()
                img_list.append(vutils.make_grid(fake, padding=2, normalize=True))
                img_list_only.append(fake)

                for j in range(len(fake)):
                    save_image(fake[j], f"{out}{epoch}_{j}_img.png")

            iters += 1

    return G_losses, D_losses, img_list, img_list_only

def main(params):
    torch.manual_seed(params['seed'])
    np.random.seed(params['seed'])

    device = to_gpu(ngpu=params['n_gpu'])
    classes = ['MEL', 'BCC', 'SCC', 'NV']

    for class_name in classes:
        print(f"Training {params['arch']} for class: {class_name}")

        dataloader = create_single_class_dataloader(
            image_size=params['image_size'],
            batch_size=params['batch_size'],
            class_name=class_name,
            num_workers=params['loader_workers'],
            root_dir=params['path']
        )

        netG, netD = initialize_networks(params)
        netG = netG.to(device)
        netD = netD.to(device)

        if (device.type == 'cuda') and (params['n_gpu'] > 1):
            netG = nn.DataParallel(netG, list(range(params['n_gpu'])))
            netD = nn.DataParallel(netD, list(range(params['n_gpu'])))

        netG.apply(weights_init)
        netD.apply(weights_init)

        criterion = nn.BCEWithLogitsLoss()
        fixed_noise = torch.randn(params['batch_size'], params['latent_vector'], 1, 1, device=device)

        optimizerD = optim.Adam(netD.parameters(), lr=params['discriminator_lr'], betas=(params['beta_adam'], 0.999))
        optimizerG = optim.Adam(netG.parameters(), lr=params['generator_lr'], betas=(params['beta_adam'], 0.999))

        out_dir = os.path.join(params['out'], f"{params['run']}_{class_name}_")
        os.makedirs(out_dir, exist_ok=True)
        G_losses, D_losses, img_list, img_list_only = training_loop(
            num_epochs=params['num_epochs'],
            dataloader=dataloader,
            netG=netG,
            netD=netD,
            device=device,
            criterion=criterion,
            nz=params['latent_vector'],
            optimizerG=optimizerG,
            optimizerD=optimizerD,
            fixed_noise=fixed_noise,
            out=f"{params['out']}{params['run']}_{class_name}_",
            use_label_smoothing=params['use_label_smoothing'],
            smooth_label=params['smooth_label']
        )

        # Save the trained models
        torch.save(netG.state_dict(), f"{out_dir}generator.pth")
        torch.save(netD.state_dict(), f"{out_dir}discriminator.pth")

        loss_plot(G_losses=G_losses, D_losses=D_losses, out=out_dir)
        visual_evaluation(dataloader=dataloader, img_list=img_list, device=device, out=out_dir)
        save_generated_images(img_list_only, f"{out_dir}generated_images/")

        print(f"Finished training for class: {class_name}")

if __name__ == "__main__":
    with open('/kaggle/input/gan-params-yml/gan_params1.yaml', 'r') as f:
        params = yaml.safe_load(f)
    main(params)

Training SNGAN for class: MEL
Starting Training Loop...
[0/500][0/16]	Loss_D: 1.5137	Loss_G: 27.1572	D(x): 0.6360	D(G(z)): 0.5716 / -30.1746
[1/500][0/16]	Loss_D: 2.7584	Loss_G: 7.4703	D(x): -2.5189	D(G(z)): -7.6334 / -8.3000
[2/500][0/16]	Loss_D: 0.9226	Loss_G: 3.8726	D(x): 0.6873	D(G(z)): -2.2866 / -4.2876
[3/500][0/16]	Loss_D: 0.8540	Loss_G: 2.2137	D(x): 0.6222	D(G(z)): -2.0238 / -2.3591
[4/500][0/16]	Loss_D: 1.4868	Loss_G: 1.6995	D(x): -0.7764	D(G(z)): -1.4287 / -1.7015
[5/500][0/16]	Loss_D: 1.6860	Loss_G: 4.1486	D(x): 1.2617	D(G(z)): -5.4653 / -4.5977
[6/500][0/16]	Loss_D: 0.5118	Loss_G: 3.0215	D(x): 1.0506	D(G(z)): -2.9320 / -3.3163
[7/500][0/16]	Loss_D: 2.1646	Loss_G: 4.9452	D(x): -1.1344	D(G(z)): -2.3742 / -5.4845
[8/500][0/16]	Loss_D: 0.7058	Loss_G: 2.1407	D(x): 1.2733	D(G(z)): -1.5302 / -2.2339
[9/500][0/16]	Loss_D: 0.8101	Loss_G: 3.1405	D(x): 0.7293	D(G(z)): -1.9306 / -3.4088
[10/500][0/16]	Loss_D: 1.3986	Loss_G: 1.5741	D(x): 0.0329	D(G(z)): -1.1967 / -1.4068
[11/500][0/16]	

In [8]:
# def training_loop(num_epochs, dataloader, netG, netD, device, criterion, nz, optimizerG, optimizerD, fixed_noise, out, use_label_smoothing, smooth_label):
#     img_list = []
#     img_list_only = []
#     G_losses = []
#     D_losses = []
#     iters = 0

#     real_label = smooth_label if use_label_smoothing else 1.
#     fake_label = 0.
#     os.makedirs(os.path.dirname(out), exist_ok=True)
#     print("Starting Training Loop...")
#     for epoch in range(num_epochs):
#         for i, data in enumerate(dataloader, 0):
#             ############################
#             # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
#             ###########################
#             netD.zero_grad()
#             real_cpu = data[0].to(device)
#             b_size = real_cpu.size(0)
#             label = torch.full((b_size,), real_label, dtype=torch.float, device=device)
#             output = netD(real_cpu)
#             errD_real = criterion(output, label)
#             errD_real.backward()
#             D_x = output.mean().item()

#             noise = torch.randn(b_size, nz, 1, 1, device=device)
#             fake = netG(noise)
#             label.fill_(fake_label)
#             output = netD(fake.detach())
#             errD_fake = criterion(output, label)
#             errD_fake.backward()
#             D_G_z1 = output.mean().item()
#             errD = errD_real + errD_fake
#             optimizerD.step()

#             ############################
#             # (2) Update G network: maximize log(D(G(z)))
#             ###########################
#             netG.zero_grad()
#             label.fill_(real_label)  # fake labels are real for generator cost
#             output = netD(fake)
#             errG = criterion(output, label)
#             errG.backward()
#             D_G_z2 = output.mean().item()
#             optimizerG.step()

#             # Output training stats
#             if i % 50 == 0:
#                 print(f'[{epoch}/{num_epochs}][{i}/{len(dataloader)}]\tLoss_D: {errD.item():.4f}\tLoss_G: {errG.item():.4f}\tD(x): {D_x:.4f}\tD(G(z)): {D_G_z1:.4f} / {D_G_z2:.4f}')

#             # Save Losses for plotting later
#             G_losses.append(errG.item())
#             D_losses.append(errD.item())

#             # Check how the generator is doing by saving G's output on fixed_noise
#             if (iters % 500 == 0) or ((epoch == num_epochs-1) and (i == len(dataloader)-1)):
#                 with torch.no_grad():
#                     fake = netG(fixed_noise).detach().cpu()
#                 img_list.append(vutils.make_grid(fake, padding=2, normalize=True))
#                 img_list_only.append(fake)

#                 for j in range(len(fake)):
#                     save_image(fake[j], f"{out}{epoch}_{j}_img.png")

#             iters += 1

#     return G_losses, D_losses, img_list, img_list_only

# # Main function
# def main(params):
#     torch.manual_seed(params['seed'])
#     np.random.seed(params['seed'])

#     device = to_gpu(ngpu=params['n_gpu'])
#     classes = ['MEL', 'BCC', 'SCC', 'NV']

#     for class_name in classes:
#         print(f"Training {params['arch']} for class: {class_name}")

#         dataloader = create_single_class_dataloader(
#             image_size=params['image_size'],
#             batch_size=params['batch_size'],
#             class_name=class_name,
#             num_workers=params['loader_workers'],
#             root_dir=params['path']
#         )

#         netG, netD = initialize_networks(params)
#         netG = netG.to(device)
#         netD = netD.to(device)

#         if (device.type == 'cuda') and (params['n_gpu'] > 1):
#             netG = nn.DataParallel(netG, list(range(params['n_gpu'])))
#             netD = nn.DataParallel(netD, list(range(params['n_gpu'])))

#         netG.apply(weights_init)
#         netD.apply(weights_init)

#         criterion = nn.BCEWithLogitsLoss()
#         fixed_noise = torch.randn(params['batch_size'], params['latent_vector'], 1, 1, device=device)

#         optimizerD = optim.Adam(netD.parameters(), lr=params['discriminator_lr'], betas=(params['beta_adam'], 0.999))
#         optimizerG = optim.Adam(netG.parameters(), lr=params['generator_lr'], betas=(params['beta_adam'], 0.999))


#         out_dir = os.path.join(params['out'], f"{params['run']}_{class_name}_")
#         os.makedirs(out_dir, exist_ok=True)
#         G_losses, D_losses, img_list, img_list_only = training_loop(
#             num_epochs=params['num_epochs'],
#             dataloader=dataloader,
#             netG=netG,
#             netD=netD,
#             device=device,
#             criterion=criterion,
#             nz=params['latent_vector'],
#             optimizerG=optimizerG,
#             optimizerD=optimizerD,
#             fixed_noise=fixed_noise,
#             out=f"{params['out']}{params['run']}_{class_name}_",
#             use_label_smoothing=params['use_label_smoothing'],
#             smooth_label=params['smooth_label']
#         )

#         # Save the trained models
#         torch.save(netG.state_dict(), f"{params['out']}{params['run']}_{class_name}_generator.pth")
#         torch.save(netD.state_dict(), f"{params['out']}{params['run']}_{class_name}_discriminator.pth")

#         loss_plot(G_losses=G_losses, D_losses=D_losses, out=f"{params['out']}{params['run']}_{class_name}_")
#         image_grid(dataloader=dataloader, img_list=img_list, device=device, out=f"{params['out']}{params['run']}_{class_name}_")
#         compute_metrics(real=next(iter(dataloader)), fakes=img_list_only, size=params['image_size'], out=f"{params['out']}{params['run']}_{class_name}_")

#         print(f"Finished training for class: {class_name}")

# # Main execution
# if __name__ == "__main__":
#     with open('/kaggle/input/gan-params-yml/gan_params1.yaml', 'r') as f:
#         params = yaml.safe_load(f)
#     main(params)

In [14]:
import os
import glob

# Path to the output folder
output_folder = '/kaggle/working/'

# Delete the zip file
zip_file = os.path.join(output_folder, 'output.zip')
if os.path.exists(zip_file):
    os.remove(zip_file)
    print(f"Deleted {zip_file}")
else:
    print(f"{zip_file} does not exist.")

# Get all .png files in the folder
png_files = glob.glob(os.path.join(output_folder, '*.png'))

# Loop through and delete each .png file
for file in png_files:
    os.remove(file)
    print(f"Deleted {file}")

print(f"Deleted {len(png_files)} .png files from {output_folder}")


FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/working/output.zip'

In [9]:
import os
import zipfile
from IPython.display import FileLink

def zip_and_download_output():
    # Kaggle's standard output directory
    output_dir = '/kaggle/working'
    zip_filename = 'output.zip'
    
    # Create a ZipFile object
    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Walk through the directory
        for root, dirs, files in os.walk(output_dir):
            for file in files:
                # Create the full file path
                file_path = os.path.join(root, file)
                # Add file to the zip file
                # Use arcname to flatten the directory structure in the zip
                zipf.write(file_path, arcname=os.path.basename(file_path))
    
    print(f"All files in {output_dir} have been zipped to {zip_filename}")
    
    # Create a download link
    return FileLink(zip_filename)

# Call the function and display the download link
download_link = zip_and_download_output()
display(download_link)

OSError: [Errno 28] No space left on device