# Наша цель:
1) находить наиболее подходящий вопрос из базы по пользовательскому запросу;
2) уметь корректно отказывать, если в базе нет подходящего ответа.

# 1) Начальный baseline: BM25 + RapidFuzz + Semantic rerank
### 1.1. Candidate generation: BM25 (FTS) + RapidFuzz
- BM25 (SQLite FTS) → хорошо ловит совпадения токенов, но плохо переносит переформулировки.
- RapidFuzz (WRatio) → ловит похожие строки, но тоже в основном про “похожие буквы/слова”, не про смысл.
### 1.2. Semantic rerank поверх пула (SBERT)
берём embedding запроса,
берём embedding кандидата,
считаем cosine similarity --> сортирует кандидатов по смысловой близости.
# 2) Ручная тестовая выборка и сравнение
- Ручная тестовая выборка.
- Метрики ранжирования: Top-1, Top-3, MRR.
# 3) Сбор train_pairs.csv для обучения reranker’а
- берем каждый qa_id (истинный), создаем несколько query-вариантов (augment_queries): выкинуть стоп-слова, добавить “подскажите пожалуйста …”, добавить “что делать”, сделать опечатку (swap символов).
- генерируем пул кандидатов через BM25+Fuzzy,
гарантируем, что позитив (qa_id) есть в пуле,
- выбираем NEG_PER_POS “сложных” негативов из пула.
# 4) Обучение reranker
- Logistic Regression (+ scaler)
- Признаки: BM25, fuzzy, semantic similarity, overlap, длины запросов
# 5) Переход к dense retrieval
- RapidFuzz плохо работает с перефразами.
- все вопросы базы кодируем в embeddings,
- ищем ближайшие по cosine similarity.

Сравнили модели:

- sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2
- ai-forever/sbert_large_nlu_ru

вывод:
- SBERT multilingual MiniLM дал лучшие метрики,
- BM25 рядом с dense оказался бесполезен (BM25+Dense ≈ Dense)

# 6) Финальный этап: Dense + FAISS + Reject (лучший результат)
## 6.1. Подготовка retrieval-инфраструктуры
- Загружаем заранее посчитанные эмбеддинги из qa_vec.q_vec (или a_vec),
нормализуем (если надо),
- строим FAISS IndexFlatIP (cosine через inner product).
- Зачем FAISS: быстро даже на больших базах (не O(N) скан).

##6.2. Reject-логика (“нет ответа”)
Проблема: даже при хорошем top-1, иногда top-1 — “лучшее из плохого”.

Поэтому мы добавили внешний фильтр:
- берём similarity лучшего кандидата s1,
- если s1 < sem_thr → REJECT и печатаем “увы, ответа нет”.

##6.3. Как sem_thr выбираем доказательно
- делим tests на DEV/TEST,
- на DEV для каждого query фиксируем s1 и correctness,
- перебираем пороги (grid search),
- выбираем sem_thr

# в итоге:
Dense retrieval (SBERT MiniLM) + FAISS
- → top-1 similarity s1
- → если s1 достаточно высок → показываем match
- → иначе → “нет ответа в базе”.

## База данных

In [1]:
import re
import hashlib
import sqlite3
from datetime import datetime, UTC
import requests
from bs4 import BeautifulSoup

In [None]:
UA = "Mozilla/5.0 (compatible; QA-bot/1.0)"

URLS = [
    "https://ba.hse.ru/faq",
    "https://ma.hse.ru/faq",
    "https://www.hse.ru/dod/faq",
    "https://gsb.hse.ru/iec/faq",
    "https://www.hse.ru/sho/faq_ru",
    "https://studentcentre.hse.ru/faq",
    "https://volunteer.hse.ru/faq",
    "https://ba.hse.ru/faq-obrazovatelny-kredit-2020",
    "https://yandex.ru/profi/faq",
    "https://olymp.hse.ru/mmo/info",
    "https://www.hse.ru/studyspravka/ras_stud",
    "https://www.hse.ru/studyspravka/kontr",
    "https://electives.hse.ru/kpv",
    "https://www.hse.ru/studyspravka/practice",
    "https://www.hse.ru/studyspravka/engbach",
    "https://www.hse.ru/studyspravka/documents",
]

DB_PATH = "qa.db"


# ---------------- utils ----------------

def clean_url(url: str) -> str:
    return (url or "").strip().rstrip(":/ \t\r\n")


def norm(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())


def utc_stamp() -> str:
    return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")


def make_hash(question: str, answer_text: str, source_url: str) -> str:
    base = (norm(question) + "||" + norm(answer_text) + "||" + source_url).encode("utf-8")
    return hashlib.sha1(base).hexdigest()


def fetch_html(url: str) -> BeautifulSoup:
    url = clean_url(url)
    r = requests.get(
        url,
        headers={"User-Agent": UA, "Accept-Language": "ru,en;q=0.9"},
        timeout=30,
        allow_redirects=True,
    )
    r.raise_for_status()
    return BeautifulSoup(r.text, "html.parser")


def extract_text_with_links(node) -> str:
    """
    Преобразует HTML в текст, сохраняя ссылки:
    <a href="URL">текст</a> -> текст (URL)

    Работает с bs4.Tag / bs4.BeautifulSoup / str / None.
    """
    if node is None:
        return ""

    # делаем "копию" через строку, чтобы безопасно заменять узлы
    s = str(node)
    soup = BeautifulSoup(s, "html.parser")

    for a in soup.find_all("a"):
        href = (a.get("href") or "").strip()
        text = a.get_text(" ", strip=True)

        if href:
            a.replace_with(f"{text} ({href})")
        else:
            a.replace_with(text)

    return norm(soup.get_text(" ", strip=True))


def make_item(
    *,
    page: str,
    question: str,
    answer_text: str,
    source_url: str,
    scraped_at: str,
    section: str | None = None,
    answer_html: str | None = None,
) -> dict:
    source_url = clean_url(source_url)
    question = norm(question)
    answer_text = norm(answer_text)

    return {
        "page": page,
        "section": section,
        "question": question,
        "answer_text": answer_text,
        "answer_html": answer_html,
        "source_url": source_url,
        "item_hash": make_hash(question, answer_text, source_url),
        "scraped_at": scraped_at,
    }


# база данных

def init_db(db_path: str = DB_PATH) -> sqlite3.Connection:
    con = sqlite3.connect(db_path)
    cur = con.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS qa (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        page TEXT,
        section TEXT,
        question TEXT NOT NULL,
        answer_text TEXT NOT NULL,
        answer_html TEXT,
        source_url TEXT NOT NULL,
        item_hash TEXT NOT NULL UNIQUE,
        scraped_at TEXT NOT NULL
    );
    """)

    cur.execute("""
    CREATE VIRTUAL TABLE IF NOT EXISTS qa_fts USING fts5(
        question, answer_text, section, source_url,
        content='qa', content_rowid='id'
    );
    """)

    cur.executescript("""
    CREATE TRIGGER IF NOT EXISTS qa_ai AFTER INSERT ON qa BEGIN
      INSERT INTO qa_fts(rowid, question, answer_text, section, source_url)
      VALUES (new.id, new.question, new.answer_text, new.section, new.source_url);
    END;

    CREATE TRIGGER IF NOT EXISTS qa_ad AFTER DELETE ON qa BEGIN
      INSERT INTO qa_fts(qa_fts, rowid, question, answer_text, section, source_url)
      VALUES ('delete', old.id, old.question, old.answer_text, old.section, old.source_url);
    END;

    CREATE TRIGGER IF NOT EXISTS qa_au AFTER UPDATE ON qa BEGIN
      INSERT INTO qa_fts(qa_fts, rowid, question, answer_text, section, source_url)
      VALUES ('delete', old.id, old.question, old.answer_text, old.section, old.source_url);
      INSERT INTO qa_fts(rowid, question, answer_text, section, source_url)
      VALUES (new.id, new.question, new.answer_text, new.section, new.source_url);
    END;
    """)

    con.commit()
    return con


def insert_items(con: sqlite3.Connection, items: list[dict]) -> int:
    cur = con.cursor()
    inserted = 0
    for it in items:
        try:
            cur.execute("""
            INSERT INTO qa(page, section, question, answer_text, answer_html, source_url, item_hash, scraped_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                it["page"],
                it["section"],
                it["question"],
                it["answer_text"],
                it["answer_html"],
                it["source_url"],
                it["item_hash"],
                it["scraped_at"],
            ))
            inserted += 1
        except sqlite3.IntegrityError:
            pass
    con.commit()
    return inserted


def fetch_latest(con: sqlite3.Connection, limit: int = 10):
    cur = con.cursor()
    return cur.execute("""
      SELECT id, page, question, answer_text, source_url
      FROM qa
      ORDER BY id DESC
      LIMIT ?;
    """, (limit,)).fetchall()


def print_pretty_rows(rows: list[tuple], max_width_q=80, max_width_a=120):
    def cut(s: str, n: int) -> str:
        s = norm(s)
        return s if len(s) <= n else s[: n - 1] + "…"

    print("\n" + "=" * 140)
    print(f"{'ID':>4} | {'PAGE':<15} | {'QUESTION':<80} | {'ANSWER':<120}")
    print("-" * 140)
    for (id_, page, question, answer_text, source_url) in rows:
        print(f"{id_:>4} | {page:<15} | {cut(question, max_width_q):<80} | {cut(answer_text, max_width_a):<120}")
        print(f"     source: {source_url}")
        print("-" * 140)
    print("=" * 140 + "\n")


def search(con: sqlite3.Connection, query: str, k: int = 5):
    q = norm(query)
    cur = con.cursor()
    return cur.execute("""
      SELECT qa.id, qa.page, qa.question, qa.answer_text, qa.source_url, bm25(qa_fts) AS score
      FROM qa_fts
      JOIN qa ON qa_fts.rowid = qa.id
      WHERE qa_fts MATCH ?
      ORDER BY score
      LIMIT ?;
    """, (q, k)).fetchall()


# выстраиваем структуру ссылок

def guess_page_tag(url: str) -> str:
    url = clean_url(url)
    if "ba.hse.ru" in url:
        return "ba"
    if "ma.hse.ru" in url:
        return "ma"
    if "www.hse.ru/sho/" in url:
        return "sho"
    if "www.hse.ru/dod/" in url:
        return "dod"
    if "studentcentre.hse.ru" in url:
        return "studentcentre"
    if "volunteer.hse.ru" in url:
        return "volunteer"
    if "electives.hse.ru" in url:
        return "electives"
    if "/studyspravka/" in url:
        return "studyspravka"
    if "yandex.ru/profi/faq" in url:
        return "yan"
    if "olymp.hse.ru/mmo/info" in url:
        return "olymp_hse"
    if "hse.ru/studyspravka" in url:
        return "studyspravka"
    if "hse.ru/kpv" in url:
        return "kpv"
    if "hse.ru/studyspravka/practice" in url:
        return "practice"
    if "hse.ru/studyspravka/engbach" in url:
        return "engbach"
    if "hse.ru/studyspravka/documents" in url:
        return "documents"
    return "unknown"


# парсер

def parse_schemaorg_faq(url: str) -> list[dict]:
    soup = fetch_html(url)
    page = guess_page_tag(url)
    scraped_at = utc_stamp()

    items: list[dict] = []

    blocks = soup.select('div.faq[itemtype="https://schema.org/Question"]')
    if not blocks:
        blocks = soup.select('[itemscope][itemtype="https://schema.org/Question"]')

    for b in blocks:
        q_el = b.select_one('[itemprop="name"]')
        a_el = b.select_one('[itemprop="text"]') or b.select_one('[itemprop="acceptedAnswer"] [itemprop="text"]')
        if not q_el or not a_el:
            continue

        question = extract_text_with_links(q_el)
        answer_text = extract_text_with_links(a_el)
        answer_html = str(a_el)

        if not question or not answer_text:
            continue

        items.append(
            make_item(
                page=page,
                section=None,
                question=question,
                answer_text=answer_text,
                answer_html=answer_html,
                source_url=url,
                scraped_at=scraped_at,
            )
        )

    return items


