# 1: Integration with Chat Engines
When building AI chatbots, we often need to integrate with various chat platforms. Each platform has its unique way of handling messages, but they all need to work seamlessly with our core AI system. Let's look at three common integration scenarios:

# Chat Platforms Overview
# Webim
A real-time web chat platform commonly used for customer support. It uses websocket connections for instant communication and supports features like typing indicators and visitor tracking. Particularly useful for website-based support systems.

In [None]:
class WebimAdapter(ChatEngineAdapter):
    async def receive_message(self, raw_data: Dict[str, Any]) -> Message:
        return Message(
            text=raw_data.get("text", ""),
            user_id=raw_data.get("visitor_id"),
            timestamp=raw_data.get("timestamp"),
            metadata={
                "session_id": raw_data.get("session_id"),
                "visitor_name": raw_data.get("visitor_name")
            }
        )

# **ChatFuel**
A bot-building platform primarily used with messaging services like Facebook Messenger. It uses a block-based system for responses and handles user interactions through a REST API.

In [None]:
class ChatFuelAdapter(ChatEngineAdapter):
    async def send_message(self, response: Response, user_id: str) -> None:
        url = f"{self.base_url}/{user_id}/send"
        payload = {
            "chatfuel_token": self.token,
            "blocks": [{"text": response.text}]
        }
        if response.action == "location":
            payload["blocks"].append({
                "type": "location",
                "coordinates": response.metadata.get("coordinates", {})
            })

# **WebSocket Custom Chat**
A custom implementation for real-time chat applications. Provides direct, bidirectional communication between client and server, ideal for applications requiring low latency and real-time updates.

In [None]:
class WebSocketAdapter(ChatEngineAdapter):
    async def handle_events(self, event_data: Dict[str, Any]) -> None:
        event_type = event_data.get("type")
        if event_type == "connect":
            await self.ws_manager.handle_connect(event_data)
        elif event_type == "disconnect":
            await self.ws_manager.handle_disconnect(event_data)

# Common Integration Pattern
We use the Adapter pattern to standardize these different platforms:

In [None]:
class ChatEngineAdapter(ABC):
    @abstractmethod
    async def receive_message(self, raw_data: Dict[str, Any]) -> Message:
        """Convert platform-specific message to standard format"""
        pass

    @abstractmethod
    async def send_message(self, response: Response, user_id: str) -> None:
        """Send response back to the platform"""
        pass

# **Chapter 2: Message Aggregation in Chat Systems**
In chat interactions, users often break their thoughts into multiple messages. Instead of sending "What's the fastest route from New York to Los Angeles?", they might type:

```
"What's"
"the fastest"
"route from NY"
"to LA?"
```


This natural behavior creates a challenge for AI systems. Each message in isolation lacks context, and making separate API calls for each fragment is inefficient and may produce incorrect responses. Let's explore how to handle this elegantly.
The Aggregation Solution
Here's how we handle message aggregation in our navigation system:

In [None]:
class MessageAggregator:
    def __init__(self, window_size: int = 3):
        self.messages = defaultdict(list)
        self.last_processed = {}
        self.window_size = window_size  # seconds

    async def add_message(self, user_id: str, message: str) -> bool:
        current_time = datetime.now()
        self.messages[user_id].append({
            "text": message,
            "timestamp": current_time
        })

        should_process = await self._should_process_messages(user_id, current_time)
        return should_process

    async def get_aggregated_message(self, user_id: str) -> str:
        messages = self.messages[user_id]
        self.messages[user_id] = []
        self.last_processed[user_id] = datetime.now()

        return " ".join(msg["text"] for msg in messages)

Integration with Navigation Chat

In [None]:
class NavigationChat:
    def __init__(self):
        self.aggregator = MessageAggregator(window_size=3)
        self.nav_agent = NavigationAgent(...)

    async def process_message(self, user_id: str, message: str):
        should_process = await self.aggregator.add_message(user_id, message)

        if should_process:
            complete_query = await self.aggregator.get_aggregated_message(user_id)
            return await self.nav_agent.process_query(complete_query)

        return None

