Why?


reduce the domain gap

Artistic Style to Photographic Images

Goal -
 Reduce the Domain Shift enough to be able to classificy the target domain images with just fine-tunning a model

CycleGAN

*   It consists of two generators (G and F) and two discriminators (D_X and
    D_Y).
*   The goal is to learn mappings between two domains, X → Y and Y → X, without
    needing paired examples



Imports

In [17]:
import os
import kagglehub

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

import torchvision
from torchvision import transforms
from torchvision.datasets import ImageFolder

import matplotlib.pyplot as plt
from PIL import Image

In [18]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device


device(type='cuda')

**Data Processing**

In [19]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("karntiwari/home-office-dataset")

print("Path to dataset files:", path)

Path to dataset files: /root/.cache/kagglehub/datasets/karntiwari/home-office-dataset/versions/1


In [20]:
def walk_through_dir(dir_path):
  """
  Walks through dir_path returning its contents.
  Args:
    dir_path (str or pathlib.Path): target directory

  Returns:
    A print out of:
      number of subdiretories in dir_path
      number of images (files) in each subdirectory
      name of each subdirectory
  """
  for dirpath, dirnames, filenames in os.walk(dir_path):
    print(f"There are {len(dirnames)} directories and {len(filenames)} images in '{dirpath}'.")


# walk_through_dir(path)

In [21]:
path

'/root/.cache/kagglehub/datasets/karntiwari/home-office-dataset/versions/1'

In [22]:
#target domain
art_path = os.path.join(path, "OfficeHomeDataset_10072016/Art")

#source domain
real_path = os.path.join(path, "OfficeHomeDataset_10072016/Real World")

In [23]:
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize images to 256x256
    transforms.ToTensor(),         # Convert to PyTorch tensor
    transforms.Normalize((0.5,), (0.5,))  # Normalize to [-1, 1]
])

In [24]:
# Load QuickDraw and Real domain datasets
quickdraw_dataset = ImageFolder(root=art_path, transform=transform)
real_dataset = ImageFolder(root=real_path, transform=transform)

quickdraw_loader = torch.utils.data.DataLoader(quickdraw_dataset, batch_size=1, shuffle=True)
real_loader = torch.utils.data.DataLoader(real_dataset, batch_size=1, shuffle=True)

In [25]:
len(quickdraw_dataset), len(real_dataset)

(2427, 4357)

**Models**

need new definations



*   The generator translates images from one domain to another.
*   The discriminator classifies whether an image is real or generated.



Generator

In [26]:


class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1),
            nn.InstanceNorm2d(channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1),
            nn.InstanceNorm2d(channels),
        )

    def forward(self, x):
        return x + self.block(x)  # Residual connection


class Generator(nn.Module):
    def __init__(self, in_channels, out_channels, num_residual_blocks=9):
        super(Generator, self).__init__()
        # Initial convolution block
        self.initial = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=7, stride=1, padding=3),
            nn.InstanceNorm2d(64),
            nn.ReLU(inplace=True)
        )
        # Downsampling layers
        self.down = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.InstanceNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),
            nn.InstanceNorm2d(256),
            nn.ReLU(inplace=True)
        )
        # Residual blocks
        self.residuals = nn.Sequential(
            *[ResidualBlock(256) for _ in range(num_residual_blocks)]
        )
        # Upsampling layers
        self.up = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.InstanceNorm2d(128),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.InstanceNorm2d(64),
            nn.ReLU(inplace=True)
        )
        # Output layer
        self.final = nn.Sequential(
            nn.Conv2d(64, out_channels, kernel_size=7, stride=1, padding=3),
            nn.Tanh()
            # [-1, 1]
        )

    def forward(self, x):
        x = self.initial(x)
        x = self.down(x)
        x = self.residuals(x)
        x = self.up(x)
        return self.final(x)

In [27]:
class Discriminator(nn.Module):
    def __init__(self, in_channels):
        super(Discriminator, self).__init__()
        # Define PatchGAN layers
        self.model = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(256, 512, kernel_size=4, stride=1, padding=1),
            nn.InstanceNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),

            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1)  # Output single scalar per patch
        )

    def forward(self, x):
        return self.model(x)

Losses

*   Adversarial Loss:
*   Cycle-Consistency Loss:



In [28]:
adversarial_loss = nn.MSELoss()
cycle_loss = nn.L1Loss()

**CycleGAN Training Loop**

The training loop for CycleGAN consists of alternating updates between two generator networks and two discriminator networks.


In [34]:
G = Generator(in_channels=3, out_channels=3).to(device)
F = Generator(in_channels=3, out_channels=3).to(device)
D_A = Discriminator(in_channels=3).to(device)
D_B = Discriminator(in_channels=3).to(device)

