# Оценка качества поиска (BM25 + kNN + ColBERT) через сервис `/search`

Этот ноутбук:
- берёт тестовые запросы из `test_queries_bank_docs.json`;
- для каждого запроса вызывает HTTP‑эндпоинт `POST http://127.0.0.1:8000/search` твоего сервиса;
- сортирует документы по полю `_colbert_score`;
- считает метрики `precision@k`, `recall@k` и `nDCG@k` на уровне чанка и файла.

По умолчанию используется индекс `makar_ozon`, но можно подставить любой `index_name` (например, `makar_ozon_semantic`).


In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

YANDEX_API_KEY = os.getenv("YANDEX_API_KEY")
YANDEX_FOLDER_ID = os.getenv("YANDEX_FOLDER_ID")

if not YANDEX_API_KEY:
    raise ValueError("YANDEX_API_KEY must be set in environment variables")
if not YANDEX_FOLDER_ID:
    raise ValueError("YANDEX_FOLDER_ID must be set in environment variables")

'https://llm.api.cloud.yandex.net/foundationModels/v1/textEmbedding'

In [None]:
import os
import math
import json
import subprocess
from typing import List, Dict, Any
from dotenv import load_dotenv

load_dotenv()

OPENSEARCH_HOST = os.getenv("OPENSEARCH_HOST")
OPENSEARCH_PORT = os.getenv("OPENSEARCH_PORT")
OPENSEARCH_INDEX = os.getenv("OPENSEARCH_INDEX", "makar_ozon")

if not OPENSEARCH_HOST:
    raise ValueError("OPENSEARCH_HOST must be set in environment variables")
if not OPENSEARCH_PORT:
    raise ValueError("OPENSEARCH_PORT must be set in environment variables")

OPENSEARCH_PORT = int(OPENSEARCH_PORT)

TOP_K = 20
KS = (1, 3, 5, 10, 15, 20)
QUERIES_PATH = "test_queries_bank_docs.json"
SEARCH_URL = "http://127.0.0.1:8000/search"

print("OPENSEARCH_HOST:", OPENSEARCH_HOST)
print("OPENSEARCH_PORT:", OPENSEARCH_PORT)
print("OPENSEARCH_INDEX:", OPENSEARCH_INDEX)

OPENSEARCH_HOST: localhost
OPENSEARCH_PORT: 9200
OPENSEARCH_INDEX: makar_ozon


In [None]:
import os

YANDEX_API_KEY = os.getenv("YANDEX_API_KEY")
YANDEX_FOLDER_ID = os.getenv("YANDEX_FOLDER_ID")

if not YANDEX_API_KEY:
    raise ValueError("YANDEX_API_KEY must be set in environment variables")
if not YANDEX_FOLDER_ID:
    raise ValueError("YANDEX_FOLDER_ID must be set in environment variables")

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

OPENSEARCH_HOST = os.getenv("OPENSEARCH_HOST")
OPENSEARCH_PORT = os.getenv("OPENSEARCH_PORT")
OPENSEARCH_INDEX = os.getenv("OPENSEARCH_INDEX", "makar_ozon")

if not OPENSEARCH_HOST:
    raise ValueError("OPENSEARCH_HOST must be set in environment variables")
if not OPENSEARCH_PORT:
    raise ValueError("OPENSEARCH_PORT must be set in environment variables")

OPENSEARCH_PORT = int(OPENSEARCH_PORT)

TOP_K = 20
KS = (1, 3, 5, 10, 15, 20)
QUERIES_PATH = "test_queries_bank_docs.json"
SEARCH_URL = "http://127.0.0.1:8000/search"

print("OPENSEARCH_HOST:", OPENSEARCH_HOST)
print("OPENSEARCH_PORT:", OPENSEARCH_PORT)
print("OPENSEARCH_INDEX:", OPENSEARCH_INDEX)

OPENSEARCH_HOST: localhost
OPENSEARCH_PORT: 9200
OPENSEARCH_INDEX: makar_ozon


