Podejście autonomiczne

In [122]:
import ollama
import httpx
import os
import json
from typing import Optional, Any
from pydantic import BaseModel
from pprint import pprint
from openai import OpenAI
from typing import Callable


from dotenv import load_dotenv
load_dotenv()

True

In [3]:
db_url = f"{os.environ.get("AG3NTS_CENTRALA_URL")}/apidb"
api_key = os.environ.get("AG3NTS_API_KEY")

In [149]:
class Prompt(BaseModel):
    role: str
    content: Any

class UserPrompt(Prompt):
    role: str = "user"

class SystemPrompt(Prompt):
    role: str = "system"

class ApiDbQuery(BaseModel):
    task: Optional[str] = "database"
    apikey: Optional[str] = api_key
    query: str

class ApiDbResponse(BaseModel):
    reply: Any
    error: str

class AssistantResponse(BaseModel):
    query: str
    thinking: str
    next_step: str
    found_tables: list
    found_structures: dict

class ConversationStep(BaseModel):
    step_number: int
    api_query: ApiDbQuery
    api_response: ApiDbResponse
    agent_thoughts: AssistantResponse

In [154]:
def ollama_query(
        system_prompt: SystemPrompt,
        user_prompt: Optional[UserPrompt] = None, 
        additional_prompts: Optional[list[ConversationStep]] = None,
        temperature: float = 0.8) -> AssistantResponse:
    """
    Hit the backend with query.

    Args:
        user_prompt (str): Main user prompt should contain api response from previous step.
        system_prompt (str): System prompt to be used - constant.
        additional_prompts (list): Collection of additional prompts to be used.
    """
    # serialization is automatically taken care of
    # pydantic ensures that the data is in the correct format
    if user_prompt is None:
        user_prompt = UserPrompt(content="")
    user_prompt_serialized = user_prompt.model_dump()

    system_prompt_serialized = system_prompt.model_dump()

    if additional_prompts is not None:
        additional_prompts_serialized = [prompt.model_dump() for prompt in additional_prompts]
    else:
        additional_prompts_serialized = None

    # build the "messages" list as required by the API
    messages = [system_prompt_serialized, user_prompt_serialized, *(additional_prompts_serialized or [])]

    # build the options
    if temperature:
        options = {"temperature": temperature}
    
    response = ollama.chat(
        model="qwen2.5-coder:32b",
        messages=messages,
        options=options or {}
    )
    content = response["message"]["content"]
    content = json.loads(content)
    print(content)

    return AssistantResponse(
        query=content["query"],
        _thinking=content["_thinking"],
        _next_step=content["_next_step"],
        _found_tables=content["_found_tables"],
        _found_structures=content["_found_structures"]
    )

In [165]:
def openai_query(
        system_prompt: SystemPrompt,
        user_prompt: Optional[UserPrompt] = None, 
        additional_prompts: Optional[list[ConversationStep]] = None,
        temperature: float = 0.8) -> AssistantResponse:
    
    messages = [
        {"role": "system", "content": system_prompt.content},
    ]

    if user_prompt is not None:
        messages.append({"role": "user", "content": user_prompt.content})

    if additional_prompts is not None:
        for prompt in additional_prompts:
            messages.append({"role": "user", "content": prompt.model_dump_json()})

    client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        temperature=temperature
    )
    content = response.choices[0].message.content
    content = json.loads(content)
    return AssistantResponse(**content)

In [74]:
def query_db(query: str):
    """Send prompt to ApiDb"""
    data = ApiDbQuery(query=query)
    response = httpx.post(db_url, data=data.model_dump_json())
    return ApiDbResponse(**response.json())

