In [None]:
%pip install -U litellm langchain_ollama langchain_community langchain_anthropic langchain-tavily langchain_experimental matplotlib langgraph

In [None]:
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
import json

In [None]:
llm_gen_qwen3_coder = ChatOpenAI(
    model="qwen3-coder:30b",  # Указанная модель
    openai_api_base="https://cloud.m1r0.ru/v1",
    openai_api_key="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",
    temperature=0
)
print(f"llm_gen_qwen3_coder инициализирован для модели: {llm_gen_qwen3_coder.model_name}")

In [None]:
with open("./Датасет/flights.json") as file:
    data = json.load(file)

print(data.keys(), data['queries'][0].keys())

DDL_querys = "\n".join([i['statement'] for i in data['ddl']])
SQL_querys = [i['query'] for i in data['queries']]

defolt_retry_nums = 3
MAX_CONCURRENT_CALLS = 2

In [None]:
def query_analize_prompt() -> str:
    """
    Упрощённый промпт для query_analize_agent.
    Агент анализирует до 5 SQL-запросов и возвращает текстовое описание закономерностей,
    часто используемых таблиц, соединений, фильтров и потенциальных точек оптимизации под DDL.
    """
    return """Ты — SQL-аналитик для системы оптимизации Data Lakehouse (Trino + Iceberg + S3).
Вход: до 5 SQL-запросов.  
Твоя цель — описать словами структуру и частые паттерны этих запросов, указав:
- какие таблицы чаще всего участвуют;
- какие поля часто используются в фильтрах (WHERE/ON);
- по каким полям часто происходит соединение (JOIN);
- какие операции чаще всего выполняются (GROUP BY, ORDER BY, DISTINCT, WINDOW);
- какие конструкции создают нагрузку или могут быть оптимизированы через DDL;
- какие улучшения можно предложить на уровне DDL (партиционирование, денормализация, сортировка и т. п.).

### Важно:
0. Обращай внимание на частоту выполнения запроса и его время, чтобы в первую очередь оптимизировать тяжёлые запросы
1. **Запрещено** предлагать или использовать materialized views.
2. **Запрещено** менять DDL или писать SQL-код миграций. Только описывать, что требует оптимизации.  
3. **Запрещено** придумывать статистику, размеры таблиц или время выполнения. Если данных нет — укажи "неизвестно".  
4. **Не упоминай** индексы в классическом RDBMS-смысле (B-Tree и т. д.).  
5. **Не упоминай** безопасность, авторизацию, шифрование и внешние системы.  
6. Используй формулировки, применимые к Trino + Iceberg + S3.  
   (Например: «можно рассмотреть партиционирование по дате» вместо «создать индекс».)  
7. Не возвращай JSON, таблицы или списки ключей. Пиши связный текстовый отчёт с пунктами и примерами.

### Структура ответа:
- Краткое резюме: какие таблицы и поля чаще всего встречаются.  
- Анализ соединений: какие таблицы чаще соединяются между собой, по каким ключам.  
- Анализ фильтров: какие колонки часто участвуют в WHERE/ON (например, event_date, user_id).  
- Анализ операций: какие операции создают нагрузку (JOIN, GROUP BY, DISTINCT, ORDER BY, WINDOW).

Вывод должен быть понятным, логически структурированным и готовым для последующей обработки другой моделью, которая будет строить DDL и миграции на основе твоего анализа."""


