In [3]:
# | default_exp service

In [4]:
# | export

from typing import Protocol, Callable, Awaitable, Union, Optional, Any
import inspect
import asyncio
from pylogue.session import Message, ChatSession

## Business Logic Layer

This module provides the service layer for processing chat messages and handling business logic.

In [5]:
# | export


class Responder(Protocol):
    """Protocol for chat responders (LLMs, APIs, etc.)."""

    async def __call__(self, message: str, context: Optional[Any] = None) -> str:
        """Process a message and return a response."""
        ...

In [6]:
# | export


class ErrorHandler(Protocol):
    """Protocol for handling errors during message processing."""

    def __call__(self, error: Exception, message: str) -> str:
        """Handle an error and return a user-friendly message."""
        ...

In [7]:
# | export


class DefaultErrorHandler:
    """Default error handler implementation."""

    def __init__(self, include_details: bool = True):
        self.include_details = include_details

    def __call__(self, error: Exception, message: str) -> str:
        """Handle an error and return a user-friendly message."""
        if self.include_details:
            return f"Error processing message: {type(error).__name__}: {str(error)}"
        return "Sorry, I encountered an error processing your message."

In [8]:
# | export


class ChatService:
    """Service for processing chat messages with responders."""

    def __init__(
        self,
        responder: Union[Responder, Callable],
        error_handler: Optional[ErrorHandler] = None,
        context_provider: Optional[Callable[[ChatSession], Any]] = None,
    ):
        """
        Initialize ChatService.

        Args:
            responder: Function/callable that processes messages and returns responses
            error_handler: Handler for processing errors
            context_provider: Optional function to extract context from session
        """
        self.responder = responder
        self.error_handler = error_handler or DefaultErrorHandler()
        self.context_provider = context_provider

    async def process_message(
        self, user_message: str, session: Optional[ChatSession] = None
    ) -> str:
        """
        Process a user message and return the assistant's response.

        Args:
            user_message: The user's input message
            session: Optional chat session for context

        Returns:
            The assistant's response string
        """
        try:
            # Extract context if provider exists
            context = None
            if self.context_provider and session:
                context = self.context_provider(session)

            # Call responder
            if inspect.iscoroutinefunction(self.responder):
                result = await self.responder(user_message, context)
            else:
                result = self.responder(user_message, context)
                if inspect.isawaitable(result):
                    result = await result

            return str(result)

        except Exception as e:
            return self.error_handler(e, user_message)

    async def process_session_message(
        self, session: ChatSession, user_message: str, add_to_session: bool = True
    ) -> Message:
        """
        Process a message within a session context.

        Args:
            session: The chat session
            user_message: The user's input
            add_to_session: Whether to add user message to session

        Returns:
            The assistant's response as a Message object
        """
        # Add user message if requested
        if add_to_session:
            session.add_message("User", user_message)

        # Process and get response
        response = await self.process_message(user_message, session)

        # Create and add assistant message
        assistant_msg = session.add_message("Assistant", response)
        return assistant_msg

## Example Responders

In [9]:
# | export


async def echo_responder(message: str, context: Optional[Any] = None) -> str:
    """Simple echo responder for testing."""
    await asyncio.sleep(0.2)  # Simulate latency
    return f"[Echo] You said: {message}"

In [10]:
# | export


class ContextAwareResponder:
    """Example responder that uses conversation history."""

    def __init__(self, max_history: int = 5):
        self.max_history = max_history

    async def __call__(self, message: str, context: Optional[Any] = None) -> str:
        """Generate response with context awareness."""
        await asyncio.sleep(0.1)

        if context and isinstance(context, list):
            history_count = len(context)
            return f"I see we've exchanged {history_count} messages. You just said: {message}"

        return f"Hello! You said: {message}"

## Test the Service Layer

In [11]:
# Test basic ChatService
from pylogue.session import ChatSession

service = ChatService(responder=echo_responder)
response = await service.process_message("Hello, world!")
print(f"Response: {response}")

Response: [Echo] You said: Hello, world!


In [12]:
# Test with session
session = ChatSession("test-session")
service = ChatService(responder=echo_responder)

assistant_msg = await service.process_session_message(session, "First message")
print(f"Session messages: {session.get_message_dicts()}")

Session messages: [{'role': 'User', 'content': 'First message', 'id': 'eb9c84e7-f898-4514-8bf9-aac2c4ece363'}, {'role': 'Assistant', 'content': '[Echo] You said: First message', 'id': '52451b28-a288-4cb5-8ec7-c139daf736f7'}]


In [13]:
# Test context-aware responder
def context_provider(session: ChatSession):
    """Provide message history as context."""
    return session.get_messages()


session = ChatSession("context-test")
responder = ContextAwareResponder()
service = ChatService(responder=responder, context_provider=context_provider)

await service.process_session_message(session, "First")
await service.process_session_message(session, "Second")
await service.process_session_message(session, "Third")

print(f"Final messages: {session.get_message_dicts()}")

Final messages: [{'role': 'User', 'content': 'First', 'id': 'a9b83b5a-9c29-46d3-a3c3-179ca1d77b7f'}, {'role': 'Assistant', 'content': "I see we've exchanged 1 messages. You just said: First", 'id': '555f171f-fbfa-4b2c-b6e3-0e754093c006'}, {'role': 'User', 'content': 'Second', 'id': 'b1bb1984-8154-4cfb-8372-67c73315467d'}, {'role': 'Assistant', 'content': "I see we've exchanged 3 messages. You just said: Second", 'id': '471f8465-d3b2-46f5-8fb1-e14cef42a030'}, {'role': 'User', 'content': 'Third', 'id': '9cd1d761-fa34-4eca-aadf-acffe180a238'}, {'role': 'Assistant', 'content': "I see we've exchanged 5 messages. You just said: Third", 'id': 'e47b7fba-92ab-4fbf-829f-9e86d8afb7a5'}]


In [14]:
# Test error handling
def failing_responder(message: str, context=None):
    raise ValueError("Something went wrong!")


service = ChatService(responder=failing_responder)
response = await service.process_message("This will fail")
print(f"Error response: {response}")

Error response: Error processing message: ValueError: Something went wrong!
