In [21]:
import psycopg2
from psycopg2 import Error

def get_postgresql_connection():
    # PostgreSQL connection parameters
    db_host = 'localhost'
    db_name = 'flightsdb'
    db_user = 'postgres'
    db_password = 'Pra123@1'

    conn = None

    try:
        # Connect to the PostgreSQL server
        conn = psycopg2.connect(
            host=db_host,
            database=db_name,
            user=db_user,
            password=db_password
        )
        
        print("PostgreSQL connected successfully")
        return conn

    except (Exception, Error) as error:
        print("Error while connecting to PostgreSQL:", error)
        return None


In [39]:
from langchain_core.tools import tool

In [42]:
@tool
def cancel_booking(id):
    """This method will take id as argument and the use of this method is to cancel the booking by updating the DB"""
    conn = get_postgresql_connection()
    cursor = conn.cursor()
    cursor.execute(f"update flights set status='cancelled' where id='{id}' ")
    conn.commit()

In [43]:
@tool
def get_booking_details(id):
    """This method will take id as argument and retrieves the row from the DB, it fetches all the information from the table for the ID argument passed"""
    conn = get_postgresql_connection()
    cursor = conn.cursor()
    cursor.execute(f"select * from flights where id='{id}' ")
    rows = cursor.fetchall()
    return rows

In [44]:
@tool
def change_departure_location(id,departure_location):
    """This method is used to change the departure location to the desired location passed as argument by updating the table in the database"""
    conn = get_postgresql_connection()
    cursor = conn.cursor()
    cursor.execute(f"update flights set departure_location='{departure_location}' where id='{id}' ")
    conn.commit()

All the required imports

In [35]:
from typing import Annotated, Literal, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import Runnable, RunnableConfig

In [36]:
def update_dialog_box(left:list[str],right:Optional[str]):
    """Push or Pop the state"""
    if right is None:
        return left
    if right =='pop':
        return left[:-1]
    return left +[right]

class State(TypedDict):
    messages:Annotated[list[AnyMessage],add_messages]
    user_info:str
    dialog_state:Annotated[list[Literal["assistant",
                                        "cancel_booking",
                                        "get_booking_details",
                                        "change_departure_location"
                                        ]],update_dialog_box]
    

In [37]:
class Assistant:
    def __init__(self,runnable:Runnable):
        self.runnable = runnable 
    
    def __call__(self,state:State,config:RunnableConfig):
        while True:
            result = self.runnable.invoke(state)
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}

In [38]:
class CompleteOrEscalate(BaseModel):
    """A tool to mark the current task as completed and/or to escalate control of the dialog to the main assistant,
    who can re-route the dialog based on the user's needs."""
    cancel:bool=True
    reason:str
    class Config:
        schema_extra = {
            "example": {
                "cancel": True,
                "reason": "User changed their mind about the current task.",
            },
            "example 2": {
                "cancel": True,
                "reason": "I have fully completed the task.",
            },
            "example 3": {
                "cancel": False,
                "reason": "I need to search the user's emails or calendar for more information.",
            },
        }

In [46]:
llm = ChatOpenAI()

In [48]:
cancel_booking_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a specialized assistant for cancelling the flight booking, by changing the status to cancelled. "
            " The primary assistant delegates work to you whenever the user needs help in cancelling the flight booking. "
            "Confirm the updated flight details with the customer. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            "If you need more information or the customer changes their mind, escalate the task back to the main assistant."
            " Remember that a booking isn't cancelled until after the relevant tool has successfully been used."
            "\n\nCurrent Id of the user:\n<Flights>\n{id}\n</Flights>"
            ),
    ]
)

cancel_booking_tools = [cancel_booking]
cancel_flight_runnable = cancel_booking_prompt | llm.bind_tools(
    cancel_booking_tools + [CompleteOrEscalate]
)

In [49]:
get_booking_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a specialized assistant for fetching the flight booking information. "
            "The primary assistant delegates work to you whenever the user wants to fetch the flight information. "
            "Confirm the updated flight details with the customer. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            "If you need more information or the customer changes their mind, escalate the task back to the main assistant."
            " Remember that you haven't provided the information to user untill the relevant tool has successfully been used."
            "\n\nCurrent Id of the user:\n<Flights>\n{id}\n</Flights>"
            ),
    ]
)

get_booking_tools = [get_booking_details]
get_booking_runnable = get_booking_prompt | llm.bind_tools(
    get_booking_tools + [CompleteOrEscalate]
)

In [50]:
change_departure_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a specialized assistant for changing the departure location of the flight as requested by user. "
            "The primary assistant delegates work to you whenever the user wants to change the departure location for the specified ID. "
            "Confirm the departure location details to be updated with the customer. "
            "When updating the departure location, be persistent. Expand your query bounds if the first search returns no results. "
            "If you need more information or the customer changes their mind, escalate the task back to the main assistant."
            " Remember that you haven't provided the information to user untill the relevant tool has successfully been used."
            "\n\nCurrent Id of the user:\n<Flights>\n{id}\n</Flights> \n<Flights>\n{departure_location}\n</Flights> "
            ),
    ]
)

change_departure_tools = [change_departure_location]
change_departure_runnable = change_departure_prompt | llm.bind_tools(
    change_departure_tools + [CompleteOrEscalate]
)

In [51]:
class ToCancelBooking(BaseModel):
    """Transfers work to a specialized assistant to handle flight cancellations."""
    id:int=Field(
        description="the id of the candidate to whom we would like to cancel the booking"
    )

class ToGetBookingDetails(BaseModel):
    """Transfers work to a specialized assistant which can fetch the booking details of the specific ID."""
    id:int=Field(
        description="the id of the candidate to whom we would like to fetch the booking details"
    )

class ToChangeDepartureLocation(BaseModel):
    """Transfers work to a specialized assistant to handle the changing of Departure Location."""
    id:int=Field(
        description="the id of the candidate to whom we would like to change the departue location",
        departure_location="the new departure location which we would like to change tos"
    )

        

In [None]:

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful customer support assistant for Airlines. "
            "If a customer requests to cancel the booking, fetch the booking information , change departure location "
            "delegate the task to the appropriate specialized assistant by invoking the corresponding tool. You are not able to make these types of changes yourself."
            " Only the specialized assistants are given permission to do this for the user."
            "The user is not aware of the different specialized assistants, so do not mention them; just quietly delegate through function calls. "
            "Provide detailed information to the customer, and always double-check the database before concluding that information is unavailable. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            " If a search comes up empty, expand your search before giving up."
            "\n\nCurrent user flight information:\n<Flights>\n{user_info}\n</Flights>"
            "\nCurrent time: {time}.",
        ),
        ("placeholder", "{messages}"),
    ]
).partial(time=datetime.now())
primary_assistant_tools = [
    TavilySearchResults(max_results=1),
    search_flights,
    lookup_policy,
]
assistant_runnable = primary_assistant_prompt | llm.bind_tools(
    primary_assistant_tools
    + [
        ToFlightBookingAssistant,
        ToBookCarRental,
        ToHotelBookingAssistant,
        ToBookExcursion,
    ]
)