In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
from PIL import Image
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
import torchvision
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np

# Set Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device: ", device)

directory = '/kaggle/input/warli-art/Warli Art Object Image Dataset/Warli Art Dataset/Warli Art Dataset'

In [None]:
data_dir = directory

for category in os.listdir(data_dir):
    if os.path.isdir(os.path.join(data_dir, category)):
        category_dir = os.path.join(data_dir, category)
        image_files = [f for f in os.listdir(category_dir) if f.endswith(('.jpg'))]
        num_images = len(image_files)

        print(f"Category: {category}")
        print(f"Number of images: {num_images}")

        if num_images > 0:
            sample_image_path = os.path.join(category_dir, image_files[0])
            sample_image = Image.open(sample_image_path)
            image_size = sample_image.size
            # num_channels = len(sample_image.mode)
            num_channels = len(sample_image.getbands())

            print(f"Image size: {image_size}")
            print(f"Number of channels: {num_channels}")

        print()

In [None]:
# Transforms
transform = transforms.Compose([
    torchvision.transforms.CenterCrop((600, 600)),
    transforms.Resize((128,128)),
    transforms.ToTensor(),
    transforms.Normalize([0.5,0.5,0.5], [0.5,0.5,0.5])
])

# Path to data
data_pth = directory

# Dataloader
dataset = ImageFolder(data_pth, transform=transform)
# total_samples = len(dataset)
# train_samples = int(0.5 * total_samples)  # 50% of the total samples
# val_samples = total_samples - train_samples

# Split the dataset into training and validation sets
# train_dataset, val_dataset = random_split(dataset, [train_samples, val_samples])

batch_size = 64
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import matplotlib.pyplot as plt

images,_ = next(iter(dataloader))

plt.figure(figsize=(8, 8))
plt.axis("off")
plt.title("Images")
plt.imshow(np.transpose(torchvision.utils.make_grid(images[:64], 
                                         padding=2, normalize=True),
                        (1, 2, 0)))

In [None]:
# Residual Block

class ResBlock(nn.Module):
    def __init__(self, in_feat, out_feat):
        super(ResBlock, self).__init__()
        self.block = nn.Sequential(
        nn.Conv2d(in_feat, out_feat, 3, stride=1, padding=1, bias=False),
        nn.BatchNorm2d(out_feat),
        nn.ReLU(inplace=True), 
        nn.Conv2d(out_feat, out_feat, 3, stride=1, padding=1, bias=False),
        nn.BatchNorm2d(out_feat),
        nn.ReLU(inplace=True)
        )
    
    def forward(self, x):
        return x + self.block(x)

In [None]:
epsilon=0.00005

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            # Input is the latent vector Z.
            nn.ConvTranspose2d(100, 1024, 4, 1, 0, bias=False),
            nn.BatchNorm2d(1024, eps=epsilon),
            nn.ReLU(True),
            nn.Dropout(0.4),

#             # Additional Conv layer
#             nn.ConvTranspose2d(1024, 1024, 3, 1, 1, bias=False),
#             nn.BatchNorm2d(1024, eps=epsilon),
#             nn.ReLU(True),
#             nn.Dropout(0.4),
            
            ResBlock(1024,1024),

            # State size. (1024x4x4)
            nn.ConvTranspose2d(1024, 512, 4, 2, 1, bias=False),
            nn.BatchNorm2d(512, eps=epsilon),
            nn.ReLU(True),
            nn.Dropout(0.4),

#             # Additional Conv layer
#             nn.ConvTranspose2d(512, 512, 3, 1, 1, bias=False),
#             nn.BatchNorm2d(512, eps=epsilon),
#             nn.ReLU(True),
#             nn.Dropout(0.4),
            
            ResBlock(512,512),

            # State size. (512x8x8)
            nn.ConvTranspose2d(512, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256, eps=epsilon),
            nn.ReLU(True),
            nn.Dropout(0.4),

            # State size. (256x16x16)
            nn.ConvTranspose2d(256, 128, 4, 2, 1, bias=False),
            nn.BatchNorm2d(128, eps=epsilon),
            nn.ReLU(True),
            nn.Dropout(0.4),
            
            ResBlock(128,128),

            # State size. (128x32x32)
            nn.ConvTranspose2d(128, 64, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64, eps=epsilon),
            nn.ReLU(True),
            nn.Dropout(0.4),

            # State size. (64x64x64)
            nn.ConvTranspose2d(64, 3, 4, 2, 1, bias=False),
            nn.Tanh()  
            # Final state size. (3x128x128)
        )

    def forward(self, x):
        x = x.view(-1, 100, 1, 1)  # Reshape input noise vector into a batch of inputs for ConvTranspose2d
        output = self.model(x)
        return output

