In [None]:
import torch
import os
from itertools import product
from random import shuffle
from tqdm import tqdm

from KNN_Embeddings import *

In [None]:
# Feature normalization
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset, Dataset
import random

class TripletDataset(Dataset):
    def __init__(self, embeddings, labels):
        self.embeddings = embeddings
        self.labels = labels
        self.labels_set = set(labels.numpy())
        self.label_to_indices = {label: np.where(labels.numpy() == label)[0]
                                 for label in self.labels_set}

    def __getitem__(self, index):
        anchor = self.embeddings[index]
        anchor_label = self.labels[index].item()
        positive_index = index
        while positive_index == index:
            positive_index = random.choice(self.label_to_indices[anchor_label])
        negative_label = random.choice(list(self.labels_set - {anchor_label}))
        negative_index = random.choice(self.label_to_indices[negative_label])
        positive = self.embeddings[positive_index]
        negative = self.embeddings[negative_index]
        return anchor, positive, negative, anchor_label, negative_label

    def __len__(self):
        return len(self.embeddings)


# Create a scaler object
scaler = StandardScaler()

# Fit on training data and transform both training and test data
X_train_normalized = scaler.fit_transform(X_train)
X_test_normalized = scaler.transform(X_test)

X_train_tensor = torch.tensor(X_train_normalized, dtype=torch.float)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test_normalized, dtype=torch.float)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

train_dataset = TripletDataset(X_train_tensor, y_train_tensor)
test_dataset = TripletDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)



In [None]:
import torch
import torch.nn as nn

class Generator(nn.Module):
    def __init__(self, input_size, hidden_dim, output_size):
        super(Generator, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_size, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_size),
            nn.Tanh()
        )
    
    def forward(self, combined_input):
        return self.net(combined_input)



In [None]:
class Discriminator(nn.Module):
    def __init__(self, input_size, hidden_dim, num_classes, hash_bit):
        super(Discriminator, self).__init__()
        self.feature_extractor = nn.Sequential(
            nn.Linear(input_size, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, hash_bit)  # Output hash bits
        )
        self.discriminator = nn.Linear(hash_bit, 1)  # Uses the hash bits for discrimination
        self.classifier = nn.Linear(hash_bit, num_classes)  # Uses the hash bits for classification

    def forward(self, x):
        features = self.feature_extractor(x)
        validity = self.discriminator(features)
        labels = self.classifier(features)
        return torch.sigmoid(validity), labels

    def get_feature_embeddings(self, x):
        return self.feature_extractor(x)



In [None]:
class DSHGAN(nn.Module):
    def __init__(self, g_input_size, g_hidden_dim, g_output_size, d_input_size, d_hidden_dim, num_classes, hash_bits):
        super(DSHGAN, self).__init__()
        self.generator = Generator(g_input_size, g_hidden_dim, g_output_size)
        self.discriminator = Discriminator(d_input_size, d_hidden_dim, num_classes, hash_bits)
    
    def forward(self, noise, labels, images):
        generated_images = self.generator(noise, labels)
        real_validity, real_label = self.discriminator(images)
        fake_validity, fake_label = self.discriminator(generated_images.detach())
        return real_validity, real_label, fake_validity, fake_label, generated_images

    def get_embeddings(self, x):
        # Assuming you want to use an intermediate layer of the discriminator for embeddings
        # This method should access that layer and return its output
        return self.discriminator.get_feature_embeddings(x)


In [None]:
# Parameters setup
noise_dim = 100
num_classes = 10
embedding_dim = 768
hash_bits = 36

# Initialize DSHGAN
model = DSHGAN(
    g_input_size=noise_dim + num_classes, 
    g_hidden_dim=256, 
    g_output_size=embedding_dim, 
    d_input_size=embedding_dim, 
    d_hidden_dim=256, 
    num_classes=num_classes,
    hash_bits=hash_bits,
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device) 

import torch.optim as optim
import torch.nn.functional as F

# Loss functions
adversarial_loss = torch.nn.BCELoss()
classification_loss = torch.nn.CrossEntropyLoss()

# Optimizers
optimizer_G = optim.Adam(model.generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
optimizer_D = optim.Adam(model.discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))




