# Кдассификация ЭКГ с помощью vLLM и Tool Calling

Этот блокнот демонстрирует создание системы для анализа параметров ЭКГ. Система состоит из нескольких компонентов:
1.  **vLLM сервер**: Запускает модель `microsoft/Phi-4-mini-4k-instruct` с OpenAI-совместимым API.
2.  **MCP сервер**: Предоставляет инструменты (tools) для анализа ЭКГ, такие как классификатор и калькуляторы.
3.  **Оркестратор (ECGAgent)**: Клиент, который принимает пользовательский ввод, общается с LLM для определения намерений, вызывает необходимые инструменты на MCP сервере и запрашивает у LLM финальный структурированный JSON-ответ.
4.  **Структурированный вывод**: Использует возможности vLLM для принудительной генерации JSON по заданной Pydantic-схеме, что повышает надежность.

### Зависимости

In [1]:
!pip install -q "vllm>=0.5.0" "pydantic>=2.0" "openai>=1.0.0" fastmcp aiohttp nest_asyncio

# Free 8000 (vLLM), 8081 (MCP)
!fuser -k 8000/tcp 8081/tcp || true

### MCP Server с инструментами

MCP сервер с инструментами: скрининг-классификатор (заглушка), длина интервала, QT, QTc.

In [2]:
import asyncio
from typing import Optional, Tuple, Dict
from pydantic import Field, BaseModel
from mcp.server.fastmcp import FastMCP
from typing_extensions import Annotated

mcp = FastMCP("ecg_tools")

class ECGParameters(BaseModel):
    rr_interval: Optional[float] = Field(None, description="Интервал RR в миллисекундах (мс).")
    p_duration: Optional[float] = Field(None, description="Длительность P-волны в мс.")
    qrs_duration: Optional[float] = Field(None, description="Длительность QRS-комплекса в мс.")
    qtc_interval: Optional[float] = Field(None, description="Корректированный интервал QT (QTc) в мс.")
    p_axis: Optional[float] = Field(None, description="Электрическая ось P-волны в градусах.")
    qrs_axis: Optional[float] = Field(None, description="Электрическая ось QRS-комплекса в градусах.")
    t_axis: Optional[float] = Field(None, description="Электрическая ось T-волны в градусах.")
    gender: Optional[str] = Field(None, description="Пол пациента ('male' или 'female') для корректной оценки QTc.")

@mcp.tool()
async def screening_classifier(params: ECGParameters) -> Dict:
    """
    Проводит скрининг параметров ЭКГ на основе пороговых значений.
    Возвращает словарь:
    {
        "class": 0/1,
        "anomaly_prob": float 0-100
    }
    """
    anomalies = []
    total_params = 0
    if params.rr_interval is not None:
        total_params += 1
        if not (600 <= params.rr_interval <= 1000): anomalies.append("RR interval out of range")
    if params.p_duration is not None:
        total_params += 1
        if not (60 <= params.p_duration <= 120): anomalies.append("P-wave duration out of range")
    if params.qrs_duration is not None:
        total_params += 1
        if not (60 <= params.qrs_duration <= 110): anomalies.append("QRS duration out of range")
    if params.qtc_interval is not None:
        total_params += 1
        if params.gender == 'female' and params.qtc_interval >= 470: anomalies.append("QTc (female) is elevated")
        elif params.gender != 'female' and params.qtc_interval >= 450: anomalies.append("QTc (male) is elevated")
    if not anomalies:
        return {"class_code": 0, "anomaly_prob": 0.0}
    anomaly_prob = (len(anomalies) / total_params) * 100 if total_params > 0 else 0.0
    return {"class_code": 1, "anomaly_prob": round(anomaly_prob, 2)}

@mcp.tool()
async def calculate_duration(start_ms: Annotated[float, Field(description="Время начала события в мс")], end_ms: Annotated[float, Field(description="Время окончания события в мс")]) -> float:
    """Вычисляет длительность интервала в мс по времени начала и окончания."""
    return end_ms - start_ms

@mcp.tool()
async def calculate_qt_interval(qrs_onset_ms: Annotated[float, Field(description="Время начала QRS-комплекса в мс")], t_end_ms: Annotated[float, Field(description="Время окончания T-волны в мс")]) -> float:
    """Вычисляет длительность QT-интервала на основе времени начала QRS и окончания T."""
    return t_end_ms - qrs_onset_ms

