In [0]:
!pip install pydantic-ai
!pip install "pydantic-ai-slim[openai]"

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%restart_python

In [0]:
class AbstractDataConnector:
    def __init__(self):
        pass

    def get_email_activity(self):
        pass

class DatabaseDataConnector(AbstractDataConnector):
    def __init__(self):
        pass

    def get_email_activity(self):
        return {
            "subject": "Drive way paving",
            "content": "We have a driveway that needs to be paved. Can I get someone out"
        }

class MockDataConnector(AbstractDataConnector):
    def __init__(self, mock_data: dict):
        self.mock_data = mock_data

    def get_email_activity(self):
        return self.mock_data["get_email_activity"]

db_connector = DatabaseDataConnector()


In [0]:
class FeedbackData:
    def __init__(self):
        self.feedback = []

    def get_feedback(self):
        return self.feedback
    
    def add_feedback(self, fb):
        self.feedback.append(fb)

In [0]:
import nest_asyncio
nest_asyncio.apply()


from pydantic_ai import Agent
from pydantic_ai import Agent, ModelRetry, RunContext

import os

from dataclasses import dataclass

os.environ["OPENAI_API_KEY"] = dbutils.secrets.get("openai", "api-key")

SYSTEM_PROMPT = """
You are an event driven assistant that recommends actions to parterns.
You work for TBS (TELS Building Services).
Customers email you with service information.

Please call tools to gather context and then call one of the action tools to generate an aciton.
Please always call the get_feedback tool to make sure you are taking in specific buisness rules into account.
"""

ACTIONS = {
    "ACTION_SEND_EMAIL",
    "ACTION_CLOSE_TICKET",
    "ACTION_NOTHING"
}

class CustomerServiceAgent:
    pass

@dataclass
class AgentContext:
    agent: CustomerServiceAgent
    feedback_source: FeedbackData

class CustomerServiceAgent:
    def __init__(self, connector):
        self.actions_called = []
        self.data_connector = connector

        pydanitc_agent = Agent(
            'openai:gpt-4o',
            system_prompt=SYSTEM_PROMPT
        )

        ########## EXTERNAL DATA SOURCE TOOLS ##########

        @pydanitc_agent.tool
        async def get_email_activity(
            ctx: RunContext[AgentContext]
        ) -> dict[str, float]:
            """Gets the activity of an email
            Args:
                ctx: The context.
            """
            print(f"  TOOL CALL: get_email_activity")
            return connector.get_email_activity()
        
        @pydanitc_agent.tool
        async def get_feedback(
            ctx: RunContext[AgentContext]
        ):
            """Gets useful rules and prior feedback data to help make a decision
            Args:
                ctx: The context.
            """
            # As this grows, we can use RAG
            feedback = ctx.deps.feedback_source.get_feedback()
            if len(feedback) == 0:
                return "No feedback provided as of yet"
            feedback_str = "\n".join(feedback)

            return feedback_str


        ########## ACTION TOOLS ##########

        @pydanitc_agent.tool
        async def action_send_email(
            ctx: RunContext[AgentContext], email_purpose: str
        ) -> dict[str, float]:
            """Recommends an email action
            Args:
                ctx: The context.
                email_purpose: The purpose of what the email should be about. This should be useful to a sales partner
            """
            print(f"  TOOL CALL: action_send_email: {email_purpose}")
            ctx.deps.agent.actions_called.append({
                "action": "ACTION_SEND_EMAIL",
                "metadata": {
                    "purpose": email_purpose
                }
            })
            return {'status': 'success'}

        @pydanitc_agent.tool
        async def action_close_ticket(
            ctx: RunContext[AgentContext], reason: str
        ) -> dict[str, float]:
            """Recommends agent closes ticket
            Args:
                ctx: The context.
                reason: The reason for closure
            """
            print(f"  TOOL CALL: action_close_ticket: {reason}")
            ctx.deps.agent.actions_called.append({
                "action": "ACTION_CLOSE_TICKET",
                "metadata": {
                    "reason": reason
                }
            })
            return {'status': 'success'}

        @pydanitc_agent.tool
        async def action_no_action(
            ctx: RunContext[AgentContext]
        ) -> dict[str, float]:
            """Recommends no action
            Args:
                ctx: The context.
            """
            ctx.deps.agent.actions_called.append({
                "action": "ACTION_NOTHING",
                "metadata": {}
            })
            return {'status': 'success'}
        
        self.agent = pydanitc_agent
            
    def invoke_agent(self, message, feedback_source):
        self.actions_called = []
        self.agent.run_sync(
            message,
            deps=AgentContext(agent=self, feedback_source=feedback_source)
        )
        actions_called = self.actions_called
        data_state = {
            "get_email_activity": self.data_connector.get_email_activity()
        }
        return {
            "actions": actions_called,
            "data_state": data_state
        }


