# Chat with your Azure PostgreSQL database via Agent Framework (pro-code)

## Simple one agent chat

### let's first do a simple single agent test (no db connection)

In [4]:
from azure.identity import AzureCliCredential
import dotenv
dotenv.load_dotenv()

from agent_framework.azure import AzureOpenAIChatClient
from typing import Annotated
from pydantic import Field
from agent_framework import ai_function

@ai_function(name="translation_tool", description="translate from one language to another") # decorator, optional
def translate(
    text: Annotated[str, Field(description="The text to translate.")],
    source_language: Annotated[str, Field(description="The source language of the text.")],
    target_language: Annotated[str, Field(description="The target language for the translation.")],
) -> str:
    return f"Translated '{text}' from {source_language} to {target_language}."
translation_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions="You are a translation agent that translates text between languages.",
    tools=translate
)


Test it:

In [7]:
result = await translation_agent.run("Translate 'What is the weather like in Amsterdam?' from English to Dutch.")
print(result.text)

Wat voor weer is het in Amsterdam?


### Agent with tools to access Azure PostgreSQL database

Ensure to first run **az login** in terminal...

In [None]:
import psycopg2
from typing import Optional
from pandas import DataFrame
from get_conn import get_connection_uri
class PG_Plugin:

    def __init__(self, db_uri: str):
        self.db_uri = db_uri
        conn = psycopg2.connect(db_uri)
        conn.close()
        print("Connected to company's database successfully.")
    def _get_connection(self):
        """Create a new connection for each operation."""
        return psycopg2.connect(self.db_uri)
    async def get_product_info(self, product_name: Optional[str] = None, product_id: Optional[int] = None) -> list[dict]:
        """Gets all product information from the database given the name or ID of the product."""
        query = """SELECT 
                    product_id,                   
                    name,
                    inventory,
                    price,
                    refurbished,
                    category
                FROM products
                WHERE (LOWER(name) = LOWER(%(product_name)s) AND %(product_name)s IS NOT NULL)
                   OR (product_id = %(product_id)s AND %(product_id)s IS NOT NULL)
                   """
        conn = self._get_connection()
        cursor = conn.cursor()
        if not product_name and not product_id:
            print("No product name or ID provided.")
            return None
        elif product_id:
            cursor.execute(query, {"product_name": None, "product_id": product_id})
        else:
            cursor.execute(query, {"product_name": product_name, "product_id": None})

            
        rows = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        cursor.close()
        conn.close()
        try:
            products= DataFrame(rows, columns=columns)
            products.to_dict(orient="records")  # <-- JSON serializabl
            
            return products.to_dict(orient="records")  # <-- JSON serializabl
        except Exception as e:
            print(f"Error fetching product information: {e}")
            return None


#### Get connection string and initiate a class object

In [None]:
conn_uri = get_connection_uri()
plugin = PG_Plugin(conn_uri)


#### Define agent and pass the tools

In [None]:
chat_client=AzureOpenAIChatClient(credential=AzureCliCredential())
agent = chat_client.as_agent(tools=[plugin.get_product_info])

#### Test the agent!

In [None]:
result = await agent.run("give info on product with id 2")
print(result.text)

## Multi-agent Nl2SQL

In [None]:
import psycopg2
from typing import TypedDict, Annotated, List, Optional
from pandas import DataFrame
import json
from get_conn import get_connection_uri