@mcp.tool()
async def calculate_qtc_bazett(qt_interval_ms: Annotated[float, Field(description="Интервал QT в мс")], rr_interval_ms: Annotated[float, Field(description="Интервал RR в мс")]) -> float:
    """Вычисляет корректированный QT (QTc) по формуле Базетта."""
    if rr_interval_ms <= 0: return 0.0
    rr_interval_s = rr_interval_ms / 1000
    qtc = qt_interval_ms / (rr_interval_s ** 0.5)
    return round(qtc, 2)

### Запуск vLLM Server

Запускаем LLM в фоновом режиме.

In [3]:
import os
from google.colab import userdata

os.environ["HF_TOKEN"] = userdata.get('clab')
if os.environ["HF_TOKEN"]:
    print("HF_TOKEN установлен, vLLM сможет аутентифицироваться на Hugging Face")
else:
    print("HF_TOKEN не найден, проверьте секрет в Colab")

HF_TOKEN установлен, vLLM сможет аутентифицироваться на Hugging Face


In [7]:
!VLLM_ATTENTION_BACKEND=TRITON_ATTN_VLLM_V1 nohup python -m vllm.entrypoints.openai.api_server \
    --model "microsoft/Phi-4-mini-instruct" \
    --host "0.0.0.0" \
    --port 8000 \
    --trust-remote-code \
    --max-model-len 4096 \
    --enable-auto-tool-choice \
    --tool-call-parser phi4_mini_json \
    --gpu-memory-utilization 0.8 > vllm.log 2>&1 &

In [10]:
# !!! Тут лучше подождать

# import time
# print("Waiting for vLLM server to start...")
# time.sleep(600)
# print("vLLM server should be running. Check vllm.log for status.")
!cat vllm.log

2025-09-25 05:29:48.681946: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1758778188.718808   37741 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1758778188.730989   37741 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1758778188.755451   37741 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1758778188.755487   37741 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1758778188.755491   37741 computation_placer.cc:177] computation placer alr

### Модели ответа

Удачный ответ:
```json
{
    "class": 1,                         # 1 - аномалия, 0 - здоров
    "analysis": "Потому что больной",   # Анализ
    "anomaly_prob": 100                 # Вероятность в %
}
```

Неудачный ответ:
```json
{
    "error": "bad_request",       
    "reason": "Потому что ...",   # Проблемы с данными / Вывод не возможен
}
```

In [11]:
import json
from typing import Optional, Literal
from pydantic import BaseModel, Field, model_validator, ValidationError

class FinalResponse(BaseModel):
    class_code: Optional[int] = Field(None, alias="class", ge=0, le=1)
    analysis: Optional[str] = Field(None, alias="analysis")
    anomaly_prob: Optional[float] = Field(None)
    error: Optional[Literal["bad_request"]] = Field(None)
    reason: Optional[str] = Field(None)

    @model_validator(mode='after')
    def check_exclusive_fields(self) -> 'FinalResponse':
        success_fields = self.class_code is not None and self.analysis is not None and self.anomaly_prob is not None
        error_fields = self.error is not None and self.reason is not None
        if success_fields and error_fields:
            raise ValueError("Only success or error fields can be filled, not both.")
        if not success_fields and not error_fields:
            raise ValueError("Either success or error fields must be filled.")
        return self

### Системный промпт



