In [0]:
%pip install mlflow pydantic databricks-agents -U --quiet
%pip install unitycatalog-crewai unitycatalog-crewai[databricks] -U --quiet
%pip install crewai>=0.67.1 crewai_tools langchain_community --quiet
%restart_python

In [0]:
%load_ext autoreload
%autoreload 2

In [0]:
import mlflow
from mlflow.tracking import MlflowClient
from bs4.diagnose import profile
from dotenv import load_dotenv
from databricks.connect import DatabricksSession
import os

os.environ["DATABRICKS_API_KEY"] = dbutils.secrets.get("databricks_token_qyu", "qyu_rag_sp_token")
os.environ["DATABRICKS_API_BASE"] = (
    f"{dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()}/serving-endpoints"
)

mlflow.set_tracking_uri("databricks")
mlflow.set_registry_uri("databricks-uc")


experiment = "/Workspace/Users/q.yu@databricks.com/ML_experiments/insurance_operation_crewai_agent_flow"
mlflow.set_experiment(experiment)

mlflow.crewai.autolog()

In [0]:
from unitycatalog.ai.core.base import set_uc_function_client
from unitycatalog.ai.core.databricks import DatabricksFunctionClient

client = DatabricksFunctionClient()
set_uc_function_client(client)
CATALOG = 'dhuang'
SCHEMA = 'insurance_agent'

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

# Create a Crew

## Set tools

In [0]:
from unitycatalog.ai.crewai.toolkit import UCFunctionToolkit

uc_tools = [f"{CATALOG}.{SCHEMA}.{func.name}" for func in client.list_functions(catalog=CATALOG,
                                                                                schema=SCHEMA)
            if 'search' in func.name]
print(uc_tools)
toolkit = UCFunctionToolkit(function_names=uc_tools)
tools = toolkit.tools
search_policy_doc_tool, search_claim_details_tool = tools[:2]

In [0]:
from crewai import Agent, Task, Crew
from crewai import LLM
from crewai.tools import BaseTool

llm = LLM(model="databricks/databricks-meta-llama-3-3-70b-instruct")
#llm = LLM(model="databricks/qyu_gtp_4o")
llm = LLM(model="databricks/databricks-claude-3-7-sonnet")

# Create a flow

## Add a human input tool

In [0]:
class HumanInputTool(BaseTool):
    name: str = "Human interact"
    description: str = (
        "As user questions and gather user response"
    )

    def _run(self, argument: str) -> str:
        print(argument)
        res = input(f"{argument} \n")
        return res


human_input = HumanInputTool()

## Create Crews

In [0]:
class TriageCrew:
    def triage_agent(self) -> Agent:
        return Agent(
            role="triage agent",
            goal="analysis customer query and delegate to appropriate specialist, if you cannot "
                    "identify the goal of query, ask user for clarification",
            backstory="The triage agent is responsible for analyzing the customer query and "
                        "delegate the task to the appropriate agent. If the goal query cannot be "
                        "identified, ask the user for clarification",
            verbose=True,
            tools=[human_input],
            llm=llm
        )

    def triage_task(self) -> Task:
        return Task(
            description="Analyze the customer query: {query} route to appropriate "
                        "specialist, if the customer is asking general insurance policy question return 'policy'"
                        "if the customer is asking claim related question or their own policy questions, return 'claim'",
            expected_output="return the word claim, policy",
            agent=self.triage_agent(),
        )

    def crew(self) -> Crew:
        return Crew(
            agents=[self.triage_agent()],
            tasks=[self.triage_task()],
            memory=False,
            verbose=True
        )

In [0]:
class ClaimCrew:
    def claim_agent(self) -> Agent:
        return Agent(
            role="claim agent",
            goal="Provide accurate and helpful information about customer policy and claims",
            backstory=""" You are an insurance claims specialist. 
                If you are speaking to a customer, you probably were transferred to you from the triage agent.
                If the customer does not have a policy number, explain that you need it to access their claim information.

                use the search_policy_details_by_policy_number tool to retrieve customer profile and their policy information.
                use the search_claim_by_policy_number tool to retrieve claim status and details.
                Your job is to:
                1. Process customer inquiries related to claims
                2. Look up customer profiles or retrieve claim information
                3. Provide status updates on existing claims
                4. Guide customers through the claims process
                """,
            verbose=True,
            allow_delegation=False,
            tools=[search_claim_details_tool, search_customer_policy_details_tool, human_input],
            llm=llm
        )

    def claim_task(self) -> Task:
        return Task(
            description=(
                """Process this claim-related customer request based on: 
                query: {query}

                If you have a policy number,
                - use the search_policy_details_by_policy_number tool to retrieve customer profile 
                    and policy information.
                - use the search_claim_by_policy_number tool to retrieve existing claim information.
                
                Provide a helpful, detailed response about their claim. If you don't have their
                policy_number, explain that you need it to access their claim information."""
            ),
            expected_output="A helpful response to the customer's claim query",
            agent=self.claim_agent(),
        )

    def crew(self) -> Crew:
        return Crew(
            agents=[self.claim_agent()],
            tasks=[self.claim_task()],
            memory=False,
            verbose=True
        )