def parse_foldable_faq(url: str) -> list[dict]:
    soup = fetch_html(url)
    page = guess_page_tag(url)
    scraped_at = utc_stamp()

    items: list[dict] = []

    blocks = soup.select(
        "div.incut_foldable_block__item, "
        "div.incut_foldable_block_item, "
        "div.foldable_block__item, "
        "div.foldable_block_item"
    )

    if not blocks:
        blocks = [h.find_parent("div") for h in soup.select("h3.foldable_control") if h.find_parent("div")]
        seen = set()
        uniq = []
        for b in blocks:
            key = id(b)
            if key not in seen:
                seen.add(key)
                uniq.append(b)
        blocks = uniq

    for b in blocks:
        q_span = b.select_one("h3.foldable_control span.link.pseudo_link")
        q_h3 = b.select_one("h3.foldable_control")
        q_node = q_span or q_h3
        if not q_node:
            continue

        q_clone = BeautifulSoup(str(q_node), "html.parser")
        for i_tag in q_clone.find_all("i"):
            i_tag.decompose()
        question = extract_text_with_links(q_clone)
        if not question:
            continue

        fold = b.select_one("div.foldable") or (q_h3.find_next("div", class_="foldable") if q_h3 else None)
        if not fold:
            continue

        ans = fold.select_one("div.with-indent") or fold
        answer_text = extract_text_with_links(ans)
        if not answer_text:
            continue

        items.append(
            make_item(
                page=page,
                section=None,
                question=question,
                answer_text=answer_text,
                answer_html=str(ans),
                source_url=url,
                scraped_at=scraped_at,
            )
        )

    return items


def parse_wdj_plashka_faq(url: str) -> list[dict]:
    soup = fetch_html(url)
    page = guess_page_tag(url)
    scraped_at = utc_stamp()

    items: list[dict] = []

    cards = soup.select("div.wdj-plashka__card")
    for c in cards:
        q_el = c.find("h3")
        if not q_el:
            continue

        question = extract_text_with_links(q_el)
        if not question:
            continue

        c_copy = BeautifulSoup(str(c), "html.parser")
        h3 = c_copy.find("h3")
        if h3:
            h3.decompose()

        answer_text = extract_text_with_links(c_copy)
        if not answer_text:
            continue

        items.append(
            make_item(
                page=page,
                section=None,
                question=question,
                answer_text=answer_text,
                answer_html=str(c_copy),
                source_url=url,
                scraped_at=scraped_at,
            )
        )

    return items


def parse_headings_article_faq(url: str, headings=("h2", "h3")) -> list[dict]:
    soup = fetch_html(url)
    page = guess_page_tag(url)
    scraped_at = utc_stamp()

    items: list[dict] = []

    h1 = soup.find("h1")
    container = h1.parent if h1 else soup

    hs = []
    for tag in headings:
        hs.extend(container.find_all(tag))
    hs = sorted(hs, key=lambda x: x.sourceline if x.sourceline is not None else 10**9)

    for h in hs:
        question = extract_text_with_links(h)
        if not question:
            continue

        parts_text = []
        parts_html = []

        for sib in h.next_siblings:
            if getattr(sib, "name", None) in headings:
                break
            if isinstance(sib, str) or not hasattr(sib, "get_text"):
                continue

            txt = extract_text_with_links(sib)
            if txt:
                parts_text.append(txt)
                parts_html.append(str(sib))

        answer_text = norm(" ".join(parts_text))
        answer_html = "".join(parts_html) if parts_html else None
        if not answer_text:
            continue

        items.append(
            make_item(
                page=page,
                section=None,
                question=question,
                answer_text=answer_text,
                answer_html=answer_html,
                source_url=url,
                scraped_at=scraped_at,
            )
        )

    return items


def parse_h3_article_faq(url: str) -> list[dict]:
    return parse_headings_article_faq(url, headings=("h3",))


def parse_engbach_faq(url: str) -> list[dict]:
    soup = fetch_html(url)
    page = guess_page_tag(url)
    scraped_at = utc_stamp()

    items: list[dict] = []

    questions = soup.select("p.important.question")
    for q in questions:
        question = extract_text_with_links(q)
        if not question:
            continue

        parts_text, parts_html = [], []
        for sib in q.next_siblings:
            if getattr(sib, "name", None) == "p" and "question" in (sib.get("class") or []):
                break
            if isinstance(sib, str) or not hasattr(sib, "get_text"):
                continue

            txt = extract_text_with_links(sib)
            if txt:
                parts_text.append(txt)
                parts_html.append(str(sib))

        answer_text = norm(" ".join(parts_text))
        if not answer_text:
            continue

        items.append(
            make_item(
                page=page,
                section=None,
                question=question,
                answer_text=answer_text,
                answer_html="".join(parts_html),
                source_url=url,
                scraped_at=scraped_at,
            )
        )

    return items


def parse_yandex_profi_faq(url: str) -> list[dict]:
    soup = fetch_html(url)
    page = guess_page_tag(url)
    scraped_at = utc_stamp()

    items: list[dict] = []

    # 1) старый вариант
    spoiler_items = soup.select("div.lc-spoiler-item")
    for it in spoiler_items:
        q_el = (
            it.select_one(".lc-spoiler-item__header .lc-spoiler-item__title")
            or it.select_one(".lc-spoiler-item__title")
        )
        if not q_el:
            continue

        question = extract_text_with_links(q_el)
        if not question:
            continue

        a_wrap = it.select_one(".lc-spoiler-item__text") or it
        answer_text = extract_text_with_links(a_wrap)
        if not answer_text:
            continue

        items.append(
            make_item(
                page=page,
                section=None,
                question=question,
                answer_text=answer_text,
                answer_html=str(a_wrap),
                source_url=url,
                scraped_at=scraped_at,
            )
        )

    if items:
        return items

    # 2) новый вариант: aria-controls -> div#id
    headers = soup.select('[class*="spoiler-item__header"][aria-controls], [aria-controls][role="button"]')
    for h in headers:
        ctrl = h.get("aria-controls")
        if not ctrl:
            continue

        question = extract_text_with_links(h)
        if not question:
            continue

        a_wrap = soup.find(id=ctrl)
        if not a_wrap:
            continue

        answer_text = extract_text_with_links(a_wrap)
        if not answer_text:
            continue

        items.append(
            make_item(
                page=page,
                section=None,
                question=question,
                answer_text=answer_text,
                answer_html=str(a_wrap),
                source_url=url,
                scraped_at=scraped_at,
            )
        )

    if items:
        return items

    # 3) fallback: заголовки как статья
    return parse_headings_article_faq(url, headings=("h2", "h3"))


# main

def main():
    con = init_db(DB_PATH)
    total_inserted = 0

    for url in URLS:
        url = clean_url(url)

        if "www.hse.ru/sho/faq_ru" in url:
            items = parse_foldable_faq(url)

        elif "studentcentre.hse.ru/faq" in url:
            items = parse_wdj_plashka_faq(url)

        elif "volunteer.hse.ru/faq" in url or "olymp.hse.ru/mmo/info" in url:
            items = parse_h3_article_faq(url)

        elif "yandex.ru/profi/faq" in url:
            items = parse_yandex_profi_faq(url)

        elif "/studyspravka/" in url:
            if "engbach" in url:
                items = parse_engbach_faq(url)
            else:
                items = parse_headings_article_faq(url, headings=("h2", "h3"))

        elif "electives.hse.ru/kpv" in url:
            items = parse_headings_article_faq(url, headings=("h2", "h3"))

        else:
            items = parse_schemaorg_faq(url)

        ins = insert_items(con, items)
        total_inserted += ins
        print(f"[OK] {url}: found={len(items)} inserted={ins}")

    rows = fetch_latest(con, limit=10)
    print_pretty_rows(rows)

    # example_query = "Мне прислали приглашение на программу"
    # hits = search(con, example_query, k=5)
    # if hits:
    #     print("Top matches for:", example_query)
    #     for (id_, page, q, a, url, score) in hits:
    #         print(f"- id={id_} page={page} score={score:.4f} q={norm(q)[:120]}")
    #         print(f"  a={norm(a)[:200]}...")
    #         print(f"  source={url}")
    # else:
    #     print("No matches for:", example_query)

    con.close()


if __name__ == "__main__":
    main()

[OK] https://ba.hse.ru/faq: found=113 inserted=112
[OK] https://ma.hse.ru/faq: found=80 inserted=80
[OK] https://www.hse.ru/dod/faq: found=165 inserted=165
[OK] https://gsb.hse.ru/iec/faq: found=12 inserted=12
[OK] https://www.hse.ru/sho/faq_ru: found=31 inserted=31
[OK] https://studentcentre.hse.ru/faq: found=9 inserted=9
[OK] https://volunteer.hse.ru/faq: found=12 inserted=12
[OK] https://ba.hse.ru/faq-obrazovatelny-kredit-2020: found=17 inserted=17
[OK] https://yandex.ru/profi/faq: found=51 inserted=51
[OK] https://olymp.hse.ru/mmo/info: found=13 inserted=13
[OK] https://www.hse.ru/studyspravka/ras_stud: found=7 inserted=7
[OK] https://www.hse.ru/studyspravka/kontr: found=12 inserted=12
[OK] https://electives.hse.ru/kpv: found=14 inserted=14
[OK] https://www.hse.ru/studyspravka/practice: found=1 inserted=1
[OK] https://www.hse.ru/studyspravka/engbach: found=6 inserted=6
[OK] https://www.hse.ru/studyspravka/documents: found=10 inserted=10

  ID | PAGE            | QUESTION           

In [7]:
!pip install sentence-transformers rapidfuzz numpy

Collecting rapidfuzz
  Downloading rapidfuzz-3.14.3-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (12 kB)
Downloading rapidfuzz-3.14.3-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m43.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz
Successfully installed rapidfuzz-3.14.3


In [2]:
import numpy as np


def ensure_vec_table(con: sqlite3.Connection):
    cur = con.cursor()
    cur.execute("""
    CREATE TABLE IF NOT EXISTS qa_vec (
        qa_id INTEGER PRIMARY KEY,
        emb BLOB NOT NULL,
        dim INTEGER NOT NULL
    );
    """)
    con.commit()

def emb_to_blob(vec: np.ndarray) -> bytes:
    # vec: float32 (dim,)
    return vec.astype(np.float32).tobytes()

def blob_to_emb(blob: bytes, dim: int) -> np.ndarray:
    return np.frombuffer(blob, dtype=np.float32, count=dim)


### простроение ембэддингов

In [3]:
from sentence_transformers import SentenceTransformer

