In [None]:
import os
import asyncio
import psycopg2
from semantic_kernel.functions import kernel_function
from typing import Annotated
from dotenv import load_dotenv

load_dotenv()

class QueryPostgresPlugin:
    """
    Plugin to query a PostgreSQL database using a SQL query.
    The SQL query is provided as an input parameter.
    """

    def __init__(self, connection_string: str) -> None:
        self._connection_string = connection_string

    @staticmethod
    def __clean_sql_query__(sql_query: str) -> str:
        """Clean the SQL query to remove potentially harmful characters."""
        return sql_query.replace(";", "").replace("\n", " ")

    @kernel_function(
        name="query_postgres",
        description="Query a PostgreSQL database using a SQL query and return the results as a string.")
    async def query_postgres(
        self,
        sql_query: Annotated[str, "SQL query to be executed"]) -> Annotated[str, "The results of the SQL query as a formatted string"]:
        """
        Executes the SQL query against PostgreSQL using psycopg2 and returns the results.
        Args:
            sql_query: The SQL query to be executed.
        Returns:
            A string representation of the query results or an error message.
        """
        def run_query():
            try:
                conn = psycopg2.connect(self._connection_string)
                cur = conn.cursor()
                # Clean the SQL query.
                query = self.__clean_sql_query__(sql_query)
                cur.execute(query)
                # Retrieve column names before fetching rows.
                col_names = [desc[0] for desc in cur.description] if cur.description else []
                rows = cur.fetchall()
                cur.close()
                conn.close()
                if not rows:
                    return "No results found."
                # Convert rows to a list of dictionaries if column names are available.
                results = [dict(zip(col_names, row)) for row in rows] if col_names else rows
                return str(results)
            except Exception as e:
                return f"Error executing query: {e}"
        
        # Run the synchronous query code in a thread.
        result = await asyncio.to_thread(run_query)
        return result

# Define an asynchronous function to run the query plugin.
async def query_plugin():
    conn_str = os.getenv("POSTGRES_CONNECTION_STRING")
    if not conn_str:
        print("Please set POSTGRES_CONNECTION_STRING in your environment.")
        return

    plugin_instance = QueryPostgresPlugin(conn_str)
    
    sql_query = 'SELECT * FROM public."actor" LIMIT 5'
    
    result = await plugin_instance.query_postgres(sql_query)
    print("Query Plugin Result:")
    print(result)

await query_plugin()



Query Plugin Result:
[{'actor_id': 1, 'first_name': 'Penelope', 'last_name': 'Guiness', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'actor_id': 2, 'first_name': 'Nick', 'last_name': 'Wahlberg', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'actor_id': 3, 'first_name': 'Ed', 'last_name': 'Chase', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'actor_id': 4, 'first_name': 'Jennifer', 'last_name': 'Davis', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'actor_id': 5, 'first_name': 'Johnny', 'last_name': 'Lollobrigida', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}]


In [None]:
class GetSchemaPlugin:
    """
    Plugin to retrieve the schema of tables from a PostgreSQL database.
    It returns table schema, table name, column name, and data type for each column.
    """

    def __init__(self, connection_string: str) -> None:
        self._connection_string = connection_string

    @kernel_function(
        name="get_schema",
        description="Retrieves the schema of tables from the PostgreSQL database. "
                    "Returns table names, column names, and data types as a formatted string.")
    async def get_schema(
        self,
        _: Annotated[str, "Unused parameter for compatibility"]
    ) -> Annotated[str, "The schema details as a formatted string"]:
        """
        Connects to PostgreSQL using psycopg2, retrieves schema details from the information_schema.columns view,
        and returns the results as a formatted string.
        """
        def run_schema_query():
            try:
                # Connect to the PostgreSQL database.
                conn = psycopg2.connect(self._connection_string)
                cur = conn.cursor()

                # Query to retrieve schema details excluding internal schemas.
                query = """
                    SELECT table_schema, table_name, column_name, data_type
                    FROM information_schema.columns
                    WHERE table_schema NOT IN ('information_schema', 'pg_catalog')
                    ORDER BY table_schema, table_name, ordinal_position;
                """
                cur.execute(query)
                rows = cur.fetchall()
                cur.close()
                conn.close()

                if not rows:
                    return "No schema information found."

                # Format the output.
                schema_details = []
                for table_schema, table_name, column_name, data_type in rows:
                    schema_details.append(f"{table_schema}.{table_name} - {column_name} ({data_type})")
                return "\n".join(schema_details)
            except Exception as e:
                return f"Error retrieving schema: {e}"

        # Run the synchronous schema query in a thread.
        result = await asyncio.to_thread(run_schema_query)
        return result