In [177]:
system_prompt = """
Jesteś autonomicznym agentem specjalizującym się w analizie baz danych SQL. 

ZADANIE:
Znajdź ID aktywnych centrów danych (datacenter), którymi zarządzają nieaktywni managerowie.

DOSTĘPNE NARZĘDZIA:
1. API REST przyjmujące zapytania SQL w formacie:
   {"query": "zapytanie_sql"}
   i zwracające wynik w formacie:
   {"reply": "wynik_zapytania", "error": "ewentualny_błąd"}
2. Specjalne komendy:
   - 'show tables'
   - 'show create table NAZWA_TABELI'

WYMAGANY FORMAT ODPOWIEDZI (WAŻNE!!! - brak jednego z pól spowoduje błąd, brak zawartości pola query spowoduje błąd):
{
    "query": "zapytanie_sql",
    "thinking": "rozumowanie",
    "next_step": "proponowany_następny_krok",
    "found_tables": ["tabele_znalezione_w_bazie"],
    "found_structures": {
        "nazwa_tabeli": "znaleziona_struktura"
    }
}

WAŻNE: upewnij się że pole query jest zdefiniowane w każdej odpowiedzi - ZAWSZE MUSI MIEĆ ZAWARTOŚĆ!
WAŻNE: nie kombinuj z formatem JSONa - ma być zawsze tak jak w przykładzie powyżej! Nie dodawaj żadnych dodatkowych pól! Ani ozdobników!

STRATEGIA DZIAŁANIA:
1. Zbadaj strukturę bazy:
   - Pobierz listę tabel
   - Przeanalizuj strukturę każdej tabeli
   - Zidentyfikuj klucze obce i powiązania

2. Buduj zapytanie iteracyjnie:
   - Zacznij od prostych zapytań testowych
   - Stopniowo dodawaj złożoność
   - Weryfikuj wyniki pośrednie

3. Dokumentuj założenia:
   - Zapisuj znalezione powiązania między tabelami
   - Notuj napotkane ograniczenia
   - Uzasadniaj wybory w '_thinking'

4. Proponuj kolejne kroki:
    - Podawaj konkretne zapytania SQL w "query"
    - Podawaj sugestie co do dalszych kroków w "_next_step"

W każdym kroku będziesz mieć dostęp do wyników poprzednich kroków - historii twoich odpowiedzi i działań oraz historii zapytań REST API.
Historia będzie listą obiektów o takiej postaci:
{"api_query": "query", "api_response": "response", "agent_thoughts": {}}

OGRANICZENIA:
- Nie zakładaj niczego o strukturze bazy - wszystko musi być odkryte dynamicznie
- Każde zapytanie musi być poprzedzone analizą w '_thinking'
- Zawsze proponuj następny krok w '_next_step' i konkretną kwerendę SQL w polu "query"
- WAŻNE: zwracaj sam JSON z odpowiedzią, bez dodatkowych komentarzy które mogą zakłócić działanie aplikacji

W wiadomościach użytkownika w kolejnych krokach otrzymasz wyniki zapytań REST API które zlecisz.

Jeśli uznasz że zadanie jest rozwiązane, zwróć wartość "next_step" równą "DONE".
"""

system_prompt = SystemPrompt(content=system_prompt)

In [131]:
# run a simple test with local llama

# first prompt

test_result = ollama_query(
    system_prompt=system_prompt
)

print(test_result)
print(type(test_result))

{'query': 'show tables', '_thinking': 'Rozpoczynam proces analizy struktury bazy danych poprzez pobranie listy dostępnych tabel.', '_next_step': 'Przeanalizuję otrzymaną listę tabel i sprawdzę strukturę każdej z nich, aby znaleźć powiązania między nimi.', '_found_tables': [], '_found_structures': {}}
query='show tables'
<class '__main__.AssistantResponse'>


In [151]:
# run test with openai

# first prompt

test_result = openai_query(
    system_prompt=system_prompt
)

print(test_result)

query='show tables' thinking='Zaczynam od sprawdzenia, jakie tabele znajdują się w bazie danych. To pozwoli mi zrozumieć, jakie informacje są dostępne i jakie relacje mogą istnieć między tabelami.' next_step='Po uzyskaniu listy tabel, przeanalizuję ich struktury, aby zidentyfikować klucze obce i relacje.' found_tables=[] found_structures={}


In [128]:
def call_agent(api_response: Optional[ApiDbResponse] = None, history: list[ConversationStep] = None, handler: Callable = None):
    """
    Call agent to make a decision.

    Args:
        api_response: previous step response from REST API
        history: list of previous interactions
    """
    if handler is None:
        handler = ollama_query
    print("using handler ", handler)

    if history is None:
        history = []

    if api_response is None:
        user_prompt = None
    else:
        user_prompt = UserPrompt(content=api_response.model_dump_json())
    
    return handler(system_prompt=system_prompt, user_prompt=user_prompt, additional_prompts=history) # TODO: temperature

