In [219]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.datasets import CIFAR10
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset, TensorDataset
import random
import numpy as np
from typing import Tuple
from tqdm import tqdm
from scipy.spatial import distance_matrix
import pandas as pd

In [220]:
root = "C:/Users/xiaoy/OneDrive/Desktop/P7/p7 project/DVP7/"
# load data
X_train = torch.tensor( np.load(root + "Features/train_features_vgg16_cifar10.npy" ) )
y_train = np.load(root + "Features/train_labels_vgg16_cifar10.npy" )

X_test = torch.tensor( np.load(root + "Features/test_features_vgg16_cifar10.npy" ) )
y_test = np.load(root + "Features/test_labels_vgg16_cifar10.npy")

y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

In [221]:
def meanAveragePrecision(test_hashes, training_hashes, test_labels, training_labels):
    aps = []
    num_queries = len(test_hashes)
    for i in tqdm(range(num_queries)):
        label = test_labels[i]
        distances = (training_hashes != test_hashes[i]).sum(axis=1)  # Hamming distance
        tp = (training_labels == label).long()  # True positive indicator
        hash_df = pd.DataFrame({"distances": distances, "tp": tp.cpu().numpy()})
        hash_df = hash_df.sort_values(by="distances").reset_index(drop=True)
        hash_df["tp_cumsum"] = hash_df["tp"].cumsum()
        hash_df["precision"] = hash_df["tp_cumsum"] / (np.arange(len(hash_df)) + 1)
        ap = hash_df["precision"].where(hash_df["tp"] == 1).mean() if hash_df["tp"].sum() > 0 else 0
        aps.append(ap)

    return np.mean(aps)


In [227]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SUBIC_encoder(nn.Module): 
    def __init__(self, bits=48, num_classes=10, num_blocks=8, block_size=8):
        super(SUBIC_encoder, self).__init__()
       
        assert bits % num_blocks == 0, "Bits must be divisible by num_blocks"

        self.bits = bits 
        self.num_blocks = num_blocks
        self.block_size = block_size

        # Define the encoder structure
        self.encoder = nn.Sequential(
            nn.Linear(4096, 256), 
            nn.ReLU(),
            nn.Linear(256, bits)
        )  # Outputs binary feature vectors
        
        self.fc3 = nn.Linear(bits, num_classes)  # Logits for num_classes
    
    def block_softmax(self, x):
        
        batch_size = x.shape[0]
        block_size = x.shape[1] // self.num_blocks
        
        # Ensure that x has the expected shape
        assert x.shape[1] == self.bits, f"Expected shape [batch_size, {self.bits}], got {x.shape}"
        
        # Reshape and apply softmax
        x = x.view(batch_size, self.num_blocks, block_size)
        x = F.softmax(x, dim=-1) 
        return x.view(batch_size, -1)
    
    def block_one_hot(self, x):
        batch_size = x.shape[0]

        x = x.view(batch_size, self.num_blocks, self.block_size)
        max_indices = x.argmax(dim=-1, keepdim=True)
        
        # Create one-hot encoding
        one_hot = torch.zeros_like(x).scatter_(-1, max_indices, 1)

        return one_hot.view(batch_size, self.bits)
    
    def forward(self, x, use_one_hot=False):
        # Ensure x is a flat tensor before passing to encoder
        batch_size = x.shape[0]
        x = x.view(batch_size, -1)  # Flatten if necessary

        z = self.encoder(x)

        if use_one_hot:
            binary_codes = self.block_one_hot(z)
        else:
            binary_codes = self.block_softmax(z)

        logits = self.fc3(binary_codes)

        return logits, binary_codes


In [243]:
model = SUBIC_encoder(bits = 48, num_classes = 10, num_blocks = 8, block_size = 6)
optimizer = optim.Adam(model.parameters(), lr=0.005)
gamma, mu = 0.5, 0.1  # Adjust based on experimentation
epochs = 10
logits, binary_codes = model(X_train, use_one_hot = False)

In [229]:
logits.shape