In [12]:
SYSTEM_PROMPT = """
Ты — интеллектуальный помощник для анализа ЭКГ, предназначенный для использования врачом. Твоя задача — принять числовые параметры ЭКГ, вызвать инструменты для всех доступных вычислений и предоставить структурированный ответ в формате JSON.

**ОБЩИЕ ПРАВИЛА:**
1. **Независимость запросов:** Каждый запрос новый и независимый.
2. **Частичные данные:** Работай даже если предоставлены только отдельные параметры. Если вычислить какой-либо интервал невозможно — не включай его в анализ, не придумывай значения.
3. **Вычисления через инструменты:**
   - `P_duration = p_end - p_onset`
   - `QRS_duration = qrs_end - qrs_onset`
   - `QT = t_end - qrs_onset`
   - `QTc = QT / sqrt(RR / 1000)`
4. **Приоритет инструментов:** Сначала вызывай инструменты вычисления интервалов и других признаков, затем скрининг классификатор.
5. Модель скрининга должна дать `class` и `anomaly_prob`. **Эту оценку включи в `analysis`, но не копируй её как финальный результат.** Твоя задача — дать финальный `class` и `anomaly_prob` на основе собственного анализа, с учётом всех параметров и вывода локальной модели.
6. **Формат ответа:** JSON должен содержать:
   - `class` (0 — в пределах нормы, 1 — отклонение)
   - `analysis` — текстовое объяснение с соотношением к нормам
   - `anomaly_prob` — доля параметров вне нормы среди всех доступных, от 0 до 100
   - `error` и `reason` — только при невозможности анализа
7. **Единицы измерения:** интервалы — миллисекунды (мс), углы — градусы.
8. **Сообщение о диагнозе:** В `analysis` всегда указывай, что это **предварительный скрининг**, не диагноз, и при необходимости рекомендовать консультацию специалиста.
9. Анализ производи на русском языке.
**РЕФЕРЕНСНЫЕ ЗНАЧЕНИЯ:**
- RR интервал: 600–1000 мс (ниже — тахикардия, выше — брадикардия)
- P-duration: 60–120 мс
- QRS-duration: 60–110 мс
- QTc: для мужчин <450 мс, для женщин <470 мс
- Оси: P: 0–75°, QRS: -30–90°, T: 0–90°

**АНАЛИЗ И ВЫЧИСЛЕНИЯ:**
- Сравни каждый параметр с нормой и указывай отклонения.
- Если параметр в пределах нормы — упомяни кратко.
- Для частичных данных анализируй только доступные параметры.
- **Автоматически вычисляй `anomaly_prob`** как `(число параметров вне нормы / число всех доступных параметров)`.

**ПРИМЕРЫ:**

**Пример 1 (Простой анализ):**
* User: "RR 1150 мс, QRS 90 мс."
* Assistant (JSON):
```json
{
  "class": 1,
  "analysis": "Оценка screening модели: class=1, anomaly_prob=65%. Интервал RR составляет 1150 мс, что выше нормы (600–1000 мс) и может указывать на брадикардию. Длительность QRS в пределах нормы.",
  "anomaly_prob": 50.0
}
```

**Пример 2 (Вычисление QT и QTc):**

* User: "мужчина, rr 600, qrs_onset_ms 100, t_end_ms 500"
* Assistant (JSON):

```json
{
  "class": 1,
  "analysis": "Оценка screening модели: class=1, anomaly_prob=86%. Вычисленный интервал QT = 400 мс, QTc = 516.4 мс, что превышает норму для мужчин (<450 мс).",
  "anomaly_prob": 100.0
}
```

**Пример 3 (Вычисление QRS-duration):**

* User: "qrs_onset_ms 200, qrs_end ms 310"
* Assistant (JSON):

```json
{
  "class": 0,
  "analysis": "Оценка screening модели: class=0, anomaly_prob=13%. Вычисленная длительность QRS = 110 мс, в пределах нормы.",
  "anomaly_prob": 0.0
}
```

**Пример 4 (Частичные данные):**

* User: "t_axis 100"
* Assistant (JSON):

```json
{
  "class": 1,
  "analysis": "Оценка screening модели: class=1, anomaly_prob=63%. Ось T = 100°, что выше нормы (0–90°). Скрининг предварительный, рекомендуется консультация специалиста.",
  "anomaly_prob": 100.0
}
```

**Пример 5 (Некорректный запрос):**

* User: "Проверь мое сердце"
* Assistant (JSON):

```json
{
  "error": "bad_request",
  "reason": "Запрос не содержит числовых параметров ЭКГ, необходимых для анализа."
}
```
"""

### Оркестратор (ECGAgent)

Инициализирует соединения с vLLM и MCP-сервером, преобразует схемы инструментов в понятный для LLM формат и обрабатывает логику вызова инструментов.

In [13]:
from openai import AsyncOpenAI
from fastmcp import Client as MCP

def _mcp_to_openai(tools):
    openai_tools = []
    for t in tools:
        parameters = t.inputSchema
        parameters.pop('title', None)
        parameters.pop('description', None)
        openai_tools.append({
            "type": "function",
            "function": { "name": t.name, "description": t.description or "", "parameters": parameters }
        })
    return openai_tools

