In [1]:
from dotenv import load_dotenv
load_dotenv('azure.env')

True

In [2]:
from langchain_experimental.llms.ollama_functions import OllamaFunctions

llm = OllamaFunctions(model="llama3.1", format="json")

In [5]:
tools = [{
    "name": "search_database",
        "description": "Search PostgreSQL database for relevant products based on user query",
            "parameters": {
        "type": "object",
            "properties": {
            "search_query": {
                "type": "string",
                    "description": "Query string to use for full text search, e.g. 'red shoes'",
                },
            "price_filter": {
                "type": "object",
                    "description": "Filter search results based on price of the product",
                        "properties": {
                    "comparison_operator": {
                        "type": "string",
                            "description": "Operator to compare the column value, either '>', '<', '>=', '<=', '='",  # noqa
                    },
                    "value": {
                        "type": "number",
                            "description": "Value to compare against, e.g. 30",
                        },
                },
            },
            "brand_filter": {
                "type": "object",
                    "description": "Filter search results based on brand of the product",
                        "properties": {
                    "comparison_operator": {
                        "type": "string",
                            "description": "Operator to compare the column value, either '=' or '!='",
                        },
                    "value": {
                        "type": "string",
                            "description": "Value to compare against, e.g. AirStrider",
                        },
                },
            },
        },
        "required": ["search_query"],
        }
}]

In [None]:
llm = llm.bind_tools(tools, function_call={"name": "search_database"})
llm.invoke("Best shoe for hiking?")

In [None]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain_openai import AzureOpenAIEmbeddings
import os

aoai_embeddings = AzureOpenAIEmbeddings(
    api_key=os.getenv("AZURE_OPENAI_KEY"),
    azure_deployment="text-embedding-ada-002",
    openai_api_version="2024-03-01-preview",
    azure_endpoint =os.environ["AZURE_OPENAI_ENDPOINT"]
)

In [8]:
llm = llm.bind_tools(tools)
llm.invoke("Best shoe for hiking?")

AIMessage(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_QsjsguTvwjzG84N155TMrwAv', 'function': {'arguments': '{"search_query":"hiking shoes"}', 'name': 'search_database'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-05-13', 'system_fingerprint': 'fp_abc28019ad'}, id='run-a5162ab3-887a-4188-8a78-8985152e9c74-0', tool_calls=[{'name': 'search_database', 'args': {'search_query': 'hiking shoes'}, 'id': 'call_QsjsguTvwjzG84N155TMrwAv', 'type': 'tool_call'}])

In [7]:
llm = AzureChatOpenAI(api_key = os.environ["AZURE_OPENAI_KEY"],  
                      api_version="2024-05-01-preview",
                      azure_endpoint = os.environ["AZURE_OPENAI_ENDPOINT"],
                      azure_deployment= "gpt-4o",
                      model="gpt-4o",
                      streaming=True)
llm = llm.bind_tools(tools)

In [9]:
few_shot_messages = [
    {"role": "user", "content": "good options for climbing gear that can be used outside?"},
    {"role": "assistant", "tool_calls": [
        {
            "id": "call_abc123",
            "type": "function",
            "function": {
                "arguments": "{\"search_query\":\"climbing gear outside\"}",
                "name": "search_database"
            }
        }
    ]},
    {
        "role": "tool",
        "tool_call_id": "call_abc123",
        "content": "Search results for climbing gear that can be used outside: ..."
    },
    {"role": "user", "content": "are there any shoes less than $50?"},
    {"role": "assistant", "tool_calls": [
        {
            "id": "call_abc456",
            "type": "function",
            "function": {
                "arguments": "{\"search_query\":\"shoes\",\"price_filter\":{\"comparison_operator\":\"<\",\"value\":50}}",
                "name": "search_database"
            }
        }
    ]},
    {
        "role": "tool",
        "tool_call_id": "call_abc456",
        "content": "Search results for shoes cheaper than 50: ..."
    }
]