In [None]:
def query_summarize_prompt() -> str:
    """
    Возвращает промпт (на русском) для query_summarize_agent.
    Агент получает агрегированные результаты от query_analize_agent (строка/батч)
    и должен выдать компактный, читаемый человеком и машиной, сводный отчёт
    — пригодный для подачи в DDL_agent и migrations_agent.
    """

    return """Ты — агрегирующий аналитик SQL для пайплайна оптимизации Data Lakehouse (стек: Trino + Iceberg + S3).

ВХОД:
- Строка / текст `analize` — объединённые результаты работы query_analize_agent для многих батчей (каждый блок содержит анализ до 5 SQL-запросов).
- Формат входа может быть JSON-подобным или текстовым отчётом (см. предыдущий агент). Разбирай оба варианта; если не можешь распарсить фрагмент — помечь как "UNPARSED fragment".

ЦЕЛЬ:
- Объединить и агрегировать мелкие анализы в единый сводный отчёт, выявив:
  1. "Hot" таблицы — таблицы с наибольшей частотой упоминаний в тяжёлых операциях;
  2. Частые пары JOIN (и их join_keys);
  3. Часто используемые фильтры (колонки в WHERE/ON);
  4. Частые тяжёлые операции (GROUP BY, ORDER BY, DISTINCT, WINDOW, SORT);
  5. Повторяющиеся паттерны, которые можно исправить DDL-изменениями;
  6. Очередность (приоритет) изменений для downstream: DDL_agent -> migrations_agent -> query_optimizer.
- Сформировать короткий и однозначный набор рекомендаций уровня DDL (НЕ писать сам DDL/миграции, а только что и где изменить и почему).

СТРОГИЕ ЗАПРЕЩЕНИЯ (выполняй обязательно):
- НЕЛЬЗЯ предлагать или использовать materialized views.
- НЕЛЬЗЯ писать, изменять или генерировать DDL или миграции — это делают downstream агенты.
- НЕЛЬЗЯ придумывать статистику, объёмы данных, кардинальности или время выполнения; если этих метрик нет — указывай `UNKNOWN`.
- НЕЛЬЗЯ предлагать классические RDBMS-индексы (B-Tree и т.п.). Если предлагаешь что-то похожее, опиши его как "файловая/партиц./сортировка/кластеризация в Iceberg" и пометь `depends_on_iceberg_features`.
- НЕЛЬЗЯ обсуждать авторизацию/аутентификацию/безопасность/внешние сервисы.
- Ответ должен быть ТОЛЬКО текстом отчёта (без генерации JSON/DDL/SQL). Пиши структурированный человекочитаемый отчёт (см. формат ниже).

ПРАВИЛА АГРЕГАЦИИ:
- Подсчитывай частоту встречаемости сущностей (таблиц, колонок, join-пар) по данным `analize`. Если входы не содержат явного счётчика — используй относительную частоту (high/medium/low) основанную на количестве вхождений в анализах; если нельзя определить — ставь `UNKNOWN`.
- Если множество анализов указывает на фильтрацию по колонке `event_date`/`ds` — предлагай PARTITIONING (указывай день/месяц как опции). Добавляй замечание о риске при высокой кардинальности.
- Если одна и та же пара таблиц соединяется очень часто — размышляй о DENORMALIZATION (опиши, какие поля включить), но НЕ генерируй CREATE TABLE.
- Для ORDER BY + LIMIT — указывай возможность SORT_ORDER / предварительной кластеризации в Iceberg и помечай `depends_on_iceberg_features`.
- В каждой рекомендации указывай: цель (что менять), конкретные колонки/таблицы, краткая причина (основанную на evidence fragments из `analize`), ориентировочный impact (HIGH/MEDIUM/LOW/UNKNOWN) и какие метрики/проверки нужны (data_volume, cardinality, query_frequency, avg_runtime).
"""


