<a href="https://colab.research.google.com/github/manwanth/transductive-transfer-learning-model/blob/main/transductive_learning_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#MAKE UNIT VECTORS SPAN ENTIRE SPHERE
#Increase radius of sphere

!pip install POT
!pip install torch torchvision
!pip install pandas
!pip install xlrd

import numpy as np
import ot
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
from torchvision import datasets, transforms
import random

testS = pd.read_excel("https://docs.google.com/spreadsheets/d/e/2PACX-1vSoIsRZM8lEOHa-EhdpRARQTXwWR2XkqzUxMYIawB1QvPkQqgEjjBrjbwYpPw_ranveQm9dQi4cVpyT/pub?output=xlsx")
outputS = testS['label'].astype(int)
inputS = testS.drop('label', axis=1)
testT = pd.read_excel("https://docs.google.com/spreadsheets/d/e/2PACX-1vTcnrNGQHGuv3aYHfvdPB3RisDSHSfhjbPpSDYSF_rDUJO9B1w7q9y4pL4pet7dNnbUHrIcG3D-9Ycu/pub?output=xlsx")
#https://docs.google.com/spreadsheets/d/e/2PACX-1vQvONJZ6WIqsiw0hOoT-WvI9yowJJYXtsm7D-AmkmSifsoChEsYpNO87iRZ2-5MrIAuHrxlINzOQsN1/pub?output=xlsx
outputT = testT['label'].astype(int)
inputT = testT.drop('label', axis=1)
#Source domain is bolded Sans
#Target domain is light italic Sans

def nsphereMap(tensor, radius):
    return radius * F.normalize(tensor, dim=1)

xS = nsphereMap(F.normalize(torch.tensor(inputS.values), dim=1),1).float()
yS = torch.tensor(outputS.values, dtype=torch.float32).long()
xT = nsphereMap(F.normalize(torch.tensor(inputT.values), dim=1),1).float()
yT = torch.tensor(outputT.values, dtype=torch.float32).long()

#Precausion for iteration
new_xS = xS.clone()

#Set up for OT
xSweight = torch.ones(xS.size(0)) / xS.size(0)
xTweight = torch.ones(xT.size(0)) / xT.size(0)

#Cost
def geodesic_cost(Xs_u: torch.Tensor, Xt_u: torch.Tensor) -> np.ndarray:

    cos_sim = Xs_u @ Xt_u.T
    cos_sim = cos_sim.clamp(-1.0, 1.0)
    C = torch.acos(cos_sim)
    return C.cpu().numpy()

C = geodesic_cost(xS, xT)

#OT
P = ot.emd(xSweight, xTweight, torch.from_numpy(C))


#Make source pinpoint on only one target
def row_argmax_projection(P: np.ndarray) -> np.ndarray:
    P_proj = np.zeros_like(P)
    row_max_indices = np.argmax(P, axis=1)

    for i, j in enumerate(row_max_indices):
        P_proj[i, j] = P[i, j]
    P_proj[P_proj != 0] = 1

    return P_proj

new_xT = torch.tensor(row_argmax_projection(P)) @ xT

#Transfer everything if possible to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
xS, yS = xS.to(device), yS.to(device)
xT, yT = xT.to(device), yT.to(device)
new_xS, new_xT = new_xS.to(device), new_xT.to(device)


# Angle between rows of two tensors
def angle_between_rows(new_xS, Target):
    cos_theta = torch.sum(new_xS * Target, dim=1) / (torch.norm(new_xS, dim=1) * torch.norm(Target, dim=1))
    cos_theta = torch.clamp(cos_theta, -1.0, 1.0)
    return torch.acos(cos_theta)  # shape: (N,)

# Project new_xS onto Target (row-wise)
def project_rows(new_xS, Target):
    dot = torch.sum(new_xS * Target, dim=1, keepdim=True)
    proj = dot / torch.sum(Target * Target, dim=1, keepdim=True) * Target
    return proj  # shape: (N, d)