class PG_Plugin:

    def __init__(self, db_uri: str):
        self.db_uri = db_uri  # Store URI instead of connection
        # Test connection
        conn = psycopg2.connect(db_uri)
        conn.close()
        print("Connected to company's database successfully.")

    def _get_connection(self):
        """Create a new connection for each operation."""
        return psycopg2.connect(self.db_uri)

    async def execute_query(self, query: str) -> list:
        conn = self._get_connection()
        try:
            query_cursor = conn.cursor()
            query_cursor.execute(query)
            if query.strip().upper().startswith("SELECT"):
                result = query_cursor.fetchall()
            else:
                result = ["Only read-only SELECT queries are allowed."]
                conn.commit()     
        except psycopg2.Error as e:
            conn.rollback()
            result = [str(e)]
        finally:
            query_cursor.close()
            conn.close()
        return result

    async def get_schema_info(self) -> str:
        print("Getting schema")
        conn = self._get_connection()
        try:
            query = """
            SELECT
                cols.table_schema,
                cols.table_name,
                cols.column_name,
                cols.data_type,
                cols.is_nullable,
                cons.constraint_type,
                cons.constraint_name,
                fk.references_table AS referenced_table,
                fk.references_column AS referenced_column
            FROM information_schema.columns cols
            LEFT JOIN information_schema.key_column_usage kcu
                ON cols.table_schema = kcu.table_schema
                AND cols.table_name = kcu.table_name
                AND cols.column_name = kcu.column_name
            LEFT JOIN information_schema.table_constraints cons
                ON kcu.table_schema = cons.table_schema
                AND kcu.table_name = cons.table_name
                AND kcu.constraint_name = cons.constraint_name
            LEFT JOIN (
                SELECT
                    rc.constraint_name,
                    kcu.table_name AS references_table,
                    kcu.column_name AS references_column
                FROM information_schema.referential_constraints rc
                JOIN information_schema.key_column_usage kcu
                    ON rc.unique_constraint_name = kcu.constraint_name
            ) fk
                ON cons.constraint_name = fk.constraint_name
            WHERE cols.table_schema = 'public'
            ORDER BY cols.table_schema, cols.table_name, cols.ordinal_position;
            """
            schema_cur = conn.cursor()
            schema_cur.execute(query)
            columns = [desc[0] for desc in schema_cur.description]
            rows = schema_cur.fetchall()
            schema_cur.close()
            schema_info = [dict(zip(columns, row)) for row in rows]
            return json.dumps(schema_info, indent=2)
        finally:
            conn.close()

Get connection string ... (don't forget to do 'az login' in terminal)

In [None]:
conn_uri = get_connection_uri()
plugin = PG_Plugin(conn_uri)

Defining Agents...

In [None]:
chat_client=AzureOpenAIChatClient(credential=AzureCliCredential())
support_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
            "You are a support agent. Analyze the user's request and route to the appropriate specialist:\n"
            "- For getting databsase schema: call handoff_to_schema_agent\n"
            "- For querying database: call handoff_to_service_agent"
            "- If not able to help, let the user know of the reason."
        ),
    name = "support_agent"
)
schema_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions="You are responsible for providing database schema information.",
    tools=[plugin.get_schema_info],
    name = "schema_agent"
)
service_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions="You are a read-only agent for company's product database. Use the schema information provided by the schema agent to answer user questions about the database.",
    tools=[plugin.execute_query],
    name="service_agent"
)


### Hand-off orchestration 

In [None]:
from agent_framework import AgentRunUpdateEvent, HandoffBuilder

async def run_handoff_agent_framework(question) -> None:
    workflow = (
        HandoffBuilder(
            name="support_handoff",
            participants=[support_agent,schema_agent, service_agent],
        ).with_start_agent(support_agent)
        .add_handoff(support_agent, [schema_agent, service_agent])
        .with_termination_condition(lambda conv: sum(1 for msg in conv if msg.role.value == "user") > 3)
        .build()
    )

    print("[Agent Framework] Group chat conversation:")
    current_executor = None
    async for event in workflow.run_stream(question):
        if isinstance(event, AgentRunUpdateEvent):
            # Print executor name header when switching to a new agent
            if current_executor != event.executor_id:
                if current_executor is not None:
                    print()  # Newline after previous agent's message
                print(f"---------- {event.executor_id} ----------")
                current_executor = event.executor_id
            if event.data:
                print(event.data.text, end="", flush=True)


In [None]:
# Run with a question that requires expert selection

await run_handoff_agent_framework("What are the table names?")

In [None]:
# Run with a question that requires expert selection

await run_handoff_agent_framework("which customers left a review?")

### Sequential orchestration

In [None]:
from agent_framework import AgentRunUpdateEvent, SequentialBuilder
async def run_seq_agent_framework(question) -> None:
    seq_workflow = (
        SequentialBuilder().participants([schema_agent, service_agent])
        .build()
    )

    print("[Agent Framework] Group chat conversation:")
    current_executor = None
    async for event in seq_workflow.run_stream(question):
        if isinstance(event, AgentRunUpdateEvent):
            # Print executor name header when switching to a new agent
            if current_executor != event.executor_id:
                if current_executor is not None:
                    print()  # Newline after previous agent's message
                print(f"---------- {event.executor_id} ----------")
                current_executor = event.executor_id
            if event.data:
                print(event.data.text, end="", flush=True)





In [None]:
await run_seq_agent_framework("give all sales info")