In [174]:
def perform_agent_step(api_response: Optional[ApiDbResponse] = None, history: list[ConversationStep] = None, handler: Callable = None):
    if handler is None:
        handler = ollama_query
    if history is None:
        history = []
        step = 0
    else:
        step = len(history)
    agent_response = call_agent(api_response=api_response, history=history, handler=handler)
    pprint(agent_response)
    if "DONE" in agent_response.next_step:
        return api_response, history, "done"
    action = agent_response.query
    api_response = query_db(action)
    history.append(ConversationStep(step_number=step, api_query=ApiDbQuery(query=action), api_response=api_response, agent_thoughts=agent_response))
    return api_response, history, "continue"

In [178]:
api, history, status = None, None, "continue"
for i in range(12):
    api, history, status = perform_agent_step(api, history, handler=openai_query)
    if status == "done":
        print("FINISHED, DONE")
        break

using handler  <function openai_query at 0x115650040>
AssistantResponse(query='show tables', thinking='Aby rozpocząć analizę bazy danych, muszę najpierw poznać listę dostępnych tabel. To pozwoli mi na dalsze kroki w kierunku zrozumienia struktury bazy danych.', next_step='Zidentyfikować strukturę każdej z tabel po uzyskaniu listy tabel.', found_tables=[], found_structures={})
using handler  <function openai_query at 0x115650040>
AssistantResponse(query='show create table connections', thinking="Muszę poznać strukturę tabeli 'connections', aby zrozumieć, jakie dane przechowuje i jakie są jej powiązania z innymi tabelami.", next_step="Sprawdzę strukturę tabeli 'connections'. Następnie analogicznie sprawdzę pozostałe tabele.", found_tables=['connections', 'correct_order', 'datacenters', 'users'], found_structures={})
using handler  <function openai_query at 0x115650040>
AssistantResponse(query='show create table datacenters', thinking="Aby kontynuować budowanie pełnego obrazu struktury ba

In [187]:
answer = [item["dc_id"] for item in api.reply]
answer

['4278', '9294']

## Check

In [188]:
from aidevs3.poligon import send

load_dotenv()

key = os.environ.get("AG3NTS_API_KEY")
url = f"{os.environ.get("AG3NTS_CENTRALA_URL")}/report"

res = send(url, answer=answer, apikey=key, task="database")
print(res)

{'code': 0, 'message': '{{FLG:KNOWLEDGE}}'}


In [185]:
res = query_db("select * from users where is_active = 0")
res.reply

[{'id': '3',
  'username': 'Azazel',
  'access_level': 'removed',
  'is_active': '0',
  'lastlog': '2026-11-05'},
 {'id': '28',
  'username': 'Rafał',
  'access_level': 'user',
  'is_active': '0',
  'lastlog': '2029-05-11'},
 {'id': '51',
  'username': 'Patrycja',
  'access_level': 'user',
  'is_active': '0',
  'lastlog': '2023-05-04'},
 {'id': '62',
  'username': 'Igor',
  'access_level': 'user',
  'is_active': '0',
  'lastlog': '2023-10-28'},
 {'id': '91',
  'username': 'Sylwia',
  'access_level': 'user',
  'is_active': '0',
  'lastlog': '2024-02-06'}]

In [186]:
res  = query_db("select * from datacenters where is_active = 1")
res.reply

[{'dc_id': '1226', 'location': 'Kraków', 'manager': '44', 'is_active': '1'},
 {'dc_id': '6491', 'location': 'Wrocław', 'manager': '31', 'is_active': '1'},
 {'dc_id': '1405', 'location': 'Łódź', 'manager': '13', 'is_active': '1'},
 {'dc_id': '4278', 'location': 'Gdańsk', 'manager': '28', 'is_active': '1'},
 {'dc_id': '9294', 'location': 'Grudziądz', 'manager': '28', 'is_active': '1'},
 {'dc_id': '5637', 'location': 'Pcim', 'manager': '20', 'is_active': '1'}]