## Notebook Contains

A Rag Agent using `Cewai` with short term memory

## Importing Libraries

In [1]:
# Importing Necessaet Libraries
import os
import sys
import io

from crewai import LLM, Agent, Task, Crew, Process
from crewai.tools import BaseTool

from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [2]:
# --- Configuration --- #
MODEL_NAME = "ollama/openhermes:latest"
DATA_FILE = "Data"
DATA_FILE = os.path.join(DATA_FILE, "HR_Policy.txt")
VECTORSTORE_DIR = "hr_policy_vectorstore"
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 150

In [3]:
# --- Configuring the LLM --- #
llm = LLM(
    model= MODEL_NAME,
    temperature=0.7
)

## Vector Database Preparation

In [4]:
# Loading and Chunking the Data
def load_and_chunk_documents():
    """"Loads the HR policy data from the TXT file and splits it into smaller chunks."""
    print("\t --- Step 1: Loading and Chunking Data ---")

    if not os.path.exists(DATA_FILE):
        print(f"Data file {DATA_FILE} does not exist.")
        return None

    # Read the text file
    with open(DATA_FILE, 'r', encoding='utf-8') as file:
        full_text = file.read()
    
    # Split the text into chunks
    documents = [full_text]

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size= CHUNK_SIZE,
        chunk_overlap= CHUNK_OVERLAP
    )

    chunks = text_splitter.create_documents(documents)
    print(f"Data loaded and split into {len(chunks)} chunks.")
    return chunks
    

In [5]:
# Creating and embedding the Vector Store
def create_and_store_embeddings(chunks):
    """Creates a vector store from the chunks of data."""
    print("\t --- Step 2: Creating Vector Store ---")

    if not chunks:
        print("No chunks to create vector store.")
        return None

    # Initialize the embedding model from HuggingFace
    print(f"Loading embedding model: '{EMBEDDING_MODEL}'...")
    embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)

    # Create the Chroma vector store
    # This will process all chunks and store their vector representations.
    # It will be persisted to disk in the VECTORSTORE_DIR.
    print(f"Creating vector store in '{VECTORSTORE_DIR}'...")
    Chroma.from_documents(
        chunks,
        embeddings,
        persist_directory=VECTORSTORE_DIR
    )
    print("Vector store created and persisted successfully.")

In [6]:
# Manually download the dataset first as per the instructions in the function.
doc_chunks = load_and_chunk_documents()
if doc_chunks:
    create_and_store_embeddings(doc_chunks)
print("\n\t--- Data Ingestion Complete ---")
print(f"Vector store is ready in the '{VECTORSTORE_DIR}' directory.")

	 --- Step 1: Loading and Chunking Data ---
Data loaded and split into 25 chunks.
	 --- Step 2: Creating Vector Store ---
Loading embedding model: 'all-MiniLM-L6-v2'...
Creating vector store in 'hr_policy_vectorstore'...
Vector store created and persisted successfully.

	--- Data Ingestion Complete ---
Vector store is ready in the 'hr_policy_vectorstore' directory.


## Creating RAG Agent

In [7]:
# --- 1. RAG Agent with Crew.ai --- #
# Initialize embeddings and vector store once to be used by the tool
print("Loading embeddings and vector store for RAG tool...")
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
vectorstore = Chroma(
    persist_directory=VECTORSTORE_DIR,
    embedding_function=embeddings
)
retiver = vectorstore.as_retriever(search_kwargs={"k": 3}) # Retrieve top 3 results

Loading embeddings and vector store for RAG tool...


In [8]:
# Defining retrieval tool
class RetrieveHRPolicy(BaseTool):
    name: str = "RetrieveHRPolicy"
    description: str = "Retrives relevant HR policy information based on the query."

    def _run(self, query: str) -> str:
        """Run the retrieval tool to get relevant HR policy information."""
        # print(f"Retrieving information for query: {query}")
        results = retiver.invoke(query)

        if not results:
            return "No relevant information found in the vector database."
        
        context = "\n".join([doc.page_content for doc in results])
        # print("\t--- RAG Tool Finished ---")
        return f"Retrieved context: \n{context}"

In [9]:
# retrival_tool = RetrieveHRPolicy()

# retrival_tool._run("What is the process for approval to request time off?")  # Test the tool

## Creating Supervisor Agent with memory
This class will act as long term memory (persisted between runs).

In [10]:
import sqlite3
import datetime