In [None]:
async def eval_queries_colbert(
    index_name: str = OPENSEARCH_INDEX,
    top_k: int = TOP_K,
    ks: List[int] = list(KS),
) -> None:
    """Прогоняет все запросы из JSON через HTTP‑эндпоинт /search и считает метрики.

    Считаются две группы метрик:
    - по точному совпадению чанка (source + chunk_id),
    - по совпадению только имени файла (source), независимо от chunk_id.

    Результаты сортируются по `_colbert_score` (если он есть в ответе).
    """
    with open(QUERIES_PATH, "r", encoding="utf-8") as f:
        data: List[Dict[str, Any]] = json.load(f)

    hits_doc_at_k = {k: 0 for k in ks}
    prec_doc_at_k_sum = {k: 0.0 for k in ks}
    ndcg_doc_at_k_sum = {k: 0.0 for k in ks}

    hits_file_at_k = {k: 0 for k in ks}
    prec_file_at_k_sum = {k: 0.0 for k in ks}
    ndcg_file_at_k_sum = {k: 0.0 for k in ks}

    total = len(data)
    print(f"Loaded {total} queries from {QUERIES_PATH}")

    for ex in data:
        q = ex["question"]
        target_source = ex["source"]
        target_chunk = ex["chunk_id"]

        payload = {
            "query": q,
            "size": top_k,
            "index_name": index_name,
            "use_hyde": False,
            "use_colbert": True,
        }

        curl_cmd = [
            "curl",
            "-sS",
            "-X",
            "POST",
            SEARCH_URL,
            "-H",
            "accept: application/json",
            "-H",
            "Content-Type: application/json",
            "-d",
            json.dumps(payload, ensure_ascii=False),
        ]
        res = subprocess.run(
            curl_cmd,
            capture_output=True,
            text=True,
        )
        if res.returncode != 0:
            print("curl error:", res.stderr[:200])
            continue

        try:
            resp = json.loads(res.stdout)
        except json.JSONDecodeError as e:
            print("JSON decode error:", e, "raw:", res.stdout[:300])
            continue

        docs = resp.get("documents") or []

        docs = sorted(
            docs,
            key=lambda d: (
                d.get("_colbert_score") is not None,
                d.get("_colbert_score") or 0.0,
            ),
            reverse=True,
        )

        hit_doc_rank = None
        hit_file_rank = None
        for i, d in enumerate(docs, 1):
            src = d.get("source")
            cid = d.get("chunk_id")
            if hit_doc_rank is None and src == target_source and cid == target_chunk:
                hit_doc_rank = i
            if hit_file_rank is None and src == target_source:
                hit_file_rank = i
            if hit_doc_rank is not None and hit_file_rank is not None:
                break

        for k in ks:
            if hit_doc_rank is not None and hit_doc_rank <= k:
                hits_doc_at_k[k] += 1
                prec_doc_at_k_sum[k] += 1.0 / k
                dcg_doc = 1.0 / math.log2(1 + hit_doc_rank)
                ndcg_doc_at_k_sum[k] += dcg_doc

            if hit_file_rank is not None and hit_file_rank <= k:
                hits_file_at_k[k] += 1
                prec_file_at_k_sum[k] += 1.0 / k
                dcg_file = 1.0 / math.log2(1 + hit_file_rank)
                ndcg_file_at_k_sum[k] += dcg_file

    print(
        f"Evaluated {total} queries with TOP_K={top_k}, index='{index_name}' "
        f"(ColBERT enabled via /search)",
    )
    for k in ks:
        recall_doc = hits_doc_at_k[k] / total
        precision_doc = prec_doc_at_k_sum[k] / total
        ndcg_doc = ndcg_doc_at_k_sum[k] / total

        recall_file = hits_file_at_k[k] / total
        precision_file = prec_file_at_k_sum[k] / total
        ndcg_file = ndcg_file_at_k_sum[k] / total

        print(
            f"k={k}: DOC  hits={hits_doc_at_k[k]}/{total} | "
            f"recall@{k}={recall_doc:.3f} | precision@{k}={precision_doc:.3f} | ndcg@{k}={ndcg_doc:.3f}",
        )
        print(
            f"      FILE hits={hits_file_at_k[k]}/{total} | "
            f"recall_file@{k}={recall_file:.3f} | precision_file@{k}={precision_file:.3f} | ndcg_file@{k}={ndcg_file:.3f}",
        )

In [None]:
import nest_asyncio

nest_asyncio.apply()

await eval_queries_colbert()

Loaded 28 queries from test_queries_bank_docs.json
Evaluated 28 queries with TOP_K=20, index='makar_ozon' (ColBERT enabled via /search)
k=1: DOC  hits=3/28 | recall@1=0.107 | precision@1=0.107 | ndcg@1=0.107
      FILE hits=18/28 | recall_file@1=0.643 | precision_file@1=0.643 | ndcg_file@1=0.643
k=3: DOC  hits=5/28 | recall@3=0.179 | precision@3=0.060 | ndcg@3=0.152
      FILE hits=22/28 | recall_file@3=0.786 | precision_file@3=0.262 | ndcg_file@3=0.728
k=5: DOC  hits=11/28 | recall@5=0.393 | precision@5=0.079 | ndcg@5=0.240
      FILE hits=22/28 | recall_file@5=0.786 | precision_file@5=0.157 | ndcg_file@5=0.728
k=10: DOC  hits=17/28 | recall@10=0.607 | precision@10=0.061 | ndcg@10=0.308
      FILE hits=22/28 | recall_file@10=0.786 | precision_file@10=0.079 | ndcg_file@10=0.728