In [0]:
class PolicyCrew:
    def policy_agent(self) -> Agent:
        return Agent(
            role="policy agent",
            goal="asnwer general policy related queries",
            backstory="""You are a general policy FAQ agent.

                    Your job is to:
                    1. Identify the last question asked by the customer. clarify if needed.
                    2. Use the search_policy_doc_tool to answer the question. Do not rely on your own knowledge.""",
            verbose=True,
            allow_delegation=False,
            tools=[search_policy_doc_tool, human_input],
            llm=llm
        )

    def policy_task(self) -> Task:
        return Task(
            description="Answer the general policy related query: {query}",
            expected_output="answer the general policy queries by using search_policy_doc_tool for document context",
            agent=self.policy_agent()
        )

    def crew(self) -> Crew:
        return Crew(
            agents=[self.policy_agent()],
            tasks=[self.policy_task()],
            memory=False,
            verbose=True
        )

## Create a flow with unstructured state

In [0]:
from crewai.flow.flow import Flow, listen, router, start

class InsuranceOperatorFlow(Flow):

    @start()
    def analyze_query(self):
        if self.state['query']:
            response = TriageCrew().crew().kickoff(inputs={'query': self.state['query']})
            self.state['intent'] = response.raw

    @router(analyze_query)
    def triage_task(self):
        if self.state['intent'] == 'policy' or self.state['intent'] == 'Policy':
            return "policy"
        elif self.state['intent'] == 'claim' or self.state['intent'] == 'Claim':
            return "claim"
        else:
            return "Cannot identify customer intent"

    @listen("claim")
    def claim_question(self):
        ClaimCrew().crew().kickoff(inputs={'query': self.state['query']})

    @listen("policy")
    def policy_question(self):
        PolicyCrew().crew().kickoff(inputs={'query': self.state['query']})


## Plot the flow

In [0]:
flow = InsuranceOperatorFlow()
flow.plot()

In [0]:
html_diagram = open('./crewai_flow.html', 'r').read()
styled_html_content = f'<div style="height:800px;">{html_diagram}</div>'
displayHTML(styled_html_content)

In [0]:
import nest_asyncio
nest_asyncio.apply()

flow.kickoff({'query': "What is the maximum the policy will pay for damages arising out of bodily injury to two or more persons in any one motor vehicle accident?"})

In [0]:
import nest_asyncio
nest_asyncio.apply()

with mlflow.start_span(name="insurance_agent", span_type="AGENT") as span:
    print("[AGENT] Hello! How may I assist you?")
    while True:
        user_input = input("[USER]: ")
        if user_input.lower() == "exit":  # Break loop if user types 'exit'
            print("[AGENT]: Bye!")
            break
        if not user_input:
            continue
        try:
            result = flow.kickoff({"query": user_input})
            print("\n[AGENT]:", result.raw)
        except Exception as e:
            print(f"\nError occurred: {str(e)}")

## Create a flow with structured state and converstaion history

### Update crew to consider conversation history

In [0]:
class TriageCrew:
    def triage_agent(self) -> Agent:
        return Agent(
            role="triage agent",
            goal="analysis customer query and delegate to appropriate specialist, if you cannot "
                    "identify the goal of query, ask user for clarification",
            backstory="The triage agent is responsible for analyzing the customer query and "
                        "delegate the task to the appropriate agent. If the goal query cannot be "
                        "identified, ask the user for clarification",
            verbose=False,
            tools=[human_input],
            llm=llm
        )

    def triage_task(self) -> Task:
        return Task(
            description="Analyze the customer query: {query} and converstaion history: {conversation_history}"
                        "route to appropriate specialist:" 
                        "if the customer is asking general insurance policy question return 'policy'"
                        "if the customer is asking claim related question or their own policy questions, return 'claim'",
            expected_output="return the word claim, policy",
            agent=self.triage_agent(),
        )

    def crew(self) -> Crew:
        return Crew(
            agents=[self.triage_agent()],
            tasks=[self.triage_task()],
            memory=False,
            verbose=False
        )