class ECGAgent:
    def __init__(self, mcp_cmd: FastMCP, llm_url: str = "http://localhost:8000/v1", model: str = "microsoft/Phi-4-mini-4k-instruct"):
        self.mcp = MCP(mcp_cmd)
        self.llm = AsyncOpenAI(base_url=llm_url, api_key="dummy")
        self.model = model
        self.tools = None

    async def __aenter__(self):
        await self.mcp.__aenter__()
        await self.llm.__aenter__()
        self.tools = _mcp_to_openai(await self.mcp.list_tools())
        print("--- Loaded tools schema for LLM ---")
        return self

    async def __aexit__(self, *exc):
        await self.mcp.__aexit__(*exc)
        await self.llm.__aexit__(*exc)

    async def ask(self, prompt: str, system_prompt: str) -> str:
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": prompt}
        ]
        initial_response = await self.llm.chat.completions.create(
            model=self.model, messages=messages, tools=self.tools, tool_choice="auto"
        )
        response_message = initial_response.choices[0].message
        messages.append(response_message)

        if getattr(response_message, "tool_calls", None):
            for tool_call in response_message.tool_calls:
                function_name = tool_call.function.name
                function_args = json.loads(tool_call.function.arguments)
                print(f"--- Calling tool: {function_name} with args: {function_args} ---")

                result = await self.mcp.call_tool(function_name, function_args)
                # function_response = json.dumps(result[0])
                tool_output = result.content[0].text if result.content else ""
                print(f"--- Tool response: {tool_output} ---")
                messages.append({
                    "tool_call_id": tool_call.id, "role": "tool", "name": function_name, "content": tool_output
                })

        final_response_schema = FinalResponse.model_json_schema()
        final_response = await self.llm.chat.completions.create(
            model=self.model,
            messages=messages,
            extra_body={ "response_format": { "type": "json_object", "schema": final_response_schema } },
            tools=None,
            tool_choice=None
        )
        return final_response.choices[0].message.content

  return datetime.utcnow().replace(tzinfo=utc)


### Основной цикл ввода

In [14]:
import asyncio
import nest_asyncio
import sys

async def run_case(agent, query: str):
    print(f"\n--- Running Case: '{query}' ---")
    llm_response_str = await agent.ask(query, SYSTEM_PROMPT)

    print("\n--- LLM Response (Guaranteed JSON) ---")
    print(llm_response_str)
    print("--------------------------------------\n")

    try:
        validated_result = FinalResponse.model_validate_json(llm_response_str)
        if validated_result.error:
            print(f"Result: Bad Request -> {validated_result.reason}\n")
        else:
            print("Result: Validation successful.")
            print(f"Class: {'Anomaly' if validated_result.class_code == 1 else 'Normal'}")
            print(f"Anomaly Probability: {validated_result.anomaly_prob}")
            print(f"Analysis: {validated_result.analysis}\n")
    except ValidationError as e:
        print(f"Result: Pydantic logical validation error: {e}")


async def main():

    print("\n--- Initializing ECG Agent ---")
    try:
        async with ECGAgent(
            mcp_cmd=mcp,
            llm_url="http://localhost:8000/v1",
            model="microsoft/Phi-4-mini-instruct"
        ) as agent:
            # Автотесты
            test_queries = [
                "RR 1150 мс, QRS 90 мс.",
                "Женщина, p_onset 100, p_end 230, rr 800.",
                "Проверь мое сердце",
                "qrs_duration 100, rr_interval 750, gender male"
            ]
            for i, query in enumerate(test_queries):
                print(f"\n=== Test Case {i+1} ===")
                await run_case(agent, query)

            # Ручной режим
            while True:
                user_query = await asyncio.to_thread(input, "Enter ECG parameters (or 'exit' to quit): ")
                if user_query.lower() == 'exit':
                    break
                await run_case(agent, user_query)

    except Exception as e:
        print(f"An unexpected error occurred during agent execution: {e}")
    finally:
        print("--- Script finished ---")


nest_asyncio.apply()
asyncio.run(main())


--- Initializing ECG Agent ---
--- Loaded tools schema for LLM ---

=== Test Case 1 ===

--- Running Case: 'RR 1150 мс, QRS 90 мс.' ---


  return datetime.utcnow().replace(tzinfo=utc)



--- LLM Response (Guaranteed JSON) ---
{
  "class": 1,
  "analysis": "Оценка screening модели: class=1, anomaly_prob=50%. Интервал RR составляет 1150 мс, что выше нормы (600–1000 мс) и может указывать на брадикардию. Длительность QRS в пределах нормы.",
  "anomaly_prob": 50.0
}
--------------------------------------

Result: Validation successful.
Class: Anomaly
Anomaly Probability: 50.0
Analysis: Оценка screening модели: class=1, anomaly_prob=50%. Интервал RR составляет 1150 мс, что выше нормы (600–1000 мс) и может указывать на брадикардию. Длительность QRS в пределах нормы.


=== Test Case 2 ===

--- Running Case: 'Женщина, p_onset 100, p_end 230, rr 800.' ---

--- LLM Response (Guaranteed JSON) ---
{
  "class": 0,
  "analysis": "Оценка screening модели: class=0, anomaly_prob=0%. Интервал RR составляет 800 мс, что находится в пределах нормы (600–1000 мс). Длина проводящей волны P составляет 130 мс, что также находится в пределах нормы (60–120 мс).",
  "anomaly_prob": 0.0
}
---------