In [None]:
import torch
from torch.autograd import Variable
from torchvision.utils import save_image
import torch.nn.functional as F

# Training hyperparameters
num_epochs = 10
sample_interval = 400

for epoch in range(num_epochs):
    for batch_idx, (anchor, positive, negative, anchor_label, negative_label) in enumerate(train_loader):
        batch_size = anchor.size(0)
        
        # Adversarial ground truths
        valid = Variable(torch.FloatTensor(batch_size, 1).fill_(1.0), requires_grad=False)
        fake = Variable(torch.FloatTensor(batch_size, 1).fill_(0.0), requires_grad=False)
        
        # Configure input
        real_imgs = Variable(anchor.type(torch.FloatTensor))

        labels = Variable(anchor_label.type(torch.LongTensor))

        # Generate a batch of images
        z = Variable(torch.FloatTensor(np.random.normal(0, 1, (batch_size, noise_dim))))
        label_embeddings = F.one_hot(labels, num_classes).type(torch.FloatTensor)  # Convert labels to one-hot encoding

        # Concatenate noise and labels to form a combined input for the generator
        combined_input = torch.cat([z, label_embeddings], dim=1)
        gen_imgs = model.generator(combined_input)

        optimizer_D.zero_grad()

        real_imgs_flat = real_imgs.view(real_imgs.size(0), -1)
        gen_imgs_flat = gen_imgs.view(gen_imgs.size(0), -1)

        # Loss for real images
        real_pred, real_aux = model.discriminator(real_imgs_flat)
        d_real_loss = (adversarial_loss(real_pred, valid) +
                       classification_loss(real_aux, labels)) / 2

        # Loss for fake images
        fake_pred, fake_aux = model.discriminator(gen_imgs_flat.detach())
        d_fake_loss = (adversarial_loss(fake_pred, fake) +
                       classification_loss(fake_aux, labels)) / 2

        # Total discriminator loss
        d_loss = (d_real_loss + d_fake_loss) / 2

        d_loss.backward()
        optimizer_D.step()

        optimizer_G.zero_grad()

        # Loss for fake images
        fake_pred, fake_aux = model.discriminator(gen_imgs)
        g_loss = (adversarial_loss(fake_pred, valid) +
                  classification_loss(fake_aux, labels)) / 2

        g_loss.backward()
        optimizer_G.step()

        print(f"[Epoch {epoch}/{num_epochs}] [Batch {batch_idx}/{len(train_loader)}] [D loss: {d_loss.item()}] [G loss: {g_loss.item()}]")



In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report
from sklearn.metrics import average_precision_score
from sklearn.preprocessing import label_binarize
import numpy as np


def evaluate_model(model, test_loader, device):
    model.eval()
    embeddings = []
    labels = []
    with torch.no_grad():
        for anchor, _, _, label_a, _ in test_loader:
            anchor = anchor.to(device)
            output = model.get_embeddings(anchor)  # Now using the new method
            embeddings.append(output.cpu())
            labels.append(label_a)
    embeddings = torch.cat(embeddings)
    labels = torch.cat(labels)
    return embeddings, labels


# Ensure model and device are defined and properly initialized
# Example: model = DPSH(input_dim=X_train.shape[1], num_bits=48).to(device)
# and device is defined like device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Extract hash codes
train_codes, train_labels = evaluate_model(model, train_loader, device)
test_codes, test_labels = evaluate_model(model, test_loader, device)

# Classification with KNN
knn = KNeighborsClassifier(n_neighbors=7)
knn.fit(train_codes, train_labels)
predictions = knn.predict(test_codes)
y_pred_proba = knn.predict_proba(test_codes)

print(classification_report(test_labels, predictions))

# Binarize the labels for a one-vs-rest computation
y_test_binarized = label_binarize(test_labels, classes=np.unique(train_labels))  # Updated to use `test_labels`

# Calculate the average precision for each class
average_precisions = []
for i in range(y_test_binarized.shape[1]):  # iterate over classes
    average_precisions.append(average_precision_score(y_test_binarized[:, i], y_pred_proba[:, i]))

# Compute the mean of the average precisions
map_score = np.mean(average_precisions)
print(f'Mean Average Precision (MAP): {map_score}')
