In [47]:
import dspy
import os
from dotenv import load_dotenv
from pydantic import BaseModel
from typing import List
from datetime import datetime
from pymongo import MongoClient



load_dotenv()

GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')

In [48]:
# MongoDB connection
client = MongoClient(os.getenv('MONGODB_URI'))
db = client['renai']
collection = db['events']
# # Create a new client and connect to the server
# db = client[os.getenv('MONGODB_DB_NAME')]
# collection = db[os.getenv('MONGODB_COLLECTION_NAME')]

In [49]:
# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [50]:
lm = dspy.LM('gemini/gemini-2.0-flash-exp', api_key=GEMINI_API_KEY)
dspy.configure(lm=lm)

In [63]:

# search = dspy.retrievers.Embeddings(embedder=embedder, corpus=corpus, k=topk_docs_to_retrieve)

In [52]:
from difflib import SequenceMatcher


In [66]:
class Entity(BaseModel):
    entity: str
    type: str


class ExtractEntities(dspy.Signature):
    """Extract structEntitiesured from text."""

    question: str = dspy.InputField()

    entities: List[Entity] = dspy.OutputField(desc="a list of entities and their metadata")

class RAG(dspy.Module):
    def __init__(self):
        self.cot1 = dspy.ChainOfThought(ExtractEntities)
        self.respond = dspy.ChainOfThought('context, question -> response')

    def forward(self, question):
        entities = self.cot1(question=question).entities
        # Fetch records from MongoDB
        matching_records = []
        for doc in collection.find():
            for entity in entities:
                for doc_entity in doc.get('entities', []):
                    similarity = SequenceMatcher(None, entity.entity, doc_entity['entity']).ratio()
                    if similarity >= 0.7:
                        matching_records.append(doc)
                        break  # Stop checking other entities in this document if a match is found
                if doc in matching_records:
                    break  # Stop checking other entities if a match is found
        # print(matching_records)

                # Combine the extracted entities and matching records into a single string
        corpus = []
        for record in matching_records:
            context = (
                f"Action: {record.get('action', 'N/A')}\n"
                f"Type: {record.get('type', 'N/A')}\n"
                f"Date: {record.get('date', 'N/A')}\n"
                f"Location: {record.get('location', 'N/A')}\n"
                f"Entities:\n"
            )
            context += "\n".join([f"  - {e['entity']} ({e['type']}, {e['role']})" for e in record.get('entities', [])])
            corpus.append(context)

        embedder = dspy.Embedder('gemini/text-embedding-004')
        search = dspy.retrievers.Embeddings(embedder=embedder, corpus=corpus, k=5)
        context = search(question).passages
        return self.respond(context=context, question=question)
        
        # print("Contexts for RAG:", contexts)


      
        # print(entities)
        # context = search(question).passages
        # return self.respond(context=context, question=question)


In [67]:
rag = RAG()
rag(question="When did the IPhone 14 get released?")

Prediction(
    reasoning='The context states that Apple Inc. announced the iPhone 14 on 2025-01-18. The question asks when the iPhone 14 was released. Therefore, the answer is 2025-01-18.',
    response='The iPhone 14 was announced on 2025-01-18.'
)