In [2]:
!nvidia-smi -L

GPU 0: Tesla T4 (UUID: GPU-c7932e9d-60b8-c107-e5aa-78f001a919ed)


In [3]:
import json
import pandas as pd
import os
import re
import string
import asyncio
from tqdm.auto import tqdm
tqdm.pandas()

In [4]:
import config

In [5]:
# config.REDIS_HOST

In [6]:
DATA_PATH='/home/jovyan/arxiv/arxiv-metadata-oai-snapshot.json'
YEAR_CUTOFF = 2012
YEAR_PATTERN = r"(19|20[0-9]{2})"
ML_CATEGORY = "cs.LG"

In [7]:
!ls -lah {DATA_PATH}

-rw-rw-r-- 1 jovyan jovyan 3.4G Nov  6 00:15 /home/jovyan/arxiv/arxiv-metadata-oai-snapshot.json


In [8]:
def process(paper: dict):
    paper = json.loads(paper)
    if paper['journal-ref']:
        years = [int(year) for year in re.findall(YEAR_PATTERN, paper['journal-ref'])]
        years = [year for year in years if (year <= 2022 and year >= 1991)]
        year = min(years) if years else None
    else:
        year = None
    return {
        'id': paper['id'],
        'title': paper['title'],
        'year': year,
        'authors': paper['authors'],
        'categories': ','.join(paper['categories'].split(' ')),
        'abstract': paper['abstract']
    }

def papers():
    with open(DATA_PATH, 'r') as f:
        for paper in f:
            paper = process(paper)
            if paper['year']:
                if paper['year'] >= YEAR_CUTOFF and ML_CATEGORY in paper['categories']:
                    yield paper

In [9]:
df = pd.DataFrame(papers())
len(df)

11419

In [10]:
# Avg length of the abstracts
df.abstract.apply(lambda a: len(a.split())).mean()

169.84534547683685

In [11]:
from indexer import RedisIndexer, ColBERTFormator, preprocess

from model import ColBERTModel

In [12]:
# sample only fraction of data
df_sampled = df.sample(frac=0.1)
len(df_sampled)

1142

In [13]:
model = ColBERTModel()
formator = ColBERTFormator(doc_prefix="doc", vec_prefix="doc_vec")

using device: cuda:0


Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertModel: ['vocab_transform.weight', 'vocab_layer_norm.weight', 'vocab_transform.bias', 'vocab_layer_norm.bias', 'vocab_projector.weight', 'vocab_projector.bias']
- This IS expected if you are initializing DistilBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [14]:
# indexer = RedisIndexer(index_name="doc", url=config.REDIS_URL)
# await indexer.init_redis()

# embs = []
# for i, text in enumerate(tqdm(df_sampled.apply(preprocess, axis=1).tolist())):
#     emb = model.compute_document_representation(text)
#     # emb.tolist()
#     vecs = formator.create_vec_formatting(i, emb)
#     # print(docs)
#     await indexer.write(vecs)
#     docs = formator.create_doc_formatting(i, emb, text)
#     await indexer.write(docs)
# await indexer.print_dbsize()

In [15]:
import numpy as np

In [16]:
doc_prefix="paper"
vec_prefix="doc_vec"

def get_doc_key(key):
    return f"{doc_prefix}:{key}"
def get_vec_key(key):
    return f"{vec_prefix}:{key}"

In [17]:
df_vec = pd.DataFrame()
df_doc = pd.DataFrame()

embs = []
for doc_id, text in enumerate(tqdm(df_sampled.apply(preprocess, axis=1).tolist())):
    emb = model.compute_document_representation(text)
    
    vecs= emb
    data =text
    # add to doc df    
    vecs = vecs.astype(np.float32)
    vecs_bytes = vecs.tobytes()
    key = get_doc_key(f'{doc_id}')
    doc_columns = ['id', 'doc_id', 'doc', 'vector_matrix']
    docs = [[key, str(doc_id), data, vecs_bytes]]
    # doc = {
    #     'id': key,
    #     'doc_id': str(doc_id),
    #     'doc': data,
    #     'vector_matrix': vecs_bytes
    # }
    # df_doc = df_doc.append(doc, ignore_index = True)
    df_doc = pd.concat([pd.DataFrame(docs, columns=doc_columns), df_doc], ignore_index = True)
    
        
    # add to vec df        
    docvec_columns = ['id', 'doc_id', 'vec_id', 'vector']
    doc_vecs = []
    for vec_id, vec in enumerate(vecs):
        key = get_vec_key(f'{doc_id}-{vec_id}')
        vec_bytes = vec.tobytes()
        doc_vec = [key,str(doc_id),str(vec_id),vec_bytes]
        doc_vecs.append(doc_vec)
    df_vec = pd.concat([pd.DataFrame(doc_vecs, columns=docvec_columns), df_vec], ignore_index = True)

  0%|          | 0/1142 [00:00<?, ?it/s]

