In [2]:
import asyncio
from typing import Annotated

from semantic_kernel import Kernel
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.connectors.ai import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents import ChatHistory, FunctionCallContent, FunctionResultContent
from semantic_kernel.functions import KernelArguments, kernel_function

import json

In [3]:
import json
import os
from dotenv import load_dotenv
import psycopg2
from psycopg2.extras import RealDictCursor
load_dotenv()

True

In [15]:
os.getenv("DB_PORT")

'5433'

In [42]:
from decimal import Decimal

class Accounts:
    def __init__(self):
        self.create_connection()

    def create_connection(self):
        print("create_connection function called... ")
        connection = psycopg2.connect(
            dbname=os.getenv("DB_NAME"),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            host=os.getenv("DB_HOST"),
            port=os.getenv("DB_PORT")
        )
        self.connection = connection
        self.cursor = self.connection.cursor(cursor_factory=RealDictCursor)

    @kernel_function(description="Fetches user information for a given account id.")
    def get_account_info(self, account_id: Annotated[str, "account id to look up"]) -> Annotated[str, "Returns the account information."]:
        """
        Fetches account information from the Account table in PostgreSQL database.

        :param account_id (int): ID of the account.
        :return: Account information as a JSON string.
        :rtype: str
        """
        print("get_account_info function called... ")
        try:
            if(self.connection != True):
                self.create_connection()
            
            connection = self.connection
            cursor = self.cursor
            cursor.execute("SELECT * FROM accounts WHERE account_id = %s", (account_id,))
            account_record = cursor.fetchone()
            if account_record:
                # Convert Decimal values to strings
                for key, value in account_record.items():
                    if isinstance(value, Decimal):
                        account_record[key] = str(value)
                return json.dumps({"account_info": account_record})
            else:
                return json.dumps({"error": "Account not found."})
        except Exception as e:
            return json.dumps({"error": str(e)})
        # finally:
        #     if connection:
        #         self.close_connection()
        
    
    @kernel_function(description="Closes the connection to the database.")
    def close_connection(self) -> Annotated[str, "Returns a message indicating the status of the connection closure."]:
        """
        Closes the connection to the PostgreSQL database.

        :return: Message indicating the status of the connection closure.
        :rtype: str
        """
        print("close_connection function called... ")
        try:
            if self.connection:
                self.cursor.close()
                self.connection.close()
            return "Connection closed successfully."
        except Exception as e:
            return str(e)

In [43]:
accounts_instance = Accounts()

create_connection function called... 


In [45]:
accounts_instance.get_account_info(1)

get_account_info function called... 
create_connection function called... 


'{"account_info": {"account_id": 1, "name": "HBL", "email": "abcd@gef.com", "phone": "9876543210", "balance": "99034.50"}}'

In [46]:
accounts_instance.close_connection()

close_connection function called... 


'Connection closed successfully.'

In [47]:
# Simulate a conversation with the agent
USER_INPUTS = [
    "Hello",
    "Could you please give me more information about the account id 2 ?",
    "Could you please give me more information about the account id 1 ?",
    "No more questions. Thank you!",
]


In [48]:
# 1. Create the instance of the Kernel to register the plugin and service
service_id = "agent" 
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
kernel = Kernel()

kernel.add_plugin(Accounts(), plugin_name="accounts_plugin")
kernel.add_service(
            AzureChatCompletion(service_id=service_id,
                                api_key=os.getenv("AZURE_OPENAI_API_KEY"),
                                deployment_name=os.getenv("AZURE_OPENAI_CHAT_COMPLETION_MODEL"),
                                endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
                                
            ))
                                    

create_connection function called... 


In [49]:
# 2. Configure the function choice behavior to auto invoke kernel functions
# so that the agent can automatically execute the menu plugin functions when needed
settings = kernel.get_prompt_execution_settings_from_service_id(service_id=service_id)
settings.function_choice_behavior = FunctionChoiceBehavior.Auto()

In [50]:
# 3. Create the agent
agent = ChatCompletionAgent(
    kernel=kernel,
    name="Host",
    instructions="""Answer questions about the accounts in detail. Do not include the account balance in the response. 
                Provide information about one account at a time only.
                After the customer confirms there are no more questions, close the Database connection""",
    arguments=KernelArguments(settings=settings),
)


In [51]:
# 4. Create a chat history to hold the conversation
chat_history = ChatHistory()

In [52]:
for user_input in USER_INPUTS:
    # 5. Add the user input to the chat history
    chat_history.add_user_message(user_input)
    print(f"# User: {user_input}")
    # 6. Invoke the agent for a response
    async for content in agent.invoke(chat_history):
        print(f"# {content.name}: ", end="")
        if (
            not any(isinstance(item, (FunctionCallContent, FunctionResultContent)) for item in content.items)
            and content.content.strip()
        ):
            # We only want to print the content if it's not a function call or result
            print(f"{content.content}", end="", flush=True)
    print("")  # Print a newline to separate the messages

