In [9]:
import pandas as pd
import random
import numpy as np
import os
import polars as pl
import gensim
import torch
import torch.nn as nn
import dask.dataframe as dd

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [2]:
w2v = gensim.models.Word2Vec.load(
    "word2vec-gensim-text8-custom-preprocess.model"
)

vocab = w2v.wv.index_to_key
word_to_idx = {word: i for i, word in enumerate(vocab)}
embeddings_array = np.array([w2v.wv[word] for word in vocab])
embeddings = torch.tensor(embeddings_array, dtype=torch.float32)
print(embeddings.shape)

embedding_layer = nn.Embedding.from_pretrained(embeddings, freeze=True)

torch.Size([74792, 100])


In [3]:
# def embed_tokens(tokens: list[str], unknown_tokens: set):
#     valid_tokens = [token for token in tokens if token in word_to_idx]
#     unknown_tokens.update(set(tokens) - set(valid_tokens))
#     if valid_tokens:
#         return (
#             embedding_layer(
#                 torch.tensor(
#                     [word_to_idx[token] for token in valid_tokens], dtype=torch.long
#                 )
#             ),
#             unknown_tokens,
#         )
#     return torch.tensor([])

In [6]:
dfa = pd.read_parquet("list_of_tokens.parquet")

In [7]:

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Assuming `embedding_layer` is a PyTorch layer, move it to the GPU
embedding_layer = embedding_layer.to(device)
device

device(type='cuda')

In [31]:
# !pip install dask

In [38]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import torch
from dask.diagnostics import ProgressBar

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
embedding_layer = embedding_layer.to(device)

def embed_tokens(tokens, unknown_tokens):
    if tokens is None or (isinstance(tokens, (float, int)) and pd.isna(tokens)):
        print("Tokens are None or NaN")
        return []

    if isinstance(tokens, (list, pd.Series, pd.Index, np.ndarray)):
        tokens = [token for token in tokens if pd.notna(token)]
        if not tokens:
            print("Tokens are empty after filtering NaNs")
            return []

        valid_tokens = [token for token in tokens if token in word_to_idx]
        unknown_tokens.update(set(tokens) - set(valid_tokens))
        
        if valid_tokens:
            print(f"Valid tokens found: {valid_tokens}")
            token_indices = torch.tensor([word_to_idx[token] for token in valid_tokens], dtype=torch.long).to(device)
            embeddings = embedding_layer(token_indices)
            return embeddings.cpu().numpy().tolist()
        
        print("No valid tokens found")
    else:
        print("Tokens are not a valid type")
    
    return []

In [None]:
dfa = dd.read_parquet('list_of_tokens.parquet', engine='pyarrow')
sample_size = 1000  # Set the desired sample size
small_sample_pandas_df = dfa.head(sample_size)  # This returns a Pandas DataFrame with the first `sample_size` rows
dfa = dd.from_pandas(small_sample_pandas_df, npartitions=1)

unknown_tokens = set()

dfa['query_embedding'] = dfa.map_partitions(
    lambda df: df['query_tokens'].apply(embed_tokens, args=(unknown_tokens,)),
    meta=('query_embedding', 'object')
)

dfa['relevant_document_embedding'] = dfa.map_partitions(
    lambda df: df['relevant_document_tokens'].apply(embed_tokens, args=(unknown_tokens,)),
    meta=('relevant_document_embedding', 'object')
)

dfa['irrelevant_document_embedding'] = dfa.map_partitions(
    lambda df: df['irrelevant_document_tokens'].apply(embed_tokens, args=(unknown_tokens,)),
    meta=('irrelevant_document_embedding', 'object')
)

with ProgressBar():
    dfa = dfa.compute()

In [None]:
# Now `unknown_tokens` should contain all the tokens that were not found during the embedding process
print(f"Number of unknown tokens: {len(unknown_tokens)}")

In [34]:
# Convert the Pandas DataFrame to a Dask DataFrame
dask_df = dd.from_pandas(dfa, npartitions=10)

# Define function to check if the embedding is non-empty
def is_non_empty(x):
    if isinstance(x, tuple):
        return len(x[0]) > 0
    else:
        return len(x) > 0

# Apply filtering to each partition to remove rows with empty embeddings
dfa_filtered = dask_df.map_partitions(
    lambda df: df.loc[
        df["query_embedding"].apply(is_non_empty)
        & df["relevant_document_embedding"].apply(is_non_empty)
        & df["irrelevant_document_embedding"].apply(is_non_empty)
    ],
    meta=dask_df._meta  # Use the Dask DataFrame's _meta attribute
)

# Compute the filtered dataframe
with ProgressBar():
    dfb = dfa_filtered.compute()

[########################################] | 100% Completed | 5.09 ss


In [37]:
dfb[
    [
        "query_id",
        "query_embedding",
        "relevant_document_embedding",
        "irrelevant_document_embedding",
    ]
].head(10)

Unnamed: 0,query_id,query_embedding,relevant_document_embedding,irrelevant_document_embedding
0,81114,"[[-0.5350781083106995, -0.31560149788856506, 0...",[],[]
1,81114,"[[-0.5350781083106995, -0.31560149788856506, 0...",[],[]
2,81114,"[[-0.5350781083106995, -0.31560149788856506, 0...",[],[]
3,81114,"[[-0.5350781083106995, -0.31560149788856506, 0...",[],[]
4,81114,"[[-0.5350781083106995, -0.31560149788856506, 0...",[],[]
5,81114,"[[-0.5350781083106995, -0.31560149788856506, 0...",[],[]
6,81114,"[[-0.5350781083106995, -0.31560149788856506, 0...",[],[]
7,81114,"[[-0.5350781083106995, -0.31560149788856506, 0...",[],[]
8,81114,"[[-0.5350781083106995, -0.31560149788856506, 0...",[],[]
9,30560,"[[0.15981212258338928, -0.5535925030708313, -0...",[],[]