In [35]:
optimizer_G = torch.optim.Adam(G.parameters(), lr=0.000025, betas=(0.5, 0.999))
optimizer_F = torch.optim.Adam(F.parameters(), lr=0.000025, betas=(0.5, 0.999))
optimizer_D_A = torch.optim.Adam(D_A.parameters(), lr=0.0005, betas=(0.5, 0.999))
optimizer_D_B = torch.optim.Adam(D_B.parameters(), lr=0.001, betas=(0.5, 0.999))


from torch.optim.lr_scheduler import StepLR
scheduler_G = StepLR(optimizer_G, step_size=5, gamma=0.5)  # Reduce by factor of 0.1 every 10 steps
scheduler_F = StepLR(optimizer_F, step_size=5, gamma=0.5)
scheduler_D_A = StepLR(optimizer_D_A, step_size=5, gamma=0.5)
scheduler_D_B = StepLR(optimizer_D_B, step_size=5, gamma=0.5)


In [36]:
epochs = 10
lambda_cycle = 5
#define iterators
gen_iter = iter(quickdraw_loader)
real_iter = iter(real_loader)



from torch.amp import GradScaler, autocast  # Import for mixed precision
from itertools import zip_longest  # Use for handling different DataLoader lengths

# Initialize GradScaler for mixed precision training
scaler = GradScaler()

# Training loop
for epoch in range(epochs):
    # Use zip_longest to handle mismatched DataLoader lengths
    for quickdraw_batch, real_batch in zip_longest(quickdraw_loader, real_loader, fillvalue=None):
        # Process quickdraw_batch
        if quickdraw_batch is not None:
            quickdraw_images, _ = quickdraw_batch  # Ignore labels
            quickdraw_images = quickdraw_images.to(device)

        # Process real_batch
        if real_batch is not None:
            real_images, _ = real_batch  # Ignore labels
            real_images = real_images.to(device)

        # Skip iteration if one of the batches is None
        if quickdraw_batch is None or real_batch is None:
            continue

        z_quickdraw = quickdraw_images
        z_real = real_images

        ### Train Generators ###
        with autocast(device_type='cuda'):  # Enable mixed precision
            fake_quickdraw = G(z_quickdraw)  # Generate QuickDraw-like images
            fake_real = F(z_real)  # Generate Real images

            # Cycle consistency
            with torch.no_grad():  # Avoid storing intermediate gradients
                cycle_quickdraw = F(fake_quickdraw)  # Reverse cycle: A -> B -> A
                cycle_real = G(fake_real)  # Reverse cycle: B -> A -> B

            # Loss calculation
            loss_GAN_g = adversarial_loss(D_B(fake_quickdraw), torch.ones_like(D_B(fake_quickdraw)).to(device))
            loss_GAN_f = adversarial_loss(D_A(fake_real), torch.ones_like(D_A(fake_real)).to(device))
            loss_cycle = cycle_loss(cycle_quickdraw, z_quickdraw) + cycle_loss(cycle_real, z_real)

            loss_G_total = loss_GAN_g + loss_GAN_f + lambda_cycle * loss_cycle

        # Backpropagation and optimization with scaled gradients
        optimizer_G.zero_grad()
        scaler.scale(loss_G_total).backward(retain_graph=True)
        scaler.step(optimizer_G)
        scaler.update()

        # Train Discriminator F
        with autocast(device_type='cuda'):  # Enable mixed precision
            loss_F_total = loss_GAN_f + lambda_cycle * loss_cycle
        optimizer_F.zero_grad()
        scaler.scale(loss_F_total).backward()
        scaler.step(optimizer_F)
        scaler.update()

        ### Train Discriminators ###
        # D_A
        with autocast(device_type='cuda'):
            loss_real_A = adversarial_loss(D_A(z_real), torch.ones_like(D_A(z_real)).to(device))
            loss_fake_A = adversarial_loss(D_A(fake_real.detach()), torch.zeros_like(D_A(fake_real.detach())).to(device))
            loss_D_A = (loss_real_A + loss_fake_A) / 2

        optimizer_D_A.zero_grad()
        scaler.scale(loss_D_A).backward()
        scaler.step(optimizer_D_A)
        scaler.update()

        # D_B
        with autocast(device_type='cuda'):
            loss_real_B = adversarial_loss(D_B(z_quickdraw), torch.ones_like(D_B(z_quickdraw)).to(device))
            loss_fake_B = adversarial_loss(D_B(fake_quickdraw.detach()), torch.zeros_like(D_B(fake_quickdraw.detach())).to(device))
            loss_D_B = (loss_real_B + loss_fake_B) / 2

        optimizer_D_B.zero_grad()
        scaler.scale(loss_D_B).backward()
        scaler.step(optimizer_D_B)
        scaler.update()

    # Log progress at the end of each epoch
    print(f"Epoch {epoch+1}/{epochs} - Loss_G: {loss_G_total.item():.4f}, "
          f"Loss_F: {loss_F_total.item():.4f}, Loss_D_A: {loss_D_A.item():.4f}, "
          f"Loss_D_B: {loss_D_B.item():.4f}")
    # Update learning rate schedulers
    scheduler_G.step()
    scheduler_F.step()
    scheduler_D_A.step()
    scheduler_D_B.step()


    # Clear unused memory
    torch.cuda.empty_cache()