torch.Size([45000, 10])

In [244]:
def compute_total_loss(logits, target, binary_codes, num_blocks, block_size, gamma=0.05, mu=0.05):
    """
    Computes the total loss, which includes:
    - Cross-entropy classification loss
    - Mean entropy loss (encouraging one-hot encoding within each block)
    - Batch entropy loss (encouraging uniform distribution across blocks)
    
    Parameters:
    - logits: The output logits from the classification layer.
    - target: The true labels.
    - binary_codes: The binary codes generated by the encoder.
    - num_blocks: The number of blocks in the binary codes.
    - block_size: The size of each block in the binary codes.
    - gamma: Weight for the mean entropy loss.
    - mu: Weight for the batch entropy loss.
    
    """
    classification_loss = F.cross_entropy(logits, target)

    batch_size = binary_codes.shape[0]
    binary_codes = binary_codes.view(batch_size, num_blocks, block_size) #used in structure encoding

    #Mean Entropy Loss (encourages each block to resemble a one-hot vector) using softmax binary codes
    mean_entropy_loss = -torch.sum(binary_codes * torch.log2(binary_codes + 1e-10), dim=-1).mean()

    #Batch Entropy Loss (encourages uniform distribution across blocks)
    average_support = binary_codes.mean(dim=0)  
    batch_entropy_loss = torch.sum(-average_support * torch.log2(average_support + 1e-10)).mean()

    #Combine losses with weights gamma and mu
    entropy_loss = gamma * mean_entropy_loss - mu * batch_entropy_loss
    total_loss = classification_loss + entropy_loss
    
    return total_loss

#logits, binary_codes = model(X_train, use_one_hot=False)
#loss = compute_total_loss(logits, y_train_tensor, binary_codes, num_blocks=8, block_size=4, gamma=0.5, mu=0.05)

In [245]:
epochs = 10

train_dataset = TensorDataset(X_train, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_dataset = TensorDataset(X_test, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for epoch in range(epochs):
    model.train()
    total_loss = 0.0

    for images, labels in train_loader:
        
        images, labels = images.to(device), labels.to(device)
        logits, binary_codes = model(images, use_one_hot=False)

        # Compute loss and update model
        loss = compute_total_loss(logits, labels, binary_codes, num_blocks=8, block_size=6, gamma=0.05, mu=0.05)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss / len(train_loader):.4f}")


Epoch [1/10], Loss: 0.9445
Epoch [2/10], Loss: 0.6850
Epoch [3/10], Loss: 0.5616
Epoch [4/10], Loss: 0.5315
Epoch [5/10], Loss: 0.4361
Epoch [6/10], Loss: 0.4024
Epoch [7/10], Loss: 0.3988
Epoch [8/10], Loss: 0.3311
Epoch [9/10], Loss: 0.3413
Epoch [10/10], Loss: 0.3200


In [261]:
model.eval()
all_query_codes, all_query_labels = [], []
all_db_codes, all_db_labels = [], []

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)

        _, binary_codes = model(images, use_one_hot=True)

        # Ensure binary_codes is a tensor
        if not isinstance(binary_codes, torch.Tensor):
            raise TypeError("Expected binary_codes to be a tensor.")
        
        all_db_codes.append(binary_codes)
        all_db_labels.append(labels)

        if len(all_query_codes) == 0:  
            all_query_codes.append(binary_codes.clone())  
            all_query_labels.append(labels.clone())

# Concatenate all tensors
all_query_codes = torch.cat(all_query_codes, dim=0)
all_query_labels = torch.cat(all_query_labels, dim=0)
all_db_codes = torch.cat(all_db_codes, dim=0)
all_db_labels = torch.cat(all_db_labels, dim=0)

# Calculate MAP Score
map_score = meanAveragePrecision(
    all_query_codes,
    all_db_codes,
    all_query_labels,
    all_db_labels
    )

print(f"MAP Score: {map_score:.5f}")

100%|██████████| 64/64 [00:00<00:00, 240.21it/s]

MAP Score: 0.52585