def build_embeddings(
    con: sqlite3.Connection,
    model_name: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
    batch_size: int = 64,
    rebuild: bool = False,
):
    """
    Считает эмбеддинги для qa.question и кладёт в qa_vec.
    normalize_embeddings=True -> cosine similarity = dot product.
    """
    ensure_vec_table(con)
    cur = con.cursor()

    if rebuild:
        cur.execute("DELETE FROM qa_vec;")
        con.commit()

    # берем вопросы, для которых еще нет эмбеддинга
    rows = cur.execute("""
        SELECT qa.id, qa.question
        FROM qa
        LEFT JOIN qa_vec ON qa_vec.qa_id = qa.id
        WHERE qa_vec.qa_id IS NULL
        ORDER BY qa.id;
    """).fetchall()

    if not rows:
        print("[emb] Nothing to do: all embeddings exist.")
        return

    ids = [r[0] for r in rows]
    questions = [r[1] for r in rows]

    model = SentenceTransformer(model_name)
    print(f"[emb] Model loaded: {model_name}")
    print(f"[emb] To encode: {len(questions)} questions")

    # энкодиркем батчами
    for start in range(0, len(questions), batch_size):
        batch_q = questions[start:start+batch_size]
        batch_ids = ids[start:start+batch_size]

        embs = model.encode(
            batch_q,
            batch_size=len(batch_q),
            normalize_embeddings=True, # нормализованные эмбеддинги
            show_progress_bar=False
        )
        embs = np.asarray(embs, dtype=np.float32)
        dim = embs.shape[1]

        cur.executemany(
            "INSERT OR REPLACE INTO qa_vec(qa_id, emb, dim) VALUES (?, ?, ?)",
            [(int(i), emb_to_blob(embs[j]), int(dim)) for j, i in enumerate(batch_ids)]
        )
        con.commit()

    print("[emb] Done.")



### BM25 (FTS5)

классический алгоритм лексического поиска из семейства TF-IDF.
Он оценивает, насколько хорошо документ подходит под запрос, исходя из совпадений слов.

In [6]:
import re

def norm_text(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())

def fts_query(text: str) -> str:
    """
    FTS5 MATCH не любит пунктуацию и 'сырой' текст.
    Делаем безопасный запрос из токенов.
    """
    t = norm_text(text).lower()
    t = re.sub(r"[^\w\s]+", " ", t, flags=re.UNICODE)
    tokens = [x for x in t.split() if len(x) > 1]
    return " ".join(tokens[:20])

def bm25_candidates(con: sqlite3.Connection, query: str, k: int = 30) -> list[int]:
    q = fts_query(query)
    if not q:
        return []
    cur = con.cursor()
    rows = cur.execute("""
        SELECT qa.id
        FROM qa_fts
        JOIN qa ON qa_fts.rowid = qa.id
        WHERE qa_fts MATCH ?
        ORDER BY bm25(qa_fts)
        LIMIT ?;
    """, (q, k)).fetchall()
    return [r[0] for r in rows]

### RapidFuzz

библиотека для быстрого нечеткого сравнения строк.
Она измеряет, насколько две строки похожи на уровне символов, а не смысла

In [8]:
from rapidfuzz import process, fuzz

def load_all_questions(con: sqlite3.Connection) -> list[tuple[int, str]]:
    cur = con.cursor()
    return cur.execute("SELECT id, question FROM qa ORDER BY id;").fetchall()

def fuzzy_candidates(
    query: str,
    all_q: list[tuple[int, str]],
    k: int = 30,
    min_score: int = 70
) -> list[int]:
    """
    Возвращает qa_id по похожести строк.
    WRatio хорошо справляется с лишними словами и перестановками.
    """
    questions = [q for (_id, q) in all_q]
    matches = process.extract(query, questions, scorer=fuzz.WRatio, limit=k)
    # matches: [(matched_text, score, idx), ...]
    out = []
    for _text, score, idx in matches:
        if score >= min_score:
            out.append(all_q[idx][0])
    return out

### Семантический реранкинг (Sentence-BERT)

оценивает смысловую близость между пользовательским вопросом и кандидатами, полученными на предыдущем этапе (BM25 / RapidFuzz)

In [9]:
def get_embeddings_for_ids(con: sqlite3.Connection, ids: list[int]) -> dict[int, tuple[np.ndarray, int]]:
    """
    Возвращает {qa_id: (emb_vector, dim)} для списка id.
    """
    if not ids:
        return {}
    cur = con.cursor()
    qmarks = ",".join(["?"] * len(ids))
    rows = cur.execute(f"""
        SELECT qa_id, emb, dim
        FROM qa_vec
        WHERE qa_id IN ({qmarks});
    """, ids).fetchall()

    out = {}
    for qa_id, blob, dim in rows:
        out[int(qa_id)] = (blob_to_emb(blob, int(dim)), int(dim))
    return out

def semantic_rerank(
    query: str,
    candidate_ids: list[int],
    con: sqlite3.Connection,
    model: SentenceTransformer,
) -> list[tuple[int, float]]:
    """
    Возвращает [(qa_id, similarity)] отсортированное по убыванию similarity.
    cosine similarity = dot product, потому что normalize_embeddings=True.
    """
    if not candidate_ids:
        return []

    id2emb = get_embeddings_for_ids(con, candidate_ids)
    # если у части кандидатов нет эмбеддингов — просто пропустим
    # список с номерами кандидатов и их эмбеддингами
    usable = [(i, id2emb[i][0]) for i in candidate_ids if i in id2emb]
    if not usable:
        return []

    q_emb = model.encode([query], normalize_embeddings=True, show_progress_bar=False)[0].astype(np.float32)

    scored = []
    for qa_id, emb in usable:
        sim = float(np.dot(q_emb, emb))  # [-1,1], обычно 0..1
        scored.append((qa_id, sim))

    scored.sort(key=lambda x: x[1], reverse=True)
    return scored

### BM25 + RapidFuzz --> финальный выбор по смыслу (SBERT)

In [10]:
def fetch_rows_by_ids(con: sqlite3.Connection, ids: list[int]) -> list[tuple]:
    if not ids:
        return []
    cur = con.cursor()
    qmarks = ",".join(["?"] * len(ids))
    rows = cur.execute(f"""
        SELECT id, page, question, answer_text, source_url
        FROM qa
        WHERE id IN ({qmarks});
    """, ids).fetchall()
    # сохраним порядок как в ids
    by_id = {r[0]: r for r in rows}
    return [by_id[i] for i in ids if i in by_id]

def hybrid_search(
    con: sqlite3.Connection,
    query: str,
    all_q_cache: list[tuple[int, str]] | None = None,
    model: SentenceTransformer | None = None,
    bm25_k: int = 30,
    fuzzy_k: int = 30,
    fuzzy_min_score: int = 70,
    final_k: int = 3,
):
    """
    Возвращает:
      - best_row: (id, page, question, answer_text, source_url) | None
      - top_rows: list таких же кортежей (final_k)
      - debug: словарь с этапами и скором
    """
    q = norm_text(query)
    if not q:
        return None, [], {"reason": "empty_query"}

    # 1) кандидаты BM25
    bm_ids = bm25_candidates(con, q, k=bm25_k)

    # 2) кандидаты fuzzy
    if all_q_cache is None:
        all_q_cache = load_all_questions(con)
    fz_ids = fuzzy_candidates(q, all_q_cache, k=fuzzy_k, min_score=fuzzy_min_score)

    # 3) объединяем (уникально, сохраняя приоритет: BM25 -> Fuzzy)
    seen = set()
    candidate_ids = []
    for _id in bm_ids + fz_ids:
        if _id not in seen:
            seen.add(_id)
            candidate_ids.append(_id)

    # Если кандидатов нет вообще — можно fallback: взять top fuzzy без порога
    if not candidate_ids:
        fz_ids2 = fuzzy_candidates(q, all_q_cache, k=10, min_score=0)
        candidate_ids = fz_ids2

    # 4) финальный rerank по смыслу
    if model is None:
        model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")

    scored = semantic_rerank(q, candidate_ids, con, model)
    top_ids = [i for i, sim in scored[:final_k]]

    top_rows = fetch_rows_by_ids(con, top_ids)
    best_row = top_rows[0] if top_rows else None

    debug = {
        "bm25_candidates": bm_ids[:10],
        "fuzzy_candidates": fz_ids[:10],
        "candidate_count": len(candidate_ids),
        "semantic_top": scored[:10],  # (id, sim)
    }
    return best_row, top_rows, debug

### тестовый main

In [None]:
# def demo():
#     db_path = "qa.db"
#     con = sqlite3.connect(db_path)

#     # 1) один раз: посчитать эмбеддинги
#     build_embeddings(con)

#     # 2) кэш вопросов для rapidfuzz (ускоряет)
#     all_q = load_all_questions(con)

#     # 3) модель грузим один раз
#     model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")

#     while True:
#         try:
#             text = input(">>> ")
#             best, top, dbg = hybrid_search(con, text, all_q_cache=all_q, model=model)
#             if not best:
#                 print("Ничего не нашла")
#                 continue

#             print("\n BEST:")
#             print("Q:", best[2])
#             print("A:", best[3], "...")
#             print("SRC:", best[4])

#             print("\nTop-3:")
#             for r in top:
#                 print("-", r[2])


#         except KeyboardInterrupt:
#             break

#     con.close()

# if __name__ == "__main__":
#     demo()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/645 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/471M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/480 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.08M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

[emb] Model loaded: sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2
[emb] To encode: 552 questions
[emb] Done.


## создаем тестовую выборку

In [None]:
import csv
import sqlite3
from pathlib import Path

DB_PATH = "qa.db"
OUT = "tests.csv"

HEADER = ["query", "expected_qa_id", "comment"]


def open_writer(path: str):
    """
    Открывает CSV:
    - если файл есть → дописываем
    - если нет → создаём и пишем header
    """
    p = Path(path)
    exists = p.exists()

    f = p.open("a", newline="", encoding="utf-8")
    writer = csv.writer(f)

    if not exists:
        writer.writerow(HEADER)
        f.flush()

    return f, writer


def continue_labeling(n_items: int = 20):
    con = sqlite3.connect(DB_PATH)
    cur = con.cursor()

    f, writer = open_writer(OUT)

    print(f"\n Writing: {Path(OUT).resolve()}")
    print("Empty input = finish current FAQ item\n")

    try:
        for i in range(n_items):
            row = cur.execute("""
                SELECT id, question
                FROM qa
                ORDER BY RANDOM()
                LIMIT 1
            """).fetchone()

            qa_id, faq_question = row

            print("\n" + "=" * 90)
            print(f"[{i+1}/{n_items}] expected_qa_id = {qa_id}")
            print("FAQ QUESTION:")
            print(faq_question)

            while True:
                query = input("query> ").strip()
                if not query:
                    break

                comment = input("comment (опечатка / перефраз / лишние слова / точно): ").strip()
                writer.writerow([query, qa_id, comment])
                f.flush()
                print("saved")

    except KeyboardInterrupt:
        print("\n Interrupted by user")

    finally:
        f.close()
        con.close()
        print("\n Done. File safely closed.")


continue_labeling(n_items=20)


✍️  Writing to: /content/tests.csv
Empty input = finish current FAQ item


[1/20] expected_qa_id = 24
FAQ QUESTION:
В каких случаях мои документы не примут?
query> когда могут не принять мои документы 
comment (опечатка / перефраз / лишние слова / точно): перефраз
✔ saved
query> почему мои документ не приняли
comment (опечатка / перефраз / лишние слова / точно): перефраз
✔ saved
query> В каких случаях мои документы не примут
comment (опечатка / перефраз / лишние слова / точно): точно
✔ saved
query> 

[2/20] expected_qa_id = 111
FAQ QUESTION:
Будет ли отличаться диплом от других бакалаврских программ?
query> мой диплом будет отличаться от других дипломов бакалавриата
comment (опечатка / перефраз / лишние слова / точно): перефраз
✔ saved
query> есть ли отличие этого диплома бакалавра от других
comment (опечатка / перефраз / лишние слова / точно): перефраз
✔ saved
query> удет ли отличаться диплом от других бакалаваских програм
comment (опечатка / перефраз / лишние слова / точно): опечатк

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
OUT = "/content/drive/MyDrive/hse_qa_bot/tests_2.csv"

## Сравнение всех способов на тестовой выборке

In [None]:
import csv
import sqlite3
import math
from collections import defaultdict