k=15: DOC  hits=18/28 | recall@15=0.643 | precision@15=0.043 | ndcg@15=0.317
      FILE hits=22/28 | recall_file@15=0.786 | precision_file@15=0.052 | ndcg_file@15=0.728
k=20: DOC  hits=22/28 | recall@20=0.786 | 

In [None]:
import os
import random
import requests
from typing import List

CORPUS_PATH = "corpus.json"
OUT_PATH = "test_queries_corpus.json"

N_QUESTIONS = 30

YANDEX_API_KEY = os.getenv("YANDEX_API_KEY")
YANDEX_FOLDER_ID = os.getenv("YANDEX_FOLDER_ID")
YANDEX_LLM_MODEL = os.getenv("YANDEX_LLM_MODEL", "yandexgpt-lite")
YANDEX_COMPLETION_URL = os.getenv(
    "YANDEX_COMPLETION_URL",
    "https://llm.api.cloud.yandex.net/foundationModels/v1/completion",
)

if not (YANDEX_API_KEY and YANDEX_FOLDER_ID):
    raise ValueError("Нужно выставить YANDEX_API_KEY и YANDEX_FOLDER_ID в окружении")

headers = {
    "Authorization": f"Api-Key {YANDEX_API_KEY}",
    "x-folder-id": YANDEX_FOLDER_ID,
    "Content-Type": "application/json",
}


def generate_question_for_chunk(text: str) -> str:
    """Генерирует один естественный вопрос по смыслу данного чанка."""
    snippet = text.strip().replace("\n", " ")
    if len(snippet) > 1200:
        snippet = snippet[:1200]

    prompt = (
        "Ты помогаешь составлять тестовые пользовательские запросы по текстам.\n"
        "Ниже дан фрагмент текста (чанк). Сформулируй ОДИН естественный, "
        "конкретный вопрос на русском языке, который пользователь мог бы задать, "
        "чтобы найти именно этот фрагмент. Не пиши ничего, кроме самого вопроса.\n\n"
        f"Текст чанка:\n{snippet}\n\n"
        "Вопрос:"
    )

    body = {
        "modelUri": f"gpt://{YANDEX_FOLDER_ID}/{YANDEX_LLM_MODEL}/latest",
        "completionOptions": {
            "stream": False,
            "temperature": 0.4,
            "maxTokens": 120,
        },
        "messages": [
            {"role": "user", "text": prompt},
        ],
    }

    resp = requests.post(
        YANDEX_COMPLETION_URL,
        headers=headers,
        json=body,
        timeout=60,
    )
    resp.raise_for_status()
    data = resp.json()

    try:
        text_out = data["result"]["alternatives"][0]["message"]["text"].strip()
    except Exception as e:
        raise RuntimeError(f"Bad completion response: {data}") from e

    return text_out


with open(CORPUS_PATH, "r", encoding="utf-8") as f:
    corpus: List[Dict[str, Any]] = json.load(f)

print(f"Loaded {len(corpus)} chunks from {CORPUS_PATH}")

num_available = len(corpus)
num_to_sample = min(N_QUESTIONS, num_available)
all_indices = list(range(num_available))
random.shuffle(all_indices)
sample_indices = all_indices[:num_to_sample]

print(f"Will generate questions for {num_to_sample} random chunks")

results: List[Dict[str, Any]] = []

for i in sample_indices:
    item = corpus[i]
    text = (item.get("text") or "").strip()
    if not text:
        continue

    print(f"[{i}] generating question…")
    try:
        q = generate_question_for_chunk(text)
    except Exception as e:
        print(f"  error on chunk {i}: {e}")
        continue

    results.append(
        {
            "id": i,
            "question": q,
            "chunk_text": text,
        }
    )

print(f"Generated {len(results)} query–chunk pairs")

with open(OUT_PATH, "w", encoding="utf-8") as f:
    json.dump(results, f, ensure_ascii=False, indent=2)

print(f"Saved to {OUT_PATH}")

Loaded 167 chunks from corpus.json
Will generate questions for 30 random chunks
[77] generating question…
[117] generating question…
[46] generating question…
[83] generating question…
[113] generating question…
[155] generating question…
[14] generating question…
[119] generating question…
[91] generating question…
[40] generating question…
[121] generating question…
[24] generating question…
[139] generating question…
[148] generating question…
[108] generating question…
[146] generating question…
[79] generating question…
[81] generating question…
[29] generating question…
[125] generating question…
[80] generating question…
[115] generating question…
[131] generating question…
[89] generating question…
[3] generating question…
[62] generating question…
[5] generating question…
[0] generating question…
[39] generating question…
[48] generating question…
Generated 30 query–chunk pairs
Saved to test_queries_corpus.json