# Define an asynchronous function to run the schema plugin.
async def schema_plugin():
    conn_str = os.getenv("POSTGRES_CONNECTION_STRING")
    if not conn_str:
        print("Please set POSTGRES_CONNECTION_STRING in your environment.")
        return

    plugin_instance = GetSchemaPlugin(conn_str)
    
    result = await plugin_instance.get_schema("")
    print("Schema Plugin Result:")
    print(result)

await schema_plugin()

Schema Plugin Result:
public.actor - actor_id (integer)
public.actor - first_name (character varying)
public.actor - last_name (character varying)
public.actor - last_update (timestamp without time zone)
public.actor_info - actor_id (integer)
public.actor_info - first_name (character varying)
public.actor_info - last_name (character varying)
public.actor_info - film_info (text)
public.address - address_id (integer)
public.address - address (character varying)
public.address - address2 (character varying)
public.address - district (character varying)
public.address - city_id (smallint)
public.address - postal_code (character varying)
public.address - phone (character varying)
public.address - last_update (timestamp without time zone)
public.category - category_id (integer)
public.category - name (character varying)
public.category - last_update (timestamp without time zone)
public.city - city_id (integer)
public.city - city (character varying)
public.city - country_id (smallint)
public.

In [3]:
import os
import sys
from dotenv import load_dotenv
import asyncio
import logging
import requests
import uuid
import json
from datetime import datetime
from typing import List, Dict, Any, Optional
from azure.identity import DefaultAzureCredential
from azure.core.credentials import AccessToken
from semantic_kernel import Kernel
from semantic_kernel.utils.logging import setup_logging
from semantic_kernel.functions import kernel_function
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.chat_completion_client_base import ChatCompletionClientBase
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.functions.kernel_arguments import KernelArguments
from semantic_kernel.contents.chat_history import ChatHistory
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.functions.kernel_function_from_prompt import KernelFunctionFromPrompt
from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import (
    AzureChatPromptExecutionSettings,
)
from semantic_kernel.connectors.ai.open_ai import (
    AzureChatCompletion,
    AzureChatPromptExecutionSettings,
)
# def setup_logging():
#     logging.basicConfig(level=logging.DEBUG)

load_dotenv()

