In [100]:
import torch
from torch import nn, optim
from tqdm import tqdm
from torchvision import transforms, datasets
import matplotlib.pyplot as plt
import torch.nn.functional as F
import torch.utils.data as data
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np

%matplotlib inline

In [101]:
LATENT_DIM = 64  # Latent space dimension
NUM_EMBEDDINGS = 128  # Number of vectors in codebook
COMMITMENT_COST = 0.25  # Beta in loss function
BATCH_SIZE = 128
EPOCHS = 10
LEARNING_RATE = 1e-3

In [102]:
# Define the transformation
transform = transforms.Compose([transforms.ToTensor()])

# Define DataLoaders
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST(root="./data", train=True, download=True, transform=transform),
    batch_size=64, shuffle=True
)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST(root="./data", train=False, download=True, transform=transform),
    batch_size=64, shuffle=True
)

In [103]:
def residual_stack(h, num_hiddens, num_residual_layers):
    for _ in range(num_residual_layers):
        h = nn.Sequential(
            nn.ReLU(),
            nn.Conv2d(num_hiddens, num_hiddens, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(num_hiddens, num_hiddens, kernel_size=1, stride=1, padding=0)
        )(h)
    return h

In [104]:
# Transforming from x -> z_e
class Encoder(nn.Module):
    def __init__(self, latent_dim):
        super(Encoder, self).__init__()
        self.latent_dim = latent_dim
        self.conv1 = nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1)
        self.conv3 = nn.Conv2d(64, latent_dim, kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.conv3(x)  # No activation, raw latents
        residual_stack(x, self.latent_dim, 2)
        return x

In [105]:
# z_e -> z_q Codebook Dimension

class VectorQuantizer(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, commitment_cost):
        super(VectorQuantizer, self).__init__()
        self.num_embeddings = num_embeddings
        self.embedding_dim = embedding_dim
        self.commitment_cost = commitment_cost
        self.embedding = nn.Embedding(num_embeddings, embedding_dim)
        self.embedding.weight.data.uniform_(-1/num_embeddings, 1/num_embeddings)
    
    def forward(self, x):
        B, C, H, W = x.shape  # Get input shape
        x_flattened = x.permute(0, 2, 3, 1).contiguous().view(-1, C)
        distances = torch.cdist(x_flattened, self.embedding.weight)
        encoding_indices = torch.argmin(distances, dim=1)
        quantized = self.embedding(encoding_indices).view(B, H, W, C).permute(0, 3, 1, 2).contiguous()
        
        loss = F.mse_loss(quantized.detach(), x) + self.commitment_cost * F.mse_loss(x.detach(), quantized)
        quantized = x + (quantized - x).detach()
        
        return quantized, loss, encoding_indices

In [106]:
# z_q -> x_hat Decoding

class Decoder(nn.Module):
    def __init__(self, latent_dim):
        super(Decoder, self).__init__()
        self.latent_dim = latent_dim
        self.conv1 = nn.ConvTranspose2d(latent_dim, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1)
        self.conv3 = nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1)

    def forward(self, x):
        x = self.conv1(x)
        residual_stack(x, self.latent_dim, 2)
        x = self.conv2(x)
        x = torch.sigmoid(self.conv3(x))  # Output in [0,1]
        return  x

In [107]:
class VQVAE(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, commitment_cost):
        super(VQVAE, self).__init__()
        self.encoder = Encoder(embedding_dim)
        self.quantizer = VectorQuantizer(num_embeddings, embedding_dim, commitment_cost)
        self.decoder = Decoder(embedding_dim)
    
    def forward(self, x):
        z_e = self.encoder(x)
        z_q, loss, encoding_indices = self.quantizer(z_e)
        x_hat = self.decoder(z_q)
        return x_hat, loss, encoding_indices
    
    def encode(self, x):
        return self.quantizer(self.encoder(x))[2]
    
    def decode(self, x):

        return self.decoder(x)