default_feedback_source = FeedbackData()

cs_agent = CustomerServiceAgent(db_connector)
res = cs_agent.invoke_agent("New email came in for customer. Please gather context and recommend an action.", default_feedback_source)
print(res["actions"])

  TOOL CALL: get_email_activity
[{'action': 'ACTION_NOTHING', 'metadata': {}}]


In [0]:
res

{'actions': [{'action': 'ACTION_SEND_EMAIL',
   'metadata': {'purpose': 'The customer requires driveway paving services. Recommend sending an email to schedule a site visit to assess the driveway and provide a quote.'}}],
 'data_state': {'get_email_activity': {'subject': 'Drive way paving',
   'content': 'We have a driveway that needs to be paved. Can I get someone out'}}}

In [0]:
# Dreaming cycle
NEW_FEEDBACK_DATA = [
    {
        "data": res["data_state"],
        "predicted_action": "ACTION_SEND_EMAIL",
        "expected_action": "ACTION_CLOSE_TICKET",
        "reasoning": "Paving of driveways is not something that we do"
    },
    {
        "data": {
            "get_email_activity": {
                "subject": "My pool is broken",
                "content": "The filter on my pool is broken. Can I get a service representative to come here stat."
            }
        },
        "predicted_action": "ACTION_SEND_EMAIL",
        "expected_action": "ACTION_CLOSE_TICKET",
        "reasoning": "Pool repairs is not something that we do"
    }
]

In [0]:
feedback_that_should_be_accounted_for = []
for new_feedback_data in NEW_FEEDBACK_DATA:
    print("Generating new rule to accomodate feedback.")
    feedback_agent = Agent(
        'openai:gpt-4o',
        system_prompt="You are given some feedback. Your job is to provide a short consice and specific buisness context rule for the representantice to be reference to ensure the incorrect action doesn't happen again. Please make sure it is specific, short, and uses the sample as an example."
    )

    input_pormpt = f"""
    data_available: {new_feedback_data["data"]}
    predicted_action: {new_feedback_data["predicted_action"]}
    expected_action: {new_feedback_data["expected_action"]}
    reasoning: {new_feedback_data["reasoning"]}
    """

    proposed_rule = feedback_agent.run_sync(input_pormpt).output

    print(f"Proposed rule is: \"{proposed_rule}\"")

    proposed_feedback_source = FeedbackData()
    for fb in default_feedback_source.get_feedback():
        proposed_feedback_source.add_feedback(fb)
    proposed_feedback_source.add_feedback(proposed_rule)

    feedback_that_should_be_accounted_for.append(new_feedback_data)

    # Verify the agent learned
    print(f"Verifying new rule...")
    passed_cases = 0
    failed_cases = 0
    for x in feedback_that_should_be_accounted_for:
        test_agent = CustomerServiceAgent(MockDataConnector(x["data"]))
        new_res = test_agent.invoke_agent("New email came in for customer. Please gather context and recommend an action.", proposed_feedback_source)
        if new_res["actions"][0]["action"] == x["expected_action"]:
            passed_cases += 1
        else:
            failed_cases += 1

    print(f"Passed cases: {passed_cases} / {passed_cases + failed_cases}")

    default_feedback_source.add_feedback(proposed_rule)

Generating new rule to accomodate feedback.
Proposed rule is: "Business Context Rule: If an email inquiry relates to a service not offered, such as driveway paving, the representative should close the ticket without sending a response."
Verifying new rule...
  TOOL CALL: get_email_activity
  TOOL CALL: action_close_ticket: Driveway paving is not a service we offer. Closing ticket as per standard procedure for requests outside our service scope.
Passed cases: 1 / 1
Generating new rule to accomodate feedback.
Proposed rule is: "Business Context Rule: If a customer's request falls outside the company's service scope, such as pool repair, close the ticket instead of initiating any further action like sending an email."
Verifying new rule...
  TOOL CALL: get_email_activity
  TOOL CALL: action_close_ticket: The inquiry involves a request for driveway paving, which is a service not offered by our company.
  TOOL CALL: get_email_activity
  TOOL CALL: action_close_ticket: The request involves p