# Тематизация жалоб без эмбеддингов: LDA и LSI+KMeans

Этот ноутбук содержит полностью рабочие пайплайны без использования эмбеддингов HuggingFace:
- LDA (CountVectorizer → LatentDirichletAllocation)
- LSI (TF‑IDF → TruncatedSVD) + KMeans

Включено:
- Очистка текста (минимальная, с заменой PII)
- Обучение, предсказание, извлечение ключевых слов для тем/кластеров
- Метрики: LDA перплексия, silhouette для KMeans
- Сохранение/загрузка моделей
- Демонстрация на примерах

Подойдёт как стартовый шаблон для кластеризации обращений клиентов.


In [None]:
# При необходимости раскомментируйте для установки
# %pip install scikit-learn pandas numpy joblib razdel

import re
import os
import json
import joblib
import numpy as np
import pandas as pd

from typing import List, Tuple, Dict, Optional

from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation, TruncatedSVD
from sklearn.preprocessing import Normalizer
from sklearn.pipeline import make_pipeline
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

# Токены, которые не должны попадать в названия тем/топ-термины
BLOCKED_TERMS = {
    "amount", "card", "date", "email",
    "pii_amount", "pii_card", "pii_date", "pii_email",
}



In [None]:
# Загрузка данных

def load_texts_from_csv(csv_path: str, text_col: str = "text") -> List[str]:
    """Загрузить столбец с текстами из CSV.
    Ожидается, что в файле есть колонка с названием text_col.
    """
    df = pd.read_csv(csv_path)
    assert text_col in df.columns, f"В CSV нет колонки '{text_col}'"
    texts = df[text_col].astype(str).fillna("").tolist()
    return texts

# Пример использования:
# texts = load_texts_from_csv("complaints.csv", text_col="complaint")



In [None]:
# Очистка текста

PII_EMAIL = re.compile(r"\S+@\S+")
PII_CARD = re.compile(r"\b\d{12,16}\b")
PII_DATE = re.compile(r"\d{1,2}[./-]\d{1,2}[./-]\d{2,4}")
PII_AMOUNT = re.compile(r"\d+[ ,\.]?\d*\s?(₽|р\.?|rub|\$|usd|eur)?", re.IGNORECASE)

RE_MULTISPACE = re.compile(r"\s{2,}")


def clean_text(text: str) -> str:
    """Минимальная нормализация и замена PII."""
    t = str(text).lower()
    t = PII_EMAIL.sub("<email>", t)
    t = PII_CARD.sub("<card>", t)
    t = PII_DATE.sub("<date>", t)
    t = PII_AMOUNT.sub("<amount>", t)
    t = RE_MULTISPACE.sub(" ", t).strip()
    return t


def clean_corpus(texts: List[str]) -> List[str]:
    return [clean_text(t) for t in texts]



In [None]:
# Пайплайн LDA