In [10]:
from __future__ import annotations

from dataclasses import asdict

from pgvector.sqlalchemy import Vector
from sqlalchemy import Index
from sqlalchemy.orm import DeclarativeBase, Mapped, MappedAsDataclass, mapped_column


# Define the models
class Base(DeclarativeBase, MappedAsDataclass):
    pass

class Item(Base):
    __tablename__ = "items"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    type: Mapped[str] = mapped_column()
    brand: Mapped[str] = mapped_column()
    name: Mapped[str] = mapped_column()
    description: Mapped[str] = mapped_column()
    price: Mapped[float] = mapped_column()
    embedding: Mapped[Vector] = mapped_column(Vector(1536))  # ada-002

    def to_dict(self, include_embedding: bool = False):
        model_dict = asdict(self)
        if include_embedding:
            model_dict["embedding"] = model_dict["embedding"].tolist()
        else:
            del model_dict["embedding"]
        return model_dict

    def to_str_for_rag(self):
        return f"Name:{self.name} Description:{self.description} Price:{self.price} Brand:{self.brand} Type:{self.type}"

    def to_str_for_embedding(self):
        return f"Name: {self.name} Description: {self.description} Type: {self.type}"

In [11]:
from langchain_core.messages.ai import AIMessage
from pgvector.utils import to_db
from sqlalchemy import Float, Integer, column, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from openai import AzureOpenAI

import json
def extract_search_arguments(response_message: AIMessage):
    search_query = None
    filters = []
    if response_message.tool_calls:
        for tool in response_message.tool_calls:
            if tool['type'] != "tool_call":
                continue
            if tool['name'] == "search_database":
                arg = tool['args']
                # Even though its required, search_query is not always specified
                search_query = arg["search_query"]
                if "price_filter" in arg and arg["price_filter"]:
                    price_filter = arg["price_filter"]
                    filters.append(
                        {
                            "column": "price",
                            "comparison_operator": price_filter["comparison_operator"],
                            "value": price_filter["value"],
                        }
                    )
                if "brand_filter" in arg and arg["brand_filter"]:
                    brand_filter = arg["brand_filter"]
                    filters.append(
                        {
                            "column": "brand",
                            "comparison_operator": brand_filter["comparison_operator"],
                            "value": brand_filter["value"],
                        }
                    )
    elif query_text := response_message.content:
        search_query = query_text.strip()
    return search_query, filters