class SQLiteMemory:
    def __init__(self, db_path="memory.db"):
        self.conn = sqlite3.connect(db_path)
        self.cur = self.conn.cursor()
        self.cur.execute("""
            CREATE TABLE IF NOT EXISTS memory (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                role TEXT,
                content TEXT,
                timestamp TEXT
            )
        """)
        self.conn.commit()

    def save(self, role, content):
        ts = datetime.datetime.now().isoformat()
        self.cur.execute("INSERT INTO memory (role, content, timestamp) VALUES (?, ?, ?)", 
                         (role, content, ts))
        self.conn.commit()

    def load_recent(self, limit=10):
        self.cur.execute("SELECT role, content FROM memory ORDER BY id DESC LIMIT ?", (limit,))
        return self.cur.fetchall()


## Creating Python Coad Runner (Coading Agent)

In [11]:
class CodingAgent(BaseTool):
    name: str = "CodingAgent"
    description: str = """A coding agent that can write and execute Python code based on user queries.
    It returns the output of the code if successful, or an error message if an error occurs.
    The query must be a valid Python code block.
    """

    def _run(self, query: str) -> str:
        """Run the coding agent to write and execute Python code based on the query."""
        
        # We need to capture the output, so we redirect stdout to a string buffer.
        old_stdout = sys.stdout
        redirected_output = io.StringIO()
        sys.stdout = redirected_output
        
        try:
            # Execute the user's query as a code block.
            # The 'exec' function can run multi-line statements, including imports and function definitions.
            exec(query)
            
            # Get the output from the redirected buffer.
            output = redirected_output.getvalue()
            
            # Restore the original stdout.
            sys.stdout = old_stdout
            
            # If there's no output, we return a message indicating success.
            if not output.strip():
                return "Code executed successfully with no output."
            
            # Return the captured output.
            return output
            
        except Exception as e:
            # If any error occurs during execution, capture it and return the error message.
            # Restore the original stdout first.
            sys.stdout = old_stdout
            return f"An error occurred while executing the code: {e}"

In [12]:
coding_tool = CodingAgent()

print(coding_tool._run("print('Hello')"))
# print(run_python_code("print('Hello World!!!')"))

Hello



In [13]:
class DirectAnswerTool(BaseTool):
    name: str = "DirectAnswerTool"
    description: str = "A tool for the supervisor to give a direct, final answer to a query."

    def _run(self, query: str) -> str:
        # This tool's purpose is to signal to the crew that a direct answer has been given.
        return f"Supervisor directly answered the query: {query}"

In [None]:
# Instantiate the tools
coding_tool = CodingAgent()
helper_tool = DirectAnswerTool()
retrival_tool = RetrieveHRPolicy()

# --- Defining All Agents --- #
# Define Coding Agent
coding_agent = Agent(
    role="coding_agent",
    goal="""Write python code based on user queries and answer executing code.""",
    backstory= """You are a helpful assistant. You can write and execute Python code based on user queries and return the output,
      executing the code.""",
    verbose=False,
    tools=[coding_tool],
    llm=llm
)

# # Defining Helpful Agent
# helper_agent = Agent(
#     role= "helper_agent",
#     goal= "Answer user query based on your knowladge",
#     backstory= "You are a helpful assistant. You try to answer user Question.",
#     verbose= False,
#     tools= [helper_tool],
#     llm= llm
# )

# define Agent first
rag_agent = Agent(
    role="RAG Agent",
    goal="""Answering user queries from a vector database. You must use the retrieval tool for every query. 
        If the tool's response is 'No relevant documents found.', you must respond with "I don't know the answer."
        Otherwise, use the retrieved documents to formulate your answer. Do not use any other tools or methods.""",
    backstory= """You are a helpful assistant. You can answer user queries from a vector database created from HR policies.""",
    verbose=False,
    tools=[retrival_tool],
    llm=llm
)

# Defining Supervisor Agent
supervisor_agent = Agent(
    role="Supervisor Agent",
    goal="""Analyze user queries and decide whether to route the request to the Coding Agent or the Helper Agent. Be very strict in your decision-making.""",
    backstory="""You are a top-tier assistant that acts as a router for a team of specialized agents.
        Your primary function is to classify user requests. You MUST ALWAYS route the query to one of the following agents, by name:
        - Coding Agent: For tasks that explicitly require writing, running, or debugging code (e.g., "write a Python script", "calculate X using code", "debug this function").
        - RAG Agent: For HR policies question (e.g., "What are employee benifits?", "What are the leave policies?").
        You CAN only select one of these agents.""",
    verbose=False,
    llm=llm
)

