**Introduction**

Adversarial Discriminative Domain Adaptation (ADDA) is a domain adaptation approach that uses adversarial training to align the feature distributions of the source and target domains in an unsupervised setting. Unlike methods like MMD or CORAL, which focus on direct distribution matching, ADDA leverages a domain discriminator to force target features into the source feature space.



1. Train a Source Feature Extractor & Classifier:

    First, a neural network learns features from the labeled source domain using standard supervised learning.

    The classifier is trained only on source domain samples at this stage.

2. Initialize a Separate Target Feature Extractor:

    A separate feature extractor is used for the unlabeled target domain, but it initially does not align with the source domain.

    The objective is to adapt this target feature extractor to match the source features.


3.   Adversarial Domain Discriminator Training:

    A domain discriminator is introduced to distinguish between source and target features.

    The target feature extractor learns to fool the discriminator, making target features look like source domain features.

    This adversarial training helps align the target feature space with the source feature space.


4.   Final Classification Using Source-Trained Classifier:

    Once adaptation is complete, the target feature extractor produces domain-aligned features, allowing the source-trained classifier to work effectively on the target domain without needing target labels.

**Imports**

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn.utils import spectral_norm

import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

import matplotlib.pyplot as plt

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

**Data Processing**

In [None]:
transform_pipeline = transforms.Compose([
    transforms.Resize((128, 128)),  # Standardize image size
    transforms.Grayscale(num_output_channels=1),  # Convert to grayscale
    # transforms.RandomRotation(15),  # Add randomness
    # transforms.RandomHorizontalFlip(),  # Flip images
    # transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),  # Change lighting
    # transforms.RandomAffine(degrees=15, translate=(0.1, 0.1), scale=(0.9, 1.1), shear=10),
    transforms.ToTensor(),  # Convert to tensor format
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize values
])

In [4]:
# Load SVHN
svhn_train = datasets.SVHN(root='./data', split='train', download=True, transform=transform_pipeline)
svhn_test = datasets.SVHN(root='./data', split='test', download=True, transform=transform_pipeline)

# Load MNIST
mnist_train = datasets.MNIST(root='./data', train=True, download=True, transform=transform_pipeline)
mnist_test = datasets.MNIST(root='./data', train=False, download=True, transform=transform_pipeline)

In [None]:
#Dataloader
source_dataloader_train = DataLoader(svhn_train, batch_size=64, shuffle=True, drop_last=True)
source_dataloader_test = DataLoader(svhn_test, batch_size=64, shuffle=True, drop_last=True)

target_dataloader_train = DataLoader(mnist_train, batch_size=64, shuffle=True, drop_last=True)

In [None]:
len(source_dataloader_train), len(source_dataloader_test), len(target_dataloader_train)

(1144, 406, 937)

**Models**

In [None]:
resnet18 = models.resnet18(pretrained=True)

# Modify first convolution layer if SVHN/MNIST are single-channel (grayscale)
resnet18.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)

# for params in resnet18.parameters():
#     params.requires_grad = False

# Remove classification head (use as feature extractor)
resnet18 = nn.Sequential(*list(resnet18.children())[:-1])


class SourceEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = resnet18

    def forward(self, x):
        return self.encoder(x)




In [None]:
# class SourceEncoder(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.encoder = nn.Sequential(
#             nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),  # Added pooling layer

#             nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
#             nn.BatchNorm2d(128),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),  # Another pooling layer

#             nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),  # Added depth
#             nn.BatchNorm2d(256),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),

#             nn.Flatten(),
#             nn.Linear(256 * 16 * 16, 512),  # Adjusted size for deeper convs
#             nn.ReLU(),
#         )

#     def forward(self, x):
#         return self.encoder(x)


# class TargetEncoder(nn.Module):
#   def __init__(self):
#     super().__init__()
#     self.encoder = SourceEncoder().encoder
#     for param in self.encoder.parameters():
#       param.requires_grad = True  # Allow adaptation training

#   def forward(self, x):
#     return self.encoder(x)


import torch.nn as nn

class Classifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512, 384),
            nn.ReLU(),
            nn.BatchNorm1d(384),
            nn.Dropout(0.3),

            nn.Linear(384, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.3),

            nn.Linear(256, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Dropout(0.2),

            nn.Linear(128, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Dropout(0.2),

            nn.Linear(64, 10)  # No Softmax since CrossEntropyLoss is used
        )

    def forward(self, x):
        return self.classifier(x)


# Define Discriminator
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 128),
            nn.LeakyReLU(0.2),
            nn.Linear(128, 64),
            nn.LeakyReLU(0.2),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.fc(x)


**Training Loops**

Step 1: Train Source Encoder and Classifier


In [None]:
source_encoder = SourceEncoder().to(device)
source_classifier = Classifier().to(device)

optimizer = optim.Adam(list(source_encoder.parameters()) + list(source_classifier.parameters()), lr=0.0005)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)  #
criterion = nn.CrossEntropyLoss()