# User: Hello
# Host: Hello! How can I assist you today? Do you have any questions about your accounts?
# User: Could you please give me more information about the account id 2 ?
get_account_info function called... 
create_connection function called... 
# Host: Here are the details for account ID 2:

- **Name**: Vikas
- **Email**: abcd@gef.com
- **Phone**: 9876543210

If you have any more questions about this account or if there's anything else I can help you with, just let me know!
# User: Could you please give me more information about the account id 1 ?
get_account_info function called... 
create_connection function called... 
# Host: Here is the information for account ID 1:

- **Name:** HBL
- **Email:** abcd@gef.com
- **Phone:** 9876543210

If you have any more questions or need information on another account, feel free to ask!
# User: No more questions. Thank you!
close_connection function called... 
# Host: You're welcome! If you have any more questions in the future, feel free 

In [2]:
from pydantic import BaseModel, Field, ValidationError
import json

# 1. Define Data Schemas
class OrderDetails(BaseModel):
    order_id: str = Field(pattern=r"^ORD-\d{6}$")
    customer_email: str = Field(..., description="Email of the customer.")
    total_amount: float = Field(ge=0.0)
    items: list[str] = Field(min_length=1)

class AgentAction(BaseModel):
    action_type: str = Field(description="Type of action to perform (e.g., 'process_order', 'send_email').")
    payload: dict = Field(description="Data payload for the action.")

# Simulate an LLM generating a response
def mock_llm_response(prompt: str) -> str:
    if "order 123456" in prompt:
        return '{"order_id": "ORD-123456", "customer_email": "test@example.com", "total_amount": 99.99, "items": ["Laptop", "Mouse"]}'
    elif "invalid order" in prompt:
        return '{"order_id": "INVALID-ID", "customer_email": "bad-email", "total_amount": -10.0}'
    else:
        return '{"action_type": "unknown", "payload": {}}'

# Agent's processing logic
def process_order_request(user_input: str) -> dict:
    # 2. Input Validation (Implicit here, assuming mock_llm_response handles initial parsing)

    # Agent's Reasoning/Tool Use - LLM attempts to generate structured output
    print(f"LLM processing: '{user_input}'")
    llm_raw_output = mock_llm_response(user_input)

    try:
        # 2. Integrate Validation - Parsing LLM output to Pydantic model
        order_details = OrderDetails.model_validate_json(llm_raw_output)
        print(f"Successfully parsed order: {order_details.model_dump_json(indent=2)}")

        # Agent decides on an action based on parsed data
        action = AgentAction(
            action_type="process_order",
            payload=order_details.model_dump() # Payload is the validated order details
        )
        return {"status": "success", "action": action.model_dump()}

    except ValidationError as e:
        print(f"LLM output validation failed: {e.json()}")
        # 3. Error Handling - Returning structured error response
        return {"status": "failed", "error": "Invalid LLM output, unable to parse order details."}
    except json.JSONDecodeError:
        print(f"LLM output is not valid JSON: {llm_raw_output}")
        return {"status": "failed", "error": "LLM output is not valid JSON."}

# --- Simulate agent interactions ---
print("\n--- Valid Order Scenario ---")
result_valid = process_order_request("I want to place order 123456 for a Laptop and Mouse.")
print(f"Agent Result: {json.dumps(result_valid, indent=2)}")

print("\n--- Invalid Order Scenario ---")
result_invalid = process_order_request("Please process an invalid order for me.")
print(f"Agent Result: {json.dumps(result_invalid, indent=2)}")


--- Valid Order Scenario ---
LLM processing: 'I want to place order 123456 for a Laptop and Mouse.'
Successfully parsed order: {
  "order_id": "ORD-123456",
  "customer_email": "test@example.com",
  "total_amount": 99.99,
  "items": [
    "Laptop",
    "Mouse"
  ]
}
Agent Result: {
  "status": "success",
  "action": {
    "action_type": "process_order",
    "payload": {
      "order_id": "ORD-123456",
      "customer_email": "test@example.com",
      "total_amount": 99.99,
      "items": [
        "Laptop",
        "Mouse"
      ]
    }
  }
}

--- Invalid Order Scenario ---
LLM processing: 'Please process an invalid order for me.'
LLM output validation failed: [{"type":"string_pattern_mismatch","loc":["order_id"],"msg":"String should match pattern '^ORD-\\d{6}$'","input":"INVALID-ID","ctx":{"pattern":"^ORD-\\d{6}$"},"url":"https://errors.pydantic.dev/2.10/v/string_pattern_mismatch"},{"type":"greater_than_equal","loc":["total_amount"],"msg":"Input should be greater than or equal to 0",