In [1]:
import pandas as pd
import pyarrow.parquet as pq
import sqlite3

In [None]:
pf = pq.ParquetFile("../data.parquet")
conn = sqlite3.connect("dictionary.db")
for batch in pf.iter_batches(batch_size=100):
    df = batch.to_pandas()
    df[["word", "definition"]].to_sql("dictionary", conn, if_exists="append", index=False)

In [21]:
class WordNotFoundException(Exception):
    def __init__(self, word):
        self.message = f"The word '{word}' does not exist in the dictionary."
        super().__init__(self.message)

class DatabaseFailureException(Exception):
    def __init__(self, error_message):
        self.message = f"Database failure: {error_message}"
        super().__init__(self.message)

class Database:
    def __init__(self, database_name):
        self.db = sqlite3.connect(database_name)
        self.db.row_factory = sqlite3.Row
        
    def populate(self, dataset_name):
        pf = pq.ParquetFile(dataset_name)
        current_id = 0
        for batch in pf.iter_batches(batch_size=1000):
            df = batch.to_pandas()
            df = df[["word", "definition"]]
            df.insert(0, "id", range(current_id, current_id + len(df)))
            df.to_sql("dictionary", self.db, if_exists="append", index=False)
            current_id += len(df)

    def word_exists(self, word):
        try:
            query = "SELECT 1 FROM dictionary WHERE word = ? LIMIT 1"
            cursor = self.db.execute(query, (word,))
            exists = cursor.fetchone() is not None
            return exists
        except Exception as e:
            raise DatabaseFailureException(str(e))

    def get_id(self, word):
        if not self.word_exists(word):
            raise WordNotFoundException(word)

        try:
            query = "SELECT id FROM dictionary WHERE word = ?"
            cursor = self.db.execute(query, (word,))
            result = cursor.fetchone()
            word_id = result["id"]
            return word_id
        except Exception as e:
            raise DatabaseFailureException(str(e))

    def get_dictionary_records(self, ids):
        try:
            placeholder = ",".join("?" for _ in ids)
            query = f"SELECT id, word, definition FROM dictionary WHERE id IN ({placeholder})"
            cursor = self.db.execute(query, ids)
            result = cursor.fetchall()
            records = [dict(row) for row in result]
            dictionary_records = {item["id"]: item for item in records}
            return dictionary_records
        except Exception as e:
            raise DatabaseFailureException(str(e))

In [26]:
db = Database("dictionary.db")
db.populate("./data.parquet")

In [27]:
db = Database("dictionary.db")
db.get_dictionary_records([4, 1000, 2024])

{4: {'id': 4,
  'word': 'abalone',
  'definition': 'A type of marine mollusk known for its shell.'},
 1000: {'id': 1000,
  'word': 'ancient',
  'definition': 'Belonging to the very distant past.'},
 2024: {'id': 2024,
  'word': 'barbel',
  'definition': 'A type of fish with elongated whiskers.'}}

In [29]:
import hnswlib
import numpy as np

class VectorSearch:
    def __init__(self, index_name, embedding_size, word_count, M=16, ef_construction=200, ef=256):
        self.index_name = index_name
        self.embedding_size = embedding_size
        self.word_count = word_count
        self.M = M
        self.ef_construction = ef_construction
        self.ef = ef

        self.index = hnswlib.Index(space = 'cosine', dim = embedding_size)
        self.index.init_index(max_elements = word_count, ef_construction=ef_construction, M=M)
        self.index.set_ef(ef)

    def load(self):
        self.index.load_index(self.index_name, max_elements = self.word_count)
        self.index.set_ef(self.ef)

    def populate(self, dataset_name):
        pf = pq.ParquetFile(dataset_name)
        current_id = 0
        for batch in pf.iter_batches(batch_size=1000):
            df = batch.to_pandas()
            embeddings = np.stack(df["embedding"].to_numpy())
            self.index.add_items(embeddings, range(current_id, current_id + len(df)))
            current_id += len(df)
        self.index.save_index(self.index_name)

    def search(self, word_id, top_n):
        embedding = self.index.get_items([word_id])[0]
        labels, distances = self.index.knn_query([embedding], k=top_n + 1)

        index_records = {}
        for label, distance in zip(labels[0], distances[0]):
            similarity = 1.0 if label == word_id else 1.0 - distance
            index_records[label] = {"id": label, "similarity": similarity}
        return index_records

In [12]:
vector_search = VectorSearch('index.bin', 1536, 28032)
vector_search.populate("./data.parquet")

In [30]:
vector_search = VectorSearch('index.bin', 1536, 28032)
vector_search.load()



In [31]:
vector_search.search(4, 8+1)

{4: {'id': 4, 'similarity': 1.0},
 17708: {'id': 17708, 'similarity': 0.5715104341506958},
 8552: {'id': 8552, 'similarity': 0.5369500517845154},
 22041: {'id': 22041, 'similarity': 0.5336644649505615},
 4349: {'id': 4349, 'similarity': 0.5271939039230347},
 1120: {'id': 1120, 'similarity': 0.5264646410942078},
 14653: {'id': 14653, 'similarity': 0.5210265517234802},
 770: {'id': 770, 'similarity': 0.5058796405792236},
 22661: {'id': 22661, 'similarity': 0.49759751558303833},
 22777: {'id': 22777, 'similarity': 0.48571449518203735}}