# Training on source domain
for epoch in range(25):
  total_loss = 0.0
  total_samples = 0
  for images, labels in source_dataloader_train:
    images, labels = images.to(device), labels.to(device)
    optimizer.zero_grad()
    features = source_encoder(images)
    predictions = source_classifier(features)
    loss = criterion(predictions, labels)
    loss.backward()

    # Gradient clipping for stability
    torch.nn.utils.clip_grad_norm_(source_encoder.parameters(), max_norm=1.0)
    torch.nn.utils.clip_grad_norm_(source_classifier.parameters(), max_norm=1.0)

    optimizer.step()
    total_loss += loss.item() * labels.size(0)
    total_samples += labels.size(0)
  average_loss = total_loss / total_samples
  scheduler.step()

  print(f"[Epoch {epoch+1}] Source Classifier Loss: {average_loss:.4f}")

[Epoch 1] Source Classifier Loss: 0.6534
[Epoch 2] Source Classifier Loss: 0.2983
[Epoch 3] Source Classifier Loss: 0.2408
[Epoch 4] Source Classifier Loss: 0.2033
[Epoch 5] Source Classifier Loss: 0.1701
[Epoch 6] Source Classifier Loss: 0.1087
[Epoch 7] Source Classifier Loss: 0.0811
[Epoch 8] Source Classifier Loss: 0.0638
[Epoch 9] Source Classifier Loss: 0.0491
[Epoch 10] Source Classifier Loss: 0.0389
[Epoch 11] Source Classifier Loss: 0.0175
[Epoch 12] Source Classifier Loss: 0.0115
[Epoch 13] Source Classifier Loss: 0.0092
[Epoch 14] Source Classifier Loss: 0.0074
[Epoch 15] Source Classifier Loss: 0.0060
[Epoch 16] Source Classifier Loss: 0.0031
[Epoch 17] Source Classifier Loss: 0.0022
[Epoch 18] Source Classifier Loss: 0.0015
[Epoch 19] Source Classifier Loss: 0.0018
[Epoch 20] Source Classifier Loss: 0.0018
[Epoch 21] Source Classifier Loss: 0.0012
[Epoch 22] Source Classifier Loss: 0.0006
[Epoch 23] Source Classifier Loss: 0.0007
[Epoch 24] Source Classifier Loss: 0.0005
[

Test the Source Classifer

In [None]:
source_encoder.eval()
source_classifier.eval()


correct = 0
total = 0
test_loss = 0
with torch.no_grad():
  for images, labels in source_dataloader_test:
    images, labels = images.to(device), labels.to(device)

    # Extract features
    features = source_encoder(images)

    # Get predictions
    predictions = source_classifier(features)

    # Compute loss
    loss = criterion(predictions, labels)
    test_loss += loss.item() * labels.size(0)


    # Compute accuracy
    _, predicted = torch.max(predictions.data, 1)
    correct += (predicted == labels).sum().item()
    total  += labels.size(0)

# Calculate final loss and accuracy
average_test_loss = test_loss / total
accuracy = correct / total * 100

print(f"Source Test Loss: {average_test_loss:.4f}")
print(f"Source Test Accuracy: {accuracy:.2f}%")

Source Test Loss: 0.2408
Source Test Accuracy: 96.70%


Step 2: Train Discriminator

In [26]:
class TargetEncoder(nn.Module):
  def __init__(self, source_encoder):
    super().__init__()
    self.encoder = source_encoder.encoder
    for param in self.encoder.parameters():
      param.requires_grad = True  # Allow adaptation training

  def forward(self, x):
    x = self.encoder(x)
    return x


In [28]:
target_encoder =  TargetEncoder(source_encoder).to(device)

# Load trained source weights into target encoder
target_encoder.encoder.load_state_dict(source_encoder.encoder.state_dict())

discriminator = Discriminator().to(device)

optimizer_disc = optim.Adam(discriminator.parameters(), lr=0.00005, weight_decay=1e-5)
scheduler_disc = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer_disc, factor=0.5, patience=3)


criterion_loss = nn.BCELoss()

