<a href="https://colab.research.google.com/github/yusufasam-git/Yusufcode/blob/main/domain_adaptation_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# A Clean, General Domain Adaptation Pipeline (DANN-style)

This notebook implements a **minimal, general domain adaptation pipeline** using a simple synthetic dataset.

Core ideas:
- A **source domain** with labels
- A **target domain** without labels (used only for domain alignment)
- A model with:
  - **Feature extractor**
  - **Label classifier**
  - **Domain discriminator**
- A **DANN-style loss**: classification on source + adversarial domain loss on source+target

You can later swap the synthetic data with real datasets and keep the same pipeline.

## 1. Install and import dependencies

In [None]:
!pip install torch torchvision matplotlib scikit-learn --quiet

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons
from sklearn.manifold import TSNE

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

## 2. Create synthetic source and target domains

- **Source domain**: standard two-moons dataset
- **Target domain**: shifted and rotated version of the same data (domain shift)

Source has labels, target is treated as unlabeled (for adaptation).

In [None]:
def create_synthetic_domains(n_samples=2000, noise=0.1, shift=(1.0, 0.5)):
    # Source: two moons
    Xs, ys = make_moons(n_samples=n_samples, noise=noise, random_state=0)

    # Target: same moons but shifted and rotated (domain shift)
    Xt, yt = make_moons(n_samples=n_samples, noise=noise, random_state=1)

    # Apply a simple transform to create domain shift
    theta = np.radians(25)
    R = np.array([[np.cos(theta), -np.sin(theta)],
                  [np.sin(theta),  np.cos(theta)]])
    Xt = Xt @ R.T
    Xt = Xt + np.array(shift)

    return Xs.astype(np.float32), ys.astype(np.int64), Xt.astype(np.float32), yt.astype(np.int64)

Xs, ys, Xt, yt = create_synthetic_domains()

print("Source shape:", Xs.shape, ys.shape)
print("Target shape:", Xt.shape, yt.shape)

### Visualize the two domains (before adaptation)

In [None]:
plt.figure(figsize=(6, 6))
plt.scatter(Xs[:, 0], Xs[:, 1], c=ys, cmap='coolwarm', alpha=0.6, label='Source')
plt.scatter(Xt[:, 0], Xt[:, 1], c=yt, cmap='coolwarm', alpha=0.2, marker='x', label='Target')
plt.title("Source vs Target domains (input space)")
plt.legend()
plt.show()

## 3. Create PyTorch datasets and dataloaders

- Source dataset: features + labels
- Target dataset: features only (labels not used for training in unsupervised DA)
- We'll still keep target labels separately for **evaluation** purposes.

In [None]:
class SourceDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X)
        self.y = torch.from_numpy(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class TargetDataset(Dataset):
    def __init__(self, X):
        self.X = torch.from_numpy(X)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx]

batch_size = 64

source_dataset = SourceDataset(Xs, ys)
target_dataset = TargetDataset(Xt)

