# Работа с текстом

В этом домашнем задании вам предстоит поработать с текстовыми данными и научиться находить спам сообщения!

In [186]:
import inspect
import math
import random
import re
from collections import Counter, defaultdict
from string import punctuation

import numpy as np
from nltk import SnowballStemmer, download
from nltk.corpus import stopwords
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split

In [187]:
download("stopwords")

[nltk_data] Downloading package stopwords to /home/wrdx/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [188]:
def set_seed(seed=42):
    np.random.seed(seed)
    random.seed(seed)


# Этой функцией будут помечены все места, которые необходимо дозаполнить
# Это могут быть как целые функции, так и отдельные части внутри них
# Всегда можно воспользоваться интроспекцией и найти места использования этой функции :)
def todo():
    stack = inspect.stack()
    caller_frame = stack[1]
    function_name = caller_frame.function
    line_number = caller_frame.lineno
    raise NotImplementedError(f"TODO at {function_name}, line {line_number}")


SEED = 0xC0FFEE
set_seed(SEED)

In [189]:
def read_dataset(filename):
    x, y = [], []
    with open(filename, encoding="utf-8") as file:
        for line in file:
            cl, sms = re.split(r"^(ham|spam)[\t\s]+(.*)$", line)[1:3]
            x.append(sms)
            y.append(cl)
    return x, y

In [190]:
X, y = read_dataset("spam.txt")

In [191]:
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.9, random_state=SEED, stratify=y)

In [192]:
for x_, y_ in zip(X_train[:5], y_train[:5]):
    print(f"{y_}: {x_}")

ham: Two fundamentals of cool life: "Walk, like you are the KING"...! OR "Walk like you Dont care,whoever is the KING"!... Gud nyt
ham: Haha... Where got so fast lose weight, thk muz go 4 a month den got effect... Gee,later we go aust put bk e weight.
ham: I wish things were different. I wonder when i will be able to show you how much i value you. Pls continue the brisk walks no drugs without askin me please and find things to laugh about. I love you dearly.
ham: Tmr then ü brin lar... Aiya later i come n c lar... Mayb ü neva set properly ü got da help sheet wif ü...
ham: For many things its an antibiotic and it can be used for chest abdomen and gynae infections even bone infections.


In [193]:
Counter(y_train)

Counter({'ham': 4344, 'spam': 672})

## Bag of Words (2 балла)

Реализуйте простой подсчет слов в тексте, в качестве токенизатора делите по пробелу, убрав перед этим все знаки пунктуации и приведя к нижнему регистру.

После этого обучите простую логистическую модель, измерьте ее качество и сделайте выводы.

In [194]:
class BagOfWords:
    def __init__(self, vocabulary_size: int = 1000):
        """Init Bag-of-Words instance

        Args:
            vocabulary_size: maximum number of tokens in vocabulary
        """
        self._vocabulary_size = vocabulary_size
        self._vocabulary: dict[str, int] = None

    def _tokenize(self, sentence: str) -> list[str]:
        sentence = re.sub(r'[^\w\s]', '', sentence.lower())
        return sentence.split()
        
    def fit(self, sentences: list[str]):
        """Fit Bag-of-Words based on list of sentences"""
        tokens = []
        for sent in sentences:
            tokens += self._tokenize(sent)
        counter = Counter(tokens)
        
        common = counter.most_common(self._vocabulary_size)
        self._vocabulary = {token: i for i, (token, _) in enumerate(common)}

    def transform(self, sentences: list[str]) -> np.ndarray:
        """Vectorize texts using built vocabulary

        Args:
            sentences: list of sentences to vectorize

        Return:
            transformed texts, matrix of (n_sentences, vocab_size)
        """
        if self._vocabulary is None:
            raise RuntimeError("Fit before transforming!")

        vectors = np.zeros((len(sentences), self._vocabulary_size))
        
        for i, sentence in enumerate(sentences):
            tokens = self._tokenize(sentence)
            for token in tokens:
                if token in self._vocabulary:
                    token_idx = self._vocabulary[token]
                    vectors[i, token_idx] += 1
                    
        return vectors

    def fit_transform(self, sentences: list[str]) -> np.ndarray:
        self.fit(sentences)
        return self.transform(sentences)