In [None]:
def ddl_optim_prompt() -> str:
    """
    Промпт для агента ddl_optimize_agent.
    Цель — на основе анализа (query_summarize) и оригинальных DDL (ddl_orig)
    выдать только новые или модифицированные DDL-запросы, оптимизирующие работу под Trino + Iceberg + S3.
    Без пояснений, текста и комментариев.
    """
    return """Ты — DDL-оптимизатор для Trino + Iceberg + S3.

Вход:
- query_summarize: анализ частых фильтров, сортировок, JOIN
- ddl_orig: исходные DDL таблиц

Задача: Создать оптимизированные DDL для Trino + Iceberg.

СИНТАКСИС Trino + Iceberg:
- Таблицы: <catalog>.<schema>.<table>
- Формат хранения файлов: используйте WITH (format = 'PARQUET')
- Партиционирование (Trino/Iceberg): в CREATE TABLE через WITH (partitioning = ARRAY['day(ts_col)'] или 'month(ts_col)'). Не комбинировать year/month/day на одном столбце.
- Свойства: только допустимые Trino/Iceberg ключи в WITH (...). Не использовать 'write.target-file-size-bytes'. Без висячих запятых.

РАЗРЕШЕНО:
- PARTITIONING (через WITH) с допустимыми функциями
- - CREATE TABLE AS SELECT WITH (format = 'PARQUET')
- Создание оптимизированных копий

ЗАПРЕЩЕНО:
- Materialized Views, индексы
- DROP, DELETE, RENAME
- Несовместимый синтаксис

ПРИМЕРЫ:
ALTER TABLE analytics.sales.orders PARTITIONING (через WITH) year(order_date);
CREATE TABLE analytics.sales.orders_new AS SELECT * FROM orders WITH (format = 'PARQUET');
ALTER TABLE analytics.sales.orders -- removed unsafe SET PROPERTIES example

ПРАВИЛА ВЫВОДА:
- Каждый оператор — отдельной строкой; не объединяй множество операторов в одну строку.
- Полные имена таблиц (<catalog>.<schema>.<table>) согласованы с JDBC url.
- Никаких комментариев, префиксов типа "sql ". Только чистые SQL-операторы.

ПРАВИЛА ВЫВОДА:
- Каждый оператор — отдельной строкой; не объединяй множество операторов в одну строку.
- Полные имена таблиц (<catalog>.<schema>.<table>) согласованы с JDBC url.
- Никаких комментариев, префиксов типа "sql ". Только чистые SQL-операторы.

ПРАВИЛА ВЫВОДА:
- Каждый оператор — отдельной строкой; не объединяй множество операторов в одну строку.
- Полные имена таблиц (<catalog>.<schema>.<table>) согласованы с JDBC url.
- Никаких комментариев, префиксов типа "sql ". Только чистые SQL-операторы.

ВЫВОД: Только SQL DDL команды, готовые к выполнению в Trino."""

In [None]:
def migrations_creator_prompt() -> str:
    """
    Промпт для агента migrations_creator_agent.
    Агент получает:
      - query_summarize — сводный анализ SQL-запросов,
      - new_ddl — новые DDL-запросы, сгенерированные ddl_optim_agent.
    Его задача — сгенерировать SQL-миграции для применения новых DDL в безопасном формате,
    совместимом с Trino + Iceberg + S3.
    """
    return """Ты — генератор миграций для Trino + Iceberg + S3.

ВХОД:
- query_summarize: анализ использования таблиц
- new_ddl: оптимизированные DDL-запросы

ЦЕЛЬ:
Сгенерировать безопасные SQL-миграции для применения DDL в Trino + Iceberg.

КРИТИЧЕСКИ ВАЖНЫЕ ПРАВИЛА СИНТАКСИСА:
1. ВСЕ таблицы: <catalog>.<schema>.<table>
2. ТОЛЬКО совместимый с Trino + Iceberg синтаксис
3. НЕТ materialized views, индексов, деструктивных операций
4. Формат файлов: используем 'PARQUET' через WITH (format = 'PARQUET')

РАЗРЕШЕННЫЕ ОПЕРАЦИИ МИГРАЦИИ:
- Не дублировать DDL из new_ddl: миграции не должны повторять CREATE TABLE/CTAS, уже присутствующие в new_ddl.
- CREATE TABLE ... WITH (format = 'PARQUET', partitioning = ARRAY[...])
- ALTER TABLE ... PARTITIONING (через WITH) ... (только допустимые функции: year/month/day/hour/bucket/truncate)
- INSERT INTO new_table SELECT FROM old_table
- CREATE TABLE ... AS SELECT ... WITH (format = 'PARQUET')

ПОРЯДОК МИГРАЦИЙ:
1. Создание новых таблиц с оптимизациями
2. Перенос данных (если нужно)
3. Изменение свойств существующих таблиц
4. Валидация (простая проверка COUNT)

ПРИМЕРЫ ВАЛИДНОГО СИНТАКСИСА:
CREATE TABLE analytics.sales.orders_new
WITH (format = 'PARQUET', partitioning = ARRAY['day(order_date)'])
AS SELECT * FROM analytics.sales.orders;

ALTER TABLE analytics.sales.orders
PARTITIONING (через WITH) year(order_date);

ALTER TABLE analytics.sales.orders
-- removed unsafe SET PROPERTIES example

ВЫВОД:
ТОЛЬКО SQL-команды миграций, готовые к выполнению в Trino.
Без комментариев, пояснений, текста.
Каждая команда на новой строке."""