In [18]:
len(df_vec), len(df_doc), len(df)

(248895, 1142, 11419)

In [19]:
(df_vec.memory_usage(index=True, deep=True).sum()), (df_doc.memory_usage(index=True, deep=True).sum()),(df.memory_usage(index=True, deep=True).sum())

(822364569, 766265996, 18737794)

In [20]:
# df_vec[:2].to_dict(orient='records')

In [21]:
# indexer = RedisIndexer(index_name="doc", url=config.REDIS_URL)
# await indexer.init_redis()

# await indexer.write(df_vec.to_dict(orient='records'))
# print(await indexer.print_dbsize())
# await indexer.write(df_doc.to_dict(orient='records'))
# print(await indexer.print_dbsize())


In [22]:
# indexer = RedisIndexer(index_name="doc", url=config.REDIS_URL)
# await indexer.init_redis()

# # await indexer.write(df_vec.to_dict(orient='records'))
# # print(await indexer.print_dbsize())
# # await indexer.write(df_doc.to_dict(orient='records'))
# # print(await indexer.print_dbsize())
    
# data_ = df_vec.to_dict(orient='records')
# for data_elem in tqdm(data_):
#     await indexer._RedisIndexer__write_to_redis(data_elem['id'], data_elem)
# print(await indexer.print_dbsize())

# data_ = df_doc.to_dict(orient='records')
# for data_elem in tqdm(data_):
#     await indexer._RedisIndexer__write_to_redis(data_elem['id'], data_elem)
# print(await indexer.print_dbsize())

In [23]:
import redis.asyncio as redis
import asyncio

In [47]:
async def gather_with_concurrency(redis_conn, *data_elems):
    semaphore = asyncio.Semaphore(50)
    async def load_elems(data_elem):       
        # print(len(data_elem))
        async with semaphore:            
            # save vector data
            key = data_elem['id']            
            await redis_conn.hset(
                key,
                mapping=data_elem,
            )
    # gather with concurrency
    await asyncio.gather(*[load_elems(elem) for elem in tqdm(data_elems)])
    # load_elems(data_elems[0])

In [45]:
redis_conn = await redis.from_url(config.REDIS_URL)

In [48]:
# df_vec.to_dict(orient='records')[0]
doc_vecs =df_vec.to_dict(orient='records')
await gather_with_concurrency(redis_conn, *doc_vecs)

  0%|          | 0/248895 [00:00<?, ?it/s]

In [None]:
print(await indexer.print_dbsize())

In [71]:
# df_doc['id'] = df_doc.apply(lambda r: r.id.replace("doc","paper"), axis=1)

In [73]:
docs = df_doc[['id', 'doc_id', 'doc']].to_dict(orient='records')
await gather_with_concurrency(redis_conn, *docs)

  0%|          | 0/1142 [00:00<?, ?it/s]

In [58]:
from redis.commands.search.field import (
    TagField,
    VectorField,
    NumericField,
    TextField
)
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.field import VectorField

In [59]:
block_size = 10000
vector_field = VectorField(
    "vector",
    "FLAT", {
        "TYPE": "FLOAT32",
        "DIM": 768,
        "DISTANCE_METRIC": "IP",
        "INITIAL_CAP": block_size,
        "BLOCK_SIZE": block_size
    }
)
doc_id = TagField("doc_id")
vec_id = TagField("vec_id")
vec_fields = [
    vector_field,
    doc_id,
    vec_id
]

In [60]:
async def create_index(
    redis_conn,
    index_name,
    *fields,
    prefix: str,
    overwrite=True
):
    # Create Index
    if overwrite:
        await redis_conn.ft(index_name).dropindex(delete_documents=False)
    await redis_conn.ft(index_name).create_index(
        fields = fields,
        definition= IndexDefinition(prefix=[prefix], index_type=IndexType.HASH)
    )

In [61]:

await create_index(redis_conn, "doc_vec", *vec_fields,prefix="doc_vec", overwrite=False)


In [65]:
block_size = 1000
doc_id = TagField("doc_id")
doc = TagField("doc")
doc_fields = [
    vector_field,
    doc_id,
    vec_id,
    doc
]

In [74]:
await create_index(redis_conn, "paper", *doc_fields,prefix="paper", overwrite=False)


In [17]:
# import pickle

# # Export to file!
# with open('arxiv_colbert_embeddings_1000.pkl', 'wb') as f:
#     data = pickle.dumps(df)
#     f.write(data)