In [4]:
import re
from collections import defaultdict, Counter
import math
import pandas as pd
import pickle
from typing import List, Dict
import random

In [5]:
def preprocess_text(text: str) -> str:
    # простая предобработка lower + оставить буквы/цифры/проценты + нормализация пробелов
    text = (text or "").lower()
    text = re.sub(r"[^а-яa-z0-9%]+", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

def tokenize(text: str):
    return preprocess_text(text).split()

In [7]:
class MultinomialNaiveBayes:
    def __init__(self, alpha: float = 1.0):
        self.alpha = float(alpha)
        self.vocab = set()
        self.class_doc_counts = Counter()
        self.class_token_counts = defaultdict(Counter)
        self.class_total_tokens = Counter()
        self.class_log_prior = {}
        self.class_log_likelihood = {}
        self.classes = []
        self.trained = False

    def fit(self, texts, labels):
        # сбрасываем прежние данные
        self.vocab.clear()
        self.class_doc_counts = Counter()
        self.class_token_counts = defaultdict(Counter)
        self.class_total_tokens = Counter()
        self.class_log_prior = {}
        self.class_log_likelihood = {}
        self.classes = []

        for text, label in zip(texts, labels):
            self.class_doc_counts[label] += 1
            tokens = tokenize(text)
            for t in tokens:
                self.vocab.add(t)
                self.class_token_counts[label][t] += 1
                self.class_total_tokens[label] += 1

        if not self.class_doc_counts:
            raise ValueError("Нет обучающих данных (labels пустые).")

        self.classes = list(self.class_doc_counts.keys())
        N = sum(self.class_doc_counts.values())

        self.class_log_prior = {c: math.log(self.class_doc_counts[c] / N) for c in self.classes}

        V = len(self.vocab)
        if V == 0:
            self.class_log_likelihood = {c: {} for c in self.classes}
            self.trained = True
            return

        for c in self.classes:
            denom = self.class_total_tokens[c] + self.alpha * V
            probs = {}
            for w in self.vocab:
                num = self.class_token_counts[c].get(w, 0) + self.alpha
                probs[w] = math.log(num / denom)
            self.class_log_likelihood[c] = probs

        self.trained = True

    def _score(self, text, c):
        if not self.trained:
            raise RuntimeError("Модель не обучена. Вызовите fit() перед predict()")
        tokens = tokenize(text)
        if not tokens:
            return -1e9
        counts = Counter(tokens)
        logp = self.class_log_prior.get(c, math.log(1e-12))
        V = len(self.vocab)
        denom = self.class_total_tokens[c] + self.alpha * V
        unseen_logp = math.log(self.alpha / denom) if V > 0 else math.log(1.0)
        for w, cnt in counts.items():
            if V > 0 and w in self.vocab:
                logp += cnt * self.class_log_likelihood[c].get(w, unseen_logp)
            else:
                logp += cnt * unseen_logp
        return logp

    def predict(self, texts):
        # texts: str или iterable[str]
        single = False
        if isinstance(texts, str):
            texts = [texts]
            single = True
        preds = []
        for text in texts:
            scores = {c: self._score(text, c) for c in self.classes}
            pred = max(scores, key=scores.get)
            preds.append(pred)
        return preds[0] if single else preds

    def predict_proba(self, texts):
        # возвращаем нормализованные вероятности - принимает str или list[str]
        single = False
        if isinstance(texts, str):
            texts = [texts]
            single = True
        results = []
        for text in texts:
            scores = {c: self._score(text, c) for c in self.classes}
            maxs = max(scores.values())
            exps = {c: math.exp(scores[c] - maxs) for c in scores}
            s = sum(exps.values())
            probs = {c: exps[c] / s for c in exps}
            results.append(probs)
        return results[0] if single else results
    
    # ---- не обяз ----
    def save(self, path: str):
        """Сохранить обученный объект модели в файл"""
        with open(path, "wb") as f:
            pickle.dump({
                "alpha": self.alpha,
                "vocab": self.vocab,
                "class_doc_counts": self.class_doc_counts,
                "class_token_counts": self.class_token_counts,
                "class_total_tokens": self.class_total_tokens,
                "class_log_prior": self.class_log_prior,
                "class_log_likelihood": self.class_log_likelihood,
                "classes": self.classes,
                "trained": self.trained
            }, f)
    
    @classmethod
    def load(cls, path: str):
        """Загрузить модель из файла - возвращает экземпляр MultinomialNaiveBayes."""
        with open(path, "rb") as f:
            data = pickle.load(f)
        obj = cls(alpha=data.get("alpha", 1.0))
        obj.vocab = data.get("vocab", set())
        obj.class_doc_counts = data.get("class_doc_counts", Counter())
        obj.class_token_counts = data.get("class_token_counts", defaultdict(Counter))
        obj.class_total_tokens = data.get("class_total_tokens", Counter())
        obj.class_log_prior = data.get("class_log_prior", {})
        obj.class_log_likelihood = data.get("class_log_likelihood", {})
        obj.classes = data.get("classes", [])
        obj.trained = data.get("trained", False)
        return obj
    
    def train_and_evaluate(self,
           texts: List[str],
           labels: List[str],
           test_size: float = 0.2,
           random_state: int = 1,
           return_misclassified: int = 20
    ) -> Dict:
        """
        делим данные на train/test
        вызываем fit
        делаем предсказания на тесте
        возвращаем metrics: accuracy, confusion_matrix, misclassified_examples
        """
        if not (0.0 < test_size < 1.0):
            raise ValueError("test_size должен быть в (0,1)")

        # prepare indices
        n = len(texts)
        indices = list(range(n))
        random.seed(random_state)
        random.shuffle(indices)
        split = int(n * (1 - test_size))
        train_idx = indices[:split]
        test_idx = indices[split:]

        X_train = [texts[i] for i in train_idx]
        y_train = [labels[i] for i in train_idx]
        X_test = [texts[i] for i in test_idx]
        y_test = [labels[i] for i in test_idx]

        # train
        self.fit(X_train, y_train)

        # predict
        preds = self.predict(X_test)

        # accuracy
        correct = sum(1 for p, t in zip(preds, y_test) if p == t)
        accuracy = correct / len(y_test) if y_test else None

        conf = Counter()
        for true, pred in zip(y_test, preds):
            conf[(true, pred)] += 1

        mis = []
        for tx, tr, pr in zip(X_test, y_test, preds):
            if tr != pr and len(mis) < return_misclassified:
                mis.append({"text": tx, "true": tr, "pred": pr})

        return {
            "accuracy": accuracy,
            "confusion": dict(conf),
            "n_test": len(y_test),
            "n_train": len(y_train),
            "misclassified": mis
        }

if __name__ == "__main__":

    # --- загрузка датасета ---
    try:
        df = pd.read_csv("phrases_labels.csv", encoding="utf-8")
        print("Файл phrases_labels.csv успешно загружен.")

        # нормализация названий колонок
        if not {"text", "label"}.issubset(df.columns):
            if {"phrase", "label"}.issubset(df.columns):
                df = df.rename(columns={"phrase": "text"})
            elif {"phrase", "tag"}.issubset(df.columns):
                df = df.rename(columns={"phrase": "text", "tag": "label"})
            else:
                raise ValueError("CSV должен содержать 'text' и 'label' или ('phrase','label').")

        # --- подготовка данных ---
        texts = df["text"].tolist()
        labels = df["label"].tolist()

        # --- создаем модель ---
        model = MultinomialNaiveBayes(alpha=1.0)

        # --- полноценное обучение + оценка ---
        report = model.train_and_evaluate(
            texts=texts,
            labels=labels,
            test_size=0.2,
            random_state=42,
            return_misclassified=10
        )

        print("\n=== РЕЗУЛЬТАТЫ ===")
        print("Train size:", report["n_train"])
        print("Test size:", report["n_test"])
        print("Accuracy:", report["accuracy"])
        print("\nConfusion matrix:")
        for k, v in report["confusion"].items():
            print(f"{k}: {v}")

        print("\nОшибки (если были):")
        for item in report["misclassified"]:
            print(f"TEXT: {item['text']}")
            print(f"  true = {item['true']}, pred = {item['pred']}")
            print()

    except FileNotFoundError:
        print("Файл phrases_labels.csv не найден в этой директории.")
    except Exception as exc:
        print("Ошибка:", exc)

Файл phrases_labels.csv успешно загружен.

=== РЕЗУЛЬТАТЫ ===
Train size: 82
Test size: 21
Accuracy: 0.6190476190476191

Confusion matrix:
('not_spam', 'spam'): 7
('spam', 'spam'): 11
('spam', 'not_spam'): 1
('not_spam', 'not_spam'): 2

Ошибки (если были):
TEXT: ответьте, пожалуйста
  true = not_spam, pred = spam

TEXT: пожалуйста подтвердите
  true = not_spam, pred = spam

TEXT: акт выполненных работ
  true = not_spam, pred = spam

TEXT: работа на дому
  true = spam, pred = not_spam

TEXT: совместная работа
  true = not_spam, pred = spam

TEXT: план работ
  true = not_spam, pred = spam

TEXT: предлагаю встретиться
  true = not_spam, pred = spam

TEXT: версия релиза
  true = not_spam, pred = spam