class PostgresSearcher:
    def __init__(
        self,
        db_session: AsyncSession,
        aoai_embeddings: AzureOpenAIEmbeddings
    ):
        self.db_session = db_session
        self.aoai_embeddings = aoai_embeddings

    def build_filter_clause(self, filters) -> tuple[str, str]:
        if filters is None:
            return "", ""
        filter_clauses = []
        for filter in filters:
            if isinstance(filter["value"], str):
                filter["value"] = f"'{filter['value']}'"
            filter_clauses.append(f"{filter['column']} {filter['comparison_operator']} {filter['value']}")
        filter_clause = " AND ".join(filter_clauses)
        if len(filter_clause) > 0:
            return f"WHERE {filter_clause}", f"AND {filter_clause}"
        return "", ""

    def search(
        self,
        query_text: str | None,
        query_vector: list[float] | list,
        top: int = 5,
        filters: list[dict] | None = None,
    ):
        filter_clause_where, filter_clause_and = self.build_filter_clause(filters)

        vector_query = f"""
            SELECT id, RANK () OVER (ORDER BY embedding <=> :embedding) AS rank
                FROM items
                {filter_clause_where}
                ORDER BY embedding <=> :embedding
                LIMIT 20
            """

        fulltext_query = f"""
            SELECT id, RANK () OVER (ORDER BY ts_rank_cd(to_tsvector('english', description), query) DESC)
                FROM items, plainto_tsquery('english', :query) query
                WHERE to_tsvector('english', description) @@ query {filter_clause_and}
                ORDER BY ts_rank_cd(to_tsvector('english', description), query) DESC
                LIMIT 20
            """

        hybrid_query = f"""
        WITH vector_search AS (
            {vector_query}
        ),
        fulltext_search AS (
            {fulltext_query}
        )
        SELECT
            COALESCE(vector_search.id, fulltext_search.id) AS id,
            COALESCE(1.0 / (:k + vector_search.rank), 0.0) +
            COALESCE(1.0 / (:k + fulltext_search.rank), 0.0) AS score
        FROM vector_search
        FULL OUTER JOIN fulltext_search ON vector_search.id = fulltext_search.id
        ORDER BY score DESC
        LIMIT 20
        """

        if query_text is not None and len(query_vector) > 0:
            sql = text(hybrid_query).columns(column("id", Integer), column("score", Float))
        elif len(query_vector) > 0:
            sql = text(vector_query).columns(column("id", Integer), column("rank", Integer))
        elif query_text is not None:
            sql = text(fulltext_query).columns(column("id", Integer), column("rank", Integer))
        else:
            raise ValueError("Both query text and query vector are empty")

        results = (
            self.db_session.execute(
                sql,
                {"embedding": to_db(query_vector), "query": query_text, "k": 60},
            )
        ).fetchall()

        # Convert results to Item models
        items = []
        for id, _ in results[:top]:
            item = self.db_session.execute(select(Item).where(Item.id == id))
            items.append(item.scalar())
        return items

    def search_and_embed(
        self,
        query_text: str | None = None,
        top: int = 5,
        enable_vector_search: bool = True,
        enable_text_search: bool = True,
        filters: list[dict] | None = None,
    ):
        """
        Search items by query text. Optionally converts the query text to a vector if enable_vector_search is True.
        """
        vector: list[float] = []
        if enable_vector_search and query_text is not None:
            vector = self.aoai_embeddings.embed_query(query_text)
        if not enable_text_search:
            query_text = None
        return self.search(query_text, vector, top, filters)

In [12]:
from pgvector.utils import to_db
from sqlalchemy import Float, Integer, column, select, text
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine(os.environ['POSTGRES_CONNECTION_STRING'])
database_session = sessionmaker(
        engine,
        expire_on_commit=False,
        autoflush=False,
    )

searcher = PostgresSearcher(
        db_session=database_session(),
        aoai_embeddings = aoai_embeddings,
    )

In [13]:
from typing import List

from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_community.adapters.openai import convert_openai_messages

system_prompt = """Below is a history of the conversation so far, and a new question asked by the user that needs to be answered by searching database rows.
You have access to an Azure PostgreSQL database with an items table that has columns for title, description, brand, price, and type.
Generate a search query based on the conversation and the new question.
If the question is not in English, translate the question to English before generating the search query.
If you cannot generate a search query, return the original user question.
DO NOT return anything besides the query."""

class ToyRetriever(BaseRetriever):
    """List of documents to retrieve from."""
    k: int
    """Number of top results to return"""
    
    
    
    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:
        """Sync implementations for retriever."""
        prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="few_shot_examples"),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
        ])
        context_retriever_chain = prompt | llm 
        response_message = context_retriever_chain.invoke({"input": query,
                                                           "few_shot_examples": convert_openai_messages(few_shot_messages), 
                                                           "chat_history":[]})
        query_text, filters = extract_search_arguments(response_message)
        results = searcher.search_and_embed(
                query_text,
                top=3,
                enable_vector_search=True,
                enable_text_search=True,
                filters=filters,
            )
        sources_content = [f"[{(item.id)}]:{item.to_str_for_rag()}\n\n" for item in results]
        matching_documents = []
        for item in results:
            if len(matching_documents) > self.k:
                return matching_documents
            matching_documents.append(Document(page_content = item.to_str_for_rag(), metadata = item.to_dict()))
        return matching_documents

retriever = ToyRetriever(k = 3)
retriever.invoke("that")