In [195]:
def get_bow_size(cls, params=None, metric=accuracy_score):
    sizes = [10, 50, 100, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, 5000]
    best_score, best_size = 0, 0
    params = params if params else dict()
    for size in sizes:
        
        bow = cls(vocabulary_size=size, **params)
        
        X_train_bow = bow.fit_transform(X_train)
        X_test_bow = bow.transform(X_test)
        
        log_reg = LogisticRegression()
        log_reg.fit(X_train_bow, y_train)
        y_pred = log_reg.predict(X_test_bow)
        score = metric(y_test, y_pred)
        
        if score > best_score:
            best_score, best_size = score, size
    return best_size

In [196]:
bow_best_size = get_bow_size(BagOfWords)
bow_best_size

3000

In [197]:
bow = BagOfWords(vocabulary_size=bow_best_size)
X_train_bow = bow.fit_transform(X_train)
X_test_bow = bow.transform(X_test)

X_train_bow.shape, X_test_bow.shape

((5016, 3000), (558, 3000))

In [198]:
model = LogisticRegression()
model.fit(X_train_bow, y_train)

y_pred = model.predict(X_test_bow)
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

         ham       0.99      1.00      1.00       483
        spam       1.00      0.95      0.97        75

    accuracy                           0.99       558
   macro avg       1.00      0.97      0.98       558
weighted avg       0.99      0.99      0.99       558



## Обработка текста (1 балл)