source_loader = DataLoader(source_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
target_loader = DataLoader(target_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

print("Source batches:", len(source_loader))
print("Target batches:", len(target_loader))

## 4. Define the model components

We build three modules:

- **FeatureExtractor**: maps 2D input to a higher-dimensional feature space
- **LabelClassifier**: predicts class labels from features
- **DomainDiscriminator**: predicts whether features come from source or target

We also implement a simple **Gradient Reversal Layer (GRL)** for adversarial training (DANN).

In [None]:
class GradientReversalFunction(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, lambda_):
        ctx.lambda_ = lambda_
        return x.view_as(x)

    @staticmethod
    def backward(ctx, grad_output):
        return -ctx.lambda_ * grad_output, None

class GradientReversalLayer(nn.Module):
    def __init__(self, lambda_=1.0):
        super().__init__()
        self.lambda_ = lambda_

    def forward(self, x):
        return GradientReversalFunction.apply(x, self.lambda_)

class FeatureExtractor(nn.Module):
    def __init__(self, input_dim=2, hidden_dim=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )

    def forward(self, x):
        return self.net(x)

class LabelClassifier(nn.Module):
    def __init__(self, feature_dim=64, num_classes=2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(feature_dim, feature_dim),
            nn.ReLU(),
            nn.Linear(feature_dim, num_classes)
        )

    def forward(self, f):
        return self.net(f)

class DomainDiscriminator(nn.Module):
    def __init__(self, feature_dim=64, hidden_dim=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(feature_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, f):
        return self.net(f)

feature_extractor = FeatureExtractor().to(device)
label_classifier = LabelClassifier().to(device)
domain_discriminator = DomainDiscriminator().to(device)

print(feature_extractor)
print(label_classifier)
print(domain_discriminator)

## 5. Define training utilities

We will optimize:

- **Classification loss** on source: cross entropy
- **Domain loss** on source+target: binary cross entropy

Total loss:
\begin{equation}
L = L_{cls}(x_s, y_s) + \lambda \cdot L_{adv}(x_s, x_t)
\end{equation}

Where \(\lambda\) controls the strength of domain alignment.

In [None]:
cls_criterion = nn.CrossEntropyLoss()
dom_criterion = nn.BCEWithLogitsLoss()

params = list(feature_extractor.parameters()) + list(label_classifier.parameters()) + list(domain_discriminator.parameters())
optimizer = optim.Adam(params, lr=1e-3)

grl = GradientReversalLayer(lambda_=1.0)

lambda_domain = 0.5  # weight of domain loss
num_epochs = 50

## 6. Training loop (DANN-style)

At each step:

1. Sample a batch from **source**: \(x_s, y_s\)
2. Sample a batch from **target**: \(x_t\)
3. Compute source features, classification predictions
4. Compute domain predictions (source vs target) using features passed through GRL
5. Backpropagate the combined loss and update all networks.

In [None]:
def train_epoch(epoch, source_loader, target_loader):
    feature_extractor.train()
    label_classifier.train()
    domain_discriminator.train()

    total_cls_loss = 0.0
    total_dom_loss = 0.0
    total_loss = 0.0

    target_iter = iter(target_loader)

    for i, (xs, ys_batch) in enumerate(source_loader):
        try:
            xt = next(target_iter)
        except StopIteration:
            target_iter = iter(target_loader)
            xt = next(target_iter)

        xs = xs.to(device)
        ys_batch = ys_batch.to(device)
        xt = xt.to(device)

        # --------------------
        # 1. Label prediction on source
        # --------------------
        fs = feature_extractor(xs)
        logits_cls = label_classifier(fs)
        loss_cls = cls_criterion(logits_cls, ys_batch)

        # --------------------
        # 2. Domain prediction (source + target)
        # --------------------
        ft = feature_extractor(xt)

        f_concat = torch.cat([fs, ft], dim=0)
        f_rev = grl(f_concat)  # gradient reversal

        dom_logits = domain_discriminator(f_rev).view(-1)

        dom_labels = torch.cat([
            torch.ones(fs.size(0), device=device),   # source = 1
            torch.zeros(ft.size(0), device=device)   # target = 0
        ], dim=0)

        loss_dom = dom_criterion(dom_logits, dom_labels)

        # --------------------
        # 3. Total loss
        # --------------------
        loss = loss_cls + lambda_domain * loss_dom

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_cls_loss += loss_cls.item()
        total_dom_loss += loss_dom.item()
        total_loss += loss.item()

    n_batches = len(source_loader)
    print(f"Epoch {epoch+1}: cls_loss={total_cls_loss/n_batches:.4f}, dom_loss={total_dom_loss/n_batches:.4f}, total={total_loss/n_batches:.4f}")


for epoch in range(num_epochs):
    train_epoch(epoch, source_loader, target_loader)

## 7. Evaluation on target domain (classification accuracy)

Here we **pretend** we don't have target labels during training (unsupervised DA),
but we now use them to evaluate how well adaptation worked.

We compare:
- A classifier trained only on source (no DA) – for this simple notebook we mainly see final performance
- The DANN-style adapted model.

In [None]:
def evaluate_on_target(Xt, yt):
    feature_extractor.eval()
    label_classifier.eval()

    with torch.no_grad():
        X_tensor = torch.from_numpy(Xt).to(device)
        y_true = torch.from_numpy(yt).to(device)

        features = feature_extractor(X_tensor)
        logits = label_classifier(features)
        preds = torch.argmax(logits, dim=1)

        acc = (preds == y_true).float().mean().item()

    return acc

target_acc = evaluate_on_target(Xt, yt)
print(f"Target domain accuracy after adaptation: {target_acc*100:.2f}%")

## 8. Visualize feature space with t-SNE

We project the features into 2D using t-SNE to see:
- How **source vs target** distributions overlap
- Whether classes separate better across domains after adaptation.

In [None]:
def extract_features(X, domain_label):
    feature_extractor.eval()
    with torch.no_grad():
        X_tensor = torch.from_numpy(X).to(device)
        f = feature_extractor(X_tensor)
    return f.cpu().numpy(), np.full(len(X), domain_label)

# Sample subset for visualization
idx_s = np.random.choice(len(Xs), size=500, replace=False)
idx_t = np.random.choice(len(Xt), size=500, replace=False)

Fs, dom_s = extract_features(Xs[idx_s], domain_label=0)  # 0 = source
Ft, dom_t = extract_features(Xt[idx_t], domain_label=1)  # 1 = target

F_all = np.concatenate([Fs, Ft], axis=0)
dom_all = np.concatenate([dom_s, dom_t], axis=0)

tsne = TSNE(n_components=2, random_state=42)
F_2d = tsne.fit_transform(F_all)

plt.figure(figsize=(6, 6))
plt.scatter(F_2d[dom_all == 0, 0], F_2d[dom_all == 0, 1], alpha=0.6, label='Source features')
plt.scatter(F_2d[dom_all == 1, 0], F_2d[dom_all == 1, 1], alpha=0.6, label='Target features')
plt.title("t-SNE of learned features (after DA)")
plt.legend()
plt.show()

## 9. How to extend this pipeline

This notebook is intentionally **simple and generic**. To use it in real projects:

1. **Replace the synthetic data** with real datasets (e.g., Office-31, digits, VisDA, medical).
2. Move the model classes (FeatureExtractor, LabelClassifier, DomainDiscriminator) into a `models/` folder.
3. Move the training loop into a `trainer_dann.py` file.
4. Use a `configs/` folder with YAML files to define hyperparameters.
5. Log results (loss, accuracy) to TensorBoard or CSV.

The core ideas — source loader, target loader, feature extractor, classifier, domain discriminator, and losses — remain exactly the same.