In [None]:
async def eval_queries_corpus_text(
    index_name: str = OPENSEARCH_INDEX,
    top_k: int = TOP_K,
    ks: List[int] = list(KS),
    queries_path: str = "test_queries_corpus.json",
) -> None:
    """
    Оценка поиска по test_queries_corpus.json:
    - JSON: [{ "id": ..., "question": ..., "chunk_text": ... }, ...]
    - релевантность определяется по точному совпадению текста чанка (поле text в выдаче).
    """
    with open(queries_path, "r", encoding="utf-8") as f:
        data: List[Dict[str, Any]] = json.load(f)

    hits_doc_at_k = {k: 0 for k in ks}
    prec_doc_at_k_sum = {k: 0.0 for k in ks}
    ndcg_doc_at_k_sum = {k: 0.0 for k in ks}

    total = len(data)
    print(f"Loaded {total} queries from {queries_path}")

    for ex in data:
        q = ex["question"]
        target_text = (ex["chunk_text"] or "").strip()

        payload = {
            "query": q,
            "size": top_k,
            "index_name": index_name,
            "use_hyde": False,
            "use_colbert": True,
        }

        curl_cmd = [
            "curl",
            "-sS",
            "-X",
            "POST",
            SEARCH_URL,
            "-H",
            "accept: application/json",
            "-H",
            "Content-Type: application/json",
            "-d",
            json.dumps(payload, ensure_ascii=False),
        ]
        res = subprocess.run(
            curl_cmd,
            capture_output=True,
            text=True,
        )
        if res.returncode != 0:
            print("curl error:", res.stderr[:200])
            continue

        try:
            resp = json.loads(res.stdout)
        except json.JSONDecodeError as e:
            print("JSON decode error:", e, "raw:", res.stdout[:300])
            continue

        docs = resp.get("documents") or []

        docs = sorted(
            docs,
            key=lambda d: (
                d.get("_colbert_score") is not None,
                d.get("_colbert_score") or 0.0,
            ),
            reverse=True,
        )

        hit_doc_rank = None
        for i, d in enumerate(docs, 1):
            doc_text = (d.get("text") or "").strip()
            if doc_text == target_text:
                hit_doc_rank = i
                break

        for k in ks:
            if hit_doc_rank is not None and hit_doc_rank <= k:
                hits_doc_at_k[k] += 1
                prec_doc_at_k_sum[k] += 1.0 / k
                dcg_doc = 1.0 / math.log2(1 + hit_doc_rank)
                ndcg_doc_at_k_sum[k] += dcg_doc

    print(
        f"Evaluated {total} queries with TOP_K={top_k}, index='{index_name}' "
        f"(ground truth по тексту чанка из {queries_path})",
    )
    for k in ks:
        recall_doc = hits_doc_at_k[k] / total
        precision_doc = prec_doc_at_k_sum[k] / total
        ndcg_doc = ndcg_doc_at_k_sum[k] / total

        print(
            f"k={k}: DOC  hits={hits_doc_at_k[k]}/{total} | "
            f"recall@{k}={recall_doc:.3f} | precision@{k}={precision_doc:.3f} | ndcg@{k}={ndcg_doc:.3f}",
        )

In [None]:
import nest_asyncio

TOP_K = 20
KS = (1, 3, 5)
QUERIES_PATH = "test_queries_bank_docs.json"
SEARCH_URL = "http://127.0.0.1:8000/search"

print("OPENSEARCH_HOST:", OPENSEARCH_HOST)
print("OPENSEARCH_PORT:", OPENSEARCH_PORT)
print("OPENSEARCH_INDEX:", OPENSEARCH_INDEX)
nest_asyncio.apply()

await eval_queries_corpus_text(
    index_name="makar_ozon_semantic",
    queries_path="test_queries_semantic.json",
)

OPENSEARCH_HOST: localhost
OPENSEARCH_PORT: 9200
OPENSEARCH_INDEX: makar_ozon
Loaded 35 queries from /Users/admin/СДЭК/test_queries_semantic.json
Evaluated 35 queries with TOP_K=20, index='makar_ozon_semantic' (ColBERT enabled via /search, corpus-json ground truth by chunk_text)
k=1: DOC  hits=28/35 | recall@1=0.800 | precision@1=0.800 | ndcg@1=0.800
k=3: DOC  hits=31/35 | recall@3=0.886 | precision@3=0.295 | ndcg@3=0.850
k=5: DOC  hits=33/35 | recall@5=0.943 | precision@5=0.189 | ndcg@5=0.872
k=10: DOC  hits=35/35 | recall@10=1.000 | precision@10=0.100 | ndcg@10=0.892
k=15: DOC  hits=35/35 | recall@15=1.000 | precision@15=0.067 | ndcg@15=0.892
k=20: DOC  hits=35/35 | recall@20=1.000 | precision@20=0.050 | ndcg@20=0.892