Epoch 1/10 - Loss_G: 6.5575, Loss_F: 6.3069, Loss_D_A: 0.0754, Loss_D_B: 0.2460
Epoch 2/10 - Loss_G: 7.1973, Loss_F: 6.9931, Loss_D_A: 0.2243, Loss_D_B: 0.3103
Epoch 3/10 - Loss_G: 8.0973, Loss_F: 7.7318, Loss_D_A: 0.1714, Loss_D_B: 0.2557
Epoch 4/10 - Loss_G: 8.9121, Loss_F: 8.6646, Loss_D_A: 0.2167, Loss_D_B: 0.2153
Epoch 5/10 - Loss_G: 8.0969, Loss_F: 7.7281, Loss_D_A: 0.0644, Loss_D_B: 0.2283
Epoch 6/10 - Loss_G: 10.6232, Loss_F: 10.4350, Loss_D_A: 0.0238, Loss_D_B: 0.2547
Epoch 7/10 - Loss_G: 11.3655, Loss_F: 10.8175, Loss_D_A: 0.0340, Loss_D_B: 0.3024
Epoch 8/10 - Loss_G: 9.3688, Loss_F: 8.8938, Loss_D_A: 0.0866, Loss_D_B: 0.2154
Epoch 9/10 - Loss_G: 9.2278, Loss_F: 8.6637, Loss_D_A: 0.0244, Loss_D_B: 0.3427
Epoch 10/10 - Loss_G: 7.6658, Loss_F: 7.4510, Loss_D_A: 0.0197, Loss_D_B: 0.2439


In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

# # Save the models to your Google Drive
# torch.save(G.state_dict(), '/content/drive/My Drive/cycDAN/G.pth')
# torch.save(F.state_dict(), '/content/drive/My Drive/cycDAN/F.pth')
# torch.save(D_A.state_dict(), '/content/drive/My Drive/cycDAN/D_A.pth')
# torch.save(D_B.state_dict(), '/content/drive/My Drive/cycDAN/D_B.pth')



visialization

**Translate Source Domain Images**

  Utilize the trained generators to convert all source domain images into the style of the target domain. The resulting adapted dataset will then serve as a foundation for downstream tasks such as classification.

In [37]:
from torch.utils.data import Dataset, DataLoader

translated_images = []  # List to hold translated images
labels = []  # List to hold corresponding labels (if needed)

G.eval()  # Set the generator to evaluation mode
for batch in quickdraw_loader:  # Loop through source DataLoader
    images, batch_labels = batch  # Get images and labels from the batch
    images = images.to(device)  # Move images to the CUDA device

    with torch.no_grad():  # No gradients needed for evaluation
        translated_batch = G(images)  # Generate translated images

    translated_images.append(translated_batch.cpu())  # Move translated images to CPU and store
    labels.append(batch_labels)  # Keep labels if you need them later

# Combine all translated images and labels
translated_images = torch.cat(translated_images, dim=0)  # Combine into a single tensor
labels = torch.cat(labels, dim=0)  # Combine labels, if applicable


In [38]:
class TranslatedDataset(Dataset):
    def __init__(self, images, labels=None):
        self.images = images
        self.labels = labels

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        if self.labels is not None:
            return self.images[idx], self.labels[idx]
        else:
            return self.images[idx]

# Initialize the custom dataset
translated_dataset = TranslatedDataset(translated_images, labels)
translated_source_loader = DataLoader(translated_dataset, batch_size=32, shuffle=True)

**Fine-Tune for Classification**

Once the source domain images have been adapted to match the target domain's distribution, fine-tune a classification model using the new source domain images

In [43]:
# Example: Training a classifier

num_classes = 345

classifier = torchvision.models.resnet18(pretrained=True)
classifier.fc = nn.Linear(classifier.fc.in_features, num_classes)  # Replace final layer for your task
classifier.to(device)

optimizer = torch.optim.Adam(classifier.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

num_epochs = 10

# Train on translated source images
for epoch in range(num_epochs):
    for images, labels in translated_source_loader:  # Adapted dataset loader
        images, labels = images.to(device), labels.to(device)
        outputs = classifier(images)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")


Epoch [1/10], Loss: 3.6808
Epoch [2/10], Loss: 2.6575
Epoch [3/10], Loss: 1.6770
Epoch [4/10], Loss: 1.1055
Epoch [5/10], Loss: 0.5429
Epoch [6/10], Loss: 0.1904
Epoch [7/10], Loss: 0.1130
Epoch [8/10], Loss: 0.0803
Epoch [9/10], Loss: 0.0517
Epoch [10/10], Loss: 0.0544


**Conclusion**

The steady decline in training loss indicates strong model convergence with the adapted dataset loader, and with proper validation, this workflow can achieve excellent generalization on the target domain. By tackling domain shift creatively and efficiently, you're enhancing model robustness for real-world computer vision applications.