In [7]:
# Set ENV Variables
import os

os.environ['OPENAI_BASE_URL'] = "http://localhost:1234/v1"
os.environ['OPENAI_API_KEY'] = "test"

In [None]:
# !pip3 install requests
# !pip3 install deprecation
# !pip3 install langchain
# !pip3 install langchain_community
# !pip3 install sqlite_vss

In [8]:
import requests
from langchain.embeddings.base import Embeddings
from typing import List


# Define a class that uses the HTTP API to get embeddings
class HTTPEmbeddingModel(Embeddings):
    def __init__(self, api_url: str, model_name: str):
        """
        Initialize with the base URL of the HTTP server and model name.
        
        :param api_url: The API endpoint that returns the embeddings.
        :param model_name: The model to use when making the request.
        """
        self.api_url = api_url
        self.model_name = model_name
    
    def get_embedding(self, text: str) -> List[float]:
        """
        Get the embedding for a single piece of text by making an HTTP request.
        
        :param text: The text to get embeddings for.
        :return: A list of floats representing the embedding.
        """
        payload = {
            "model": self.model_name,
            "input": text
        }

        response = requests.post(self.api_url, json=payload, headers={"Content-Type": "application/json"})
        
        if response.status_code != 200:
            raise ValueError(f"Error getting embedding: {response.text}")
        
        response_json = response.json()

        # Extract the first embedding from the "data" field
        embedding_data = response_json.get("data", [])
        if len(embedding_data) == 0:
            raise ValueError("No embeddings found in the response.")

        # Assuming we are interested in the first embedding returned
        return embedding_data[0].get("embedding", [])
    
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """
        Embed a list of documents (texts).
        
        :param texts: List of documents to embed.
        :return: A list of lists, where each inner list is an embedding.
        """
        embeddings = []
        for text in texts:
            embedding = self.get_embedding(text)
            embeddings.append(embedding)
        return embeddings
    
    def embed_query(self, text: str) -> List[float]:
        """
        Embed a single query (text).
        
        :param text: The query text to embed.
        :return: A list of floats representing the embedding.
        """
        return self.get_embedding(text)


In [9]:
def read_txt_files(folder_path):
    all_texts = []
    folder_map = {}
    # Walk through all subdirectories and files in the folder
    for root, _, files in os.walk(folder_path):
        folder_name = os.path.basename(root)
        folder_map[folder_name] = []
        for file in files:
            # Check if the file has a .txt extension
            if file.endswith('.txt'):
                file_path = os.path.join(root, file)
                # Read the file content and add it to the list
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                    all_texts.append(content)
                    folder_map[folder_name].append(content)
            else:
                print(f"Skipping non-txt file: {file}")
    
    return all_texts, folder_map

texts, folder_map = read_txt_files("/Users/rugvedsomwanshi/CMU/capstone/archive/chatbot_documents")
print(len(texts))
print(len(folder_map))

15
4


In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import SQLiteVSS
from typing import List

# Instantiate the HTTP embedding model
api_url = "http://127.0.0.1:1234/v1/embeddings"
model_name = "nomic-embed-text-v1.5"
embd = HTTPEmbeddingModel(api_url=api_url, model_name=model_name)
# api_url = "https://api.openai.com/v1/embeddings"  # Use OpenAI's API
# model_name = "text-embedding-ada-002"  # Use OpenAI's embedding model
# embd = HTTPEmbeddingModel(api_url=api_url, model_name=model_name)


# Add the documents to the vectorstore using the custom HTTP embedding model
# Add the documents to the vectorstore using the custom HTTP embedding model
db = SQLiteVSS.from_texts(
    texts=texts,      # Extract the text from the document chunks
    embedding=embd,   # Use your custom embedding model here
    table="state_union",
    db_file="./test.db",
    metadatas=[{"file_name": file_name} for file_name in folder_map.keys()]  # Add file names as metadata
)

In [11]:
search = db.similarity_search_with_score("Test", 5)
print(search)
print(len(search))

: 

In [None]:
### Router