# Main transformation:
# new_xS * cos(aθ) + (proj(Target from new_xS)/‖proj(Target from new_xS)‖) * sin(aθ)
def gradual_map(new_xS, Target, a):
    theta = angle_between_rows(new_xS, Target)  # (N,)
    proj = project_rows(new_xS, Target)
    proj_norm = F.normalize(proj, dim=1)  # normalized projection direction

    updated = (
        new_xS * torch.cos(a * theta).unsqueeze(1) +
        proj_norm * torch.sin(a * theta).unsqueeze(1)
    )
    new_xS[:] = updated


class NeuralNetwork(nn.Module):
  def __init__(self):
      super().__init__()
      self.linear_relu_stack = nn.Sequential(
        nn.Linear(256, 32),
        nn.ReLU(),
        nn.Linear(32, 16),
        nn.ReLU(),
        nn.Linear(16, 10),
        )

  def forward(self, x):
        logits = self.linear_relu_stack(x)
        return logits

# 1️⃣ Set seeds for reproducibility
seed = 99
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

# 2️⃣ Optional: for full determinism
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

#Activate Neural Network
model = NeuralNetwork().to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-6)

#Iterate through new source domains as they wrap around n-sphere

# Choose which parameter to track
param_name_to_track = 'linear_relu_stack.0.weight'  # first layer weights

#Change numbers to track specific parameter
row, col = 0, 0

# Initialize storage for this single parameter
param_snapshots = []

# Training session parameters
epochs_per_session = 50
batch_size = 20
max_sessions = 1000  # adjust as needed
p = 0  # gradual_map angle counter

while p < max_sessions:
    # Step 1: Update new_xS globally
    gradual_map(new_xS, new_xT, 0.05*p)  # modifies new_xS in-place

    # Step 2: Prepare DataLoader
    train_dataset = TensorDataset(new_xS, yS)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

    # Step 3: Train the model
    model.train()
    running_loss = 0.0
    for epoch in range(epochs_per_session):
        for batch_x, batch_y in train_loader:
          batch_x, batch_y = batch_x.to(device), batch_y.to(device)
          optimizer.zero_grad()
          outputs = model(batch_x)
          loss = loss_fn(outputs, batch_y)
          loss.backward()
          optimizer.step()
        running_loss += loss.item() * batch_x.size(0)
    epoch_loss = running_loss / len(train_loader.dataset)
    """print(f"Session {p+1}, Training Loss: {epoch_loss:.4f}")"""

    # Step 4: Log only the chosen parameter element
    param_value = model.linear_relu_stack[0].weight[row, col].item()
    param_snapshots.append(param_value)

    # Step 5: Evaluate source accuracy

    model.eval()
    with torch.no_grad():
      preds_source = model(xS).argmax(dim=1)
      accuracy_source = (preds_source == yS).float().mean().item()

      preds_target = model(xT).argmax(dim=1)
      accuracy_target = (preds_target == yT).float().mean().item()
    """
    print(f"Accuracy source: {accuracy_source:.4f}, target: {accuracy_target:.4f}")
    """


    # Step 7: Increment angle
    p += 1

plt.plot(param_snapshots)
plt.xlabel("Training session")
plt.ylabel(f"{param_name_to_track}[{row},{col}] value")
plt.title("Tracked parameter evolution over sessions")
plt.grid(True)
plt.show()

# Evaluate final trained model on the target domain
"""
model.eval()  # set to evaluation mode

correct = 0
rounds = len(xT)
for _ in range(rounds):
    idx = random.randint(0, len(xT) - 1)
    single_x = xT[idx].unsqueeze(0).float()
    single_y = yT[idx]
    with torch.no_grad():
        logits = model(single_x)
        pred = logits.argmax(dim=1).item()
    if pred == single_y:
        correct += 1

print(f"Final accuracy on target domain: {correct/rounds:.4f}")
"""