**************************
tool_call["args"] {'search_query': 'climbing gear outside'}
**************************
**************************
tool_call["args"] {'search_query': 'shoes', 'price_filter': {'comparison_operator': '<', 'value': 50}}
**************************


[Document(metadata={'id': 19, 'type': 'Footwear', 'brand': 'Green Equipment', 'name': 'EcoTrail Running Shoes', 'description': 'Experience the great outdoors while reducing your carbon footprint with the Green Equipment EcoTrail Running Shoes. Made from recycled materials, these shoes offer a lightweight, breathable, and flexible design in an earthy green color. With their durable Vibram outsole and cushioned midsole, they provide optimal comfort and grip on any trail.', 'price': 119.99}, page_content='Name:EcoTrail Running Shoes Description:Experience the great outdoors while reducing your carbon footprint with the Green Equipment EcoTrail Running Shoes. Made from recycled materials, these shoes offer a lightweight, breathable, and flexible design in an earthy green color. With their durable Vibram outsole and cushioned midsole, they provide optimal comfort and grip on any trail. Price:119.99 Brand:Green Equipment Type:Footwear'),
 Document(metadata={'id': 11, 'type': 'Climbing', 'bra

In [17]:
import bs4
from langchain import hub
prompt = hub.pull("rlm/rag-prompt")


from langchain.schema import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, PromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate
from operator import itemgetter


system_prompt = """Assistant helps customers with questions about products.
Respond as if you are a salesperson helping a customer in a store. Do NOT respond with tables.
Answer ONLY with the product details listed in the products.
If there isn't enough information below, say you don't know.
Do not generate answers that don't use the sources below.
Each product has an ID in brackets followed by colon and the product details.
Always include the product ID for each product you use in the response.
Use square brackets to reference the source, for example [52].
Don't combine citations, list each product separately, for example [27][51]."""


# Create the final prompt template
final_prompt = ChatPromptTemplate(
    messages=[
        SystemMessagePromptTemplate(prompt=PromptTemplate(input_variables=[], template=system_prompt)),
        MessagesPlaceholder(variable_name="chat_history"),
        HumanMessagePromptTemplate(prompt=PromptTemplate(
            input_variables=['context','input'], 
            template="Question: {input} \nContext: {context} \nAnswer:"
        ))
    ]
)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)
    
rag_chain = (
    {"context": retriever | format_docs, 
     "input": RunnablePassthrough(),
     "chat_history": RunnableLambda(lambda x: x.get("chat_history", []))}
    | final_prompt
    | llm
    | StrOutputParser()
)


response = rag_chain.invoke({
        "input": "good options for climbing gear anything less than 90$?",
        "chat_history": []
    })
print(response)

**************************
tool_call["args"] {'search_query': 'climbing gear outside'}
**************************
**************************
tool_call["args"] {'search_query': 'shoes', 'price_filter': {'comparison_operator': '<', 'value': 50}}
**************************
Here are some good options for climbing gear under $90:

1. **Apex Climbing Harness by Legend**: This harness offers a secure fit with adjustable leg loops, a contoured waistbelt, and a secure buckle system. It's priced at $89.99 [ID: Apex Climbing Harness].

2. **Guardian Blue Chalk Bag by Legend**: This durable chalk bag features a spacious compartment, a drawstring closure, and a waist belt for easy access while climbing. It's priced at $21.99 [ID: Guardian Blue Chalk Bag].

3. **Summit Pro Harness by Gravitator**: This harness provides a customized fit with adjustable leg loops and waist belt. Safety features include a reinforced tie-in point and strong webbing loops. It's priced at $89.99 [ID: Summit Pro Harness].


In [None]:
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory

qa_system_prompt = """Assistant helps customers with questions about products.
Respond as if you are a salesperson helping a customer in a store. Do NOT respond with tables.
Answer ONLY with the product details listed in the products.
If there isn't enough information below, say you don't know.
Do not generate answers that don't use the sources below.
Each product has an ID in brackets followed by colon and the product details.
Always include the product ID for each product you use in the response.
Use square brackets to reference the source, for example [52].
Don't combine citations, list each product separately, for example [27][51].\

{context}"""
qa_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", qa_system_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ]
)

### Contextualize question ###
contextualize_q_system_prompt = """Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is."""
contextualize_q_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", contextualize_q_system_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ]
)
history_aware_retriever = RunnableBranch(
        (
            # Both empty string and empty list evaluate to False
            lambda x: not x.get("chat_history", False),
            # If no chat history, then we just pass input to retriever
            (lambda x: x["input"]) | retriever,
        ),
        # If chat history, then we pass inputs to LLM chain, then to retriever
        contextualize_q_prompt | llm | StrOutputParser() | retriever
)