In [None]:
def query_optimize_prompt() -> str:
    """
    Промпт для агента query_optimizer.
    Агент получает новые DDL-запросы (new_ddl) и один исходный SQL-запрос (query),
    и должен переписать данный запрос так, чтобы он был оптимизирован под новую структуру данных
    (DDL), сохраняя исходную бизнес-логику и корректность результатов.
    Работает в контексте Trino + Iceberg + S3.
    """
    return """Ты — SQL-оптимизатор для Trino + Iceberg + S3.

ВХОД:
- new_ddl: новые DDL таблиц (партиции, денормализации, свойства)
- query: исходный SQL запрос для оптимизации

ЦЕЛЬ:
Переписать SQL запрос для использования новой структуры таблиц из DDL.
Сохранить идентичную бизнес-логику и результат.

ПРАВИЛА ОПТИМИЗАЦИИ:
1. Используй новые таблицы и колонки из DDL
2. Для денормализованных таблиц - убирай JOIN, используй прямые обращения
3. Для партиционированных таблиц - фильтруй по полям партиций
4. Сохраняй все агрегации, фильтры и логику оригинала
5. Используй только синтаксис Trino + Iceberg

ЗАПРЕЩЕНО:
- Менять бизнес-логику (WHERE, JOIN, GROUP BY кроме адаптации к DDL)
- Materialized Views, индексы, временные таблицы
- Комментарии, пояснения, не-SQL текст

ПРИМЕР:
Исходный: SELECT u.id, SUM(o.amount) FROM orders o JOIN users u ON o.user_id = u.id
DDL: CREATE TABLE orders_denorm AS SELECT o.*, u.region FROM orders o JOIN users u ON o.user_id = u.id
Оптимизированный: SELECT user_id, SUM(amount) FROM orders_denorm

АНТИ-ОШИБКИ (соблюдать обязательно):
- Каждый источник в FROM/JOIN обязан иметь алиас; все колонки должны быть квалифицированы этим алиасом.
- Нельзя ссылаться на алиас другой таблицы (пример: pc.client_id, если в FROM pc= l_excursion_payment, где нет client_id). Выбирай корректные поля исходной таблицы или корректируй JOIN.
- Любая подзапрос/CTE обязан иметь алиас (.. ) AS sub; не оставлять безымянные скобки.
- GROUP BY: перечисляй только реально присутствующие в SELECT выражения/поля; не добавлять лишние токены (например, 'b1').
- Скобки и запятые: не оставлять висячих запятых и лишних закрывающих скобок.
- ORDER BY random() использовать только если это требуется задачей; избегать недетерминизма.

ВЫВОД: Только оптимизированный SQL запрос для Trino, без комментариев."""