from sentence_transformers import SentenceTransformer

DB_PATH = "qa.db"
TESTS_CSV = "/content/tests-2.csv"

def rank_of(expected_id: int, predicted_ids: list[int]) -> int | None:
    """1-based rank, or None if not found"""
    try:
        return predicted_ids.index(expected_id) + 1
    except ValueError:
        return None

def metrics_from_ranks(ranks: list[int | None], k: int = 3):
    n = len(ranks)
    top1 = sum(1 for r in ranks if r == 1) / n if n else 0.0
    topk = sum(1 for r in ranks if (r is not None and r <= k)) / n if n else 0.0
    mrr = sum((1.0 / r) if r else 0.0 for r in ranks) / n if n else 0.0
    return top1, topk, mrr

def read_tests(path: str):
    tests = []
    with open(path, newline="", encoding="utf-8") as f:
        rdr = csv.DictReader(f)
        for row in rdr:
            tests.append((
                row["query"],
                int(row["expected_qa_id"]),
                row.get("comment","")
            ))
    return tests

def predict_bm25(con, query, k=3):
    return bm25_candidates(con, query, k=50)[:k]

def predict_fuzzy_only(con, query, all_q, k=3):
    # возвращаем top-k по fuzzy score
    fz = fuzzy_candidates(query, all_q, k=50, min_score=65)
    return fz[:k]

def predict_bm25_fuzzy(con, query, all_q, k=3):
    bm = bm25_candidates(con, query, k=50)
    fz = fuzzy_candidates(query, all_q, k=50, min_score=65)
    seen = set()
    cand = []
    for i in bm + fz:
        if i not in seen:
            seen.add(i)
            cand.append(i)
    return cand[:k]

def predict_hybrid(con, query, all_q, model, k=3):
    bm = bm25_candidates(con, query, k=50)
    fz = fuzzy_candidates(query, all_q, k=50, min_score=65)

    seen = set()
    cand = []
    for i in bm + fz:
        if i not in seen:
            seen.add(i)
            cand.append(i)

    # fallback если совсем пусто
    if not cand:
        cand = fuzzy_candidates(query, all_q, k=50, min_score=0)

    scored = semantic_rerank(query, cand, con, model)
    top_ids = [i for (i, sim) in scored[:k]]
    return top_ids

def main():
    con = sqlite3.connect(DB_PATH)
    tests = read_tests(TESTS_CSV)
    all_q = load_all_questions(con)
    model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")

    ranks = {
        "BM25": [],
        "Fuzzy": [],
        "BM25+Fuzzy": [],
        "BM25+Fuzzy+Semantic": []
    }

    # лог фейлов
    failures = defaultdict(list)

    for query, expected_id, comment in tests:
        pred_bm = predict_bm25(con, query, k=3)
        pred_fz = predict_fuzzy_only(con, query, all_q, k=3)
        pred_bm_fz = predict_bm25_fuzzy(con, query, all_q, k=3)
        pred_hyb = predict_hybrid(con, query, all_q, model, k=3)

        r_bm = rank_of(expected_id, pred_bm)
        r_fz = rank_of(expected_id, pred_fz)
        r_bm_fz = rank_of(expected_id, pred_bm_fz)
        r_hyb = rank_of(expected_id, pred_hyb)

        ranks["BM25"].append(r_bm)
        ranks["Fuzzy"].append(r_fz)
        ranks["BM25+Fuzzy"].append(r_bm_fz)
        ranks["BM25+Fuzzy+Semantic"].append(r_hyb)

        if r_hyb is None:
            failures[comment or "no_comment"].append((query, expected_id, pred_hyb))

    print("\nRESULTS on", len(tests), "tests")
    print("-"*72)
    print(f"{'Model':<18} | {'Top-1':>6} | {'Top-3':>6} | {'MRR':>6}")
    print("-"*72)

    for name in ["BM25", "Fuzzy", "BM25+Fuzzy", "BM25+Fuzzy+Semantic"]:
        top1, top3, mrr = metrics_from_ranks(ranks[name], k=3)
        print(f"{name:<18} | {top1:>6.3f} | {top3:>6.3f} | {mrr:>6.3f}")

    # немного диагностик
    if failures:
        print("\nSome HYBRID failures by comment type:")
        for c, items in failures.items():
            print(f"- {c}: {len(items)}")
        # примеры
        ex = next(iter(failures.values()))
        print("\nExample failure:")
        q, eid, preds = ex[0]
        print("query:", q)
        print("expected_id:", eid)
        print("predicted_top3:", preds)

    con.close()

if __name__ == "__main__":
    main()


RESULTS on 91 tests
------------------------------------------------------------------------
Model              |  Top-1 |  Top-3 |    MRR
------------------------------------------------------------------------
BM25               |  0.110 |  0.110 |  0.110
Fuzzy              |  0.143 |  0.143 |  0.143
BM25+Fuzzy         |  0.154 |  0.154 |  0.154
BM25+Fuzzy+Semantic |  0.440 |  0.527 |  0.480

Some HYBRID failures by comment type:
- перефраз: 38
- опечатка: 4
- no_comment: 1

Example failure:
query: когда действует трехсторонний договор
expected_id: 91
predicted_top3: [82, 210, 538]


## Сбор тренингового датасета

In [None]:
import os
import re
import csv
import random
import sqlite3
from typing import Dict, List, Tuple

import numpy as np
from rapidfuzz import process, fuzz
from sentence_transformers import SentenceTransformer
from sklearn.model_selection import train_test_split

# Пути / базовые параметры
DB_PATH = "qa.db"
OUT_CSV = "train_pairs.csv"
MODEL_NAME = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"

SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Размеры пулов кандидатов и баланс датасета
BM25_K = 200
FUZZY_K = 200
NEG_PER_POS = 20
AUG_PER_Q = 3
MAX_FTS_TOKENS = 20

REBUILD_CSV = True

CSV_FIELDS = [
    "split",
    "qa_id",
    "query",
    "candidate_id",
    "label",
    "bm25_inv",
    "fuzzy",
    "semantic",
    "overlap",
    "len_ratio",
    "len_diff",
]

def norm_text(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())

def tokenize(s: str) -> List[str]:
    s = norm_text(s).lower()
    s = re.sub(r"[^\w\s]+", " ", s, flags=re.UNICODE)
    return [t for t in s.split() if len(t) > 1]

def overlap_ratio(q_tokens: List[str], c_tokens: List[str]) -> float:
    if not q_tokens or not c_tokens:
        return 0.0
    q, c = set(q_tokens), set(c_tokens)
    return len(q & c) / max(1, len(q))

def fts_query(text: str, max_tokens: int = MAX_FTS_TOKENS) -> str:
    t = norm_text(text).lower()
    t = re.sub(r"[^\w\s]+", " ", t, flags=re.UNICODE)
    tokens = [x for x in t.split() if len(x) > 1]
    return " ".join(tokens[:max_tokens])

def load_all_questions(con: sqlite3.Connection) -> List[Tuple[int, str]]:
    cur = con.cursor()
    return [(int(i), q) for i, q in cur.execute("SELECT id, question FROM qa ORDER BY id;").fetchall()]

def fetch_questions_by_ids(con: sqlite3.Connection, ids: List[int]) -> Dict[int, str]:
    if not ids:
        return {}
    cur = con.cursor()
    qmarks = ",".join(["?"] * len(ids))
    rows = cur.execute(f"SELECT id, question FROM qa WHERE id IN ({qmarks});", ids).fetchall()
    return {int(i): q for i, q in rows}

# embeddings
def blob_to_emb(blob: bytes, dim: int) -> np.ndarray:
    return np.frombuffer(blob, dtype=np.float32, count=dim)

def get_embeddings_for_ids(con: sqlite3.Connection, ids: List[int]) -> Dict[int, np.ndarray]:
    if not ids:
        return {}
    cur = con.cursor()
    qmarks = ",".join(["?"] * len(ids))
    rows = cur.execute(f"""
        SELECT qa_id, emb, dim
        FROM qa_vec
        WHERE qa_id IN ({qmarks});
    """, ids).fetchall()
    return {int(qa_id): blob_to_emb(blob, int(dim)) for qa_id, blob, dim in rows}

def semantic_sims(con: sqlite3.Connection, query: str, ids: List[int], model: SentenceTransformer) -> Dict[int, float]:
    id2emb = get_embeddings_for_ids(con, ids)
    if not id2emb:
        return {}
    q_emb = model.encode([query], normalize_embeddings=True, show_progress_bar=False)[0].astype(np.float32)
    return {i: float(np.dot(q_emb, emb)) for i, emb in id2emb.items()}


def bm25_candidates_with_scores(con: sqlite3.Connection, query: str, k: int = BM25_K) -> List[Tuple[int, float]]:
    q = fts_query(query)
    if not q:
        return []
    cur = con.cursor()
    rows = cur.execute("""
        SELECT qa.id, bm25(qa_fts) AS score
        FROM qa_fts
        JOIN qa ON qa_fts.rowid = qa.id
        WHERE qa_fts MATCH ?
        ORDER BY score
        LIMIT ?;
    """, (q, k)).fetchall()
    return [(int(i), float(s)) for i, s in rows]

def fuzzy_candidates_with_scores(query: str, all_q: List[Tuple[int, str]], k: int = FUZZY_K) -> List[Tuple[int, float]]:
    questions = [q for (_id, q) in all_q]
    matches = process.extract(query, questions, scorer=fuzz.WRatio, limit=k)
    return [(int(all_q[idx][0]), float(score)) for _text, score, idx in matches]

def make_candidate_pool(con: sqlite3.Connection, query: str, all_q: List[Tuple[int, str]],
                        bm25_k: int = BM25_K, fuzzy_k: int = FUZZY_K) -> Tuple[List[int], Dict[int, float], Dict[int, float]]:
    # Объединяем кандидатов из bm25 и fuzzy в один пул без дублей
    bm = bm25_candidates_with_scores(con, query, k=bm25_k)
    fz = fuzzy_candidates_with_scores(query, all_q, k=fuzzy_k)

    bm25_map = {i: s for i, s in bm}
    fuzzy_map = {i: s for i, s in fz}

    seen = set()
    pool = []
    for i, _ in bm:
        if i not in seen:
            seen.add(i); pool.append(i)
    for i, _ in fz:
        if i not in seen:
            seen.add(i); pool.append(i)

    return pool, bm25_map, fuzzy_map


def build_features_for_query(con: sqlite3.Connection,
                            query: str,
                            candidate_ids: List[int],
                            bm25_map: Dict[int, float],
                            fuzzy_map: Dict[int, float],
                            sem_map: Dict[int, float]) -> np.ndarray:
    q_tok = tokenize(query)
    id2q = fetch_questions_by_ids(con, candidate_ids)

    X = []
    for cid in candidate_ids:
        cand_q = id2q.get(cid, "")
        c_tok = tokenize(cand_q)

        bm25 = bm25_map.get(cid, None)
        fz = fuzzy_map.get(cid, 0.0)
        sem = sem_map.get(cid, 0.0)

        bm25_inv = -bm25 if bm25 is not None else 0.0
        len_ratio = min(len(q_tok), len(c_tok)) / max(1, max(len(q_tok), len(c_tok)))
        len_diff = abs(len(q_tok) - len(c_tok)) / max(1, len(q_tok))

        X.append([bm25_inv, fz/100.0, sem, overlap_ratio(q_tok, c_tok), len_ratio, len_diff])

    return np.asarray(X, dtype=np.float32)


STOPWORDS = set("и в во на по о об от до за для или а но что как когда где куда".split())

