<a href="https://colab.research.google.com/github/nupur412/hiwitask/blob/main/Task.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import EMNIST
from torchvision import datasets, transforms
from tqdm import tqdm
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
import numpy as np
from PIL import Image

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

The below code defines a Resnet-9 Encoder which is without the classification layer

In [3]:
def conv_block(in_channels, out_channels, pool=False, pool_no=2):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
              nn.BatchNorm2d(out_channels),
              nn.ReLU()
              ]
    if pool: layers.append(nn.MaxPool2d(pool_no))
    return nn.Sequential(*layers)

class ResNet9Features(nn.Module):
    def __init__(self, in_channels=1, num_classes=47):
        super(ResNet9Features, self).__init__()

        self.conv1 = conv_block(1, 64)
        self.conv2 = conv_block(64, 128, pool=True, pool_no=2)
        self.res1 = nn.Sequential(conv_block(128, 128), conv_block(128, 128))

        self.conv3 = conv_block(128, 256, pool=True)
        self.conv4 = conv_block(256, 256, pool=True, pool_no=2)
        self.res2 = nn.Sequential(conv_block(256, 256), conv_block(256, 256))

        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.FlatFeats = nn.Flatten()

    def forward(self, xb):
        out = self.conv1(xb)
        out = self.conv2(out)
        out = self.res1(out) + out
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.res2(out) + out
        out = self.global_avg_pool(out)
        out = self.FlatFeats(out)
        return out

In [5]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

# Load the EMNIST dataset
emnist_train_dataset = datasets.EMNIST(root='./data', split='balanced', train=True, transform=transform, download=True)
emnist_test_dataset = datasets.EMNIST(root='./data', split='balanced', train=False, transform=transform, download=True)

# Split the dataset into classes
classes = np.unique(emnist_train_dataset.targets.numpy())
print(classes)

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46]


In [6]:
train_size = int(0.8 * len(emnist_train_dataset))
val_size = len(emnist_train_dataset) - train_size

train_dataset, val_dataset = random_split(emnist_train_dataset, [train_size, val_size])

# Create DataLoader for training
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(emnist_test_dataset, batch_size=256, shuffle=False)

The below code defines a SIMCLR class that uses Resnet9 as the base encoder

In [7]:
class SimCLR(nn.Module):
    def __init__(self, base_encoder, projection_dim=128):
        super(SimCLR, self).__init__()

        # Base Encoder
        self.base_encoder = base_encoder

        # Projection Head
        self.projection_head = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Linear(128, projection_dim)
        )

    def forward(self, x1, x2):
        # Encoding both augmented views
        h1 = self.base_encoder(x1)
        h2 = self.base_encoder(x2)

        # Projection Head
        z1 = self.projection_head(h1)
        z2 = self.projection_head(h2)

        return h1, h2, z1, z2

    def loss_function(z1, z2, temperature=0.5):
        # Normalizing the embeddings
        z1 = F.normalize(z1, dim=-1, p=2)
        z2 = F.normalize(z2, dim=-1, p=2)

        # Computing cosine similarity
        sim_scores = F.cosine_similarity(z1, z2, dim=-1) / temperature

        # Creating labels for cross entropy (positive pairs have label 1, negative pairs have label 0)
        labels = torch.ones_like(sim_scores)

        # Loss calculation using cross entropy with logits
        loss = F.cross_entropy(sim_scores.unsqueeze(1), labels.long())

        return loss

In [8]:
# Move the model and data to the GPU
base_encoder = ResNet9Features().to(device)
simclr_model = SimCLR(base_encoder).to(device)

In [9]:
class InfoNCELoss(nn.Module):
    def __init__(self, temperature=1.0):
        super(InfoNCELoss, self).__init__()
        self.temperature = temperature

    def forward(self, z1, z2):
        # Normalize embeddings
        z1 = F.normalize(z1, dim=1)
        z2 = F.normalize(z2, dim=1)

        # Cosine similarity
        similarity_matrix = torch.matmul(z1, z2.t()) / self.temperature

        # Create labels for positive pairs
        labels = torch.arange(similarity_matrix.size(0)).to(z1.device)

        # Calculate InfoNCE loss
        loss = F.cross_entropy(similarity_matrix, labels)

        return loss