class LdaTopicModel:
    def __init__(
        self,
        n_topics: int = 10,
        max_df: float = 0.95,
        min_df: int = 5,
        ngram_range: Tuple[int, int] = (1, 2),
        random_state: int = 42,
    ) -> None:
        self.n_topics = n_topics
        self.vectorizer = CountVectorizer(
            stop_words="russian",
            max_df=max_df,
            min_df=min_df,
            ngram_range=ngram_range,
        )
        self.lda = LatentDirichletAllocation(
            n_components=n_topics,
            learning_method="batch",
            max_iter=20,
            random_state=random_state,
            n_jobs=-1,
        )
        self.is_fitted = False

    def fit(self, texts: List[str]) -> "LdaTopicModel":
        cleaned = clean_corpus(texts)
        X = self.vectorizer.fit_transform(cleaned)
        self.doc_topic = self.lda.fit_transform(X)
        self.is_fitted = True
        return self

    def transform(self, texts: List[str]) -> np.ndarray:
        assert self.is_fitted, "Сначала вызовите fit()"
        cleaned = clean_corpus(texts)
        X = self.vectorizer.transform(cleaned)
        return self.lda.transform(X)

    def predict(self, texts: List[str]) -> Tuple[np.ndarray, np.ndarray]:
        dist = self.transform(texts)
        return dist.argmax(axis=1), dist.max(axis=1)

    def get_topic_top_words(self, top_n: int = 10) -> Dict[int, List[str]]:
        assert self.is_fitted, "Сначала вызовите fit()"
        feature_names = self.vectorizer.get_feature_names_out()
        topics: Dict[int, List[str]] = {}
        for i, comp in enumerate(self.lda.components_):
            top_idx = comp.argsort()[::-1]
            words: List[str] = []
            for j in top_idx:
                w = feature_names[j]
                if w in BLOCKED_TERMS:
                    continue
                words.append(w)
                if len(words) >= top_n:
                    break
            topics[i] = words
        return topics

    def to_dataframe(self, texts: List[str]) -> pd.DataFrame:
        assert self.is_fitted, "Сначала вызовите fit()"
        preds, conf = self.predict(texts)
        return pd.DataFrame({"text": texts, "topic_id": preds, "topic_confidence": conf})

    def perplexity(self, texts: Optional[List[str]] = None) -> float:
        assert self.is_fitted, "Сначала вызовите fit()"
        if texts is None:
            texts = self._last_fit_corpus if hasattr(self, "_last_fit_corpus") else None
        cleaned = clean_corpus(texts) if texts is not None else None
        X = self.vectorizer.transform(cleaned) if cleaned is not None else None
        if X is None:
            raise ValueError("Передайте texts для оценки перплексии")
        return float(self.lda.perplexity(X))

    def save(self, path: str) -> None:
        os.makedirs(path, exist_ok=True)
        joblib.dump(self.vectorizer, os.path.join(path, "vectorizer.joblib"))
        joblib.dump(self.lda, os.path.join(path, "lda.joblib"))
        meta = {"n_topics": self.n_topics}
        with open(os.path.join(path, "meta.json"), "w", encoding="utf-8") as f:
            json.dump(meta, f, ensure_ascii=False, indent=2)

    @classmethod
    def load(cls, path: str) -> "LdaTopicModel":
        with open(os.path.join(path, "meta.json"), "r", encoding="utf-8") as f:
            meta = json.load(f)
        obj = cls(n_topics=meta["n_topics"])
        obj.vectorizer = joblib.load(os.path.join(path, "vectorizer.joblib"))
        obj.lda = joblib.load(os.path.join(path, "lda.joblib"))
        obj.is_fitted = True
        return obj



In [None]:
# Пайплайн LSI (TF-IDF + SVD) + KMeans

from collections import defaultdict