def augment_queries(base_q: str, n: int = AUG_PER_Q) -> List[str]:
    toks = tokenize(base_q)
    out = {base_q}

    toks2 = [t for t in toks if t not in STOPWORDS]
    if toks2:
        out.add(" ".join(toks2))

    out.add("подскажите пожалуйста " + base_q)
    out.add(base_q + " что делать")

    def typo_word(w: str) -> str:
        if len(w) < 4: return w
        i = random.randint(1, len(w) - 2)
        return w[:i] + w[i+1] + w[i] + w[i+2:]

    for _ in range(max(0, n - 2)):
        if toks:
            j = random.randrange(len(toks))
            tw = toks[:]
            tw[j] = typo_word(tw[j])
            out.add(" ".join(tw))

    out_list = list(out)
    random.shuffle(out_list)
    return out_list[:max(1, n)]


def init_csv(path: str = OUT_CSV, rebuild: bool = False):
    if rebuild and os.path.exists(path):
        os.remove(path)
    if not os.path.exists(path):
        with open(path, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=CSV_FIELDS)
            w.writeheader()

def append_rows(path: str, rows: List[dict]):
    with open(path, "a", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=CSV_FIELDS)
        w.writerows(rows)


# создаем датасет
def build_train_pairs_csv(con: sqlite3.Connection,
                          all_q: List[Tuple[int, str]],
                          split_ids: Dict[str, List[int]],
                          model: SentenceTransformer):
    id2question = {i: q for i, q in all_q}

    for split_name, ids in split_ids.items():
        print(f"[BUILD] split={split_name} qa_ids={len(ids)}")
        batch_rows = []
        written = 0

        for qa_id in ids:
            base_q = id2question[qa_id]
            queries = augment_queries(base_q, n=AUG_PER_Q) if split_name == "train" else [base_q]

            for query in queries:
                pool, bm25_map, fuzzy_map = make_candidate_pool(con, query, all_q, BM25_K, FUZZY_K)

                # гарантируем наличие позитива
                if qa_id not in pool:
                    pool.insert(0, qa_id)

                sem_map = semantic_sims(con, query, pool, model)

                negs = [cid for cid in pool if cid != qa_id]
                negs = negs[:max(NEG_PER_POS * 3, NEG_PER_POS)]
                if len(negs) > NEG_PER_POS:
                    negs = random.sample(negs, NEG_PER_POS)

                candidates = [qa_id] + negs
                X = build_features_for_query(con, query, candidates, bm25_map, fuzzy_map, sem_map)
                labels = [1] + [0] * len(negs)

                for cid, lab, feats in zip(candidates, labels, X):
                    batch_rows.append({
                        "split": split_name,
                        "qa_id": qa_id,
                        "query": query,
                        "candidate_id": cid,
                        "label": int(lab),
                        "bm25_inv": float(feats[0]),
                        "fuzzy": float(feats[1]),
                        "semantic": float(feats[2]),
                        "overlap": float(feats[3]),
                        "len_ratio": float(feats[4]),
                        "len_diff": float(feats[5]),
                    })

                if len(batch_rows) >= 5000:
                    append_rows(OUT_CSV, batch_rows)
                    written += len(batch_rows)
                    batch_rows = []

        if batch_rows:
            append_rows(OUT_CSV, batch_rows)
            written += len(batch_rows)

        print(f"[BUILD] split={split_name} written_rows={written}")


con = sqlite3.connect(DB_PATH)
all_q = load_all_questions(con)
ids = [i for i, _ in all_q]

train_ids, test_ids = train_test_split(ids, test_size=0.2, random_state=SEED)
train_ids, val_ids  = train_test_split(train_ids, test_size=0.2, random_state=SEED)
print(f"[SPLIT] train={len(train_ids)} val={len(val_ids)} test={len(test_ids)}")

st_model = SentenceTransformer(MODEL_NAME)

init_csv(OUT_CSV, rebuild=REBUILD_CSV)
if REBUILD_CSV:
    split_ids = {"train": train_ids, "val": val_ids, "test": test_ids}
    build_train_pairs_csv(con, all_q, split_ids, st_model)
    print(f"[OK] dataset saved to {OUT_CSV}")

con.close()

## Обучаем семантический reranker на train_pairs.csv на Логистической регрессии

In [None]:
import csv
import sqlite3
import re
from typing import Dict, List, Tuple, Optional

import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.linear_model import LogisticRegression


DB_PATH = "qa.db"
OUT_CSV = "train_pairs.csv"
MODEL_NAME = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"

BM25_K = 200
FUZZY_K = 200
MAX_FTS_TOKENS = 20


def norm_text(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())

# токены нужны только для overlap/длины, поэтому максимально просто
def tokenize(s: str):
    s = norm_text(s).lower()
    s = re.sub(r"[^\w\s]+", " ", s, flags=re.UNICODE)
    return [t for t in s.split() if len(t) > 1]

def overlap_ratio(q_tokens, c_tokens):
    if not q_tokens or not c_tokens:
        return 0.0
    q, c = set(q_tokens), set(c_tokens)
    return len(q & c) / max(1, len(q))

def fts_query(text: str, max_tokens: int = MAX_FTS_TOKENS) -> str:
    t = norm_text(text).lower()
    t = re.sub(r"[^\w\s]+", " ", t, flags=re.UNICODE)
    tokens = [x for x in t.split() if len(x) > 1]
    return " ".join(tokens[:max_tokens])

def rank_of(expected_id: int, predicted_ids: List[int]) -> Optional[int]:
    try:
        return predicted_ids.index(expected_id) + 1
    except ValueError:
        return None

def metrics_from_ranks(ranks: List[Optional[int]], k: int = 3):
    n = len(ranks)
    top1 = sum(1 for r in ranks if r == 1) / n if n else 0.0
    topk = sum(1 for r in ranks if (r is not None and r <= k)) / n if n else 0.0
    mrr = sum((1.0 / r) if r else 0.0 for r in ranks) / n if n else 0.0
    return top1, topk, mrr

# база данных
def load_all_questions(con: sqlite3.Connection) -> List[Tuple[int, str]]:
    cur = con.cursor()
    return [(int(i), q) for i, q in cur.execute("SELECT id, question FROM qa ORDER BY id;").fetchall()]

def fetch_questions_by_ids(con: sqlite3.Connection, ids: List[int]) -> Dict[int, str]:
    if not ids:
        return {}
    cur = con.cursor()
    qmarks = ",".join(["?"] * len(ids))
    rows = cur.execute(f"SELECT id, question FROM qa WHERE id IN ({qmarks});", ids).fetchall()
    return {int(i): q for i, q in rows}

# эмбеддинги
def blob_to_emb(blob: bytes, dim: int) -> np.ndarray:
    return np.frombuffer(blob, dtype=np.float32, count=dim)

def get_embeddings_for_ids(con: sqlite3.Connection, ids: List[int]) -> Dict[int, np.ndarray]:
    if not ids:
        return {}
    cur = con.cursor()
    qmarks = ",".join(["?"] * len(ids))
    rows = cur.execute(f"""
        SELECT qa_id, emb, dim
        FROM qa_vec
        WHERE qa_id IN ({qmarks});
    """, ids).fetchall()
    return {int(qa_id): blob_to_emb(blob, int(dim)) for qa_id, blob, dim in rows}

def semantic_sims(con: sqlite3.Connection, query: str, ids: List[int], model: SentenceTransformer) -> Dict[int, float]:
    id2emb = get_embeddings_for_ids(con, ids)
    if not id2emb:
        return {}
    q_emb = model.encode([query], normalize_embeddings=True, show_progress_bar=False)[0].astype(np.float32)
    return {i: float(np.dot(q_emb, emb)) for i, emb in id2emb.items()}

# BM25
def bm25_candidates_with_scores(con: sqlite3.Connection, query: str, k: int = 200):
    q = fts_query(query)
    if not q:
        return []
    cur = con.cursor()
    rows = cur.execute("""
        SELECT qa.id, bm25(qa_fts) AS score
        FROM qa_fts
        JOIN qa ON qa_fts.rowid = qa.id
        WHERE qa_fts MATCH ?
        ORDER BY score
        LIMIT ?;
    """, (q, k)).fetchall()
    return [(int(i), float(s)) for i, s in rows]

# fuzzy
from rapidfuzz import process, fuzz
def fuzzy_candidates_with_scores(query: str, all_q: List[Tuple[int, str]], k: int = 200):
    questions = [q for (_id, q) in all_q]
    matches = process.extract(query, questions, scorer=fuzz.WRatio, limit=k)
    return [(int(all_q[idx][0]), float(score)) for _text, score, idx in matches]

def make_candidate_pool(con, query, all_q, bm25_k=200, fuzzy_k=200):
    bm = bm25_candidates_with_scores(con, query, k=bm25_k)
    fz = fuzzy_candidates_with_scores(query, all_q, k=fuzzy_k)

    bm25_map = {i: s for i, s in bm}
    fuzzy_map = {i: s for i, s in fz}

    seen = set()
    pool = []
    for i, _ in bm:
        if i not in seen:
            seen.add(i); pool.append(i)
    for i, _ in fz:
        if i not in seen:
            seen.add(i); pool.append(i)

    return pool, bm25_map, fuzzy_map

def build_features_for_query(con, query, candidate_ids, bm25_map, fuzzy_map, sem_map):
    q_tok = tokenize(query)
    id2q = fetch_questions_by_ids(con, candidate_ids)

    X = []
    for cid in candidate_ids:
        cand_q = id2q.get(cid, "")
        c_tok = tokenize(cand_q)

        bm25 = bm25_map.get(cid, None)
        fz = fuzzy_map.get(cid, 0.0)
        sem = sem_map.get(cid, 0.0)

        bm25_inv = -bm25 if bm25 is not None else 0.0
        len_ratio = min(len(q_tok), len(c_tok)) / max(1, max(len(q_tok), len(c_tok)))
        len_diff = abs(len(q_tok) - len(c_tok)) / max(1, len(q_tok))

        X.append([bm25_inv, fz/100.0, sem, overlap_ratio(q_tok, c_tok), len_ratio, len_diff])

    return np.asarray(X, dtype=np.float32)

def rerank_with_clf(con, query, pool_ids, bm25_map, fuzzy_map, sem_map, clf, k=3):
    X = build_features_for_query(con, query, pool_ids, bm25_map, fuzzy_map, sem_map)
    scores = clf.predict_proba(X)[:, 1]
    order = np.argsort(-scores)
    return [pool_ids[i] for i in order[:k]]


def load_train_split_ids_from_csv(path=OUT_CSV):
    # чтобы быстро оценить на val/test FAQ по тем же id, что в CSV
    ids = {"train": set(), "val": set(), "test": set()}
    with open(path, newline="", encoding="utf-8") as f:
        rdr = csv.DictReader(f)
        for row in rdr:
            ids[row["split"]].add(int(row["qa_id"]))
    return {k: sorted(v) for k, v in ids.items()}

def load_Xy_from_csv(path=OUT_CSV, split="train"):
    X, y = [], []
    with open(path, newline="", encoding="utf-8") as f:
        rdr = csv.DictReader(f)
        for row in rdr:
            if row["split"] != split:
                continue
            X.append([
                float(row["bm25_inv"]),
                float(row["fuzzy"]),
                float(row["semantic"]),
                float(row["overlap"]),
                float(row["len_ratio"]),
                float(row["len_diff"]),
            ])
            y.append(int(row["label"]))
    return np.asarray(X, dtype=np.float32), np.asarray(y, dtype=np.int32)

# train
X_tr, y_tr = load_Xy_from_csv(OUT_CSV, split="train")
print("[TRAIN] X:", X_tr.shape, "pos:", int(y_tr.sum()), "neg:", int((y_tr==0).sum()))

clf = LogisticRegression(max_iter=2000, class_weight="balanced")
clf.fit(X_tr, y_tr)

