1.  We first add the Database Schema, so the LLM knows how to generate the queries.

We do this by using the datatables CREATE statements, and set them to the DB_SCHEMA Variable

In [12]:
DB_SCHEMA = """
CREATE TABLE WorkflowReporting.WorkflowInstances (
    WorkflowID INT IDENTITY(1,1) PRIMARY KEY,  -- Unique identifier for each workflow instance
    ProcessInstanceID UNIQUEIDENTIFIER NOT NULL,  -- Unique identifier for the process instance
    Status NVARCHAR(50) NOT NULL CHECK (Status IN ('Started', 'In Progress', 'Completed', 'Failed', 'Cancelled')),  -- Status of the workflow
    LinkToProcess NVARCHAR(500) NULL,  -- URL or reference link to the process
    Departments NVARCHAR(255) NULL,  -- Departments involved
    StartDate DATETIME2 NOT NULL DEFAULT GETUTCDATE(),  -- Start timestamp
    EndDate DATETIME2 NULL,  -- End timestamp
    BusinessData NVARCHAR(MAX) NULL,  -- Flexible JSON column for additional business data
    CreatedAt DATETIME2 NOT NULL DEFAULT GETUTCDATE(),  -- Record creation timestamp
    UpdatedAt DATETIME2 NULL DEFAULT GETUTCDATE(),  -- Last update timestamp
    CONSTRAINT CK_Status CHECK (Status IN ('Started', 'In Progress', 'Completed', 'Failed', 'Cancelled'))
);

CREATE TABLE WorkflowReporting.WorkflowUserTasks (
    TaskID INT IDENTITY(1,1) PRIMARY KEY,  -- Unique identifier for each task
    WorkflowID INT NOT NULL,  -- Links to WorkflowInstances table
    TaskName NVARCHAR(255) NOT NULL,  -- Name of the task
    AssignedTo NVARCHAR(255) NULL,  -- User assigned to the task
    Status NVARCHAR(50) NOT NULL CHECK (Status IN ('Pending', 'In Progress', 'Completed', 'Failed', 'Cancelled')),  -- Task status
    Priority NVARCHAR(20) NOT NULL DEFAULT 'Normal' CHECK (Priority IN ('Low', 'Normal', 'High', 'Critical')), -- Priority of task
    DueDate DATETIME2 NULL,  -- Task due date
    CompletedDate DATETIME2 NULL,  -- Completion timestamp
    BusinessData NVARCHAR(MAX) NULL,  -- JSON column for additional task-specific business data
    CreatedAt DATETIME2 NOT NULL DEFAULT GETUTCDATE(),  -- Record creation timestamp
    UpdatedAt DATETIME2 NULL DEFAULT GETUTCDATE(),  -- Last update timestamp
    CONSTRAINT FK_Workflow_UserTask FOREIGN KEY (WorkflowID) REFERENCES WorkflowReporting.WorkflowInstances(WorkflowID) ON DELETE CASCADE
);
"""

We also add some sample quries to help the LLM out

In [13]:
SQL_EXAMPLES = [
    {
        'request': 'show me records where foobar is false',
        'response': "SELECT * FROM records WHERE JSON_VALUE(attributes, '$.foobar') = 'false'",
    },
    {
        'request': 'show me records where attributes include the key \"foobar\"',
        'response': "SELECT * FROM records WHERE JSON_QUERY(attributes, '$.foobar') IS NOT NULL",
    },
    {
        'request': 'show me records from yesterday',
        'response': "SELECT * FROM records WHERE CAST(start_timestamp AS DATE) = CAST(DATEADD(DAY, -1, GETDATE()) AS DATE)",
    },
    {
        'request': 'show me error records with the tag \"foobar\"',
        'response': "SELECT * FROM records WHERE level = 'error' AND 'foobar' IN (SELECT value FROM STRING_SPLIT(tags, ','))",
    },
]

We then use pydantic to help create some Models or classes for defining our responses

This means with also have to load those modules

In [3]:
from typing import Annotated, Any, Union
from annotated_types import MinLen
from pydantic import BaseModel, Field

class Success(BaseModel):
    """Response when SQL could be successfully generated."""

    sql_query: Annotated[str, MinLen(1)]
    explanation: str = Field(
        '', description='Explanation of the SQL query, as markdown'
    )


class InvalidRequest(BaseModel):
    """Response the user input didn't include enough information to generate SQL."""

    error_message: str


We can then define our Agent

In [4]:
from typing import Annotated, Any, Union
from typing_extensions import TypeAlias
from pydantic_ai import Agent

Response: TypeAlias = Union[Success, InvalidRequest]
agent: Agent[ Response] = Agent(
    'openai:gpt-4o',
    # Type ignore while we wait for PEP-0747, nonetheless unions will work fine everywhere else
    result_type=Response,  # type: ignore
)

We then define our system prompt.  This is just extra text that goes to the LLM along with the users questions

For this example, we are sending in the DB schema we created before, so the LLM will know how to build the query.

We also need to import a few helper functions.