class LsiKMeansTopicModel:
    def __init__(
        self,
        n_components: int = 100,
        n_clusters: int = 10,
        max_df: float = 0.95,
        min_df: int = 5,
        ngram_range: Tuple[int, int] = (1, 2),
        random_state: int = 42,
    ) -> None:
        self.n_components = n_components
        self.n_clusters = n_clusters
        self.tfidf = TfidfVectorizer(
            stop_words="russian",
            max_df=max_df,
            min_df=min_df,
            ngram_range=ngram_range,
        )
        self.svd = TruncatedSVD(n_components=n_components, random_state=random_state)
        self.lsi = make_pipeline(self.svd, Normalizer(copy=False))
        self.kmeans = KMeans(n_clusters=n_clusters, n_init="auto", random_state=random_state)
        self.is_fitted = False

    def fit(self, texts: List[str]) -> "LsiKMeansTopicModel":
        cleaned = clean_corpus(texts)
        X_tfidf = self.tfidf.fit_transform(cleaned)
        X_lsi = self.lsi.fit_transform(X_tfidf)
        self.labels_ = self.kmeans.fit_predict(X_lsi)
        self.is_fitted = True
        return self

    def transform(self, texts: List[str]) -> np.ndarray:
        assert self.is_fitted, "Сначала вызовите fit()"
        cleaned = clean_corpus(texts)
        X_tfidf = self.tfidf.transform(cleaned)
        return self.lsi.transform(X_tfidf)

    def predict(self, texts: List[str]) -> np.ndarray:
        X_lsi = self.transform(texts)
        return self.kmeans.predict(X_lsi)

    def to_dataframe(self, texts: List[str]) -> pd.DataFrame:
        assert self.is_fitted, "Сначала вызовите fit()"
        labels = self.predict(texts)
        return pd.DataFrame({"text": texts, "topic_id": labels})

    def silhouette(self, texts: List[str]) -> float:
        assert self.is_fitted, "Сначала вызовите fit()"
        X_lsi = self.transform(texts)
        return float(silhouette_score(X_lsi, self.kmeans.predict(X_lsi)))

    def get_cluster_terms(self, texts: List[str], top_n: int = 10) -> Dict[int, List[str]]:
        """c-TF-IDF по агрегированным документам кластеров.
        Передайте исходные тексты корпуса (те же, на которых обучали).
        """
        assert self.is_fitted, "Сначала вызовите fit()"
        cleaned = clean_corpus(texts)
        # Аггрегируем по кластерам
        cluster_docs: Dict[int, List[str]] = defaultdict(list)
        for t, lab in zip(cleaned, self.labels_):
            cluster_docs[lab].append(t)
        agg_texts = [" ".join(cluster_docs[i]) if i in cluster_docs else "" for i in range(self.n_clusters)]
        # Считаем c-TF-IDF
        ctfidf = TfidfVectorizer(stop_words="russian", ngram_range=(1, 2), min_df=2)
        C = ctfidf.fit_transform(agg_texts)
        feat = ctfidf.get_feature_names_out()
        terms: Dict[int, List[str]] = {}
        for i in range(self.n_clusters):
            row = C[i]
            scores = row.toarray().ravel()
            order = scores.argsort()[::-1]
            words: List[str] = []
            for j in order:
                w = str(feat[j])
                if w in BLOCKED_TERMS:
                    continue
                words.append(w)
                if len(words) >= top_n:
                    break
            terms[i] = words
        return terms

    def save(self, path: str) -> None:
        os.makedirs(path, exist_ok=True)
        joblib.dump(self.tfidf, os.path.join(path, "tfidf.joblib"))
        joblib.dump(self.svd, os.path.join(path, "svd.joblib"))
        joblib.dump(self.lsi, os.path.join(path, "lsi_pipeline.joblib"))
        joblib.dump(self.kmeans, os.path.join(path, "kmeans.joblib"))
        meta = {"n_components": self.n_components, "n_clusters": self.n_clusters}
        with open(os.path.join(path, "meta.json"), "w", encoding="utf-8") as f:
            json.dump(meta, f, ensure_ascii=False, indent=2)

    @classmethod
    def load(cls, path: str) -> "LsiKMeansTopicModel":
        with open(os.path.join(path, "meta.json"), "r", encoding="utf-8") as f:
            meta = json.load(f)
        obj = cls(n_components=meta["n_components"], n_clusters=meta["n_clusters"])
        obj.tfidf = joblib.load(os.path.join(path, "tfidf.joblib"))
        obj.svd = joblib.load(os.path.join(path, "svd.joblib"))
        obj.lsi = joblib.load(os.path.join(path, "lsi_pipeline.joblib"))
        obj.kmeans = joblib.load(os.path.join(path, "kmeans.joblib"))
        obj.is_fitted = True
        return obj



In [None]:
# Подбор числа тем/кластеров

def lda_try_topics(texts: List[str], topics_list: List[int]) -> pd.DataFrame:
    """Перебор числа тем для LDA и оценка перплексии (ниже — лучше)."""
    cleaned = clean_corpus(texts)
    cv = CountVectorizer(stop_words="russian", max_df=0.95, min_df=5, ngram_range=(1, 2))
    X = cv.fit_transform(cleaned)
    rows = []
    for k in topics_list:
        lda = LatentDirichletAllocation(n_components=k, learning_method="batch", max_iter=15, random_state=42, n_jobs=-1)
        lda.fit(X)
        perp = float(lda.perplexity(X))
        rows.append({"n_topics": k, "perplexity": perp})
    return pd.DataFrame(rows).sort_values("n_topics")


