In [None]:
!pip install "grpcio<=1.58.0,>=1.49.1" pinecone-client sentence-transformers tqdm nltk numpy

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pinecone
import uuid
from sentence_transformers import SentenceTransformer
from nltk.tokenize import word_tokenize
import nltk
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np
import sys
import os

In [None]:
nltk.download('punkt')

In [None]:
import zipfile
import os

def unzip_file(zip_path, extract_to="./unzipped_data"):
    if not os.path.exists(extract_to):
        os.makedirs(extract_to)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
        print(f"Extracted all files to {extract_to}")

unzip_file("./drive/MyDrive/data.zip")

In [None]:
model = SentenceTransformer(
    "all-MiniLM-L6-v2", device="cuda"
)

In [None]:
class Embedder:
    def __init__(self, dir_paths, pinecone_api_key, collection_name, env, batch_size=30000, model='all-MiniLM-L6-v2'):
        self.dir_paths = dir_paths
        self.batch_size = batch_size
        self.model_name = model
        self.collection_name = collection_name
        self.init_pinecone(pinecone_api_key, env)
        self.record_limit = 300000

    def init_pinecone(self, api_key, env):
        pinecone.init(api_key=api_key, environment=env)

        # Delete the index if it already exists
        if self.collection_name in pinecone.list_indexes():
            print(f"deleting pinecone index: {self.collection_name}")
            pinecone.delete_index(self.collection_name)

        # Create the index
        print(f"creating index: {self.collection_name}")
        pinecone.create_index(self.collection_name, dimension=384)
        self.index = pinecone.Index(self.collection_name)
        print(f"successfully created pinecone index: {self.collection_name}")


    def tokenize_text(self, text):
        return word_tokenize(text)

    def embed_text(self, text_batch):
        model = SentenceTransformer(self.model_name, device="cuda")
        return model.encode(text_batch, show_progress_bar=True)

    def insert_embeddings(self, embedding_data, json_data):
        max_batch_size = 1000  # max vectors per pinecone upsert

        with ThreadPoolExecutor(max_workers=20) as executor:
            futures = []
            for i in range(0, len(embedding_data), max_batch_size):
                batch_embeddings = embedding_data[i:i + max_batch_size]
                batch_json = json_data[i:i + max_batch_size]

                future = executor.submit(self.process_and_insert_batch, batch_embeddings, batch_json)
                futures.append(future)

            for future in futures:
                future.result()

    def gen_ids(self, num_ids):
        return [str(uuid.uuid4()) for _ in range(num_ids)]

    def embeddings_to_list(self, embeddings):
        return [embedding.tolist() if isinstance(embedding, np.ndarray) else embedding for embedding in embeddings]

    def process_and_insert_batch(self, embeddings, json_data, max_batch_size_bytes=2 * 1024 * 1024): # 2MB
        # generate unique id for each embedding
        ids = self.gen_ids(len(embeddings))

        # convert embeddings from np arrays to list
        embeddings_list = self.embeddings_to_list(embeddings)

        current_batch = []
        current_batch_size = 0

        for id, embedding, metadata in zip(ids, embeddings_list, json_data):
            serialized_metadata = self.jsonify_metadata(metadata)
            if not serialized_metadata:
                continue

            item = (id, embedding, json.loads(serialized_metadata))
            item_size = sys.getsizeof(json.dumps(item))

            if current_batch_size + item_size > max_batch_size_bytes:
                self.index.upsert(vectors=current_batch)
                current_batch = [item]
                current_batch_size = item_size
            else:
                current_batch.append(item)
                current_batch_size += item_size

        if current_batch:
            self.index.upsert(vectors=current_batch)
            print(f"Successfully inserted {len(current_batch)} vectors and documents into Pinecone.")

    def jsonify_metadata(self, metadata):
        try:
            metadata_dict = json.loads(metadata) if isinstance(metadata, str) else metadata

            serialized_metadata = json.dumps(metadata_dict)
            if sys.getsizeof(serialized_metadata) > 40960: # max pinecone upsert limit
                return None

            return serialized_metadata
        except json.JSONDecodeError:
            print(f"failed to encode json: {metadata}")
            return None

    def batchify(self, texts):
        for i in range(0, len(texts), self.batch_size):
            yield texts[i:i + self.batch_size]

    def join_line(self, lines):
        res = []
        for line in lines:
            title = json.loads(line).get('title', '')
            selftext = json.loads(line).get('selftext', '')
            body = json.loads(line).get('body', '')
            tokenized_text = ' '.join(self.tokenize_text(title + selftext + body))
            res.append(tokenized_text)
        return res


    def process_lines(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            count = 0
            while count < self.record_limit:
                lines = [line.strip() for line in file]
                if not lines:
                    break
                texts = self.join_line(lines)
                embeddings = self.embed_text(texts)
                self.insert_embeddings(embeddings, lines)
                count += len(lines)
                print(f"inserted {len(lines)} vectors from {file_path} into pinecone")

    def run(self):
        print("Starting Embedder...")
        for dir_path in self.dir_paths:
            print(f"Processing directory: {dir_path}")
            file_paths = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f))]
            for file_path in file_paths:
                self.process_lines(file_path)


if __name__ == '__main__':
    pinecone_api_key = ''
    env = ''
    collection_name = 'test-search'
    embedder = Embedder(["./unzipped_data/sample_data/posts", "./unzipped_data/sample_data/comments"], pinecone_api_key, collection_name, env)
    embedder.run()