# Instantiate the InfoNCELoss function
criterion = InfoNCELoss(temperature=0.5).to(device)

The code below trains the SIMCLR model

In [None]:
epochs = 50

# Set up the optimizer and learning rate scheduler
optimizer = torch.optim.Adam(simclr_model.parameters(), lr=0.15)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=len(train_loader), eta_min=0, last_epoch=-1)

# Set the temperature for the contrastive loss
temperature = 0.5

encoder_representations = []

for epoch in range(epochs):
    simclr_model.train()
    total_loss = 0.0

    for batch in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{epochs}'):
        # Extract the batch of images
        x = batch[0].to(device)

        # Data augmentation
        augmentations = transforms.Compose([
            transforms.RandomResizedCrop(28),
            transforms.RandomHorizontalFlip(),
            transforms.RandomGrayscale(p=0.2),
        ])

        x1 = augmentations(x)

        # Move data to device
        x, x1 = x.to(device), x1.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        h1, h2, proj1, proj2 = simclr_model(x, x1)

        # Save encoder representations for further use
        encoder_representations.append(h1.detach().cpu().numpy())

        # Compute SimCLR loss
        loss = criterion(proj1, proj2)

        # Backward pass
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # Print average loss for the epoch
    average_loss = total_loss / len(train_loader)
    print(f'Epoch {epoch + 1}/{epochs}, Average Loss: {average_loss:.4f}')

    # Adjust learning rate with scheduler
    scheduler.step()

# Save encoder representations to a file
torch.save({'encoder_representations': encoder_representations}, 'encoder_representations.pth')


Finetuning for a downstream task

In [None]:
from torch.optim import lr_scheduler
from torch.autograd import Variable

simclr_model.load_state_dict(torch.load('simclr_model.pth'))

# Adding a classification layer outside the SimCLR model
class Classifier(nn.Module):
    def __init__(self, in_features, num_classes):
        super(Classifier, self).__init__()
        self.fc = nn.Linear(in_features, num_classes)

    def forward(self, x):
        return self.fc(x)

# Freeze the encoder
for param in simclr_model.encoder.parameters():
    param.requires_grad = False


classifier_learning_rate = 0.001
classifier = Classifier(in_features=128, num_classes=47)
classifier.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(classifier.parameters(), lr=0.01)

exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
num_epochs = 25
for epoch in range (num_epochs):
    classifier.train()
    total_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = classifier(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    average_loss = total_loss / len(train_loader)

    # Validation
    classifier.eval()
    with torch.no_grad():
        total_val_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = classifier(inputs)
            val_loss = criterion(outputs, labels)
            total_val_loss += val_loss.item()

            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        average_val_loss = total_val_loss / len(val_loader)
        accuracy = 100 * correct / total

    print(f'Epoch [{epoch + 1}/{num_epochs}], '
          f'Training Loss: {average_loss:.4f}, '
          f'Validation Loss: {average_val_loss:.4f}, '
          f'Validation Accuracy: {accuracy:.2f}%')

# Testing
classifier.eval()
with torch.no_grad():
    total_test_loss = 0.0
    correct_test = 0
    total_test = 0

    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        outputs = classifier(inputs)
        test_loss = criterion(outputs, labels)
        total_test_loss += test_loss.item()

        _, predicted_test = outputs.max(1)
        total_test += labels.size(0)
        correct_test += predicted_test.eq(labels).sum().item()

    average_test_loss = total_test_loss / len(test_loader)
    test_accuracy = 100 * correct_test / total_test

print(f'Testing Loss: {average_test_loss:.4f}, Testing Accuracy: {test_accuracy:.2f}%')

In [None]:
# Load representation vectors from the saved file
representation_vectors = torch.load('encoder_representations.pth')

In [None]:
representation_vectors_cpu = representation_vectors.cpu()

# Convert the PyTorch tensor to a NumPy array
representation_vectors_np = representation_vectors_cpu.numpy()

The below code defines a class LSH - Local Sensitive Hashing

In [None]:
class LSH:
    def __init__(self, num_buckets, projection_dim, seed=None):
        np.random.seed(seed)
        self.num_buckets = num_buckets
        self.projection_dim = projection_dim
        self.projections = np.random.randn(projection_dim, self.num_buckets)

    def hash_vector(self, vector):
        # Project the vector using random projections
        projections_result = np.dot(self.projections, vector)

        # Apply a sign function to obtain hash codes
        hash_codes = np.sign(projections_result)

        # Convert the hash codes to integers
        hash_indices = int("".join(map(str, (hash_codes > 0).astype(int))), 2)

        return hash_indices

    def hash_dataset(self, dataset):
        # Hash each vector in the dataset
        hashed_data = [self.hash_vector(vector) for vector in dataset]

        # Return the list of hash codes
        return hashed_data

In [None]:
num_buckets = 2**12
projection_dim = 128

# Initialize LSH
lsh = LSH(num_buckets, projection_dim)

In [None]:
# Hash the representation vectors
hashed_representations = lsh.hash_dataset(representation_vectors_np)

In [None]:
# Print the resulting hash codes
print("Hash Codes:")
for i, hash_code in enumerate(hashed_representations):
    print(f"Vector {i + 1}: {hash_code}")

In [None]:
from torchvision import datasets, transforms

# Define data transformations
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

# Download MNIST dataset
mnist_train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
mnist_test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)

# Download Fashion MNIST dataset
fmnist_train_dataset = datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
fmnist_test_dataset = datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)

In [None]:
batch_size = 64
mnist_train_loader = torch.utils.data.DataLoader(mnist_train_dataset, batch_size=batch_size, shuffle=True)
mnist_test_loader = torch.utils.data.DataLoader(mnist_test_dataset, batch_size=batch_size, shuffle=False)
fmnist_train_loader = torch.utils.data.DataLoader(fmnist_train_dataset, batch_size=batch_size, shuffle=True)
fmnist_test_loader = torch.utils.data.DataLoader(fmnist_test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Generate query data
mnist_queries = [mnist_test_dataset[i][0] for i in range(10000)]
fashion_mnist_queries = [fmnist_test_dataset[i][0] for i in range(10000)]

mnist_queries2 = [mnist_test_dataset[i][0] for i in range(20000)]
fashion_mnist_queries2 = [fmnist_test_dataset[i][0] for i in range(20000)]

In [None]:
# Obtain representations
def get_representation(model, query_data):
    device = next(model.parameters()).device  # Get the device of the model parameters
    query_data = query_data.to(device)  # Move input data to the same device as the model
    with torch.no_grad():
        representation = model.encoder(query_data)
    return representation


mnist_representations = get_representation(simclr_model, torch.stack(mnist_queries))
fashion_mnist_representations = get_representation(simclr_model, torch.stack(fashion_mnist_queries))

In [None]:
def lsh_estimation(representations, num_buckets=2**12):
    # Step 1: Hash the representations
    hash_values = np.array([hash_function(rep) for rep in representations])

    # Step 2: Count unique buckets
    unique_buckets = np.unique(hash_values)

    # Step 3: Calculate fraction
    fraction_occupied = len(unique_buckets) / num_buckets

    return fraction_occupied

def hash_function(vector):
    return hash(tuple(vector.tolist()))

In [None]:
fraction_10000 = lsh_estimation(mnist_representations[:10000])
fashion_mnist_fraction = lsh_estimation(fashion_mnist_representations[:10000])

fraction_20000 = lsh_estimation(mnist_representations[:20000])
fashion_mnist_fraction2 = lsh_estimation(fashion_mnist_representations[:20000])

print(f"Fraction of embedding space occupied by MNIST: {fraction_10000}")
print(f"Fraction of embedding space occupied by Fashion MNIST: {fashion_mnist_fraction}")