for epoch in range(10):
    total_loss = 0.0

    for (source_images, _), (target_images, _) in zip(source_dataloader_train, target_dataloader_train):
        source_images, target_images = source_images.to(device), target_images.to(device)
        optimizer_disc.zero_grad()

        # Extract features from both domains
        source_features = source_encoder(source_images)
        target_features = target_encoder(target_images)

        # Ensure batch sizes match before mixing
        batch_size = min(source_features.size(0), target_features.size(0))
        source_features, target_features = source_features[:batch_size], target_features[:batch_size]

        # Introduce adaptive feature noise scaling (Fix: Avoid inplace modification)
        # noise_scale = max(0.02, 0.2 * (1 - epoch / 10))
        # target_features = target_features + noise_scale * torch.randn_like(target_features)

        # Feature Mixing (Smooth adaptation)
        lambda_value = torch.rand(1).item()
        target_features = lambda_value * target_features + (1 - lambda_value) * source_features

        # Apply Label Smoothing
        labels_source = torch.full((source_features.size(0), 1), 0.95, device=device)
        labels_target = torch.full((target_features.size(0), 1), 0.05, device=device)
        labels = torch.cat([labels_source, labels_target], dim=0)

        source_features = source_features.view(source_features.size(0), -1)  # Now [64, 512]
        target_features = target_features.view(target_features.size(0), -1)  # Should also be [64, 512]

        # Predict domain labels
        predictions = discriminator(torch.cat([source_features, target_features], dim=0))

        # Compute loss
        loss_total = criterion_loss(predictions, labels)
        loss_total.backward()
        optimizer_disc.step()

        total_loss += loss_total.item()

    scheduler_disc.step(total_loss)
    average_loss = total_loss / len(source_dataloader_train)
    print(f"[Epoch {epoch+1}] Discriminator Loss: {average_loss:.4f}")


[Epoch 1] Discriminator Loss: 0.4558
[Epoch 2] Discriminator Loss: 0.4161
[Epoch 3] Discriminator Loss: 0.4057
[Epoch 4] Discriminator Loss: 0.3842
[Epoch 5] Discriminator Loss: 0.3838
[Epoch 6] Discriminator Loss: 0.3743
[Epoch 7] Discriminator Loss: 0.3713
[Epoch 8] Discriminator Loss: 0.3784
[Epoch 9] Discriminator Loss: 0.3783
[Epoch 10] Discriminator Loss: 0.3641


Step 3: Adversarial Training for Target Encoder

In [30]:
# Define optimizer and scheduler
optimizer_target = optim.Adam(target_encoder.parameters(), lr=0.0001)
scheduler_target = optim.lr_scheduler.StepLR(optimizer_target, step_size=5, gamma=0.5)  # Decays LR every 5 epochs

for epoch in range(10):
    total_loss = 0.0
    for target_images, _ in target_dataloader_train:
        target_images = target_images.to(device)
        optimizer_target.zero_grad()

        # Extract target features
        target_features = target_encoder(target_images)


        noise_scale = max(0.01, 0.2 * (1 - epoch / 10))  # Decreases noise over time
        target_features = target_features + noise_scale * torch.randn_like(target_features)

        target_features = target_features.view(target_features.size(0), -1)  # Ensures [batch_size, feature_dim]

        target_pred = discriminator(target_features)

        # Use soft labels instead of hard-coded 1s
        soft_labels = torch.full_like(target_pred, 0.95)  # Softer target label

        loss_target_adv = criterion_loss(target_pred, soft_labels)

        loss_target_adv.backward()
        optimizer_target.step()

        total_loss += loss_target_adv.item()

    # Learning rate adjustment
    scheduler_target.step()

    average_loss = total_loss / len(target_dataloader_train)
    print(f"[Epoch {epoch+1}] Target Encoder Adaptation Loss: {average_loss:.4f}")


[Epoch 1] Target Encoder Adaptation Loss: 0.2357
[Epoch 2] Target Encoder Adaptation Loss: 0.2175
[Epoch 3] Target Encoder Adaptation Loss: 0.2120
[Epoch 4] Target Encoder Adaptation Loss: 0.2084
[Epoch 5] Target Encoder Adaptation Loss: 0.2053
[Epoch 6] Target Encoder Adaptation Loss: 0.2030
[Epoch 7] Target Encoder Adaptation Loss: 0.2013
[Epoch 8] Target Encoder Adaptation Loss: 0.2000
[Epoch 9] Target Encoder Adaptation Loss: 0.1991
[Epoch 10] Target Encoder Adaptation Loss: 0.1987


Step 4: Use Classifier for Target Data

In [None]:
# for target_images, _ in target_dataloader_train:
#     with torch.no_grad():
#         target_images = target_images.to(device)
#         adapted_features = target_encoder(target_images)
#         predictions = source_classifier(adapted_features)

In [None]:
# # Get a batch of target images
# target_images, _ = next(iter(target_dataloader_train))  # No labels in target domain

# # Get predictions from the classifier
# with torch.no_grad():
#     target_images = target_images.to(device)
#     adapted_features = target_encoder(target_images)
#     predictions = source_classifier(adapted_features)

# # Convert predictions to class labels
# predicted_labels = predictions.argmax(dim=1)

# # Display images with labels
# fig, axes = plt.subplots(4, 4, figsize=(10, 10))  # Adjust grid size as needed
# for i, ax in enumerate(axes.flat):
#     ax.imshow(target_images[i].permute(1, 2, 0).cpu().numpy())  # Convert to proper format
#     ax.set_title(f"Pred: {predicted_labels[i].item()}")
#     ax.axis("off")  # Hide axes
# plt.show()