# Chapter 3: Function Calling in AI Chat Systems
In our navigation system, function calling creates a bridge between natural language input and structured API operations. We use a YAML-based approach to define and execute specific actions like geocoding, routing, and place discovery.
# Core Function Calling Structure
Let's look at how our system handles function calls:

In [None]:
class NavigationResult(BaseModel):
    action: str = Field(..., description="The action: 'geocode', 'route', 'discover', or 'autosuggest'")
    geocoding_result: Optional[GeocodingResult] = None
    routing_result: Optional[RoutingResult] = None
    discover_result: Optional[DiscoverResult] = None
    raw_response: str = Field(..., description="Raw LLM response")

class NavigationAgent:
    def __init__(self, llm, geocode_tool, route_tool, discover_tool, autosuggest_tool):
        self.llm = llm
        self.geocode_tool = geocode_tool
        self.route_tool = route_tool
        self.discover_tool = discover_tool
        self.autosuggest_tool = autosuggest_tool

    def run(self, query: str) -> NavigationResult:
        # Get structured YAML response from LLM
        raw_output = self.llm.invoke(self._create_prompt(query))

        # Parse action and parameters
        action, params = self._parse_raw_output(raw_output)

        # Execute appropriate tool
        return self._execute_tool(action, params, raw_output)

# YAML Response Format
The LLM generates structured responses like this:


In [None]:
action: route
params:
    origin: "Times Square, NY"
    destination: "Central Park, NY"
explanation: User is requesting directions between two locations

# Function Execution
Here's how we handle different function calls:

In [None]:
def _execute_tool(self, action: str, params: Dict[str, Any], raw_output: str) -> NavigationResult:
    try:
        if action == "geocode":
            tool_result = self.geocode_tool.run(params["address"])
            lat, lon = self._extract_coordinates(tool_result)
            return NavigationResult(
                action="geocode",
                geocoding_result=GeocodingResult(
                    address=params["address"],
                    latitude=lat,
                    longitude=lon
                ),
                raw_response=raw_output
            )

        elif action == "route":
            # First geocode both locations
            origin_coords = self.geocode_tool.run(params["origin"])
            dest_coords = self.geocode_tool.run(params["destination"])

            # Calculate route
            route = self.route_tool.run({
                "origin": origin_coords,
                "destination": dest_coords
            })

            return NavigationResult(
                action="route",
                routing_result=RoutingResult(
                    origin=params["origin"],
                    destination=params["destination"],
                    distance_km=route["distance"],
                    duration_hours=route["duration"]
                ),
                raw_response=raw_output
            )

# Error Handling
We implement robust error handling for function calls:

In [None]:
def _safe_execute_tool(self, func: Callable, params: Dict[str, Any]) -> Any:
    try:
        return func(params)
    except Exception as e:
        error_msg = f"Error executing {func.__name__}: {str(e)}"
        logger.error(error_msg)
        return NavigationResult(
            action="error",
            raw_response=error_msg
        )

# Real-World Example
Here's how it processes a real navigation query:

In [None]:
# User query: "How do I get from Times Square to Central Park?"

# 1. LLM generates YAML:
"""
action: route
params:
    origin: Times Square, New York, NY
    destination: Central Park, New York, NY
explanation: Calculate route between two NYC landmarks
"""

# 2. System executes:
# - Geocodes both locations
# - Calculates route
# - Returns structured response with distance and duration

# **Chapter 4: Complex Dialogue Flows with LangChain and LlamaIndex**
Handling complex navigation queries often requires multi-step dialogues and state management. Let's explore how to build sophisticated conversation flows using LangChain's tools.
# Building Conversation Flows
Here's how we structure complex dialogues:

In [None]:
from typing import List, Dict, Any
from langchain.graphs import StateGraph
from pydantic import BaseModel
from enum import Enum