# eval
con = sqlite3.connect(DB_PATH)
all_q = load_all_questions(con)
id2question = {i: q for i, q in all_q}
st_model = SentenceTransformer(MODEL_NAME)

split_ids = load_train_split_ids_from_csv(OUT_CSV)
val_ids = split_ids["val"]
test_ids = split_ids["test"]

def eval_on_ids(ids):
    ranks = []
    for qa_id in ids:
        query = id2question[qa_id]
        pool, bm25_map, fuzzy_map = make_candidate_pool(con, query, all_q, bm25_k=BM25_K, fuzzy_k=FUZZY_K)
        if qa_id not in pool:
            pool.insert(0, qa_id)
        sem_map = semantic_sims(con, query, pool, st_model)
        top3 = rerank_with_clf(con, query, pool, bm25_map, fuzzy_map, sem_map, clf, k=3)
        ranks.append(rank_of(qa_id, top3))
    return metrics_from_ranks(ranks, k=3)

v_top1, v_top3, v_mrr = eval_on_ids(val_ids)
t_top1, t_top3, t_mrr = eval_on_ids(test_ids)

print("\nRERANKER QUICK CHECK (FAQ exact queries)")
print(f"VAL : Top-1={v_top1:.3f} Top-3={v_top3:.3f} MRR={v_mrr:.3f}")
print(f"TEST: Top-1={t_top1:.3f} Top-3={t_top3:.3f} MRR={t_mrr:.3f}")

con.close()

[TRAIN] X: (22176, 6) pos: 1056 neg: 21120

RERANKER QUICK CHECK (FAQ exact queries)
VAL : Top-1=0.989 Top-3=1.000 MRR=0.994
TEST: Top-1=0.964 Top-3=1.000 MRR=0.980


## Тест выборка (Логистическая регрессия)

метрики

Top-1=0.648  Top-3=0.835  MRR=0.733


In [None]:
import csv
import re
import sqlite3
from collections import defaultdict

import numpy as np
from rapidfuzz import process, fuzz
from sentence_transformers import SentenceTransformer


DB_PATH = "qa.db"
TESTS_CSV = "tests-2.csv"
MODEL_NAME = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"

BM25_K = 400
FUZZY_K = 400
MAX_FTS_TOKENS = 20


def norm_text(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())

def tokenize(s: str):
    s = norm_text(s).lower()
    s = re.sub(r"[^\w\s]+", " ", s, flags=re.UNICODE)
    return [t for t in s.split() if len(t) > 1]

def overlap_ratio(q_tokens, c_tokens):
    if not q_tokens or not c_tokens:
        return 0.0
    q, c = set(q_tokens), set(c_tokens)
    return len(q & c) / max(1, len(q))

def fts_query(text: str, max_tokens: int = MAX_FTS_TOKENS) -> str:
    t = norm_text(text).lower()
    t = re.sub(r"[^\w\s]+", " ", t, flags=re.UNICODE)
    tokens = [x for x in t.split() if len(x) > 1]
    return " ".join(tokens[:max_tokens])


def rank_of(expected_id: int, predicted_ids: list[int]):
    try:
        return predicted_ids.index(expected_id) + 1
    except ValueError:
        return None

def metrics_from_ranks(ranks, k=3):
    n = len(ranks)
    top1 = sum(1 for r in ranks if r == 1) / n if n else 0.0
    topk = sum(1 for r in ranks if (r is not None and r <= k)) / n if n else 0.0
    mrr = sum((1.0 / r) if r else 0.0 for r in ranks) / n if n else 0.0
    return top1, topk, mrr


def load_all_questions(con):
    cur = con.cursor()
    return [(int(i), q) for i, q in cur.execute("SELECT id, question FROM qa ORDER BY id;").fetchall()]

def fetch_questions_by_ids(con, ids):
    if not ids: return {}
    cur = con.cursor()
    qmarks = ",".join(["?"]*len(ids))
    rows = cur.execute(f"SELECT id, question FROM qa WHERE id IN ({qmarks});", ids).fetchall()
    return {int(i): q for i, q in rows}

def blob_to_emb(blob: bytes, dim: int) -> np.ndarray:
    return np.frombuffer(blob, dtype=np.float32, count=dim)

def get_embeddings_for_ids(con, ids):
    if not ids: return {}
    cur = con.cursor()
    qmarks = ",".join(["?"]*len(ids))
    rows = cur.execute(f"""
        SELECT qa_id, emb, dim
        FROM qa_vec
        WHERE qa_id IN ({qmarks});
    """, ids).fetchall()
    return {int(i): blob_to_emb(blob, int(dim)) for i, blob, dim in rows}

def semantic_sims(con, query, ids, model):
    id2emb = get_embeddings_for_ids(con, ids)
    if not id2emb:
        return {}
    q_emb = model.encode([query], normalize_embeddings=True, show_progress_bar=False)[0].astype(np.float32)
    return {i: float(np.dot(q_emb, emb)) for i, emb in id2emb.items()}


def bm25_candidates_with_scores(con, query, k=BM25_K):
    q = fts_query(query)
    if not q:
        return []
    cur = con.cursor()
    rows = cur.execute("""
        SELECT qa.id, bm25(qa_fts) AS score
        FROM qa_fts
        JOIN qa ON qa_fts.rowid = qa.id
        WHERE qa_fts MATCH ?
        ORDER BY score
        LIMIT ?;
    """, (q, k)).fetchall()
    return [(int(i), float(s)) for i, s in rows]

def fuzzy_candidates_with_scores(query, all_q, k=FUZZY_K):
    questions = [q for (_id, q) in all_q]
    matches = process.extract(query, questions, scorer=fuzz.WRatio, limit=k)
    return [(int(all_q[idx][0]), float(score)) for _text, score, idx in matches]

def make_candidate_pool(con, query, all_q, bm25_k=BM25_K, fuzzy_k=FUZZY_K, max_pool=500):
    bm = bm25_candidates_with_scores(con, query, k=bm25_k)
    fz = fuzzy_candidates_with_scores(query, all_q, k=fuzzy_k)

    bm25_map = {i: s for i, s in bm}
    fuzzy_map = {i: s for i, s in fz}

    seen = set()
    pool = []
    for i, _ in bm:
        if i not in seen:
            seen.add(i); pool.append(i)
    for i, _ in fz:
        if i not in seen:
            seen.add(i); pool.append(i)

    pool = pool[:max_pool]
    return pool, bm25_map, fuzzy_map


def build_features_for_query(con, query, candidate_ids, bm25_map, fuzzy_map, sem_map):
    q_tok = tokenize(query)
    id2q = fetch_questions_by_ids(con, candidate_ids)

    X = []
    for cid in candidate_ids:
        cand_q = id2q.get(cid, "")
        c_tok = tokenize(cand_q)

        bm25 = bm25_map.get(cid, None)
        fz = fuzzy_map.get(cid, 0.0)
        sem = sem_map.get(cid, 0.0)

        bm25_inv = -bm25 if bm25 is not None else 0.0
        len_ratio = min(len(q_tok), len(c_tok)) / max(1, max(len(q_tok), len(c_tok)))
        len_diff = abs(len(q_tok) - len(c_tok)) / max(1, len(q_tok))

        X.append([bm25_inv, fz/100.0, sem, overlap_ratio(q_tok, c_tok), len_ratio, len_diff])

    return np.asarray(X, dtype=np.float32)

def rerank_with_clf(con, query, pool_ids, bm25_map, fuzzy_map, sem_map, clf, k=3):
    X = build_features_for_query(con, query, pool_ids, bm25_map, fuzzy_map, sem_map)
    scores = clf.predict_proba(X)[:, 1]
    order = np.argsort(-scores)
    return [pool_ids[i] for i in order[:k]]


def read_tests(path):
    tests = []
    with open(path, newline="", encoding="utf-8") as f:
        rdr = csv.DictReader(f)
        for row in rdr:
            tests.append((row["query"], int(row["expected_qa_id"]), (row.get("comment","") or "")))
    return tests


def main():
    if "clf" not in globals():
        raise RuntimeError("clf не найден. Сначала запусти ячейку обучения, где clf.fit(...)")

    con = sqlite3.connect(DB_PATH)

    all_q = load_all_questions(con)
    st_model = SentenceTransformer(MODEL_NAME)
    tests = read_tests(TESTS_CSV)

    ranks = []
    by_comment = defaultdict(list)

    for query, expected_id, comment in tests:
        pool, bm25_map, fuzzy_map = make_candidate_pool(con, query, all_q, bm25_k=BM25_K, fuzzy_k=FUZZY_K, max_pool=500)

        if expected_id not in pool:
            ranks.append(None)
            by_comment[(comment or "no_comment").strip().lower()].append(None)
            continue

        sem_map = semantic_sims(con, query, pool, st_model)
        top3 = rerank_with_clf(con, query, pool, bm25_map, fuzzy_map, sem_map, clf, k=3)

        r = rank_of(expected_id, top3)
        ranks.append(r)
        by_comment[(comment or "no_comment").strip().lower()].append(r)

    top1, top3, mrr = metrics_from_ranks(ranks, k=3)
    print("\nRERANKER on tests-2.csv (BM25_K=400 FUZZY_K=400 max_pool=500)")
    print(f"Top-1={top1:.3f}  Top-3={top3:.3f}  MRR={mrr:.3f}")

    print("\nBreakdown by comment (n>=3):")
    for c, rs in sorted(by_comment.items(), key=lambda x: -len(x[1])):
        if len(rs) < 3:
            continue
        c_top1, c_top3, c_mrr = metrics_from_ranks(rs, k=3)
        print(f"- {c:<12} n={len(rs):>3}  Top-1={c_top1:.3f}  Top-3={c_top3:.3f}  MRR={c_mrr:.3f}")

    recall = sum(1 for r in ranks if r is not None) / len(ranks) if ranks else 0.0
    print(f"\nCandidate Recall (expected_id in pool) = {recall:.3f}")

    con.close()

if __name__ == "__main__":
    main()



RERANKER on tests-2.csv (BM25_K=400 FUZZY_K=400 max_pool=500)
Top-1=0.648  Top-3=0.835  MRR=0.733

Breakdown by comment (n>=3):
- перефраз     n= 72  Top-1=0.639  Top-3=0.819  MRR=0.722
- опечатка     n= 10  Top-1=0.500  Top-3=0.800  MRR=0.617
- точно        n=  8  Top-1=1.000  Top-3=1.000  MRR=1.000

Candidate Recall (expected_id in pool) = 0.835


## теперь мы будем использовать Dense semantic retrieval (Sentence-BERT / E5) вместо Rapid fuzz

## сравнили две модели для эмбеддингов:

"sentence-transformersparaphrase-multilingual-MiniLM-L12-v2" и "ai-forever/sbert_large_nlu_ru" - первая показала намного выше метрики

bm25 на фоне dense совершенно бесполезен, так что можно его выбросить

In [12]:
import csv
import re
import sqlite3
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional

import numpy as np
from sentence_transformers import SentenceTransformer

DB_PATH = "qa.db"
TESTS_CSV = "tests-2.csv"

BM25_K_CAND = 200
DENSE_K_CAND = 200
TOPK_EVAL = 3

MAX_FTS_TOKENS = 20

def norm_text(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())

def fts_query(text: str, max_tokens: int = MAX_FTS_TOKENS) -> str:
    t = norm_text(text).lower()
    t = re.sub(r"[^\w\s]+", " ", t, flags=re.UNICODE)
    tokens = [x for x in t.split() if len(x) > 1]
    return " ".join(tokens[:max_tokens])

def rank_of(expected_id: int, predicted_ids: List[int]) -> Optional[int]:
    try:
        return predicted_ids.index(expected_id) + 1
    except ValueError:
        return None