print(os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"))
kernel = Kernel()
kernel.add_service(AzureChatCompletion(service_id=os.getenv('GLOBAL_LLM_SERVICE')))

settings = kernel.get_prompt_execution_settings_from_service_id(os.getenv("GLOBAL_LLM_SERVICE"))
settings.function_choice_behavior = FunctionChoiceBehavior.Auto()

conn_str = os.getenv("POSTGRES_CONNECTION_STRING")
if not conn_str:
    raise ValueError("POSTGRES_CONNECTION_STRING environment variable not set.")

query_plugin = QueryPostgresPlugin(conn_str)
schema_plugin = GetSchemaPlugin(conn_str)

agent = ChatCompletionAgent(
    kernel=kernel,
    name="SQLAssistantAgent",
    instructions="""
You are a helpful assistant that retrieves data from a PostgreSQL database.
When a user asks for information, get schema using GetSchemaPlugin. Look for relevant tables and columns that will answer user question and generate a SQL query.
Generate SQL query you are going to execute and then use the QueryPostgresPlugin's query_postgres function.
The tables reside in the public schema of the database so use following format: public."table_name",  for example: 'SELECT * FROM public."actor" LIMIT 5'.
Return the result of the SQL query.
""",
    plugins=[schema_plugin, query_plugin],
)

user_input = "Show me top 5 rows from the actor table."

chat = ChatHistory()
chat.add_user_message(user_input)
print(f"# User: {user_input}")
response = await agent.get_response(chat)
print(f"# {response.name}: {response}")
chat.add_message(response)

gpt-4o-mini
# User: Show me top 5 rows from the actor table.
# SQLAssistantAgent: Here are the top 5 rows from the actor table:

| actor_id | first_name | last_name       | last_update           |
|----------|------------|------------------|-----------------------|
| 1        | Penelope   | Guiness          | 2013-05-26 14:47:57   |
| 2        | Nick       | Wahlberg         | 2013-05-26 14:47:57   |
| 3        | Ed         | Chase            | 2013-05-26 14:47:57   |
| 4        | Jennifer   | Davis            | 2013-05-26 14:47:57   |
| 5        | Johnny     | Lollobrigida     | 2013-05-26 14:47:57   |


In [4]:
user_input = "Which actors have appeared in the most films, and how many films did they star in?"

chat = ChatHistory()
chat.add_user_message(user_input)
print(f"# User: {user_input}")
response = await agent.get_response(chat)
print(f"# {response.name}: {response}")
chat.add_message(response)

# User: Which actors have appeared in the most films, and how many films did they star in?
# SQLAssistantAgent: Here are the actors who have appeared in the most films, along with the number of films they starred in:

1. Gina Degeneres - 42 films
2. Walter Torn - 41 films
3. Mary Keitel - 40 films
4. Matthew Carrey - 39 films
5. Sandra Kilmer - 37 films
6. Scarlett Damon - 36 films
7. Angela Witherspoon - 35 films
8. Vivien Basinger - 35 films
9. Val Bolger - 35 films
10. Henry Berry - 35 films


In [5]:
user_input = "Which store has the highest number of rentals, and what is the average rental duration?"

chat = ChatHistory()
chat.add_user_message(user_input)
print(f"# User: {user_input}")
response = await agent.get_response(chat)
print(f"# {response.name}: {response}")
chat.add_message(response)

# User: Which store has the highest number of rentals, and what is the average rental duration?
# SQLAssistantAgent: The store with the highest number of rentals is Store ID 2, with a total of 8,121 rentals. The average rental duration is approximately 4.99 days.


In [6]:
user_input = "What is the total payment revenue generated, and how does it break down by film category?"

chat = ChatHistory()
chat.add_user_message(user_input)
print(f"# User: {user_input}")
response = await agent.get_response(chat)
print(f"# {response.name}: {response}")
chat.add_message(response)

# User: What is the total payment revenue generated, and how does it break down by film category?
# SQLAssistantAgent: The total payment revenue generated is **$61,312.04**.

Here’s the breakdown of the total sales by film category:

| Category       | Total Sales |
|----------------|-------------|
| Family         | $3,830.15   |
| Games          | $3,922.18   |
| Animation      | $4,245.31   |
| Classics       | $3,353.38   |
| Documentary    | $3,749.65   |
| Sports         | $4,892.19   |
| New            | $3,966.38   |
| Children       | $3,309.39   |
| Music          | $3,071.52   |
| Travel         | $3,227.36   |
| Foreign        | $3,934.47   |
| Drama          | $4,118.46   |
| Horror         | $3,401.27   |
| Action         | $3,951.84   |
| Comedy         | $4,002.48   |
| Sci-Fi         | $4,336.01   |

If you have any other questions or need further details, feel free to ask!