class ClaimCrew:
    def claim_agent(self) -> Agent:
        return Agent(
            role="claim agent",
            goal="Provide accurate and helpful information about customer policy and claims",
            backstory=""" You are an insurance claims specialist. 
                If you are speaking to a customer, you probably were transferred to you from the triage agent.
                If the customer does not have a policy number, explain that you need it to access their claim information.

                use the search_policy_details_by_policy_number tool to retrieve customer profile and their policy information.
                use the search_claim_by_policy_number tool to retrieve claim status and details.
                Your job is to:
                1. Process customer inquiries related to claims
                2. Look up customer profiles or retrieve claim information
                3. Provide status updates on existing claims
                4. Guide customers through the claims process
                """,
            verbose=False,
            allow_delegation=False,
            tools=[search_claim_details_tool, search_customer_policy_details_tool, human_input],
            llm=llm
        )

    def claim_task(self) -> Task:
        return Task(
            description=(
                """Process this claim-related customer request based on: 
                query: {query} and conversation_history: {conversation_history}

                If the customer provided a policy number,
                - use the search_policy_details_by_policy_number tool to retrieve customer profile 
                    and policy information.
                - use the search_claim_by_policy_number tool to retrieve existing claim information.
                
                Provide a helpful, detailed response about their claim. 
                If the customer did not provide a policy_number, 
                explain that you need it to access their claim information."""
            ),
            expected_output="A helpful response to the customer's claim query",
            agent=self.claim_agent(),
        )

    def crew(self) -> Crew:
        return Crew(
            agents=[self.claim_agent()],
            tasks=[self.claim_task()],
            memory=False,
            verbose=False
        )

class PolicyCrew:
    def policy_agent(self) -> Agent:
        return Agent(
            role="policy agent",
            goal="asnwer general policy related queries",
            backstory="""You are a general policy FAQ agent.

                    Your job is to:
                    1. Identify the last question asked by the customer. clarify if needed.
                    2. Use the search_policy_doc_tool to answer the question. Do not rely on your own knowledge.""",
            verbose=False,
            allow_delegation=False,
            tools=[search_policy_doc_tool, human_input],
            llm=llm
        )

    def policy_task(self) -> Task:
        return Task(
            description="Answer the general policy related query: {query} and conversation_history: {conversation_history}",
            expected_output="answer the general policy queries by using search_policy_doc_tool for document context",
            agent=self.policy_agent()
        )

    def crew(self) -> Crew:
        return Crew(
            agents=[self.policy_agent()],
            tasks=[self.policy_task()],
            memory=False,
            verbose=False
        )

In [0]:
from crewai.flow.flow import Flow, listen, router, start
from typing import List, Optional
from pydantic import BaseModel, Field

class ChatMessage(BaseModel):
    role: str
    content: str


class InsuranceOperatorState(BaseModel):
    """Structured state for the insurance chatbot flow"""
    query: str = ""
    intent: str = ""
    conversation_history: List[ChatMessage] = Field(default_factory=list)


class InsuranceOperatorFlowStructured(Flow[InsuranceOperatorState]):

    @start()
    def analyze_query(self):
        if self.state.query:
            response = TriageCrew().crew().kickoff(inputs={'query': self.state.query, 
                                                           'conversation_history': self._format_conversation_history()})
            self.state.intent = response.raw

    @router(analyze_query)
    def triage_task(self):
        if self.state.intent == 'policy' or self.state.intent == 'Policy':
            return "policy"
        elif self.state.intent == 'claim' or self.state.intent == 'Claim':
            return "claim"
        else:
            return "Cannot identify customer intent"

    @listen("claim")
    def claim_question(self):
        ClaimCrew().crew().kickoff(inputs={'query': self.state.query,
                                           'conversation_history': self._format_conversation_history()})

    @listen("policy")
    def policy_question(self):
        PolicyCrew().crew().kickoff(inputs={'query': self.state.query,
                                            'conversation_history': self._format_conversation_history()})


    def _format_conversation_history(self):
        """Format the conversation history for agent context"""
        if not self.state.conversation_history:
            return "No previous conversation."

        formatted = ""
        for message in self.state.conversation_history:
            formatted += f"{message.role.capitalize()}: {message.content}\n\n"

        return formatted.strip()

In [0]:
import nest_asyncio
nest_asyncio.apply()

flow = InsuranceOperatorFlowStructured()

with mlflow.start_span(name="insurance_agent", span_type="AGENT") as span:
    print("[AGENT] Hello! How may I assist you?")
    while True:
        user_input = input("[USER]: ")
        if user_input.lower() == "exit":  # Break loop if user types 'exit'
            print("[AGENT]: Bye!")
            break
        if not user_input:
            continue
        try:
            result = flow.kickoff({"query": user_input, "conversation_history": []})
            print("\n[AGENT]:", result.raw)
        except Exception as e:
            print(f"\nError occurred: {str(e)}")