def metrics_from_ranks(ranks: List[Optional[int]], k: int = 3):
    n = len(ranks)
    top1 = sum(1 for r in ranks if r == 1) / n if n else 0.0
    topk = sum(1 for r in ranks if (r is not None and r <= k)) / n if n else 0.0
    mrr  = sum((1.0 / r) if r else 0.0 for r in ranks) / n if n else 0.0
    return top1, topk, mrr

def read_tests(path: str):
    tests = []
    with open(path, newline="", encoding="utf-8") as f:
        rdr = csv.DictReader(f)
        for row in rdr:
            tests.append((
                row["query"],
                int(row["expected_qa_id"]),
                (row.get("comment","") or "")
            ))
    return tests

def load_all_questions(con: sqlite3.Connection) -> List[Tuple[int, str]]:
    cur = con.cursor()
    return [(int(i), q) for i, q in cur.execute("SELECT id, question FROM qa ORDER BY id;").fetchall()]

def bm25_candidates(con: sqlite3.Connection, query: str, k: int = 200) -> List[int]:
    q = fts_query(query)
    if not q:
        return []
    cur = con.cursor()
    rows = cur.execute("""
        SELECT qa.id, bm25(qa_fts) AS score
        FROM qa_fts
        JOIN qa ON qa_fts.rowid = qa.id
        WHERE qa_fts MATCH ?
        ORDER BY score
        LIMIT ?;
    """, (q, k)).fetchall()
    return [int(i) for i, _score in rows]

@dataclass
class DenseIndex:
    ids: np.ndarray
    embs: np.ndarray

def build_dense_index(all_q: List[Tuple[int, str]], model: SentenceTransformer, batch_size: int = 256) -> DenseIndex:
    ids = np.asarray([i for i, _ in all_q], dtype=np.int32)
    texts = [q for _, q in all_q]

    embs = model.encode(
        texts,
        batch_size=batch_size,
        show_progress_bar=True,
        normalize_embeddings=True
    ).astype(np.float32)

    return DenseIndex(ids=ids, embs=embs)

def dense_candidates(index: DenseIndex, model: SentenceTransformer, query: str, k: int = 200) -> List[int]:
    q_emb = model.encode([query], normalize_embeddings=True, show_progress_bar=False)[0].astype(np.float32)
    sims = index.embs @ q_emb  # cosine (because normalized)
    k = min(k, sims.shape[0])
    # top-k indices fast
    top_idx = np.argpartition(-sims, k-1)[:k]
    top_idx = top_idx[np.argsort(-sims[top_idx])]
    return [int(index.ids[i]) for i in top_idx]

def dense_sims_for_ids(index: DenseIndex, model: SentenceTransformer, query: str, ids: List[int]) -> Dict[int, float]:
    """cosine sim for a subset of ids using precomputed index"""
    if not ids:
        return {}
    q_emb = model.encode([query], normalize_embeddings=True, show_progress_bar=False)[0].astype(np.float32)

    return {}

def predict_bm25(con, query, k=3) -> List[int]:
    return bm25_candidates(con, query, k=BM25_K_CAND)[:k]

def predict_dense_only(index: DenseIndex, model, query, k=3) -> List[int]:
    return dense_candidates(index, model, query, k=DENSE_K_CAND)[:k]

def predict_bm25_dense(index: DenseIndex, id2pos: Dict[int, int], con, model, query, k=3) -> List[int]:
    """
    Candidate generation: union(BM25 top M, Dense top N)
    Rerank: cosine similarity (dense) on union
    """
    bm = bm25_candidates(con, query, k=BM25_K_CAND)
    ds = dense_candidates(index, model, query, k=DENSE_K_CAND)

    seen = set()
    pool = []
    for i in bm + ds:
        if i not in seen:
            seen.add(i)
            pool.append(i)

    if not pool:
        return []

    q_emb = model.encode([query], normalize_embeddings=True, show_progress_bar=False)[0].astype(np.float32)
    sims = []
    for cid in pool:
        pos = id2pos.get(cid)
        if pos is None:
            sim = -1.0
        else:
            sim = float(index.embs[pos] @ q_emb)
        sims.append(sim)

    order = np.argsort(-np.asarray(sims, dtype=np.float32))
    return [pool[i] for i in order[:k]]

def eval_models():
    con = sqlite3.connect(DB_PATH)
    tests = read_tests(TESTS_CSV)
    all_q = load_all_questions(con)

    model_names = [
        "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
        # "ai-forever/sbert_large_nlu_ru",
    ]

    for mname in model_names:
        print("\n" + "="*90)
        print("MODEL:", mname)
        print("="*90)

        model = SentenceTransformer(mname)

        index = build_dense_index(all_q, model, batch_size=256)

        id2pos = {int(i): int(pos) for pos, i in enumerate(index.ids)}

        ranks = {
            "BM25": [],
            "Dense": [],
            "BM25+Dense": [],
        }
        failures = defaultdict(list)

        for query, expected_id, comment in tests:
            pred_bm = predict_bm25(con, query, k=TOPK_EVAL)
            pred_ds = predict_dense_only(index, model, query, k=TOPK_EVAL)
            pred_bd = predict_bm25_dense(index, id2pos, con, model, query, k=TOPK_EVAL)

            r_bm = rank_of(expected_id, pred_bm)
            r_ds = rank_of(expected_id, pred_ds)
            r_bd = rank_of(expected_id, pred_bd)

            ranks["BM25"].append(r_bm)
            ranks["Dense"].append(r_ds)
            ranks["BM25+Dense"].append(r_bd)

            if r_bd is None:
                failures[(comment or "no_comment").strip().lower()].append((query, expected_id, pred_bd))

        print("\nRESULTS on", len(tests), "tests")
        print("-"*72)
        print(f"{'Mode':<14} | {'Top-1':>6} | {'Top-3':>6} | {'MRR':>6}")
        print("-"*72)
        for name in ["BM25", "Dense", "BM25+Dense"]:
            top1, top3, mrr = metrics_from_ranks(ranks[name], k=TOPK_EVAL)
            print(f"{name:<14} | {top1:>6.3f} | {top3:>6.3f} | {mrr:>6.3f}")

        if failures:
            print("\nSome BM25+Dense failures by comment type:")
            for c, items in sorted(failures.items(), key=lambda x: -len(x[1]))[:6]:
                print(f"- {c}: {len(items)}")
            # пример
            ex = next(iter(failures.values()))
            q, eid, preds = ex[0]
            print("\nExample failure:")
            print("query:", q)
            print("expected_id:", eid)
            print("predicted_top3:", preds)

    con.close()

if __name__ == "__main__":
    eval_models()


MODEL: sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2


Batches:   0%|          | 0/3 [00:00<?, ?it/s]


RESULTS on 91 tests
------------------------------------------------------------------------
Mode           |  Top-1 |  Top-3 |    MRR
------------------------------------------------------------------------
BM25           |  0.110 |  0.110 |  0.110
Dense          |  0.648 |  0.890 |  0.753
BM25+Dense     |  0.648 |  0.890 |  0.753

Some BM25+Dense failures by comment type:
- перефраз: 7
- опечатка: 3

Example failure:
query: когда действиет трхсторонний дговон
expected_id: 91
predicted_top3: [70, 510, 473]

MODEL: ai-forever/sbert_large_nlu_ru


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/195 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/863 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.71G [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/297 [00:00<?, ?B/s]

Batches:   0%|          | 0/3 [00:00<?, ?it/s]


RESULTS on 91 tests
------------------------------------------------------------------------
Mode           |  Top-1 |  Top-3 |    MRR
------------------------------------------------------------------------
BM25           |  0.110 |  0.110 |  0.110
Dense          |  0.538 |  0.758 |  0.634
BM25+Dense     |  0.538 |  0.758 |  0.634

Some BM25+Dense failures by comment type:
- перефраз: 20
- опечатка: 2

Example failure:
query: условия трехстороннего договора
expected_id: 91
predicted_top3: [453, 470, 471]


## ВОТ ФИНАЛЬНЫЙ ВАРИАНТ

## Dense+FAISS + reject

1) Подготовка retrieval-инфраструктуры
- Загружает заранее посчитанные эмбеддинги вопросов из qa_vec.q_vec (или a_vec).
- Строит FAISS-индекс (быстрый поиск ближайших векторов по cosine similarity).
2) Подбор reject-порога на DEV
- Берёт файл тестов tests-2.csv (query + expected_qa_id).
- Делит его на DEV/TEST.
- На DEV для каждого вопроса считает top-1 similarity s1.
- Делает grid-search по sem_thr, чтобы среди принятых ответов была точность >= TARGET_PRECISION, а coverage (сколько приняли) было максимальным.
3) Оценка на DEV и TEST + интерактив
- Печатает метрики и запускает консоль, где ты вводишь вопрос → получаешь best match и решение accept/reject по порогу

