In [12]:
import dspy
from dsp.modules import GoogleVertexAI
import os
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
import pinecone
from dsp.utils import dotdict
from pinecone.grpc import PineconeGRPC as Pinecone
from pinecone import ServerlessSpec
from typing import List, Union, Optional
import time
import requests
from google.oauth2 import service_account
import streamlit as st

In [2]:
load_dotenv()

HF_API_TOKEN = os.environ.get("HF_API_TOKEN")
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
PINECONE_ENVIRONMENT = os.environ.get("PINECONE_ENVIRONMENT")

PINECONE_INDEX_NAME = "basic-embeddings"
PINECONE_HOST = "https://basic-embeddings-m8sj7l5.svc.aped-4627-b74a.pinecone.io"
VERTEX_MODEL_ID = "gemini-1.5-flash-001"
VERTEX_PROJECT_ID = "nvcc-dspy-rag"
VERTEX_REGION = "us-central1"
VERTEX_CREDENTIALS = "./vertex_credentials.json"

In [3]:
# Loads and caches the model
def load_embedding_model(api_token: str = HF_API_TOKEN):
    return SentenceTransformer("BAAI/bge-base-en-v1.5", use_auth_token=api_token)


def load_gemini_model(model_name: str = VERTEX_MODEL_ID,
                      project: str = VERTEX_PROJECT_ID,
                      location: str = VERTEX_REGION,
                      credentials: str = VERTEX_CREDENTIALS):
    credentials_obj = service_account.Credentials.from_service_account_file(credentials)
    gemini_flash = GoogleVertexAI(
        model_name=model_name,
        project=project,
        location=location,
        credentials=credentials_obj
    )
    return gemini_flash

In [15]:
class PineconeRM(dspy.Retrieve):
    def __init__(
        self,
        pinecone_index_name: str = PINECONE_INDEX_NAME,
        pinecone_api_key: str = PINECONE_API_KEY,
        pinecone_env: Optional[str] = PINECONE_ENVIRONMENT,
        k: int = 5
    ):
        """ """
        super().__init__(k=k)
        self._embedding_model = load_embedding_model()
        self._pinecone_index = self._connect_pinecone_index()
        
        
    def _init_pinecone(
        self,
        index_name: str = PINECONE_INDEX_NAME,
        api_key: Optional[str] = PINECONE_API_KEY,
        environment: Optional[str] = PINECONE_ENVIRONMENT,
        host: str = PINECONE_HOST,
        cloud: str = "aws",
        region: str = "us-east-1",
        dimension: Optional[int] = 768,
        distance_metric: Optional[str] = "cosine",
    ) -> pinecone.Index:
        """
        Initialize pinecone and return the loaded index.

        Args:
            index_name (str): The name of the index to load. If the index is not does not exist, it will be created.
            api_key (str, optional): The Pinecone API key, defaults to env var PINECONE_API_KEY if not provided.
            environment (str, optional): The environment (ie. `us-west1-gcp` or `gcp-starter`). Defaults to PINECONE_ENVIRONMENT.

        Returns:
            pinecone.Index: The loaded index.
        """
        
        pc = Pinecone(api_key=api_key)
        
        pc.create_index(
                name=index_name,
                dimension=dimension,
                metric=distance_metric,
                spec=ServerlessSpec(
                  cloud=cloud,
                  region=region
                ),
                deletion_protection="disabled"
            )
        index = pc.Index(index_name, host=host)

        return index
    
    def _connect_pinecone_index(
        index_name: str = PINECONE_INDEX_NAME,
        api_key: str = PINECONE_API_KEY,
        host: str = PINECONE_HOST,
        environment: Optional[str] = PINECONE_ENVIRONMENT
    ) -> pinecone.Index:
        """
        Creates a connection to a existing pinecone index and returns the index
        """
        pc = Pinecone(api_key=api_key)
        index = pc.Index(index_name, host=host)
    
        return index
    
    def _get_embedding(
        self, 
        query: str,
    ) -> List[float]:
        """
        Return query vector after creating embedding using HuggingFace BAAI/bge-base-en-v1.5

        Args:
            queries (list): Query string to embed.

        Returns:
            List[float]: Embedding corresponding to each query.
        """
        embedding = list(self._embedding_model.encode(str(query), convert_to_numpy=True))
        
        return embedding


    def forward(self, query: str, k: Optional[int]) -> dspy.Prediction:
        """
        Search with pinecone for top self.k passages most similar to the input query

        Args:
            query (str): The query string we are receiving similar results for

        Returns:
            dspy.Prediction: An object containing the retrieved passages.
        """
        embeddings = self._get_embedding(query)

        results_dict = self._pinecone_index.query(
            vector = embeddings,
            top_k = k,
            include_metadata = True
        )
        
        sorted_results = sorted(
            results_dict["matches"],
            key=lambda x: x.get("scores", 0.0),
            reverse=True,
        )
        passages = [result["metadata"]["text"] for result in sorted_results]
        passages = [dotdict({"long_text": passage}) for passage in passages]
        return dspy.Prediction(passages=passages)

In [16]:
gemini_flash = load_gemini_model()
pinecone_retriever = PineconeRM
dspy.settings.configure(lm=gemini_flash, rm=pinecone_retriever)


class GenerateAnswerWithContext(dspy.Signature):
    """Answer the question based on the context and query provided, and on the scale of 10 tell how confident you are about the answer."""

    context = dspy.InputField(desc="may contain relevant facts to consider")
    question = dspy.InputField()
    answer = dspy.OutputField(desc="answer from context")


class RAG(dspy.Module):
    def __init__(self, num_passages=5):
        super().__init__()
        self.retrieve = dspy.Retrieve(k = num_passages)
        self.generate_answer = dspy.ChainOfThought(GenerateAnswerWithContext)
    
    def forward(self, query):
        context = self.retrieve(query).passages
        prediction = self.generate_answer(context=context, question=query)
        return dspy.Prediction(context=context, answer=prediction.answer)

In [17]:
rag = RAG()
txt = "What are the core values at NOVA"
ans = rag(query=txt).answer
print(ans)



AttributeError: 'PineconeRM' object has no attribute 'long_text'