In [14]:
from datetime import date
from pydantic_ai.format_as_xml import format_as_xml

@agent.system_prompt
async def system_prompt() -> str:
    return f"""\
Given the following MSSQL table of records, your job is to
write a SQL query that suits the user's request.  Please make sure not to
include Limit in the query.

Database schema:

{DB_SCHEMA}

today's date = {date.today()}

{format_as_xml(SQL_EXAMPLES)}
"""

Now we build the methods to connect to a SQL DB

In [15]:
import asyncio
import aioodbc
from contextlib import asynccontextmanager

# Database connection details
server = 'sql.denallix.com'  # e.g., 'localhost' or '192.168.1.1'
database = 'K2Applications'
username = 'WorkflowReporting'
password = ''

dsn = None  # Optional: Use DSN if configured

def get_connection_string():
    return (
        f'DRIVER={{SQL Server}};'
        f'SERVER={server};'
        f'DATABASE={database};'
        f'UID={username};'
        f'PWD={password}'
    )


@asynccontextmanager
async def connect_to_db():
    conn = await aioodbc.connect(dsn=get_connection_string())
    try:
        yield conn
    finally:
        await conn.close()

async def execute_query(query, params=()):
    async with connect_to_db() as conn:
        try:
            async with conn.cursor() as cursor:
                await cursor.execute(query, params)
                results = await cursor.fetchall()
                for row in results:
                    print(row)
                return results
        except Exception as e:
            print("Error executing query:", e)

Ask a question, send it to the LLM, along with the system prompts. 
See what SQL is generated

In [19]:
# Step 1: Generate SQL from user input

prompt = "Can you give a summary of workflows submitted by department?"
print("Step 1: Generating SQL query...")
result = await agent.run(prompt)

if isinstance(result.data, InvalidRequest):
    print("Invalid request:", result.data.error_message)
    
    
sql_query = result.data.sql_query
#print("Generated SQL Query:", sql_query)
print("Generated SQL Query:")
print(f"{sql_query}")

Step 1: Generating SQL query...
Generated SQL Query:
SELECT Departments, COUNT(*) AS NumberOfWorkflows, 
       SUM(CASE WHEN Status = 'Completed' THEN 1 ELSE 0 END) AS CompletedWorkflows,
       SUM(CASE WHEN Status = 'Failed' THEN 1 ELSE 0 END) AS FailedWorkflows,
       SUM(CASE WHEN Status = 'In Progress' THEN 1 ELSE 0 END) AS InProgressWorkflows,
       SUM(CASE WHEN Status = 'Cancelled' THEN 1 ELSE 0 END) AS CancelledWorkflows,
       SUM(CASE WHEN Status = 'Started' THEN 1 ELSE 0 END) AS StartedWorkflows
FROM WorkflowReporting.WorkflowInstances
GROUP BY Departments;


Now go and execute that SQL Query

In [20]:
# Step 2: Execute the SQL query
print("Step 2: Executing the SQL query")
async with connect_to_db() as conn:
    try:
        async with conn.cursor() as cursor:
            await cursor.execute(sql_query)
            results = await cursor.fetchall()
            print("Query Results:")
            for row in results:
                print(f"{row}")
            #return results
    except Exception as e:
        print("Error executing query:", e)

Step 2: Executing the SQL query
Query Results:
('Finance', 6, 1, 1, 0, 3, 1)
('HR', 11, 5, 0, 3, 1, 2)
('IT', 7, 1, 1, 1, 1, 3)
('Marketing', 18, 1, 1, 4, 10, 2)
('Operations', 8, 1, 0, 0, 5, 2)


Finally, send everything back to the LLM, so it can give a well formatted answer.


In [21]:
# Step 3: Pass results back to LLM for summarization
print("Step 3: Requesting summary from the LLM...")
summary_prompt = f"""
The following quesion was asked:
{prompt}
 

Here are the results of that query:
{results}
Please answer the question in a user-friendly and informative way. Highlight any patterns, notable values, or interesting insights. 

"""

summary_agent = Agent("openai:gpt-4o", result_type=str)
summary = await summary_agent.run(summary_prompt)


print("Summary of Results:")
print(f"{summary.data}")

Step 3: Requesting summary from the LLM...
Summary of Results:
Certainly! Let's break down the summary of workflows submitted by each department:

1. **Finance**:
   - **Total Workflows**: 6
   - **Pattern**: The Finance department has a balanced distribution across the workflow stages, with most workflows in the initial stage and one workflow stuck in the later stages. This suggests a relatively smooth process.

2. **HR**:
   - **Total Workflows**: 11
   - **Notable Insights**: HR has the highest number of submitted workflows. Most of their workflows are in the early stages, indicating a potential bottleneck or a need for resource reallocation to progress them further.

3. **IT**:
   - **Total Workflows**: 7
   - **Interesting Point**: IT has a steady advancement of workflows, with a slightly higher number in the final stages, possibly reflecting efficient processing or complexities being resolved effectively.

4. **Marketing**:
   - **Total Workflows**: 18
   - **Key Observations**: 