In [None]:
def critic_prompt() -> str:
    """
    Prompt for LLM‑critic: given original/optimized SQL and other data, return a fixed SQL.
    """
    return """Ты — LLM‑критик SQL для Trino + Iceberg.

ВХОД (plain‑text блоки, без JSON):

old_ddl:
<CREATE TABLE ...>  (по одному на строку; опционально)

new_ddl:
<CREATE TABLE ...>  (по одному на строку; опционально)

migrations:
<CTAS/INSERT INTO ... SELECT ...>  (перенос из старой структуры в новую; опционально)

original:
<исходный SQL>

optimized:
<оптимизированный SQL>

ЦЕЛЬ:
- Самостоятельно найти и исправить ошибки в поле optimized, если они конечно присутствуют. Проведи собственную многошаговую проверку консистентности, используя при наличии DDL (ddl_old/ddl_new) как источник истины по колонкам и таблицам.
- Исходный запрос (original) практически всегда валиден. Разрешено переиспользовать его конструкции (алиасы, выражения, подзапросы, группировки) во втором запросе, но обязательно с учётом уже выполненной оптимизации (например, денормализация, замена источников, добавление партиционных фильтров).
- Проверь строго по чек‑листу:
  1) Алиасы: каждый источник в FROM/JOIN имеет алиас; все колонки квалифицированы правильным алиасом.
  2) Существование колонок: alias.column существует согласно DDL соответствующей таблицы; при денормализации используй новые таблицы из ddl_new. Если DDL отсутствуют — делай правки только по очевидным несоответствиям (алиасы/скобки/Group By), не придумывай колонки.
  3) JOIN‑ключи: ссылки только на колонки, реально присутствующие в соответствующих таблицах; не используй client_id у таблицы, где его нет.
  4) Агрегации: GROUP BY соответствует SELECT (все неагрегированные выражения перечислены), нет мусорных токенов (например, 'b1').
  5) Синтаксис: нет лишних/незакрытых скобок, висячих запятых.
  6) Детерминизм: избегай ORDER BY random() без явной необходимости.
- Если оптимизация меняла структуру: корректно перепиши обращения к колонкам и JOIN с учётом ddl_new.
- Если ошибок НЕТ — верни строку "OK" (без дополнительных слов).
- Если ошибки ЕСТЬ — верни ТОЛЬКО один исправленный SQL‑запрос, без комментариев и текста.

ОБЯЗАТЕЛЬНО:
- Каждый источник в FROM/JOIN должен иметь алиас.
- Все колонки должны быть квалифицированы правильным алиасом.
- Не ссылаться на поля, которых нет в источнике (пример: pc.client_id, если pc= l_excursion_payment, где нет client_id).
- Каждый подзапрос/CTE обязан иметь алиас.
- GROUP BY: перечисляй только выражения/поля из SELECT; не добавляй посторонние токены.
- Скобки и запятые: без висячих запятых и лишних скобок.
- Использовать синтаксис Trino; таблицы — в формате <catalog>.<schema>.<table>.
- Избегать ORDER BY random(), если это не требуется явно.

ВЫВОД:
- Если ошибок нет: ровно "OK".
- Если есть ошибки: только исправленный SQL.
"""


In [None]:
def judge_prompt() -> str:
    """
    Prompt for LLM‑judge: decide if optimized preserves original semantics from structure only.
    """
    return """Ты — LLM‑судья эквивалентности SQL (Trino).

ВХОД (plain‑text блоки, без JSON):

old_ddl:
<CREATE TABLE ...>  (по одному на строку; опционально)

new_ddl:
<CREATE TABLE ...>  (по одному на строку; опционально)

migrations:
<CTAS/INSERT INTO ... SELECT ...>  (перенос из старой в новую; опционально)

original:
<исходный SQL>

optimized:
<оптимизированный SQL>

ЗАДАЧА:
- По структуре SQL (без выполнения) оцени, сохраняет ли optimized бизнес‑логику original (те же агрегаты, фильтры, соединения, проекции) с допустимыми адаптациями под оптимизацию (денормализация, партиционные фильтры и т. п.).
- Учитывай DDL/миграции: если optimized читает данные из новых таблиц (по CTAS/INSERT), интерпретируй соответствия полей между старой и новой структурой.

ВЫВОД:
- Если ошибок нет: ровно "OK".
- Если есть проблема (семантика/структура/синтаксис): верни полный исправленный SQL (один запрос, без комментариев/объяснений).
"""