class DialogueState(BaseModel):
    current_node: str
    context: Dict[str, Any] = {}
    collected_data: Dict[str, Any] = {}
    required_fields: List[str] = []

class NavigationFlow:
    def __init__(self):
        self.workflow = StateGraph()
        self.state = DialogueState(current_node="start")
        self._setup_navigation_flow()

    def _setup_navigation_flow(self):
        # Define nodes
        self.workflow.add_node("start", self._handle_initial_query)
        self.workflow.add_node("get_origin", self._handle_origin)
        self.workflow.add_node("get_destination", self._handle_destination)
        self.workflow.add_node("confirm_route", self._handle_confirmation)
        self.workflow.add_node("provide_alternatives", self._handle_alternatives)

        # Define edges and conditions
        self.workflow.add_edge("start", "get_origin", self._needs_origin)
        self.workflow.add_edge("get_origin", "get_destination", self._has_origin)
        self.workflow.add_edge("get_destination", "confirm_route", self._has_destination)
        self.workflow.add_edge("confirm_route", "provide_alternatives", self._needs_alternatives)

# State Management and Context
Managing conversation context:

In [None]:
class NavigationContext:
    def __init__(self):
        self.memory = ConversationBufferMemory()
        self.current_route = None
        self.alternatives = []

    async def update_context(self, user_input: str, node_result: Any):
        # Update memory with new information
        self.memory.chat_memory.add_user_message(user_input)

        if isinstance(node_result, RouteResult):
            self.current_route = node_result
            self.memory.chat_memory.add_ai_message(
                f"Found route: {node_result.distance_km}km, {node_result.duration_hours}hrs"
            )

    async def get_context_for_prompt(self) -> str:
        return self.memory.chat_memory.messages

# Node Implementation
Example of node handlers:

In [None]:
async def _handle_initial_query(self, state: DialogueState, user_input: str) -> str:
    # Use LLM to understand query intent
    query_analysis = self.llm.analyze_query(user_input)

    # Extract locations if present
    if "origin" in query_analysis:
        state.collected_data["origin"] = query_analysis["origin"]
    if "destination" in query_analysis:
        state.collected_data["destination"] = query_analysis["destination"]

    # Update required fields
    state.required_fields = [
        field for field in ["origin", "destination"]
        if field not in state.collected_data
    ]

    return self._generate_next_prompt(state)

async def _handle_confirmation(self, state: DialogueState, user_input: str) -> str:
    if self._is_confirmation_positive(user_input):
        # Execute navigation request
        route = await self.navigation_agent.get_route(
            state.collected_data["origin"],
            state.collected_data["destination"]
        )
        return self._format_route_response(route)
    else:
        state.current_node = "provide_alternatives"
        return "Would you like to see alternative routes?"

# Integration with Navigation Tools
Connecting dialogue flow with navigation features:**

In [None]:
class NavigationDialogueSystem:
    def __init__(self, navigation_agent):
        self.nav_agent = navigation_agent
        self.flow = NavigationFlow()
        self.context = NavigationContext()

    async def process_message(self, user_input: str) -> str:
        # Process through dialogue flow
        state = await self.flow.process(user_input)

        if state.current_node == "confirm_route":
            # Get route using navigation agent
            route = await self.nav_agent.get_route(
                state.collected_data["origin"],
                state.collected_data["destination"]
            )

            # Update context with route information
            await self.context.update_context(user_input, route)

            return self._format_route_response(route)

Usage Example
# Here's how a complex navigation dialogue might flow:

In [None]:
# Example conversation flow:
user: "I need to get somewhere in Manhattan"
system: "Where in Manhattan would you like to go?"
user: "Central Park"
system: "And where are you starting from?"
user: "Times Square"
system: "I found a route from Times Square to Central Park:
        Distance: 2.5km
        Duration: 0.5 hours
        Would you like to see this route?"
user: "Are there any faster alternatives?"
system: "I found 2 alternative routes:
        1. Via 7th Avenue (0.4 hours)
        2. Via Broadway (0.45 hours)
        Which would you prefer?"