In [None]:
import re, json, numpy as np, pandas as pd
from pathlib import Path
from typing import Dict, List
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, f1_score, confusion_matrix
from scipy.sparse import hstack
import joblib

# Конфиг пути и тд

In [None]:
DATA_CSV = "data/train.csv"      # колонки: text,label
TEXT_COL = "text"
LABEL_COL = "label"
MAJOR_CLASS = "ОДУ"              # мажорный класс (переименуй под свои данные)
OUT_DIR = Path("artifacts"); OUT_DIR.mkdir(exist_ok=True)

# Лексиконы
- Лексиконы - ключевые слова (правила) по которым определяем какие классы какой службе соответствуют.
- Это не жесткие правила а лишь помощь классификатору

In [None]:
LEX: Dict[str, List[str]] = {
    "Сантехника": [r"\bтруба\b", r"\bпрорвал[оась]\b", r"\bтеч[её]\b", r"\bзатоп(ил[аи]|ило)\b",
                   r"\b(гвс|хвс|вода)\b"],
    "Электрика":  [r"\bкорот(ит|нуло)\b", r"\bзамыкан(ие|ия)\b", r"\bискрит\b", r"\bвыбил[аи]\b.*\bпробк[и]\b",
                   r"\bщиток\b", r"\bавтомат(ы)?\b"],
    "Отопление":  [r"\bнет отоплен", r"\bбатаре(я|и) (холод|лед)", r"\bрадиатор\b", r"\bкотел"],
    "Лифт":       [r"\bлифт\b", r"\bзастрял\b", r"\bне работает лифт\b"],
    "ОДУ":        [r"\bподъезд\b", r"\bдвор\b", r"\bуборк[аи]\b", r"\bсосед(и|ям|ями)\b", r"\bшум\b"],
}

# МАШИН ЛЕРНИНГ!!!

In [None]:
def normalize_text(s: str) -> str:
    s = s.lower()
    s = re.sub(r"\s+", " ", s).strip()
    return s

## `build_rule_feats(texts: List[str], classes: List[str]) -> np.ndarray`

**Назначение:**  
Генерирует дополнительные "правильные" (rule-based) признаки для каждого текста на основе заранее заданных словарей/регулярных выражений (`LEX`).  
Каждый признак — это количество совпадений текста с шаблонами, характерными для определённого класса.

**Параметры:**
- `texts` (`List[str]`): Список текстовых обращений пользователей.
- `classes` (`List[str]`): Список всех классов (меток), для которых будут считаться совпадения.

**Возвращает:**
- `np.ndarray` формы `(N, C)`:
  - `N` — количество текстов,
  - `C` — количество классов.  
  Каждое значение `M[i, j]` — количество паттернов из словаря класса `classes[j]`, найденных в тексте `texts[i]`.

**Алгоритм работы:**
1. Для каждого класса из `classes` берутся регулярные выражения из глобального словаря `LEX` и компилируются для ускорения поиска.
2. Создаётся матрица признаков `M` из нулей (`float32`).
3. Для каждого текста и каждого класса считается, сколько паттернов найдено в тексте.
4. Результирующая матрица возвращается в виде `numpy.ndarray`.

**Пример:**
```python
texts = ["Прорвало трубу в подвале", "Выбило пробки"]
classes = ["Сантехника", "Электрика"]

M = build_rule_feats(texts, classes)
# M[0,0] может быть >0 (нашли слова про трубы)
# M[0,1] = 0 (нет электрических терминов)
```

In [None]:
def build_rule_feats(texts: List[str], classes: List[str]):
    # матрица N x C: сколько паттернов класса сработало в тексте
    comp = {c: [re.compile(p, re.I) for p in LEX.get(c, [])] for c in classes}
    M = np.zeros((len(texts), len(classes)), dtype=np.float32)
    for i, t in enumerate(texts):
        for j, c in enumerate(classes):
            if comp[c]:
                M[i, j] = sum(1 for pat in comp[c] if pat.search(t))
    return M

##`pick_with_majority_threshold(proba: np.ndarray, classes: np.ndarray, major: str, tau: float) -> np.ndarray`

**Назначение:**

Постобработка предсказаний модели для борьбы с дисбалансом классов.
Если модель выбрала мажорный класс (major), но уверенность ниже порога (tau),
заменяет предсказание на второй по вероятности класс.

**Параметры:**

 - `proba (np.ndarray):` Матрица вероятностей формы (N, C) — результат predict_proba().

 - `classes (np.ndarray):` Массив с названиями классов в том же порядке, что в proba.

 - `major (str):` Имя мажорного класса, вероятность которого нужно проверять.

 - `tau (float):` Порог уверенности для мажорного класса (0 < tau ≤ 1).