from typing import Literal

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
import json



def AskQuestionBeforeSS(llm, conversation_history, human_message, current_agent="internet_search",):
    system_message = SystemMessage(content="")
    conversation_history.append(system_message)
    agent_information_message = SystemMessage(content=f"")
    conversation_history.append(agent_information_message)
    conversation_history.append(human_message)
    ai_response = llm.invoke(conversation_history)
    ai_json_response = json.loads(ai_response.content)
    conversation_history.append(ai_response)
    if ai_json_response['agent'] == current_agent:
        return False
    else:
        current_agent = ai_json_response['agent']
        return True
    
    
current_agent = "internet_search"
# LLM with function call
llm = ChatOpenAI(model="mlx-community/llama-3.2-3b-instruct", temperature=0)
conversation_history = []
human_message = HumanMessage(content="What is the thing the employee would get if he or she joins the company?")
AskQuestionBeforeSS(llm, conversation_history, human_message, current_agent)
print(conversation_history)
print(conversation_history[-1].content)

In [None]:
human_message = HumanMessage(content="What is the weather today?")
AskQuestionBeforeSS(llm, conversation_history, human_message, current_agent)
print(conversation_history[-1].content)

In [None]:
human_message = HumanMessage(content="Where can i get some tacos?")
AskQuestionBeforeSS(llm, conversation_history, human_message, current_agent)
print(conversation_history[-1].content)

In [None]:
human_message = HumanMessage(content="What is the amount of sales in the last quarter?")
AskQuestionBeforeSS(llm, conversation_history, human_message, current_agent)
print(conversation_history[-1].content)

In [None]:
def similarity_search(human_message, db, top_k=1):
    search = db.similarity_search_with_score(human_message.content, k=top_k)
    return search

def AskQuestionAfterSS(llm, conversation_history, ss_agents, human_message, current_agent="internet_search",):
    system_message = SystemMessage(content=f'''Determine if the question needs a redirection to another agent or the current agent is capable of answering it. 
                                   If the current agent is capable of answering it, then proceed with the current agent. We have done a similarity search on the documents and
                                   seems like the top agents who have the information are {ss_agents}. 
                                   Usually, internet_search is not the answer and try to use more of the specialized agents which we have
                                   The current agent is {current_agent}. The agents which can answer the question are {ss_agents}
                                   The specialized available agents are: internet_search, customer_database_search and organizational_information.
                                   ONLY CHOOSE FROM THESE AGENTS. DO NOT CHOOSE FROM ANY OTHER AGENT
                                   ''')
    conversation_history.append(system_message)
    agent_information_message = SystemMessage(content=f"")
    conversation_history.append(agent_information_message)
    conversation_history.append(human_message)
    ai_response = llm.invoke(conversation_history)
    ai_json_response = json.loads(ai_response.content)
    conversation_history.append(ai_response)
    if ai_json_response['agent'] == current_agent:
        return False
    else:
        current_agent = ai_json_response['agent']
        return True

current_agent = "internet_search"

human_message = HumanMessage(content="Does customer satisfication affect employee benefits?")
switched = AskQuestionBeforeSS(llm, conversation_history, human_message, current_agent)

if switched:
    top_documents = similarity_search(human_message, db, top_k=3)
    agents_from_search = set()
    for key, values in folder_map.items():
        for value in values:
            for document in top_documents:
                if value == document[0].page_content:
                    agents_from_search.add(key)
                    print(agents_from_search)
    AskQuestionAfterSS(llm, conversation_history, agents_from_search, human_message, current_agent)
    print(conversation_history[-1].content)

In [None]:
from metadata_search import MetadataSearchEngine

# Initialize the search engine
search_engine = MetadataSearchEngine(db, folder_map, llm)

# Process a query
query = "What are the employee benefits?"
agent, results = search_engine.route_query(query)

# Access results with metadata
for result in results:
    print(f"Agent: {result.agent}")
    print(f"Title: {result.metadata['title']}")
    print(f"Score: {result.score}")
    print(f"Content: {result.content[:100]}...")