In [1]:
from dataclasses import dataclass

from autogen_core import (
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
    type_subscription,
)
from autogen_core.models import ChatCompletionClient, SystemMessage, UserMessage
from autogen_ext.models.azure import AzureAIChatCompletionClient
from azure.core.credentials import AzureKeyCredential

import pandas as pd

### Message Protocol
Defining the message that will be passed between agents. Will need to change/add new message definitions for our purposes. (SQL code, instructions, etc)

In [2]:
@dataclass
class Message:
    content: str

@dataclass
class GenerateInstructionMessage:
    content: str
    feedback: str

@dataclass
class SQLQueryMessage:
    question: str
    query: str

@dataclass
class ReviewMessage:
    question: str
    query: str
    result: pd.DataFrame

### Agents
Define the type of agents we want

Whats currently done:
- [x] Renaming of agents and topics

##### Orchestrator Agent (#1)
Given an input message from a client asking for data, this agent will ingest that information, and reform the asks into proper questions, with the correct details (client name, billing division, time period, etc) and send that message into a queue to be processed downstream.

To do:
- [ ] Implement logic to separate asks and send each as a message
- [ ] Rewrite system message

In [4]:
@type_subscription(topic_type=query_extractor_topic_type)
class QueryExtractorAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A query extractor agent.")

        # TO-DO: NEED TO REPLACE THIS SYSTEM MESSAGE
        self._system_message = SystemMessage(
            content=(
                "You are an insurance analyst Extract key information from the client's email: what specific data they need, any metrics or thresholds mentioned, time periods, and specific populations they're asking about."
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_user_request(self, message: Message, ctx: MessageContext) -> None:
        prompt = f"Request: {message.content}"
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\n{self.id.type}:\n{response}")

        await self.publish_message(Message(response), topic_id=TopicId(report_finder_topic_type, source=self.id.key))

##### Report Finder Agent (#2)
Taking a query with all relevant information, we look for an existing report that is able to answer the question. If it thinks it found a report, a human should have a say in verifying it's the correct report.

If it cannot find a report that can answer the question message, we send the message further to the SQL generator agent to generate an SQL query that can answer it.

To Do:
- [ ] Implement logic to search through existing reports
- [ ] Implement logic to send a report to the report generator agent
- [ ] Implement logic to send a report to the SQL generator agent

In [5]:
@type_subscription(topic_type=report_finder_topic_type)
class ReportFinderAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A report finder agent.")
        self._system_message = SystemMessage(
            content=(
                "You are a Report Finder Agent for an insurance company's BA reporting"
                " system. Your task is to carefully analyze a client question about their"
                " insurance plans and determine if an existing report in our database "
                "answers their query. Search the knowledge base thoroughly using these "
                "parameters. If you find a matching report, start your message with *FOUND*"
                "and explain why the report matches the query and provide the report ID "
                "and SQL query. If no existing report matches the client's needs, start "
                "your message with *NOT FOUND* and provide detailed information about "
                "what specific data would be needed to answer their question. Always "
                "maintain a formal, professional tone appropriate for insurance industry "
                "communication."
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_question(self, message: Message, ctx: MessageContext) -> None:
        prompt = f"Below is the question to answer:\n\n{message.content}"

        # To Do: Implement a RAG agent to search existing reports

        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        # response = llm_result.content
        response = ""
        assert isinstance(response, str)
        print(f"{'-'*80}\n{self.id.type}:\n{response}")

        # To Do: send message to either report generator agent
        if response.startswith("*FOUND*"):
            await self.publish_message(Message(response), topic_id=TopicId(query_execution_topic_type, source=self.id.key))
        else:
            await self.publish_message(
                GenerateInstructionMessage(message.content, feedback=""),
                topic_id=TopicId(sql_generator_topic_type, source=self.id.key))

In [6]:
@type_subscription(topic_type=sql_generator_topic_type)
class SQLGeneratorAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A SQL Query Generator agent.")
        self._system_message = SystemMessage(
            content=(
                "You are a SQL Query Generator for an insurance company's database system. Analyze "
                "the client's question carefully to identify exactly what data they need. Create "
                "a SQL query for an Azure SQL Database that answers the provided question "
                "by filtering and extracting the relevant data. Always include "
                "appropriate WHERE clauses to limit results based on the specific parameters in "
                "the client's question (date ranges, dollar amounts, plan types, etc.). Your "
                "queries should be optimized for performance and follow best practices for SQL. "
                "Include clear comments in your SQL code explaining the purpose of each section. "
                "Ensure your query returns data in a format that directly answers the client's "
                "question. The database contains the following tables containing information on "
                "claims, benefits, plan members, and plans:"
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_question(self, message: GenerateInstructionMessage, ctx: MessageContext) -> None:
        prompt = f"Question to convert to SQL Query:\n{message.content}."
        if message.feedback:
            prompt += f"Using this feedback:\n{message.feedback}"
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\n{self.id.type}:\n{response}")
        question_and_query = SQLQueryMessage(question=message.content, query=response)
        await self.publish_message(question_and_query, topic_id=TopicId(query_execution_topic_type, source=self.id.key))

In [7]:
@type_subscription(topic_type=query_execution_topic_type)
class QueryExecutionAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A SQL Query Execution agent.")
        self._system_message = SystemMessage(
            content=(
                "You are a SQL Query Execution agent that takes an Azure SQL "
                "query, verifies that it is correct, and outputs the final "
                "refined query (if needed). If not, please ONLY output the SQL query."
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_sql_query(self, message: SQLQueryMessage, ctx: MessageContext) -> None:
        prompt = f"Input SQL Query:\n{message.query}."
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\n{self.id.type}:\n{response}")
        # TO-DO: Write code to run the query against the database
        response_for_review = ReviewMessage(
            question=message.question,
            query=message.query,
            result=response
        )
        await self.publish_message(response_for_review, topic_id=TopicId(ready_for_review_topic_type, source=self.id.key))

In [8]:
@type_subscription(topic_type=ready_for_review_topic_type)
class ReviewerAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient) -> None:
        super().__init__("A Result Reviewer agent.")
        self._system_message = SystemMessage(
            content=(
                "You are a reviewer agent tasked with comparing a question, "
                "the corresponding SQL query, and the head of the result " 
                "table to ensure that the original question is answered correctly."
                "You are a Reviewer Agent responsible for quality control in an "
                "insurance reporting system. Your critical role is to verify that "
                "the the corresponding SQL query, and the head of the result " 
                "table truly answers the client's original question. Then carefully "
                "examine the resulting tables provided to you. Verify that: "
                "1) All requested information is present and complete; "
                "2) The data directly addresses the specific parameters mentioned by "
                "the client (time periods, dollar amounts, specific populations); "
                "3) The format is clear and professional; "
                "4) No extraneous or confusing information is included. If the report "
                "fully satisfies the client's query, approve it by ONLY responding "
                "with **APPROVED**. If any information is missing or inappropriate, "
                "clearly articulate what needs to be corrected under a **FEEDBACK** heading. "
                "Use your insurance industry expertise to ensure the information would "
                "be valuable and understandable to the client."
            )
        )
        self._model_client = model_client

    @message_handler
    async def handle_result(self, message: ReviewMessage, ctx: MessageContext) -> None:
        prompt = (f"Review the following question:\n{message.question}."
                  f"SQL Query:\n{message.query}"
                  f"Result:\n {message.result}"
                #   f"Result:\n {message.result.head(10)}"
                )
        llm_result = await self._model_client.create(
            messages=[self._system_message, UserMessage(content=prompt, source=self.id.key)],
            cancellation_token=ctx.cancellation_token,
        )
        response = llm_result.content
        assert isinstance(response, str)
        print(f"{'-'*80}\n{self.id.type}:\n{response}")
        # TO-DO: Write code to run the query against the database
        if response.startswith("**APPROVED**"):
            await self.publish_message(message, topic_id=TopicId(result_topic_type, source=self.id.key))
        else:
            await self.publish_message(
                GenerateInstructionMessage(message.question, feedback=response),
                topic_id=TopicId(result_topic_type, source=self.id.key))

In [9]:
@type_subscription(topic_type=result_topic_type)
class EmailDraftAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("A user agent that outputs the final answer to the user.")

    @message_handler
    async def handle_final_copy(self, message: ReviewMessage, ctx: MessageContext) -> None:
        print(f"\n{'-'*80}\n{self.id.type} Creating an email draft...")
        print(f"Question: {message.question}\n Query: {message.query}\n Result: {message.result}")
        # To Do: Write code to handle putting the resulting table into a XLSX, 
        # and sending it to an email client as a draft.

## Workflow

### Topics
Messages meant for a specific topic, along with the assigned agent type

In [2]:
query_extractor_topic_type = "QueryExtractorAgent"
report_finder_topic_type = "ReportFinderAgent"
sql_generator_topic_type = "SQLGeneratorAgent"
query_execution_topic_type = "QueryExecutionAgent"
ready_for_review_topic_type = "ReviewerAgent"
result_topic_type = "EmailDraftAgent"

In [None]:

import os
from dotenv import load_dotenv
os.environ['ODBCINI'] = '/opt/homebrew/etc/odbc.ini'
os.environ['ODBCINSTINI'] = '/opt/homebrew/etc/odbcinst.ini'
# # Set DYLD_LIBRARY_PATH for macOS
# os.environ['DYLD_LIBRARY_PATH'] = '/opt/homebrew/lib:' + os.environ.get('DYLD_LIBRARY_PATH', '')
print(os.environ.get('ODBCINI'))
print(os.environ.get('ODBCINSTINI'))

load_dotenv(override=True)

from importlib import reload
from message_classes import Message, GenerateInstructionMessage, SQLQueryMessage, ReviewMessage
from agents import QueryExtractorAgent, ReportFinderAgent, SQLGeneratorAgent, QueryExecutionAgent, ReviewerAgent, EmailDraftAgent


FOUNDRY_KEY = os.environ.get('FOUNDRY_KEY')
model_client = AzureAIChatCompletionClient(
    model="gpt-4o-mini",
    endpoint="https://msfthackathong9909733395.services.ai.azure.com/models",
    # Created an AI Foundry Hub, then a project within it, then a model. 
    # Can find and access the inference endpoint and Key within the Project -> My assets -> Models + endpoints
    credential=AzureKeyCredential(FOUNDRY_KEY),
    model_info={
        "json_output": False,
        "function_calling": False,
        "vision": False,
        "family": "unknown",
    },
)

# result = await model_client.create([UserMessage(content="What is the capital of France?", source="user")])
# print(result)

runtime = SingleThreadedAgentRuntime()

await QueryExtractorAgent.register(
    runtime, type=query_extractor_topic_type, factory=lambda: QueryExtractorAgent(model_client=model_client)
)

await ReportFinderAgent.register(runtime, type=report_finder_topic_type, factory=lambda: ReportFinderAgent(model_client=model_client))

await SQLGeneratorAgent.register(
    runtime, type=sql_generator_topic_type, factory=lambda:SQLGeneratorAgent(model_client=model_client)
)

await QueryExecutionAgent.register(
    runtime, type=query_execution_topic_type, factory=lambda:QueryExecutionAgent(model_client=model_client)
)

await ReviewerAgent.register(
    runtime, type=ready_for_review_topic_type, factory=lambda:ReviewerAgent(model_client=model_client)
)

await EmailDraftAgent.register(runtime, type=result_topic_type, factory=lambda: EmailDraftAgent())

  validate_model_info(config["model_info"])


AgentType(type='EmailDraftAgent')

In [4]:
runtime.start()

await runtime.publish_message(
    Message(content=(
        "From: IBM (client_code = 1)"
        "How many of the drug claims paid are Brand vs Generic?"
    )),
    topic_id=TopicId(query_extractor_topic_type, source="default"),
)

await runtime.stop_when_idle()

--------------------------------------------------------------------------------
QueryExtractorAgent:
Key Information Extracted:

- Specific Data Needed: Number of drug claims paid that are categorized as Brand vs Generic.
- Metrics or Thresholds: None mentioned.
- Time Periods: None specified.
- Specific Populations: None specified, but implies focus on drug claims.
--------------------------------------------------------------------------------
ReportFinderAgent:

Entered prompt for Query generation: Question to convert to SQL Query:
Key Information Extracted:

- Specific Data Needed: Number of drug claims paid that are categorized as Brand vs Generic.
- Metrics or Thresholds: None mentioned.
- Time Periods: None specified.
- Specific Populations: None specified, but implies focus on drug claims..
--------------------------------------------------------------------------------
SQLGeneratorAgent:
```sql
-- This query retrieves the count of drug claims paid categorized by drug type (Br

In [None]:
import pyodbc, struct
from azure import identity
import urllib
import urllib.parse
from sqlalchemy import create_engine

from typing import Union
from fastapi import FastAPI
from pydantic import BaseModel

def get_conn(connection_string):
        conn = pyodbc.connect(connection_string)
        # conn = pyodbc.connect("DSN=AzureSQL")
        return conn

/opt/homebrew/etc/odbc.ini
/opt/homebrew/etc/odbcinst.ini


In [None]:
connection_string = os.environ['AZURE_SQL_CONNECTIONSTRING']
print(connection_string)
get_conn(connection_string)

Driver={ODBC Driver 18 for SQL Server};Server=insurance-greenshield.database.windows.net;Database=dev;Authentication=ActiveDirectoryPassword;UID=odl_user_1661078@cloudlabssandbox.onmicrosoft.com;PWD=ctfp28QXN*Vn;


<pyodbc.Connection at 0x1182e2590>

In [12]:
rows = []
with get_conn(connection_string) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT TOP 15 * FROM GS.BENEFITS")
    columns = [column[0] for column in cursor.description]
    print(columns)
    for i, row in enumerate(cursor.fetchall()):
        if i == 0:
            print(f'each row is of type: {type(row)}\n')
        print(row)
        rows.append(row)
    df = pd.DataFrame((tuple(t) for t in rows), columns=columns)
display(df)

Error: ('01000', "[01000] [unixODBC][Driver Manager]Can't open lib 'ODBC Driver 18 for SQL Server' : file not found (0) (SQLDriverConnect)")