In [108]:
class Trainer:
    def __init__(self, model, optimizer, train_loader, test_loader):
        self.model = model
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.train_losses = []
        self.test_losses = []

    def train(self, epochs):
        for epoch in range(epochs):
            self.model.train()
            train_loss = 0
            for batch, _ in tqdm(self.train_loader):
                self.optimizer.zero_grad()
                def add_noise(x):
                    return np.clip(x + torch.randn_like(x) * 0.1, 0, 1)
                batch = add_noise(batch)
                x_hat, loss, _ = self.model(batch)
                loss_1 = F.binary_cross_entropy(x_hat, batch)
                loss = loss.mean()
                loss += loss_1
                loss.backward()
                self.optimizer.step()
                train_loss += loss.item()
            self.train_losses.append(train_loss)
            print(f"Epoch {epoch}: Train Loss: {train_loss}")
            self.test()
    
    def visualize_reconstructions(self, n=5
    ):
        self.model.eval()
        for batch, _ in self.test_loader:
            x_hat, _, idx = self.model(batch)
            for i in range(n):
                plt.subplot(2, n, i+1)
                plt.imshow(batch[i].squeeze().detach().numpy(), cmap="gray")
                plt.subplot(2, n, i+1+n)
                plt.imshow(x_hat[i].squeeze().detach().numpy(), cmap="gray")
            plt.show()
            break

    def test(self):
        self.model.eval()
        test_loss = 0
        for batch, _ in self.test_loader:
            x_hat, loss, _ = self.model(batch)
            loss = loss.mean()
            test_loss += loss.item()
        self.test_losses.append(test_loss)
        print(f"Test Loss: {test_loss}")

model = VQVAE(NUM_EMBEDDINGS, LATENT_DIM, COMMITMENT_COST)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
trainer = Trainer(model, optimizer, train_loader, test_loader)
trainer.train(10)
trainer.visualize_reconstructions()
trainer.test()

  0%|          | 0/938 [00:00<?, ?it/s]

  2%|▏         | 22/938 [00:02<01:49,  8.38it/s]


KeyboardInterrupt: 

# Recommendation