def lsi_kmeans_try_k(texts: List[str], k_list: List[int], n_components: int = 100) -> pd.DataFrame:
    """Перебор k для KMeans на LSI-признаках и оценка silhouette (выше — лучше)."""
    cleaned = clean_corpus(texts)
    tfidf = TfidfVectorizer(stop_words="russian", max_df=0.95, min_df=5, ngram_range=(1, 2))
    X_tfidf = tfidf.fit_transform(cleaned)
    lsi = make_pipeline(TruncatedSVD(n_components=n_components, random_state=42), Normalizer(copy=False))
    X_lsi = lsi.fit_transform(X_tfidf)
    rows = []
    for k in k_list:
        km = KMeans(n_clusters=k, n_init="auto", random_state=42)
        labels = km.fit_predict(X_lsi)
        sil = float(silhouette_score(X_lsi, labels))
        rows.append({"k": k, "silhouette": sil})
    return pd.DataFrame(rows).sort_values("k")



In [None]:
# Демонстрация на примерах

example_texts = [
    "Клиент не согласен со списанием 525,41 р. за обслуживание карты (годовая комиссия) и просит вернуть средства",
    "Перевод через систему не поступил получателю. Клиент просит сделать розыск и вернуть средства на карту",
    "Была заблокирована карта без предупреждения, деньги недоступны, прошу разблокировать и вернуть доступ",
    "Нужен перерасчет возврата страховки по досрочно погашенному кредиту, официальный ответ не получен",
    "В мобильном приложении нет данных по лимиту карты несмотря на уведомление",
]

# 1) LDA
lda_model = LdaTopicModel(n_topics=10)
lda_model.fit(example_texts)
lda_topics_df = lda_model.to_dataframe(example_texts)
print("Топ-слова по темам (LDA):")
print(pd.DataFrame.from_dict(lda_model.get_topic_top_words(top_n=8), orient="index"))

display(lda_topics_df)

# 2) LSI+KMeans
lsi_model = LsiKMeansTopicModel(n_components=100, n_clusters=10)
lsi_model.fit(example_texts)
lsi_topics_df = lsi_model.to_dataframe(example_texts)
print("\nТоп-термины по кластерам (LSI+KMeans):")
print(pd.DataFrame.from_dict(lsi_model.get_cluster_terms(example_texts, top_n=8), orient="index"))

display(lsi_topics_df)



In [None]:
# Применение к новым жалобам + сохранение/загрузка

new_texts = [
    "Прошу вернуть годовую комиссию, не было уведомления о списании",
    "Перевод на казахстанскую карту не дошёл, сделайте розыск",
]

lda_pred_ids, lda_pred_conf = lda_model.predict(new_texts)
print("LDA предсказания:")
print(pd.DataFrame({"text": new_texts, "topic_id": lda_pred_ids, "confidence": lda_pred_conf}))

lsi_pred_ids = lsi_model.predict(new_texts)
print("\nLSI+KMeans предсказания:")
print(pd.DataFrame({"text": new_texts, "topic_id": lsi_pred_ids}))

# Сохранение
lda_model.save("models/lda_model")
lsi_model.save("models/lsi_kmeans_model")

# Загрузка
lda_loaded = LdaTopicModel.load("models/lda_model")
lsi_loaded = LsiKMeansTopicModel.load("models/lsi_kmeans_model")

_ = lda_loaded.predict(["Возврат страховки по кредиту не произведён"])  # smoke-test
_ = lsi_loaded.predict(["Карта заблокирована без предупреждения"])  # smoke-test



## Пост-обработка: мэппинг тем в бизнес‑рубрики

- Создаем словарь соответствия `исходная_тема → бизнес‑тема`.
- LDA: суммируем вероятности по темам, входящим в одну бизнес‑тему, и берём максимум.
- Пересчитываем ключевые термины по финальным бизнес‑темам (c‑TF‑IDF).
- Анализируем размер финальных тем (шт. и % от корпуса).