**Возвращает:**

 - `np.ndarray формы (N,):` Индексы классов после применения порогового правила.

**Алгоритм работы:**

  - Определяется индекс мажорного класса (idx_major).

  - Для каждого предсказания:

       - Если топ-1 класс — мажорный и его вероятность < tau:

           - Класс заменяется на второй по вероятности (argsort по убыванию).

       - Иначе — остаётся топ-1 класс.

   - Возвращается массив индексов выбранных классов.

**Пример:**
```python
proba = np.array([
    [0.6, 0.3, 0.1],  # Мажорный класс с низкой уверенностью
    [0.9, 0.05, 0.05] # Мажорный класс с высокой уверенностью
])
classes = np.array(["ОДУ", "Сантехника", "Электрика"])
out_idx = pick_with_majority_threshold(proba, classes, major="ОДУ", tau=0.7)
# out_idx[0] будет указывать на "Сантехника", out_idx[1] останется "ОДУ"
```



In [None]:
def pick_with_majority_threshold(proba: np.ndarray, classes: np.ndarray, major: str, tau: float):
    idx_major = list(classes).index(major)
    top1 = proba.argmax(1)
    out = top1.copy()
    for i in range(len(out)):
        if classes[out[i]] == major and proba[i, idx_major] < tau:
            # берём 2-й по вероятности
            out[i] = np.argsort(proba[i])[::-1][1]
    return out

In [None]:
df = pd.read_csv(DATA_CSV)
df = df[[TEXT_COL, LABEL_COL]].dropna()
df[TEXT_COL] = df[TEXT_COL].astype(str).map(normalize_text)
X_train, X_val, y_train, y_val = train_test_split(
    df[TEXT_COL].values, df[LABEL_COL].values, test_size=0.2, stratify=df[LABEL_COL], random_state=42
)

In [None]:
# === 1) TF-IDF по словам (1-2 граммы) ===
tfidf = TfidfVectorizer(ngram_range=(1,2), min_df=3, max_df=0.9)
Xtr_tfidf = tfidf.fit_transform(X_train)
Xva_tfidf = tfidf.transform(X_val)

In [None]:
# === 2) Rule-feats ===
classes_sorted = sorted(pd.unique(df[LABEL_COL]))
Xtr_rule = build_rule_feats(list(X_train), classes_sorted)
Xva_rule = build_rule_feats(list(X_val), classes_sorted)
# совместим sparse + dense
Xtr = hstack([Xtr_tfidf, Xtr_rule])
Xva = hstack([Xva_tfidf, Xva_rule])

In [None]:
# === 3) Веса классов и/или sample_weight ===
# class_weight="balanced" уже хорошо; дополнительно можно снизить вклад мажора:
sample_weight = np.ones(len(y_train), dtype=np.float32)
sample_weight[np.array(y_train) == MAJOR_CLASS] = 0.5  # подстрой

In [None]:
clf = LogisticRegression(
        max_iter=2000, class_weight="balanced", n_jobs=-1, solver="lbfgs", multi_class="auto"
    )
    clf.fit(Xtr, y_train, sample_weight=sample_weight)


In [None]:
proba = clf.predict_proba(Xva)
y_pred_plain = clf.classes_[proba.argmax(1)]

In [None]:
best_tau, best_f1, best_pred = 0.0, -1.0, y_pred_plain
for tau in np.linspace(0.4, 0.75, 8):  # сетка порогов
    pred_idx = pick_with_majority_threshold(proba, clf.classes_, MAJOR_CLASS, tau)
    y_pred = clf.classes_[pred_idx]
    f1 = f1_score(y_val, y_pred, average="macro")
    if f1 > best_f1:
        best_tau, best_f1, best_pred = float(tau), float(f1), y_pred

In [None]:
print("\n=== Без порога ===")
print(classification_report(y_val, y_pred_plain, digits=3))
print("\n=== С порогом для мажора (τ=%.2f) — macro-F1=%.3f ===" % (best_tau, best_f1))
print(classification_report(y_val, best_pred, digits=3))
print("Confusion matrix:\n", confusion_matrix(y_val, best_pred, labels=clf.classes_))

# Экспорт модели

In [None]:
joblib.dump({"tfidf": tfidf, "clf": clf, "classes": clf.classes_, "tau_major": best_tau,
             "major_class": MAJOR_CLASS, "lex": LEX},
            OUT_DIR / "service_clf.joblib")
(OUT_DIR / "report.json").write_text(json.dumps({
    "tau_major": best_tau,
    "macro_f1": best_f1}, ensure_ascii=False, indent=2), encoding="utf-8")
print("\nSaved:", OUT_DIR / "service_clf.joblib")