In [1]:
!pip -q install faiss-cpu

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.8/23.8 MB[0m [31m88.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [5]:
import csv
import sqlite3
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional

import numpy as np
from sentence_transformers import SentenceTransformer
import faiss

DB_PATH = "qa.db"
TESTS_CSV = "tests-2.csv"

MODEL_NAME = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
WHICH_VEC = "q"

TOP_N = 50
TOPK_EVAL = 3
DEV_FRACTION = 0.7
RANDOM_SEED = 42

TARGET_PRECISION = 0.55    # целевая точность на DEV
SEM_GRID = np.arange(0.10, 0.951, 0.01)  # пороги для s1

REJECT_MESSAGE = "Похоже, подходящего ответа в базе нет. Попробуй переформулировать вопрос или добавь деталей."

np.random.seed(RANDOM_SEED)


def read_tests(path: str) -> List[Tuple[str, int, str]]:
    rows = []
    with open(path, newline="", encoding="utf-8") as f:
        rdr = csv.DictReader(f)
        for r in rdr:
            rows.append((r["query"], int(r["expected_qa_id"]), (r.get("comment","") or "")))
    return rows

def split_dev_test(tests: List[Tuple[str,int,str]], dev_fraction=0.7, seed=42):
    idx = np.arange(len(tests))
    rng = np.random.default_rng(seed)
    rng.shuffle(idx)
    cut = int(len(idx) * dev_fraction)
    dev = [tests[i] for i in idx[:cut]]
    test = [tests[i] for i in idx[cut:]]
    return dev, test

def rank_of(expected_id: int, predicted_ids: List[int]) -> Optional[int]:
    try:
        return predicted_ids.index(expected_id) + 1
    except ValueError:
        return None

def metrics_from_ranks(ranks: List[Optional[int]], k: int = 3):
    n = len(ranks)
    top1 = sum(1 for r in ranks if r == 1) / n if n else 0.0
    topk = sum(1 for r in ranks if (r is not None and r <= k)) / n if n else 0.0
    mrr  = sum((1.0 / r) if r else 0.0 for r in ranks) / n if n else 0.0
    return top1, topk, mrr


def fetch_question_text(con: sqlite3.Connection, qa_id: int) -> str:
    cur = con.cursor()
    row = cur.execute("SELECT question FROM qa WHERE id=?;", (int(qa_id),)).fetchone()
    return row[0] if row else ""

def load_all_questions(con: sqlite3.Connection) -> Dict[int, str]:
    cur = con.cursor()
    rows = cur.execute("SELECT id, question FROM qa ORDER BY id;").fetchall()
    return {int(i): q for i, q in rows}

def load_all_embeddings(con: sqlite3.Connection, model_name: str, which_vec: str = "q"):
    """
    qa_vec schema:
      qa_id INTEGER PK
      model_name TEXT
      dim INTEGER
      q_vec BLOB
      a_vec BLOB
      updated_at TEXT

    Returns:
      ids: (N,) int64
      X:   (N, D) float32 L2-normalized
    """
    vec_col = "q_vec" if which_vec == "q" else "a_vec"

    cur = con.cursor()
    rows = cur.execute(
        f"""
        SELECT qa_id, dim, {vec_col}
        FROM qa_vec
        WHERE model_name = ?
        ORDER BY qa_id;
        """,
        (model_name,)
    ).fetchall()

    if not rows:
        # fallback: если model_name не совпал, пробуем без фильтра
        rows = cur.execute(
            f"SELECT qa_id, dim, {vec_col} FROM qa_vec ORDER BY qa_id;"
        ).fetchall()

    ids = []
    vecs = []
    for qa_id, dim, blob in rows:
        if blob is None:
            continue
        v = np.frombuffer(blob, dtype=np.float32, count=int(dim))
        ids.append(int(qa_id))
        vecs.append(v)

    if not vecs:
        raise RuntimeError("Не нашла эмбеддинги в qa_vec. Проверь, что таблица заполнена и column q_vec/a_vec не NULL.")

    X = np.vstack(vecs).astype(np.float32)

    norms = np.linalg.norm(X, axis=1, keepdims=True)
    norms[norms == 0] = 1.0
    X = X / norms

    ids = np.asarray(ids, dtype=np.int64)
    return ids, X


# FAISS
def build_faiss_index(X: np.ndarray):
    """
    Cosine similarity = inner product for L2-normalized vectors.
    """
    d = X.shape[1]
    index = faiss.IndexFlatIP(d)
    index.add(np.ascontiguousarray(X, dtype=np.float32))
    return index

def dense_topn(query: str, model: SentenceTransformer, index, ids: np.ndarray, top_n: int = 50):
    """
    Returns: (top_ids, top_sims)
    """
    q = model.encode([query], normalize_embeddings=True, show_progress_bar=False).astype(np.float32)
    sims, idxs = index.search(q, top_n)
    sims = sims[0]
    idxs = idxs[0]
    top_ids = [int(ids[i]) for i in idxs if i != -1]
    top_sims = [float(s) for s, i in zip(sims, idxs) if i != -1]
    return top_ids, top_sims

# REJECT
def accept_decision(s1: float, sem_thr: float) -> bool:
    return s1 >= sem_thr


# DEV TABLE + GRID SEARCH
def build_scoring_table(tests: List[Tuple[str,int,str]], model, index, ids, top_n: int):
    """
    For each query:
      pred1, s1, is_correct
    """
    table = []
    for query, expected_id, comment in tests:
        top_ids, top_sims = dense_topn(query, model, index, ids, top_n=max(2, top_n))
        if not top_ids:
            continue
        pred1 = top_ids[0]
        s1 = float(top_sims[0]) if top_sims else -1.0
        is_correct = 1 if pred1 == expected_id else 0
        table.append({
            "query": query,
            "expected_id": expected_id,
            "pred1": pred1,
            "s1": s1,
            "is_correct": is_correct,
            "comment": comment
        })
    return table

def grid_search_sem_thr(table: List[dict], target_precision: float):
    """
    Choose sem_thr maximizing coverage with precision_accept >= target_precision.
    Returns best tuple: (coverage, precision, sem_thr) or None
    """
    best = None
    n = len(table)
    for sem_thr in SEM_GRID:
        accepted = [r for r in table if accept_decision(r["s1"], sem_thr)]
        if not accepted:
            continue
        precision = sum(r["is_correct"] for r in accepted) / len(accepted)
        coverage = len(accepted) / n
        if precision >= target_precision:
            cand = (coverage, precision, float(sem_thr))
            if best is None or cand[0] > best[0]:
                best = cand
    return best

def eval_with_reject(tests: List[Tuple[str,int,str]], model, index, ids, sem_thr: float, k: int = 3, top_n: int = 50):
    """
    We report:
      precision_accept: accuracy of Top-1 among ACCEPTED
      coverage: fraction accepted
      retrieval metrics:
        - on all queries (treat rejected as None rank)
        - on accepted only
    """
    accepted_correct = 0
    accepted_total = 0
    ranks_all = []
    ranks_on_accepted = []

    for query, expected_id, comment in tests:
        top_ids, top_sims = dense_topn(query, model, index, ids, top_n=max(k, top_n))
        if not top_ids:
            ranks_all.append(None)
            continue

        s1 = float(top_sims[0]) if top_sims else -1.0
        ok = accept_decision(s1, sem_thr)

        if not ok:
            ranks_all.append(None)
            continue

        accepted_total += 1
        pred_topk = top_ids[:k]
        r = rank_of(expected_id, pred_topk)
        ranks_all.append(r)
        ranks_on_accepted.append(r)

        if pred_topk and pred_topk[0] == expected_id:
            accepted_correct += 1

    precision_accept = accepted_correct / accepted_total if accepted_total else 0.0
    coverage = accepted_total / len(tests) if tests else 0.0

    top1_all, topk_all, mrr_all = metrics_from_ranks(ranks_all, k=k)
    top1_acc, topk_acc, mrr_acc = metrics_from_ranks(ranks_on_accepted, k=k)

    return {
        "precision_accept": precision_accept,
        "coverage": coverage,
        "accepted_total": accepted_total,
        "top1_all": top1_all,
        "topk_all": topk_all,
        "mrr_all": mrr_all,
        "top1_on_accepted": top1_acc,
        "topk_on_accepted": topk_acc,
        "mrr_on_accepted": mrr_acc,
    }


def interactive_console(state: dict):
    """
    state must contain:
      con, model, index, ids, qa_text, sem_thr
    """
    con = state["con"]
    model = state["model"]
    index = state["index"]
    ids = state["ids"]
    qa_text = state["qa_text"]

    print("\nType a question and press Enter.")
    print("Commands: 'show' (show top candidates), 'thr <num>' (set sem_thr), 'exit'\n")

    last = {"query": None, "top_ids": None, "top_sims": None}

    while True:
        s = input("you> ").strip()
        if not s:
            continue
        if s.lower() in {"exit", "quit"}:
            break

        if s.lower().startswith("thr "):
            try:
                new_thr = float(s.split(maxsplit=1)[1])
                state["sem_thr"] = new_thr
                print(f"[OK] sem_thr set to {new_thr:.3f}")
            except Exception:
                print("Usage: thr 0.75")
            continue

        if s.lower() == "show":
            if not last["top_ids"]:
                print("No previous query yet.")
                continue
            print("\nTop candidates:")
            for i, (cid, sim) in enumerate(zip(last["top_ids"], last["top_sims"]), start=1):
                qtxt = qa_text.get(cid, "")
                print(f"{i:>2}. id={cid:<4} sim={sim:.3f} | {qtxt}")
            print("")
            continue

        query = s
        top_ids, top_sims = dense_topn(query, model, index, ids, top_n=TOP_N)
        if not top_ids:
            print("[REJECT]", REJECT_MESSAGE)
            continue

        s1 = float(top_sims[0])
        pred1 = int(top_ids[0])

        ok = accept_decision(s1, state["sem_thr"])

        print("\n--- RESULT ---")
        print(f"Best match: id={pred1} s1={s1:.3f} (sem_thr={state['sem_thr']:.3f})")
        print("Matched Q:", qa_text.get(pred1, ""))

        if not ok:
            print("[REJECT]", REJECT_MESSAGE)
            print("Reason: s1 < sem_thr")
            print("Tip: type 'show' to see other close candidates\n")
        else:
            # тут мы не выводим answer (у тебя нет столбца answer)
            print("[ACCEPT] Похоже, это самый подходящий вопрос из базы.")
            print("Tip: now you can map id -> your answer storage (table/JSON/etc.)\n")

        last.update({"query": query, "top_ids": top_ids[:10], "top_sims": top_sims[:10]})


def main():
    con = sqlite3.connect(DB_PATH)

    print("[1] Load embeddings from qa_vec ...")
    ids, X = load_all_embeddings(con, model_name=MODEL_NAME, which_vec=WHICH_VEC)
    print(f"embeddings: X=({X.shape[0]}, {X.shape[1]})  which={WHICH_VEC}")

    print("[2] Build FAISS index ...")
    index = build_faiss_index(X)

    print("[3] Load dense model ...")
    model = SentenceTransformer(MODEL_NAME)

    print("[4] Load tests and split DEV/TEST ...")
    tests = read_tests(TESTS_CSV)
    dev, test = split_dev_test(tests, dev_fraction=DEV_FRACTION, seed=RANDOM_SEED)
    print(f"tests={len(tests)}  dev={len(dev)}  test={len(test)}")

    print("[5] Build scoring table on DEV ...")
    dev_table = build_scoring_table(dev, model, index, ids, top_n=TOP_N)
    print("dev_table rows:", len(dev_table))

    print("[6] Grid-search sem_thr on DEV (maximize coverage @ precision>=target) ...")
    best = grid_search_sem_thr(dev_table, target_precision=TARGET_PRECISION)

    if best is None:
        print(f"[NO BEST] None meet precision {TARGET_PRECISION:.2f} on DEV.")
        print("Try lowering TARGET_PRECISION or improving retrieval/model.")
        sem_thr = 0.55
        print(f"[FALLBACK] sem_thr={sem_thr:.2f}")
    else:
        coverage, precision, sem_thr = best
        print(f"[CHOSEN] sem_thr={sem_thr:.2f}  DEV precision={precision:.3f} coverage={coverage:.3f}")

    print("[7] Evaluate on DEV ...")
    dev_res = eval_with_reject(dev, model, index, ids, sem_thr=sem_thr, k=TOPK_EVAL, top_n=TOP_N)
    print("DEV:", dev_res)

    print("[8] Evaluate on TEST ...")
    test_res = eval_with_reject(test, model, index, ids, sem_thr=sem_thr, k=TOPK_EVAL, top_n=TOP_N)
    print("TEST:", test_res)

    qa_text = load_all_questions(con)

    state = {
        "con": con,
        "model": model,
        "index": index,
        "ids": ids,
        "qa_text": qa_text,
        "sem_thr": float(sem_thr),
    }

    interactive_console(state)
    con.close()

if __name__ == "__main__":
    main()

[1] Load embeddings from qa_vec ...
embeddings: X=(552, 384)  which=q
[2] Build FAISS index ...
[3] Load dense model ...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/645 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/471M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/480 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.08M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

[4] Load tests and split DEV/TEST ...
tests=91  dev=63  test=28
[5] Build scoring table on DEV ...
dev_table rows: 63
[6] Grid-search sem_thr on DEV (maximize coverage @ precision>=target) ...
[CHOSEN] sem_thr=0.54  DEV precision=0.550 coverage=0.952
[7] Evaluate on DEV ...
DEV: {'precision_accept': 0.55, 'coverage': 0.9523809523809523, 'accepted_total': 60, 'top1_all': 0.5238095238095238, 'topk_all': 0.8571428571428571, 'mrr_all': 0.6666666666666666, 'top1_on_accepted': 0.55, 'topk_on_accepted': 0.9, 'mrr_on_accepted': 0.7}
[8] Evaluate on TEST ...
TEST: {'precision_accept': 0.8928571428571429, 'coverage': 1.0, 'accepted_total': 28, 'top1_all': 0.8928571428571429, 'topk_all': 0.9285714285714286, 'mrr_all': 0.9107142857142857, 'top1_on_accepted': 0.8928571428571429, 'topk_on_accepted': 0.9285714285714286, 'mrr_on_accepted': 0.9107142857142857}

Type a question and press Enter.
Commands: 'show' (show top candidates), 'thr <num>' (set sem_thr), 'exit'

you> можно ли перевестись на другую

KeyboardInterrupt: Interrupted by user