In [None]:
from collections import Counter

def apply_lda_mapping(lda_model: LdaTopicModel, texts: List[str], topic_to_business: Dict[int, str]) -> pd.DataFrame:
    """Возвращает финальную бизнес-метку и уверенность (сумма вероятностей) по документу."""
    dist = lda_model.transform(texts)
    final_labels: List[str] = []
    final_conf: List[float] = []
    for row in dist:
        agg: Dict[str, float] = {}
        for topic_id, p in enumerate(row):
            label = topic_to_business.get(topic_id, f"topic_{topic_id}")
            agg[label] = agg.get(label, 0.0) + float(p)
        best_label, best_prob = max(agg.items(), key=lambda x: x[1])
        final_labels.append(best_label)
        final_conf.append(best_prob)
    return pd.DataFrame({"text": texts, "final_label": final_labels, "final_conf": final_conf})


def apply_kmeans_mapping(lsi_model: LsiKMeansTopicModel, texts: List[str], cluster_to_business: Dict[int, str]) -> pd.DataFrame:
    labels = lsi_model.predict(texts)
    final_labels = [cluster_to_business.get(int(l), f"cluster_{int(l)}") for l in labels]
    return pd.DataFrame({"text": texts, "final_label": final_labels})


def compute_business_terms(texts: List[str], labels: List[str], top_n: int = 10) -> Dict[str, List[str]]:
    """Считает ключевые термины для финальных бизнес‑тем (c‑TF‑IDF)."""
    cleaned = clean_corpus(texts)
    groups: Dict[str, List[str]] = {}
    for t, lab in zip(cleaned, labels):
        groups.setdefault(lab, []).append(t)
    business_names = sorted(groups.keys())
    agg_texts = [" ".join(groups[name]) for name in business_names]
    vec = TfidfVectorizer(stop_words="russian", ngram_range=(1, 2), min_df=2)
    M = vec.fit_transform(agg_texts)
    feat = vec.get_feature_names_out()
    terms: Dict[str, List[str]] = {}
    for idx, name in enumerate(business_names):
        row = M[idx]
        scores = row.toarray().ravel()
        order = scores.argsort()[::-1]
        words: List[str] = []
        for j in order:
            w = str(feat[j])
            if w in BLOCKED_TERMS:
                continue
            words.append(w)
            if len(words) >= top_n:
                break
        terms[name] = words
    return terms


def business_size_report(labels: List[str]) -> pd.DataFrame:
    cnt = Counter(labels)
    total = sum(cnt.values())
    rows = [{"business_label": k, "count": v, "share": v / total} for k, v in sorted(cnt.items(), key=lambda x: -x[1])]
    return pd.DataFrame(rows)

# Пример использования (заполните mapping под ваши рубрики)
# Мэппинг для LDA: исходная_тема -> бизнес‑тема
lda_mapping_example = {i: f"Тема_{i}" for i in range(lda_model.lda.n_components)}
lda_final_df = apply_lda_mapping(lda_model, example_texts, lda_mapping_example)
lda_terms_final = compute_business_terms(example_texts, lda_final_df["final_label"].tolist(), top_n=8)
print("Финальные бизнес‑темы (LDA):")
print(business_size_report(lda_final_df["final_label"].tolist()))
print(pd.DataFrame.from_dict(lda_terms_final, orient="index"))

# Мэппинг для LSI+KMeans: кластер -> бизнес‑тема
kmeans_mapping_example = {i: f"Тема_{i}" for i in range(lsi_model.n_clusters)}
kmeans_final_df = apply_kmeans_mapping(lsi_model, example_texts, kmeans_mapping_example)
kmeans_terms_final = compute_business_terms(example_texts, kmeans_final_df["final_label"].tolist(), top_n=8)
print("\nФинальные бизнес‑темы (LSI+KMeans):")
print(business_size_report(kmeans_final_df["final_label"].tolist()))
print(pd.DataFrame.from_dict(kmeans_terms_final, orient="index"))

