In [None]:
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec
from langchain_openai import OpenAIEmbeddings
from langchain_community.retrievers import (
    PineconeHybridSearchRetriever,
)
from pinecone_text.sparse import BM25Encoder
import time
import os
from dotenv import load_dotenv

load_dotenv()

pinecone_api_key = os.getenv("PINECONE_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")

embeddings = OpenAIEmbeddings(model="text-embedding-3-small", openai_api_key=openai_api_key)

class manage_index:
    def __init__(self, api_key):
        self.pc = Pinecone(api_key=api_key)

    def create_index(self, index_name, dimensions):
        existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()]
        if index_name not in existing_indexes:
            print(f"Creating index {index_name}")
            self.pc.create_index(
                name=index_name,
                dimension=dimensions,
                metric="dotproduct",
                spec=ServerlessSpec(cloud="aws", region="us-east-1"),
            )
            while not self.pc.describe_index(index_name).status["ready"]:
                time.sleep(1)

            print(f"Index {index_name} created successfully")
        else:
            print(f"Index {index_name} already exists")

    def retrieve_index(self, index_name, embeddings, namespace=None):
        existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()]
        if namespace is not None:
            try:
                if index_name in existing_indexes:
                    vector_store = PineconeVectorStore(index=self.get_index(index_name), embedding=embeddings, namespace=namespace)
                    print(f"From {index_name} index with namespace {namespace} retrieved successfully")
                    return vector_store
                else:
                    print(f"Index {index_name} does not exist")
            except Exception as e:
                print(f"Error retrieving index {index_name}: {e}")
        else:
            try:
                if index_name in existing_indexes:
                    vector_store = PineconeVectorStore(index=self.get_index(index_name), embedding=embeddings)
                    print(f"Index {index_name} retrieved successfully")
                    return vector_store
                else:
                    print(f"Index {index_name} does not exist")
            except Exception as e:
                print(f"Error retrieving index {index_name}: {e}")

    def list_indexes(self):
        existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()]
        return existing_indexes

    def delete_index(self, index_name):
        existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()]
        if index_name in existing_indexes:
            self.pc.delete_index(index_name)
            print(f"Index {index_name} deleted successfully")
        else:
            print(f"Index {index_name} does not exist")

    def get_index(self, index_name):
        existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()]
        if index_name in existing_indexes:
            return self.pc.Index(index_name)
        else:
            print(f"Index {index_name} does not exist")


handler = manage_index(pinecone_api_key)
print(handler.list_indexes())
handler.create_index("faizan", len(embeddings.embed_query("HI")))
# index = handler.retrieve_index("faizan", embeddings)
# handler.delete_index("faizan")
get_index = handler.get_index("faizan")

In [None]:
import nltk
nltk.download('punkt_tab')

In [47]:
bm25_encoder = BM25Encoder().default()

retriever = PineconeHybridSearchRetriever(
    embeddings=embeddings, sparse_encoder=bm25_encoder, index=get_index
)

In [None]:
retriever.add_texts(["Hi", "Hello", "World", "Hello World"])

In [None]:
result = retriever.invoke("HI")
print(result)