In [7]:
import numpy as np 
from Utils.tokenizer import nearly_orthogonal_vectors
from Dataset.Utils import prepare_data_range

In [8]:
N = 1024
K = 1024

V =  nearly_orthogonal_vectors(N, K, tol=0.01, max_tries=100)
G = V @ V.T  # Gram matrix; off-diagonals should be small
print("max off-diagonal |dot|:", np.max(np.abs(G - np.eye(K))))

max off-diagonal |dot|: 7.130667584176464e-13


In [11]:
# select a few random vector 
indices = np.arange(K)
d = 256
sampled_idx = np.random.choice(indices, size=d, replace=False)
P = V[sampled_idx]
P.shape

(256, 1024)

In [13]:
v = np.random.random(size=(N,))
proj_v = np.dot(P, v)
proj_v.shape

(256,)

In [2]:
embeds = np.load('/mnt/d/ML/Kaggle/CAFA6-new/Dataset/esm2_embeds_cafa5/train_embeddings.npy')
embeds.shape

(142246, 1280)

In [3]:
import torch
import torch.nn as nn

class EmbedTokenizer(nn.Module):
    def __init__(self, D, d, N, rng=None):  
        super(EmbedTokenizer, self).__init__()
        """
        Parameters
        ----------
        D : int
            Dimension of the embedding space.
        d : int
            Dimension of the token space.
        N : int
            Number of tokens.
        rng : np.random.Generator or None
            Random generator. If None, use default.
        """
        if rng is None:
            rng = np.random.default_rng()

        K = D
        V = nearly_orthogonal_vectors(D, K, tol=0.01, max_tries=100, rng=rng)
        
        P = []

        for i in range(N):
            indices = np.arange(K)
            sampled_idx = np.random.choice(indices, size=d, replace=False)
            p = V[sampled_idx]
            P.append(torch.tensor(p, dtype=torch.float32))
        
        # Stack all P matrices into a single tensor: (N, d, D)
        self.P = torch.stack(P)
        self.register_buffer('P_buffer', self.P)

    def forward(self, x):

        """
        Parameters
        ----------
        x : Tensor, shape (batch_size, D) or (D,)
            Input embeddings.

        Returns
        -------
        tokens : Tensor, shape (batch_size, N, d) or (N, d)
            Token representations.
        """
        # Handle both single sample and batch
        if x.dim() == 1:
            x = x.unsqueeze(0)  # (1, D)
            squeeze_output = True
        else:
            squeeze_output = False
        
        batch_size = x.shape[0]
        D = x.shape[1]
        
        # Get P and move to correct device/dtype
        P = self.P_buffer.to(dtype=x.dtype, device=x.device)  # (N, d, D)
        
        # Vectorized computation: (batch_size, D) @ (N, D, d).transpose(-1, -2) -> (batch_size, N, d)
        # x: (batch_size, D)
        # P: (N, d, D)
        # We need: (batch_size, D) @ (D, N*d) -> (batch_size, N*d) -> reshape to (batch_size, N, d)
        
        tokens = torch.matmul(x, P.transpose(-1, -2))  # (batch_size, N, d)
        
        if squeeze_output:
            tokens = tokens.squeeze(0)  # Remove batch dimension if input was 1D
        
        return tokens


In [None]:
from torch.utils.data import Dataset, DataLoader
import numpy as np


class EmbeddingsDataset():
    def __init__(self, data, oversample_indices=None):
        self.data = data
        self.oversample_indices = oversample_indices if oversample_indices is not None else list(range(len(self.sequences)))

    def __len__(self):
        return len(self.oversample_indices)

    def __getitem__(self, idx):
        sample_idx = self.oversample_indices[idx]

        return {
            'entryID': self.data['entries'][sample_idx],
            'embeds' : self.data['embeds'][sample_idx],
            'label' : self.data['labels'][sample_idx]
        }

torch.Size([1280])

In [None]:
prepare_data_range(embeds)