def format_docs(docs):
    print("****************************************")
    print(docs)
    print("****************************************")
    return "\n\n".join(doc.page_content for doc in docs)

question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
retrieval_chain = RunnablePassthrough.assign(context= (lambda x: x["input"]) | retriever | format_docs).assign(answer=question_answer_chain)

### Statefully manage chat history ###
store = {}


def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]


conversational_rag_chain = RunnableWithMessageHistory(
    retrieval_chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="chat_history",
    output_messages_key="answer",
)
response = conversational_rag_chain.invoke(
    {"input": "good options for climbing shoes anything less than 90$?"},
    config={
        "configurable": {"session_id": "abc123"}
    },  # constructs a key "abc123" in `store`.
)
response

In [None]:
from langchain.schema import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, PromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate
from operator import itemgetter


system_prompt = """Assistant helps customers with questions about products.
Respond as if you are a salesperson helping a customer in a store. Do NOT respond with tables.
Answer ONLY with the product details listed in the products.
If there isn't enough information below, say you don't know.
Do not generate answers that don't use the sources below.
Each product has an ID in brackets followed by colon and the product details.
Always include the product ID for each product you use in the response.
Use square brackets to reference the source, for example [52].
Don't combine citations, list each product separately, for example [27][51]."""


# Create the final prompt template
final_prompt = ChatPromptTemplate(
    messages=[
        SystemMessagePromptTemplate(prompt=PromptTemplate(input_variables=[], template=system_prompt)),
        MessagesPlaceholder(variable_name="chat_history"),
        HumanMessagePromptTemplate(prompt=PromptTemplate(
            input_variables=['context','input'], 
            template="Question: {input} \nContext: {context} \nAnswer:"
        ))
    ]
)

def search_function(response_message):
    query_text, filters = extract_search_arguments(response_message)
    print("query_text", query_text)
    print("filters", filters)
    results = searcher.search_and_embed(
                query_text,
                top=3,
                enable_vector_search=True,
                enable_text_search=True,
                filters=filters,
            )
    sources_content = [f"[{(item.id)}]:{item.to_str_for_rag()}\n\n" for item in results]
    content = "\n".join(sources_content)
    return content

# Combine everything into a single chain
combined_chain = (
    {
        "input": RunnablePassthrough(),
        "response_message": context_retriever_chain,
        "chat_history": RunnablePassthrough()
    }
    | final_prompt
    | llm
    | StrOutputParser()
)


combined_chain = (
    {
        "input": itemgetter("input"),
        "chat_history": itemgetter("chat_history"),
        "context" : context_retriever_chain | search_function
    }
    | final_prompt
    | llm
)

# Usage
def run_combined_chain(input_question, few_shot_examples, chat_history):
    return combined_chain.invoke({
        "input": input_question,
        "few_shot_examples": convert_openai_messages(few_shot_examples),
        "chat_history": chat_history
    })

# Example usage
result = run_combined_chain(
    input_question = "good options for climbing gear anything less than 90$?",
    few_shot_examples = few_shot_messages,
    chat_history = []
)
print(result.content)

In [None]:
print(result.content)