Добавьте на этапе токенизатора удаление стоп-слов и стемминг, для этого можно воспользоваться [`SnowballStemmer`](https://www.nltk.org/api/nltk.stem.SnowballStemmer.html) из библиотеки `nltk`.

⚠️ `nltk` уже довольно устаревшая библиотека и скорее не рекомендуется ее использовать, однако в учебных целях более чем достаточно.

Обучите логистическую регрессию, попробуйте по-разному комбинировать стемминг и удаление стоп-слов, сделайте выводы.

In [199]:
class BagOfWordsStem(BagOfWords):
    def __init__(
        self,
        vocabulary_size: int,
        language: str = "english",
        ignore_stopwords: bool = True,
        remove_stopwords: bool = True,
    ):
        super().__init__(vocabulary_size)
        if remove_stopwords and not ignore_stopwords:
            raise ValueError("To remove stop-words they should be ignored by stemmer")
        self._stemmer = SnowballStemmer(language)
        self._stopwords = set(stopwords.words(language))
        self._remove_stopwords = remove_stopwords

    def _tokenize(self, sentence: str) -> list[str]:
        tokens = super()._tokenize(sentence)
        result = []
        
        for token in tokens:
            if self._remove_stopwords and token in self._stopwords:
                continue
            result.append(self._stemmer.stem(token))
        return result

In [200]:
best_size = get_bow_size(BagOfWordsStem, {"ignore_stopwords": True, "remove_stopwords": True})
best_size

500

In [201]:
bow = BagOfWordsStem(vocabulary_size=best_size, ignore_stopwords=True, remove_stopwords=True)
X_train_bow = bow.fit_transform(X_train)
X_test_bow = bow.transform(X_test)

X_train_bow.shape, X_test_bow.shape

((5016, 500), (558, 500))

In [202]:
model = LogisticRegression()
model.fit(X_train_bow, y_train)

y_pred = model.predict(X_test_bow)
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

         ham       0.99      1.00      0.99       483
        spam       0.99      0.95      0.97        75

    accuracy                           0.99       558
   macro avg       0.99      0.97      0.98       558
weighted avg       0.99      0.99      0.99       558



## TF-IDF (2 балла)

Доработайте предыдущий класс до полноценного Tf-Idf, затем, аналогично, проведите эксперименты с логистической регрессией.

In [208]:
class TFIDFVectorizer:
    def __init__(
        self,
        vocabulary_size: int,
        language: str = "english",
        ignore_stopwords: bool = True,
        remove_stopwords: bool = True,
        use_idf: bool = False,
    ):
        self._vocabulary_size = vocabulary_size
        self._vocabulary = None
        self._idf = None
        self._use_idf = use_idf

        # Логику с токенизацией можно вынести в отдельный класс!
        if remove_stopwords and not ignore_stopwords:
            raise ValueError("To remove stop-words they should be ignored by stemmer")
        self._stemmer = SnowballStemmer(language)
        self._stopwords = set(stopwords.words(language))
        self._remove_stopwords = remove_stopwords
        self._ignore_stopwords = ignore_stopwords

    def _tokenize(self, sentence: str) -> list[str]:
        sentence = re.sub(r'[^\w\s]', '', sentence.lower())
        tokens = [token for token in sentence.split(' ') if token]
        result = []
        
        for token in tokens:
            is_stopword = token in self._stopwords

            if self._remove_stopwords and is_stopword:
                continue 

            if is_stopword and self._ignore_stopwords:
                result.append(self._stemmer.stem(token))
            else:
                result.append(token)

        return result
        
    def fit(self, sentences: list[str]):
        """Build vocabulary and compute IDF"""
        term_freq = defaultdict(int)
        doc_freq = defaultdict(int)
        total_docs = len(sentences)
        
        for sentence in sentences:
            tokens = self._tokenize(sentence)
            unique_tokens = set(tokens)
            
            for token in unique_tokens:
                doc_freq[token] += 1
                
            for token in tokens:
                term_freq[token] += 1
        
        sorted_terms = sorted(doc_freq.items(), key=lambda x: x[1], reverse=True)
        top_terms = [term for term, _ in sorted_terms[:self._vocabulary_size]]
        
        self._vocabulary = {term: idx for idx, term in enumerate(top_terms)}
        
        if self._use_idf:
            self._idf = {}
            for term in self._vocabulary:
                self._idf[term] = np.log((total_docs + 1) / (doc_freq.get(term, 0) + 1) + 1)


    def transform(self, sentences: list[str]) -> np.ndarray:
        """Transform sentences to TF-IDF vectors"""
        vectors = np.zeros((len(sentences), len(self._vocabulary)))
        
        for i, sentence in enumerate(sentences):
            tokens = self._tokenize(sentence)
            total_terms = len(tokens)
                
            term_counts = defaultdict(int)
            for token in tokens:
                if token in self._vocabulary:
                    term_counts[token] += 1
            
            for term, count in term_counts.items():
                tf = count / total_terms
                
                if self._use_idf:
                    tfidf = tf * self._idf[term]
                else:
                    tfidf = tf
                
                term_idx = self._vocabulary[term]
                vectors[i, term_idx] = tfidf
                
        return vectors

    def fit_transform(self, sentences: list[str]) -> np.ndarray:
        self.fit(sentences)
        return self.transform(sentences)

In [209]:
tfidf = TFIDFVectorizer(vocabulary_size=4000, remove_stopwords=True, use_idf=True)
X_train_tfidf = tfidf.fit_transform(X_train)
X_test_tfidf = tfidf.transform(X_test)

X_train_tfidf.shape, X_test_tfidf.shape

((5016, 4000), (558, 4000))

In [210]:
model = LogisticRegression()
model.fit(X_train_tfidf, y_train)

y_pred = model.predict(X_test_tfidf)
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

         ham       0.98      1.00      0.99       483
        spam       0.97      0.84      0.90        75

    accuracy                           0.97       558
   macro avg       0.97      0.92      0.94       558
weighted avg       0.97      0.97      0.97       558



## NaiveBayes (5 баллов)

Наивный байесовский классификатор — это простой и эффективный алгоритм машинного обучения, основанный на теореме Байеса с наивным предположением независимости признаков.

### Формула Байеса

$$
P(A|B) = \frac{P(B|A) \cdot P(A)}{P(B)}
$$

В контексте классификации текста это значит: $P(класс | документ) \propto P(класс) \cdot P(документ | класс)$

Почему "наивность"? Потому что предпологаем, что все слова независимы:

$$
P(w_1, w_2, \dots | class) = P(w_1 | class) \cdot P(w_2 | class) \cdot \dots
$$

### Классификация текста

Таким образом, для классификации текста необходимо:

1. Вычислить априорную вероятность класса: $P(class)$, доля документов с таким классом
2. Вычислить правдоподобие: $P(text | class) = \prod_{i=1}^n P(w_i | class)$

_Примечание:_ $P(w_i | class)$ — это частота слова в данном классе относительно всех слов в классе, при этом зачастую добавляют сглаживание Лапласа в качестве регуляризатора
$$
P(w_i | class) = \frac{\text{частота слова в классе} + \alpha}{\text{сумма всех слов в классе} + \alpha \cdot |V|}
$$

После этого, необходимо выбрать наиболее вероятный класс для данного текста:

$$
class = \arg \max\limits_{c} \Big[ P(c) \cdot P(text | c) \Big] = \arg \max\limits_{c} \Big[ \log P(c) + \sum_{i=1}^n \log P(w_i | c) \Big]
$$

### Реализация

`fit(X, y)` - оценивает параметры распределения `p(x|y)` для каждого `y`.

`log_proba(X)` - для каждого элемента набора `X` считает логарифм вероятности отнести его к каждому классу.

In [211]:
class NaiveBayes:

    def __init__(self, alpha: float = 1.0):
        """
        Args:
            alpha: regularization coefficient
        """
        self.alpha = alpha
        self._classes = None  # [n classes]
        self._vocab_size = None  # int
        self._log_p_y = None  # [n classes]
        self._log_p_x_y = None  # [n classes, vocab size]

    def fit(self, features: np.ndarray, targets: list[str]):
        """Estimate p(x|y) and p(y) based on data

        Args:
            features, [n samples; vocab size]: input features
            targets, [n samples]: targets
        """
        targets = np.array(targets)
        self._classes = np.unique(targets)
        self._vocab_size = features.shape[1]
        
        class_counts = np.array([np.sum(targets == c) for c in self._classes])
        self._log_p_y = np.log(class_counts / len(targets))
        
        self._log_p_x_y = np.zeros((len(self._classes), self._vocab_size))
        
        for i, c in enumerate(self._classes):
            class_docs = features[targets == c]
            word_counts = np.sum(class_docs, axis=0)
            total_words = np.sum(word_counts)
            self._log_p_x_y[i] = np.log((word_counts + self.alpha) / (total_words + self.alpha * self._vocab_size))

    def predict(self, features: np.ndarray) -> np.ndarray:
        """Predict class for each sample

        Args:
            features, [n samples; vocab size]: feature to predict
        Return:
            classes, [n samples]: predicted class
        """
        log_proba = self.log_proba(features)
        return self._classes[np.argmax(log_proba, axis=1)]

    def log_proba(self, features: np.ndarray) -> np.ndarray:
        """Calculate p(y|x) for each class and each sample

        Args:
            features, [n samples; vocab size]: feature to predict
        Return:
            classes, [n samples;  n classes]: log proba for each class
        """
        if self._vocab_size is None:
            raise RuntimeError("Fit classifier before predicting something")
        if features.shape[1] != self._vocab_size:
            raise RuntimeError(
                f"Unexpected size of vocabulary, expected {self._vocab_size}, actual {features.shape[1]}"
            )
        log_proba = np.zeros((features.shape[0], len(self._classes)))
        
        for i in range(len(self._classes)):
            class_log_proba = features @ self._log_p_x_y[i]
            log_proba[:, i] = self._log_p_y[i] + class_log_proba
            
        return log_proba

In [212]:
bow = BagOfWordsStem(vocabulary_size=bow_best_size, remove_stopwords=True)
X_train_bow = bow.fit_transform(X_train)
X_test_bow = bow.transform(X_test)

X_train_bow.shape, X_test_bow.shape

((5016, 3000), (558, 3000))

In [213]:
model = NaiveBayes(alpha=1.0)
model.fit(X_train_bow, y_train)

y_pred = model.predict(X_test_bow)
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

         ham       0.99      0.99      0.99       483
        spam       0.92      0.96      0.94        75

    accuracy                           0.98       558
   macro avg       0.96      0.97      0.97       558
weighted avg       0.98      0.98      0.98       558