# --- Defining Tasks --- #
# Defining Coading Task
coding_task = Task(
    description=f"""Write clean python code based on the user query.
        User Query: {{query}}""",
    expected_output="return the code like ```CODE:\n``` and the output of the code like ```OUTPUT:```.",
    agent=coding_agent
)

# # Defining Helper task
# helper_task = Task(
#     description= f''' You Answer the user query. Your Answer should be to the point and short.
#         User Query: {{query}} ''',
#     expected_output="Return Answer in Natural Language",
#     agent=helper_agent
# )

# Defining RAG Task
rag_task = Task(
    description=f"""Following is a user Question. Use the retrieval tool to find the relevant HR policies and formulate an answer.
        User Query: {{query}}""",
    expected_output="Return short and to the point answer.",
    agent=rag_agent
)


supervisor_task = Task(
    description=f"""Analyze the user's query and decide on the best course of action.
        - If the query asks for code, delegate to coding_task.
        - If the query is a general knowledge question, delegate to rag_task.
        User Query: {{query}}""",
    expected_output="The Answer can only be 'coding_task' or 'rag_task'. Nothing else.",
    agent=supervisor_agent,
    possible_tasks=[coding_task, rag_task]  # 👈 This is the missing link
)

# --- Defining Crew --- #
# Define Crew
coding_crew = Crew(
    agents=[coding_agent],
    tasks=[coding_task],
    verbose=False,
    process=Process.sequential, # The supervisor task will run first, and its output will feed into the next task.
)

# helper_crew = Crew(
#     agents= [helper_agent],
#     tasks= [helper_task],
#     verbose= False,
#     process= Process.sequential
# )

rag_crew = Crew(
    agents=[rag_agent],
    tasks=[rag_task],
    verbose=False
)

supervisor_crew = Crew(
    agents= [supervisor_agent],
    tasks= [supervisor_task],
    verbose= False,
    process= Process.sequential
)

In [15]:
# Checking if Crew is ready
# print(coading_crew.kickoff(inputs={"query": "calculate the sum of all natural numbers greater than 5 and less than 10."}) )
# print(helper_crew.kickoff(inputs={'query': "What is the capital of India?"}))
# print(helper_crew.kickoff(inputs={'query': "What employee benefits are available?"}))

In [None]:
# Defining Control Flow.
def main():
    """Main function to kickoff the Crew."""
    print("\n\t--- Starting Crew for RAG Agent ---")

    while True:
        user_query = input("Enter your query (or type 'exit' to quit): ")
        print(f"\n🙎‍♂️: {user_query}")

        # Breaking the loop if exit or quit was input
        if user_query.lower() in ['exit', 'quit']:
            break

        decision = supervisor_crew.kickoff(inputs={'query': user_query})
        print(f"\t I need to check with {decision}")

        if decision.raw == 'coding_task':
            result = coding_crew.kickoff(inputs= {'query': user_query})
            print(f"🤖: {result}")
        elif decision.raw == 'rag_task':
            result = rag_crew.kickoff(inputs= {'query': user_query})
            print(f"🤖: {result}")
        else:
            print(f"🤖: I did not find any tool to answer.")

main()


	--- Starting Crew for RAG Agent ---

🙎‍♂️: Whar are the employee benifits?
	 I need to check with rag_task


🤖: Our company offers a range of employee benefits, including health insurance, retirement plans, paid time off, training and development opportunities, and flexible work arrangements. The details of these benefits, including eligibility and enrollment procedures, will be provided in the employee handbook or through HR communication channels. Employees can reach out to the HR department for specific questions or guidance on accessing and utilizing these benefits.

🙎‍♂️: write a python code to short a list of numbers/
	 I need to check with coding_task
🤖: CODE: def short_list(numbers):
        numbers.sort()
        return numbers
OUTPUT: []

🙎‍♂️: Write a python code to find 2nd smallest perfect number.
	 I need to check with coding_task
🤖: CODE:
```python
def find_second_smallest_perfect_number():
    perfect_numbers = []
    for i in range(1, 3000):
        if all(i % j == 0 for j in range(2, i)): # Check if a number is perfect
            perfect_numbers.append(i)
    first_smallest

🤖: Thought: It appears that the RetrieveHRPolicy tool is not suitable for this task as it is meant to retrieve HR policies, not astronomical data. 
Action: None (Since no relevant documents were found and I am unable to use any other tools)

🙎‍♂️: exit