In [158]:
class Encoder(nn.Module):
    def __init__(self, num_items, embedding_dim=16, dropout_prob=0.5):
        super(Encoder, self).__init__()
        self.dense_layer_1 = nn.Conv1d(num_items, 256, kernel_size=3, stride=1, padding=1)
        self.dropout = nn.Dropout(dropout_prob)
        
        self.dense_layer_2 = nn.Conv1d(256, 64, kernel_size=3, stride=1, padding=1)  
        self.batch_norm = nn.LayerNorm(64)  
        
        self.dense_layer_3 = nn.Conv1d(64, embedding_dim, kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        x = x.permute(1, 0)
        x = F.leaky_relu(self.dense_layer_1(x).permute(1, 0))
        x = self.dropout(x)
        x = F.leaky_relu(self.batch_norm(self.dense_layer_2(x).permute(0, 2, 1)))
        x = self.dense_layer_3(x)
        return x


class Decoder(nn.Module):
    def __init__(self, num_items, embedding_dim=16, dropout_prob=0.5):
        super(Decoder, self).__init__()
        self.dense_layer_1 = nn.Conv1d(embedding_dim, 64, kernel_size=3, stride=1, padding=1)
        self.batch_norm1 = nn.LayerNorm(64)
        
        self.dense_layer_2 = nn.Conv1d(64, 256, kernel_size=3, stride=1, padding=1)
        self.batch_norm2 = nn.LayerNorm(256)
        
        self.dropout = nn.Dropout(dropout_prob)
        
        self.dense_layer_3 = nn.Conv1d(256, num_items, kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        x = self.dense_layer_1(x)
        x = F.leaky_relu(x)
        x = self.batch_norm1(x)
        
        x = self.dense_layer_2(x)
        x = F.leaky_relu(x)
        
        x = self.dropout(x)
        
        x = self.dense_layer_3(x)  
        x = torch.sigmoid(x)
        return x
    
class VectorQuantizer(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, commitment_cost=0.25):
        super(VectorQuantizer, self).__init__()
        self.num_embeddings = num_embeddings
        self.embedding_dim = embedding_dim
        self.commitment_cost = commitment_cost

        self.embedding = nn.Embedding(num_embeddings, embedding_dim)
        self.embedding.weight.data.uniform_(-1.0 / num_embeddings, 1.0 / num_embeddings)

    def forward(self, z_e):
        z_e_reshaped = z_e.view(-1, self.embedding_dim)
        distances = torch.cdist(z_e_reshaped, self.embedding.weight)

        encoding_indices = torch.argmin(distances, dim=1)
        z_q = self.embedding(encoding_indices).view(z_e.shape)

        # Improved VQ loss to encourage embedding diversity
        commitment_loss = F.mse_loss(z_q.detach(), z_e)
        codebook_loss = F.mse_loss(z_e.detach(), z_q)  # Codebook update loss
        loss = commitment_loss + self.commitment_cost * codebook_loss

        return z_q, encoding_indices, loss
    
class VQVAE(nn.Module):
    def __init__(self, input_dim, latent_dim, num_embeddings):
        super(VQVAE, self).__init__()
        self.encoder = Encoder(input_dim, latent_dim)
        self.quantizer = VectorQuantizer(num_embeddings, latent_dim)
        self.decoder = Decoder(input_dim, latent_dim)

    def forward(self, x):
        print(x.shape)
        z_e = self.encoder(x)
        z_q, encodings, vq_loss = self.quantizer(z_e)
        x_reconstructed = self.decoder(z_q)
        return x_reconstructed, vq_loss, encodings


In [159]:
class VQVAE_Rec_Sys:
    def __init__(self, dataset, num_items, val_split=0.2, embedding_dim=16, num_embeddings=4, batch_size=32):
        # Initialize the model components
        self.dataset = dataset
        self.train_dataset, self.val_dataset = train_test_split(dataset, test_size=val_split, random_state=42)
        self.num_items = num_items
        self.embedding_dim = embedding_dim
        self.num_embeddings = num_embeddings
        self.model = VQVAE(input_dim=num_items, latent_dim=embedding_dim, num_embeddings=num_embeddings)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)
        self.batch_size = batch_size
        self.train_losses = []
        self.val_losses = []
    
    def train(self, epochs=10):
        for epoch in range(epochs):
            self.model.train()
            total_loss = 0    
            progress_bar = tqdm(range(0, len(self.train_dataset), self.batch_size), desc=f"Epoch {epoch+1}/{epochs}")

            for i in progress_bar:
                # Get the batch from the dataset
                batch_data = self.train_dataset[i:i+self.batch_size].detach().to(self.device)
                
                # Zero the gradients
                self.optimizer.zero_grad()

                # Forward pass through the model
                x_reconstructed, vq_loss, _ = self.model(batch_data)

                # Compute reconstruction loss
                recon_loss = F.binary_cross_entropy(x_reconstructed, batch_data)

                # Total loss = reconstruction loss + VQ loss
                loss = recon_loss + vq_loss
                total_loss += loss.item()

                # Backward pass and optimizer step
                loss.backward()
                self.optimizer.step()
            avg_train_loss = total_loss / len(self.train_dataset)
            self.train_losses.append(avg_train_loss)

            # Validation Step
            val_loss = self.validate()
            self.val_losses.append(val_loss)
            # Print the loss for this epoch
            print(f"Epoch {epoch+1}/{epochs} - Train Loss: {avg_train_loss:.8f} | Val Loss: {val_loss:.8f}")

    def validate(self):
        self.model.eval()
        val_loss = 0

        with torch.no_grad():
            for i in range(0, len(self.val_dataset), self.batch_size):
                # Get the batch from the dataset
                batch = self.val_dataset[i:i+self.batch_size].detach().to(self.device)
                recon_batch, vq_loss, _ = self.model(batch)
                loss = F.binary_cross_entropy(recon_batch, batch) + vq_loss
                val_loss += loss.item()

        return val_loss / len(self.val_dataset)
    
    def recommend_items(self, user_index, top_k=10):
        self.model.eval()
        
        # Generate the input vector for the user (e.g., user-item interaction vector)
        user_data = self.dataset[user_index].detach().unsqueeze(0).to(self.device)

        # Forward pass to get reconstructed preferences
        with torch.no_grad():
            x_reconstructed, _, encodings = self.model(user_data)

        # After reconstruction, we use the encoder's output (latent representation)
        # Here we just use the reconstructed values to recommend items
        # For simplicity, we take the reconstructed output and find the top-k items

        reconstructed_preferences = x_reconstructed.squeeze().cpu().numpy()

        # Get the top K most recommended items
        recommended_items = reconstructed_preferences.argsort()[-top_k:][::-1]
        
        return recommended_items
    
    def plot_loss(self):
        plt.plot(self.train_losses[1:], label="Train Loss")
        plt.plot(self.val_losses[1:], label="Validation Loss")
        plt.xlabel("Epochs")
        plt.ylabel("Loss")
        plt.legend()
        plt.show()

In [160]:
df = pd.read_csv('data/lfm_interactions.csv', sep="\t", index_col=0)
inter_matr = pd.pivot_table(df, values='count', index='user_id', columns='item_id')
inter_matr = inter_matr.fillna(0).to_numpy()
inter_matr = (inter_matr > 0).astype(int)

dataset = torch.FloatTensor(inter_matr)
rec_sys = VQVAE_Rec_Sys(dataset, num_items=dataset.shape[1], embedding_dim=16, num_embeddings=128, batch_size=32)
rec_sys.train(epochs=40)

rec_sys.plot_loss()


Epoch 1/40:   0%|          | 0/70 [00:00<?, ?it/s]

torch.Size([32, 4175])





RuntimeError: Given groups=1, weight of size [64, 256, 3], expected input[1, 32, 256] to have 256 channels, but got 32 channels instead

In [None]:
ids = pd.read_csv('evaluation-data/test_users.txt')
user_recommendations = []
print(rec_sys.dataset.shape[0])


for id in ids.user_id:
    recommendation = rec_sys.recommend_items(id, top_k=10)
    user_recommendations.append([id, ','.join(recommendation.astype(str))])

df = pd.DataFrame(user_recommendations, columns=['user_id', 'recommendations'])

df.to_csv('res/rec_vq-vae.tsv', index=False, sep='\t', header=False)

2795


# Code Compression using VQ-VAE

In [None]:
class Encoder(nn.Module):
    def __init__(self, num_items, embedding_dim=16, dropout_prob=0.5):
        super(Encoder, self).__init__()
        self.mlp = nn.Sequential(
            nn.Linear(num_items, 256),
            nn.GELU(),
            nn.LayerNorm(256),
            nn.Dropout(dropout_prob),
            nn.Linear(256, 128),
            nn.GELU(),
            nn.Linear(128, embedding_dim)
        )
    def forward(self, x):
        return self.mlp(x)

class Decoder(nn.Module):
    def __init__(self, num_items, embedding_dim=16, dropout_prob=0.5):
        super(Decoder, self).__init__()
        self.mlp = nn.Sequential(
            nn.Linear(embedding_dim, 128),
            nn.GELU(),
            nn.LayerNorm(128),
            nn.Dropout(dropout_prob),
            nn.Linear(128, 256),
            nn.GELU(),
            nn.Linear(256, num_items)
        )
    def forward(self, x):
        return torch.sigmoid(self.mlp(x))
    
class Quanitzation(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, commitment_cost=0.25):
        super(Quanitzation, self).__init__()
        self.num_embeddings = num_embeddings
        self.embedding_dim = embedding_dim
        self.commitment_cost = commitment_cost
        self.embedding = nn.Embedding(num_embeddings, embedding_dim)
        self.embedding.weight.data.uniform_(-1.0 / num_embeddings, 1.0 / num_embeddings)
        
    def forward(self, z_e):
        z_e_reshaped = z_e.view(-1, self.embedding_dim)
        distances = torch.cdist(z_e_reshaped, self.embedding.weight)
        encoding_indices = torch.argmin(distances, dim=1)
        z_q = self.embedding(encoding_indices).view(z_e.shape)
        
        commitment_loss = F.mse_loss(z_q.detach(), z_e)
        codebook_loss = F.mse_loss(z_e.detach(), z_q)
        loss = commitment_loss + self.commitment_cost * codebook_loss
        
        return z_q, encoding_indices, loss