In [None]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(

            # Input size. (3x128x128)
            nn.Conv2d(3, 64, 4, stride=2, padding=1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.4),

            # State size. (64x64x64)
            nn.Conv2d(64, 128, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.4),
            
#             ResBlock(128,128),

            # State size. (128x32x32)
            nn.Conv2d(128, 256, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.4),
            
#             ResBlock(256,256),

            # State size. (256x16x16)
            nn.Conv2d(256, 512, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.4),
            
#             ResBlock(512,512),

            # State size. (512x8x8)
            nn.Conv2d(512, 1024, 4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.4),
            
            # State size. (1024x4x4)
            nn.Conv2d(1024, 1, 4, stride=1, padding=0, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.4),
            nn.Sigmoid()  # Output a single scalar per image indicating real or fake
        )

    def forward(self, x):
        output = self.model(x)
        return output.view(-1, 1)  # Flatten to [batch_size, 1]

In [None]:
import time
def train(n_epochs, dataloader, optimizer_dis, optimizer_gen, dis, gen):
  print(f"Starting Training for {n_epochs} Epochs")
  for epoch in range(n_epochs):
        start = time.time()
        # Initialize losses
        d_loss = 0.0
        g_loss = 0.0
        num_batch = 0

        for i, (real_image, _) in enumerate(dataloader):
          real_image = real_image.to(device)
          batch_size = real_image.size(0)
          # Add noise to the inputs
          noise_real = torch.randn_like(real_image) * 0.1
          noise_fake = torch.randn_like(real_image) * 0.1

          # Train Discriminator
          optimizer_dis.zero_grad()
          real_image_noisy = real_image + noise_real
          # Real Data
          real_pred = dis(real_image_noisy)
#           real_label = torch.ones_like(real_pred, device=device, dtype=torch.float32)
          real_label = torch.full_like(real_pred, 0.9, device=device, dtype=torch.float32)  # Smoothed label
          real_loss =  F.binary_cross_entropy(real_pred, real_label, reduction='mean')

          # Fake Data
          noise = torch.randn(batch_size, 100, 1, 1, device=device)
          gen_out = gen(noise)
          fake_image_noisy = gen_out + noise_fake
          fake_pred = dis(fake_image_noisy)
#           fake_label = torch.zeros_like(fake_pred, device=device, dtype=torch.float32)
          fake_label = torch.full_like(fake_pred, 0.1, device=device, dtype=torch.float32)  # Smoothed label
          fake_loss =  F.binary_cross_entropy(fake_pred, fake_label, reduction='mean')

          dis_loss = (real_loss+fake_loss)/2
          dis_loss.backward()
          optimizer_dis.step()

          # Train Generator
          optimizer_gen.zero_grad()

          noise = torch.randn(batch_size, 100, 1, 1, device=device)
          gen_out = gen(noise)
          dis_out = dis(gen_out)
          # label = real_label = torch.ones_like(dis_out, device=device, dtype=torch.float32)
          gen_loss =  F.binary_cross_entropy(dis_out, real_label, reduction='mean')

          gen_loss.backward()
          optimizer_gen.step()

          # Accumulate losses
          d_loss += dis_loss.item()
          g_loss += gen_loss.item()
          num_batch += 1


        # Print losses
        end = time.time()
#         if i % 10 == 0:
        avg_d_loss = d_loss/num_batch
        avg_g_loss = g_loss/num_batch
        print(f"Epoch [{epoch + 1}] Loss D: {avg_d_loss:.4f} Loss G: {avg_g_loss:.4f} Time: {end-start:.2f} sec")

In [None]:
gen = Generator().to(device)
dis = Discriminator().to(device)

gen = nn.DataParallel(gen)
dis = nn.DataParallel(dis)


In [None]:
n_epochs = 80# Number of epochs for training
# optimizer_dis = optim.Adam(dis.parameters(), lr=0.00004, betas=(0.5, 0.999))
optimizer_dis = optim.RMSprop(dis.parameters(), lr=0.00002)
# optimizer_gen = optim.Adam(gen.parameters(), lr=0.0004, betas=(0.5, 0.999))
optimizer_gen = optim.Adagrad(gen.parameters(), lr=0.02)
# optimizer_gen = optim.RMSprop(gen.parameters(), lr=0.02)

In [None]:
train(n_epochs, dataloader, optimizer_dis, optimizer_gen, dis, gen)

In [None]:
from torchvision.utils import make_grid

# Set the model to evaluation mode
gen.eval()

num_images = 16  # Number of images to generate
latent_dim = 100  # Dimension of the latent vector
noise = torch.randn(num_images, latent_dim, 1, 1)  # Generate random noise

# Generate images from the noise
# Ensure that noise is on the same device as the model
noise = noise.to(next(gen.parameters()).device)  # Move noise to the device of the model
fake_images = gen(noise)

# Convert images to a suitable format for displaying
fake_images = (fake_images + 1) / 2  # Adjust from [-1, 1] to [0, 1]
grid = make_grid(fake_images, nrow=4)  # Create a grid of images

# Plot the images
plt.figure(figsize=(10, 10))
plt.imshow(grid.permute(1, 2, 0).cpu().detach().numpy())  # Convert to numpy and plot
plt.axis('off')
plt.show()