In [None]:
import asyncio

async def get_answer_async(agent, querys_batch, retry_nums=3):
    for _ in range(retry_nums):
        try:
            prompt = HumanMessage(
                content='\n\n'.join(querys_batch)
            )
            response = await agent.ainvoke({'messages': prompt})
            return response["messages"][-1].content
        except Exception as e:
            print(f"Failed to execute. Error: {repr(e)}")
            print("Retrying...")
    print("OMG_MODEL_CRUSHD!!!")
    return "OMG_MODEL_CRUSHD!!!"

semaphore = asyncio.Semaphore(MAX_CONCURRENT_CALLS)

async def bounded_get_answer_async(agent, batch):
    async with semaphore:
        return await get_answer_async(agent, batch)

query_analize_agent = create_react_agent(
    llm_gen_qwen3_coder,
    tools=[],
    prompt=query_analize_prompt(),
)

querys_batch_prompt = [f"sql-запрос: {i['query']}, количество выполнения:{i['runquantity']}, колличество затраченного времени при едином выполнении запроса{i['executiontime']}" for i in data['queries']]

batches = [
    querys_batch_prompt[5 * i : 5 * (i + 1)]
    for i in range(len(querys_batch_prompt) // 5 + int(len(querys_batch_prompt) % 5 != 0))
]

tasks = [bounded_get_answer_async(query_analize_agent, batch) for batch in batches]

results = await asyncio.gather(*tasks)

analize = "\n\n".join(results)
print(analize)

In [None]:
# summaraize analize from querys

def get_answer(agent, querys_analize):
    for _ in range(defolt_retry_nums):
        try:
            prompt = HumanMessage(
                content= f"Анализ полученных sql запросов{querys_analize}"
            )
            return agent.invoke({'messages': prompt})["messages"][-1].content
        except BaseException as e:
            print(f"Failed to execute. Error: {repr(e)}")
            print("Do retry")
    print("OMG_MODEL_CRUSHD!!!")
    
    return "OMG_MODEL_CRUSHD!!!"

query_summarize_agent = create_react_agent(
    llm_gen_qwen3_coder,
    tools=[],
    prompt=query_summarize_prompt(),
)

query_summarize = get_answer(query_summarize_agent, analize)
print(query_summarize)

In [None]:
# DDL optimize by analize and orig_ddl

def get_answer(agent, query_summarize, ddl_orig):
    for _ in range(defolt_retry_nums):
        try:
            prompt = HumanMessage(
                content= f"Анализ полученных sql запросов:\n{query_summarize} \n\n это оригинальные DDL запросы: \n{ddl_orig}"
            )
            return agent.invoke({'messages': prompt})["messages"][-1].content
        except BaseException as e:
            print(f"Failed to execute. Error: {repr(e)}")
            print("Do retry")
    print("OMG_MODEL_CRUSHD!!!")
    return "OMG_MODEL_CRUSHD!!!"

ddl_optimize_agent = create_react_agent(
    llm_gen_qwen3_coder,
    tools=[],
    prompt=ddl_optim_prompt(),
)

New_DDL = get_answer(ddl_optimize_agent, query_summarize, DDL_querys)
print(New_DDL)

In [None]:
# create migrations on analize and new_ddl

def get_answer(agent, query_summarize, new_ddl):
    for _ in range(defolt_retry_nums):
        try:
            prompt = HumanMessage(
                content= f"Анализ полученных sql запросов:\n{query_summarize} \n\n это Новые DDL запросы: \n{new_ddl}"
            )
            return agent.invoke({'messages': prompt})["messages"][-1].content
        except BaseException as e:
            print(f"Failed to execute. Error: {repr(e)}")
            print("Do retry")
    print("OMG_MODEL_CRUSHD!!!")
    return "OMG_MODEL_CRUSHD!!!"

migrations_creator_agent = create_react_agent(
    llm_gen_qwen3_coder,
    tools=[],
    prompt=migrations_creator_prompt(),
)

migrations = get_answer(migrations_creator_agent, query_summarize, New_DDL)
print(migrations)

In [None]:
codect_info={
        'old_ddl':DDL_querys, 'new_ddl':New_DDL, 'migrations': migrations
    }

judge_agent = create_react_agent(
    llm_gen_qwen3_coder,
    tools=[],
    prompt=judge_prompt(),
)
critic_agent = create_react_agent(
    llm_gen_qwen3_coder,
    tools=[],
    prompt=critic_prompt(),
)

global codect_info, judge_agent, critic_agent


async def cheak_query(old_querys, new_querys):
    global codect_info, judge_agent, critic_agent
    prompt = HumanMessage(
        content=(
            f"старые DDL запросы: \n{codect_info['old_ddl']}\n\n"
            f"Новые DDL запросы: \n{codect_info['new_ddl']}\n\n"
            f"Вот миграции:\n{codect_info['migrations']}"
            f"оригинальный sql запрос:\n{old_querys}"
            f"оптимизированный запрос:\n{new_querys}"
        )
    )

    judge_response = await judge_agent.ainvoke({'messages': prompt})
    cheak_logik = judge_response["messages"][-1].content

    if not 'OK' in cheak_logik:
        new_querys = cheak_logik

    prompt = HumanMessage(
        content=(
            f"старые DDL запросы: \n{codect_info['old_ddl']}\n\n"
            f"Новые DDL запросы: \n{codect_info['new_ddl']}\n\n"
            f"Вот миграции:\n{codect_info['migrations']}"
            f"оригинальный sql запрос:\n{old_querys}"
            f"оптимизированный запрос:\n{new_querys}"
        )
    )

    critic_response = await critic_agent.ainvoke({'messages': prompt})
    cheak_sintax = critic_response["messages"][-1].content

    if not 'OK' in cheak_sintax:
        new_querys = cheak_sintax

    return new_querys

In [None]:
import asyncio

semaphore = asyncio.Semaphore(MAX_CONCURRENT_CALLS)

async def get_answer_async(agent, new_ddl, query, retry_nums=3):
    for _ in range(retry_nums):
        try:
            prompt = HumanMessage(
                content=(
                    f"Новые DDL запросы: \n{new_ddl}\n\n"
                    f"Вот SQL запрос, который надо оптимизировать:\n{query}"
                )
            )
            response = await agent.ainvoke({'messages': prompt})
            return await cheak_query(query, response["messages"][-1].content)
        except Exception as e:
            print(f"Failed to execute for query: {query}. Error: {repr(e)}")
            print("Retrying...")
            await asyncio.sleep(1)
    print(f"OMG_MODEL_CRUSHD!!! for query: {query}")
    return "OMG_MODEL_CRUSHD!!!"

async def get_answer_with_semaphore(agent, new_ddl, query, semaphore):
    async with semaphore:
        return await get_answer_async(agent, new_ddl, query)

query_optimize_agent = create_react_agent(
    llm_gen_qwen3_coder,
    tools=[],
    prompt=query_optimize_prompt(),
)

tasks = [
    get_answer_with_semaphore(query_optimize_agent, New_DDL, query, semaphore)
    for query in SQL_querys
]
results = await asyncio.gather(*tasks)
optim_querys = {
    data["queries"][i]["queryid"]: results[i]
    for i in range(len(results))
}
optim_querys

In [None]:
json_answer = {
    'ddl':data['ddl'] + [
        {'statement': i} for i in New_DDL.replace('\n', ' ').split(';')[:-1]
    ],
    'migrations':[
        {'statement': i} for i in migrations.replace('\n', ' ').split(';')[:-1]
    ],
    'queries':[
        {'queryid': i['queryid'],
         'query': optim_querys[i['queryid']].replace('\n', ' ')} for i in data['queries']
    ]
}

In [None]:
with open('flights_GigaTest_1_2.json', 'w') as fp:
    json.dump(json_answer, fp, indent=4)