# Практическая работа по анализу текста

В качестве метрики качества используйте отчет о классификации https://scikit-learn.org/1.5/modules/generated/sklearn.metrics.classification_report.html

## 0. init

In [None]:
import os
import platform
import random
import re
import string
from collections import Counter, defaultdict
from datetime import datetime

import matplotlib.pyplot as plt
import nltk
import numpy as np
import optuna
import pandas as pd
import pyLDAvis
import pyLDAvis.gensim_models as gensimvis
import pymorphy3
import razdel
import seaborn as sns
import shap
import shap.plots as sp
from catboost import CatBoostClassifier
from gensim.corpora import Dictionary
from gensim.models import CoherenceModel, LdaModel
from nltk.corpus import stopwords
from pandarallel import pandarallel
from sklearn.calibration import LabelEncoder
from sklearn.decomposition import PCA
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.manifold import TSNE
from sklearn.metrics import classification_report, f1_score
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.svm import SVC
from sklearn.utils.class_weight import compute_class_weight
from toolz import compose
from toolz.curried import map as cmap, pluck, sliding_window
from tqdm.notebook import tqdm
from umap import UMAP
from wordcloud import WordCloud

In [None]:
nltk.download("stopwords")

tqdm.pandas()
pandarallel.initialize(progress_bar=True)

system_platform = platform.system()

RESULT = {}  # name: accuracy_train, accuracy_test, model, params,
#                    X_train, X_test, y_train, y_test, vectorizer, cl_rep_train, cl_rep_test

In [3]:
SEED = 1234

# nullable
MAX_FEATURES = None

# non-null, 0.0 <= MIN_DF, MAX_DF <= 1.0
MIN_DF = 10
MAX_DF = 0.5

N_TRIALS = 5
CV_FOLDS = 2

# MAX_ITERS > MIN_ITERS
MIN_ITERS = 100
MAX_ITERS = 300

THEME_COUNT_START = 9 # min_num_topics = THEME_COUNT_START
THEME_COUNT_END = 12 # max_num_topics = THEME_COUNT_END

os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"
os.environ["PYTHONHASHSEED"] = str(SEED)

random.seed(SEED)
np.random.seed(SEED)

In [4]:
def mean_score_from_class_report(map: dict, score_kind: str = "f1-score"):
    scores = []

    for key in map.keys():
        try:
            scores.append(map[key][score_kind])
        except:
            print((map[key]))
            raise ValueError(f"score {score_kind} for key {key} isn't found")

    return sum(scores) / len(scores)


def print_results():
    for key in RESULT.keys():
        print(f"Model: {key}")
        print(f"Train accuracy: {RESULT[key]['accuracy_train']}")
        print(f"Test accuracy: {RESULT[key]['accuracy_test']}")
        print()

In [None]:
start_dt = datetime.now()

df = pd.read_csv("./train.csv", encoding="utf-8")

df.head(5)

## 1. Обработка данных

In [6]:
all_text = " ".join(df["text"])

words = re.findall(r"\b\w+\b", all_text.lower())
word_counts = Counter(words)

word_counts_df = pd.DataFrame(
    word_counts.items(), columns=["word", "count"]
).sort_values(by="count", ascending=False)

In [7]:
ru_alph_low = [chr(i) for i in range(ord("а"), ord("я") + 1)]
en_alph_low = [chr(i) for i in range(ord("a"), ord("z") + 1)]

In [8]:
replace_tokens = {
    "км": "километр",
    "г": "год",
    "мск": "москва",
    "д": "день",
    "р": "руб",
    "рубль": "руб",
    "₽": "руб",
    "америка": "сша",
    "msk": "москва",
    "дек": "декабрь",
    "спб": "петербург",
    "spb": "петербург",
    "санктпетербург": "петербург",
}

In [9]:
names = [
    "александр",
    "андрей",
    "марк",
    "ян",
    "камил",
    "сергей",
    "карлос",
    "мария",
    "дмитрий",
    "уильямс",
    "даниил",
    "данил",
    "джеймс",
    "игорь",
    "саша",
    "денис",
    "светлана",
    "александра",
]

In [10]:
countries = [
    "us",
    "usa",
    "сша",
    "россия",
    "russia",
    "rus",
    "бразилия",
    "турция",
    "вьетнам",
    "германия",
    "польша",
    "австралия",
    "франция",
    "ссср",
]

In [11]:
cities = [
    "казань",
    "урал",
    "москва",
    "майами",
    "бостон",
    "санкт",
    "петербург",
    "екатеринбург",
]

In [12]:
months = [
    "январь",
    "февраль",
    "март",
    "апрель",
    "май",
    "июнь",
    "июль",
    "август",
    "сентябрь",
    "октябрь",
    "ноябрь",
    "декабрь",
]

In [13]:
punctuation = [i for i in string.punctuation] + [
    "",
    "⠀",
    "―",
    "⸺",
    "⸻",
    "—",
    "–",
    "‑",
    "‐",
    "−",
    "-",
    "–",
]

In [14]:
dt = [
    "время",
    "год",
    "месяц",
    "неделя",
    "день",
    "час",
    "минута",
    "послезавтра",
    "завтра",
    "сегодня",
    "вчера",
    "позавчера",
]

In [15]:
useless = [
    "это",
    "такой",
    "который",
    "весь",
    "ваш",
    "наш",
    "все",
    "всё",
    "еще",
    "ещё",
    "даже",
    "пока",
    "свой",
    "этот",
    "снова",
    "хотя",
    "либо",
    "каждый",
    "также",
    "твой",
    "поэтому",
    "чтобы",
    "ранее",
    "нужно",
    "далее",
    "наиболее",
]

In [16]:
verbs = [
    "мочь",
    "смочь",
    "быть",
    "стать",
    "сказать",
    "смотреть",
    "описать",
    "думать",
    "говорить",
    "продать",
    "указать",
    "работать",
    "рассказать",
    "провести",
    "получить",
    "выиграть",
    "пройти",
    "начать",
    "добавить",
    "написать",
    "считать",
    "взять",
    "иметь",
    "писать",
    "купить",
    "являться",
    "хотеть",
    "играть",
    "сделать",
    "делать",
    "сыграть",
    "знать",
    "выйти",
]

In [17]:
adj = [
    "новый",
    "хороший",
    "большой",
    "следующий",
    "самый",
    "готовый",
    "некоторый",
    "любой",
    "данный",
    "дорогой",
    "московский",
    "первый",
    "второй",
]

In [18]:
nouns = [
    "очко",
    "матч",
    "команда",
    "турнир",
    "чемпионат",
    "чемпион",
    "победитель",
    "проигрыш",
    "победа",
    "поражение",
    "ссылка",
    "компания",
    "информация",
    "новинка",
    "рука",
    "тело",
    "рекорд",
    "встреча",
    "мир",
    "результат",
    "игра",
    "игрок",
    "тренер",
    "друг",
    "человек",
    "строчка",
    "финал",
]

In [None]:
def detect_outliers(word_counts_df):
    outliers = []
    for word in word_counts_df["word"]:
        if (
            (
                any(char in en_alph_low for char in word)
                and any(char in ru_alph_low for char in word)
            )
            or any(char.isdigit() for char in word)
            or word.isspace()
        ):
            outliers.append(word)

    return outliers


all_text = " ".join(df["text"])

words = re.findall(r"\b\w+\b", all_text.lower())
word_counts = Counter(words)

word_counts_df = pd.DataFrame(
    word_counts.items(), columns=["word", "count"]
).sort_values(by="count", ascending=True)

outliers = detect_outliers(word_counts_df)

print(len(outliers), outliers)

In [20]:
custom_stop_words = (
    ru_alph_low
    + en_alph_low
    + punctuation
    + months
    + names
    + countries
    + cities
    + dt
    + useless
    + verbs
    + adj
    + nouns
    + outliers
    + [
        "руб",
        "офф",
        "очень",
        "id",
        "ска",
        "млн",
        "го",
        "ло",
        "вк",
        "яндекс",
        "pro",
    ]
)

In [21]:
stop_words = set(
    stopwords.words("russian") + stopwords.words("english") + custom_stop_words
)

In [None]:
morph = pymorphy3.MorphAnalyzer()


def filter_token(token: str):
    to_replace_list = punctuation

    for to_replace in to_replace_list:
        token = token.replace(to_replace, "")

    return token


def process_token(token: str):
    new_token: str = (
        token if token not in replace_tokens.keys() else replace_tokens[token]
    ).replace("ё", "е")

    if new_token in stop_words or (
        any(char in en_alph_low for char in token)
        and any(char in ru_alph_low for char in token)
    ):
        return "tokenoid"

    return new_token


def process_text(text):
    tokens = [
        filter_token(token.text)
        for token in razdel.tokenize(text)
        if token.text not in stop_words
    ]

    normalized_tokens = [
        morph.parse(token)[0].normal_form
        for token in tokens
        if "UNKN" not in "".join([kind.tag._str for kind in morph.parse(token)])
    ]

    filtered_tokens = []

    for token in normalized_tokens:
        filtered_token = process_token(token)

        if "tokenoid" not in filtered_token:
            filtered_tokens.append(str(filtered_token))

    return " ".join(filtered_tokens)


if system_platform == "Windows":
    df["processed_text"] = df["text"].progress_apply(process_text)
else:
    df["processed_text"] = df["text"].parallel_apply(process_text)

In [23]:
all_text = " ".join(df["processed_text"])

words = re.findall(r"\b\w+\b", all_text.lower())
word_counts = Counter(words)

## 2. EDA

In [None]:
sentence_outliers = df[df["processed_text"] == ""]
print(f"Sentence outliers detected: {sentence_outliers}\n")

df = df.dropna()

class_distribution = df["category"].value_counts()
print(class_distribution)

plt.figure(figsize=(10, 6))
sns.barplot(x=class_distribution.index, y=class_distribution.values)
plt.xlabel("category")
plt.ylabel("count")
plt.show()

In [None]:
for cls in df["category"].unique():
    text = " ".join(df[df["category"] == cls]["processed_text"])

    wordcloud = WordCloud(
        width=400,
        height=200,
        collocations=False,
        background_color="white",
    ).generate(text)

    plt.figure(figsize=(10, 6))
    plt.imshow(wordcloud)
    plt.title(str(cls))
    plt.axis("off")
    plt.show()

In [26]:
X = df["processed_text"]
y = df["category"]

X_train_src, X_test_src, y_train_src, y_test_src = train_test_split(
    X,
    y,
    random_state=SEED,
    stratify=y,
)

выводы:
1. балансировать не обязательно, так как примерно все на одном уровне
2. пустые предложения отсутствуют

## 3. Тематическое моделирование

In [27]:
def compute_coherence_values(
    dictionary,
    corpus,
    texts,
    limit: int,
    passes: int,
    chunksize: int,
    iterations: int,
    start: int,
    step: int,
    eval_every=None,
):
    """
        Compute c_v coherence for various number of topics
        Parameters:
        ----------
        dictionary : Gensim dictionary
        corpus : Gensim corpus
        texts : List of input texts
        limit : Max num of topics
        Returns:
        -------
        model_list : List of LDA topic models
        coherence_values : Coherence values corresponding to the LDA model
    with respective number of topics
    """
    temp = dictionary[0]
    id2word = dictionary.id2token
    coherence_values = []
    model_list = []

    for num_topics in range(start, limit + 1, step):
        model = LdaModel(
            corpus=corpus,
            id2word=id2word,
            chunksize=chunksize,
            alpha="auto",
            eta="auto",
            iterations=iterations,
            num_topics=num_topics,
            passes=passes,
            eval_every=eval_every,
        )
        model_list.append(model)
        coherencemodel = CoherenceModel(
            model=model,
            texts=texts,
            dictionary=dictionary,
            coherence="c_v",
        )
        coherence_values.append(coherencemodel.get_coherence())

    return model_list, coherence_values

In [28]:
chunksize = 5000
passes = 10
iterations = 150
eval_every = None
start = THEME_COUNT_START
limit = THEME_COUNT_END
step = 1


texts = [text.split() for text in X_train_src]
dictionary = Dictionary(texts)
corpus = [dictionary.doc2bow(text) for text in texts]

model_list, coherence_values = compute_coherence_values(
    limit=limit,
    passes=passes,
    start=start,
    step=step,
    chunksize=chunksize,
    iterations=iterations,
    dictionary=dictionary,
    corpus=corpus,
    texts=texts,
)

In [None]:
x = range(start, limit + 1, step)

plt.plot(x, coherence_values)
plt.xlabel("Num Topics")
plt.ylabel("Coherence score")
plt.legend(("coherence_values"), loc="best")
plt.show()

In [None]:
best_model_index = coherence_values.index(max(coherence_values))
best_model = model_list[best_model_index]

lda_display = gensimvis.prepare(best_model, corpus, dictionary, sort_topics=False)
pyLDAvis.display(lda_display)

In [None]:
def theme_vectorizer(
    model: LdaModel,
    dictionary: Dictionary,
    X_train: pd.Series,
    X_test: pd.Series,
):
    def topics_to_vector(topic_probs, num_topics):
        vector = np.zeros(num_topics)
        for topic_num, prob in topic_probs:
            vector[topic_num] = prob
        return vector

    X_train_topics = [model[dictionary.doc2bow(text.split())] for text in X_train]
    X_test_topics = [model[dictionary.doc2bow(text.split())] for text in X_test]

    num_topics = model.num_topics

    X_train_vectors = np.array(
        [topics_to_vector(topics, num_topics) for topics in X_train_topics]
    )
    X_test_vectors = np.array(
        [topics_to_vector(topics, num_topics) for topics in X_test_topics]
    )

    return X_train_vectors, X_test_vectors


X_train_vectors, X_test_vectors = theme_vectorizer(
    model=best_model,
    dictionary=dictionary,
    X_train=X_train_src,
    X_test=X_test_src,
)

svm = SVC(probability=True, random_state=SEED)
svm.fit(X_train_vectors, y_train_src)

y_train_pred = svm.predict(X_train_vectors)
y_test_pred = svm.predict(X_test_vectors)

cl_rep_train = classification_report(y_train_src, y_train_pred, output_dict=True)
cl_rep_test = classification_report(y_test_src, y_test_pred, output_dict=True)

In [None]:
RESULT["svc"] = {
    "accuracy_train": cl_rep_train["accuracy"],
    "accuracy_test": cl_rep_test["accuracy"],
    "model": svm,
    "params": None,
    "X_train": X_train_vectors,
    "X_test": X_test_vectors,
    "y_train": y_train_src,
    "y_test": y_test_src,
    "vectorizer": None,
    "cl_rep_train": cl_rep_train,
    "cl_rep_test": cl_rep_test,
}

print(cl_rep_train["accuracy"], cl_rep_test["accuracy"])

## 4. Градиентный бустинг

In [29]:
label_encoder = LabelEncoder()

y_train_encoded = label_encoder.fit_transform(y_train_src)
y_test_encoded = label_encoder.transform(y_test_src)

In [33]:
if platform.system() == "Windows":
    task_type = "GPU"
    
else:
    task_type = "CPU"

def objective(trial, X, y, cv_folds=5):
    params = {
        "iterations": trial.suggest_int("iterations", MIN_ITERS, MAX_ITERS),
        "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.1, log=True),
        "depth": trial.suggest_int("depth", 7, 15),
        "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1e-8, 100.0, log=True),
        "random_strength": trial.suggest_float("random_strength", 1e-8, 10.0, log=True),
        "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 1, 30),
        "loss_function": "MultiClass",
    }

    cv = StratifiedKFold(
        n_splits=cv_folds,
        shuffle=True,
        random_state=SEED,
    )
    scores = []

    for fold, (train_idx, valid_idx) in enumerate(cv.split(X, y)):
        X_train, X_valid = X[train_idx], X[valid_idx]
        y_train, y_valid = y[train_idx], y[valid_idx]

        model = CatBoostClassifier(
            **params,
            verbose=0,
            task_type=task_type,
            random_state=SEED,
        )

        model.fit(
            X_train,
            y_train,
            eval_set=[(X_valid, y_valid)],
            early_stopping_rounds=50,
            verbose=False,
        )

        preds = model.predict(X_valid)
        fold_score = f1_score(y_valid, preds, average="weighted")
        scores.append(fold_score)

    return np.mean(scores)


def train_catboost_with_optuna(
    name: str,
    vectorizer,
    X_train,
    y_train,
    X_test,
    y_test,
    n_trials: int = 100,
    cv_folds: int = 5,
    params: dict = None,
    timeout: float = None,
    force: bool = False,
):  
    def go_study():
        study = optuna.create_study(direction="maximize")

        study.optimize(
            lambda trial: objective(
                trial=trial,
                X=X_train,
                y=y_train,
                cv_folds=cv_folds,
            ),
            n_trials=n_trials,
            timeout=timeout,
            show_progress_bar=True,
        )

        return study.best_params

    if force == False:
        if params == None:
            if name in RESULT:
                print(
                    f"name: {name}, train: {RESULT[name]['accuracy_train']}, test: {RESULT[name]['accuracy_test']}"
                )
                return RESULT[name]["model"]
            else:
                params = go_study()
        else:
            pass

    else:
        if not params:
            params = go_study()

    final_model = CatBoostClassifier(
        **params,
        verbose=0,
        task_type=task_type,
        random_state=SEED,
    )

    final_model.fit(X_train, y_train)

    y_train_pred = final_model.predict(X_train)
    y_test_pred = final_model.predict(X_test)

    cl_rep_train = classification_report(y_train, y_train_pred, output_dict=True)
    cl_rep_test = classification_report(y_test, y_test_pred, output_dict=True)

    RESULT[name] = {
        "accuracy_train": cl_rep_train["accuracy"],
        "accuracy_test": cl_rep_test["accuracy"],
        "model": final_model,
        "params": params,
        "X_train": X_train,
        "X_test": X_test,
        "y_train": y_train,
        "y_test": y_test,
        "vectorizer": vectorizer,
        "cl_rep_train": cl_rep_train,
        "cl_rep_test": cl_rep_test,
    }

    print(
        f"name: {name}, train: {cl_rep_train["accuracy"]}, test: {cl_rep_test["accuracy"]}"
    )

    return final_model

1. В методах преобразования данных пробуйте различные параметры, в поисках лучших для решения текущей задачи
2. Не забывайте про подбор параметров у самого бустинга с помощью optuna

https://forecastegy.com/posts/catboost-hyperparameter-tuning-guide-with-optuna/

### Мешок слов

In [None]:
vectorizer = CountVectorizer(
    max_features=MAX_FEATURES,
    max_df=MAX_DF,
    min_df=MIN_DF,
)

X_train = vectorizer.fit_transform(X_train_src)
X_test = vectorizer.transform(X_test_src)

print(X_train.shape, MIN_DF, MAX_DF)

train_catboost_with_optuna(
    name="bow",
    vectorizer=vectorizer,
    X_train=X_train,
    y_train=y_train_encoded,
    X_test=X_test,
    y_test=y_test_encoded,
    n_trials=N_TRIALS,
    cv_folds=CV_FOLDS,
    # params=RESULT["bow"]["params"] if "bow" in RESULT else None,
    force=True,
)

In [None]:
print_results()

### Мешок слов + n-grams

In [None]:
vectorizer = CountVectorizer(
    ngram_range=(1, 2),
    max_features=MAX_FEATURES,
    max_df=MAX_DF,
    min_df=MIN_DF,
)

X_train = vectorizer.fit_transform(X_train_src)
X_test = vectorizer.transform(X_test_src)

train_catboost_with_optuna(
    name="bow_n",
    vectorizer=vectorizer,
    X_train=X_train,
    y_train=y_train_encoded,
    X_test=X_test,
    y_test=y_test_encoded,
    n_trials=N_TRIALS,
    cv_folds=CV_FOLDS,
)

### Мешок слов + m-skip-n-grams

In [None]:
class SkipGramVectorizer(CountVectorizer):
    def build_analyzer(self):
        preprocess = self.build_preprocessor()
        stop_words = self.get_stop_words()
        tokenize = self.build_tokenizer()
        return lambda doc: self._word_skip_grams(
            compose(
                tokenize,
                preprocess,
                self.decode,
            )(doc)
        )

    def _word_skip_grams(self, tokens):
        return compose(cmap(" ".join), pluck([0, 2]), sliding_window(3))(tokens)


vectorizer = SkipGramVectorizer(
    max_features=MAX_FEATURES,
    max_df=MAX_DF,
    min_df=MIN_DF,
)

X_train = vectorizer.fit_transform(X_train_src)
X_test = vectorizer.transform(X_test_src)

train_catboost_with_optuna(
    name="bow_n_m",
    vectorizer=vectorizer,
    X_train=X_train,
    y_train=y_train_encoded,
    X_test=X_test,
    y_test=y_test_encoded,
    n_trials=N_TRIALS,
    cv_folds=CV_FOLDS,
)

### TF-IDF

In [None]:
vectorizer = TfidfVectorizer(
    max_features=MAX_FEATURES,
    max_df=MAX_DF,
    min_df=MIN_DF,
)

X_train = vectorizer.fit_transform(X_train_src)
X_test = vectorizer.transform(X_test_src)

train_catboost_with_optuna(
    name="tfidf",
    vectorizer=vectorizer,
    X_train=X_train,
    y_train=y_train_encoded,
    X_test=X_test,
    y_test=y_test_encoded,
    n_trials=N_TRIALS,
    cv_folds=CV_FOLDS,
)

In [None]:
print_results()

### TF-IDF + n-grams

In [None]:
vectorizer = TfidfVectorizer(
    ngram_range=(1, 2),
    max_features=MAX_FEATURES,
    max_df=MAX_DF,
    min_df=MIN_DF,
)

X_train = vectorizer.fit_transform(X_train_src)
X_test = vectorizer.transform(X_test_src)

train_catboost_with_optuna(
    name="tfidf_n",
    vectorizer=vectorizer,
    X_train=X_train,
    y_train=y_train_encoded,
    X_test=X_test,
    y_test=y_test_encoded,
    n_trials=N_TRIALS,
    cv_folds=CV_FOLDS,
)

### TF-IDF + m-skip-n-grams

In [None]:
class SkipGramVectorizerTf(TfidfVectorizer):
    def build_analyzer(self):
        preprocess = self.build_preprocessor()
        tokenize = self.build_tokenizer()
        return lambda doc: self._word_skip_grams(
            compose(
                tokenize,
                preprocess,
                self.decode,
            )(doc)
        )

    def _word_skip_grams(self, tokens):
        return compose(cmap(" ".join), pluck([0, 2]), sliding_window(3))(tokens)


vectorizer = SkipGramVectorizerTf(
    max_features=MAX_FEATURES,
    max_df=MAX_DF,
    min_df=MIN_DF,
)

X_train = vectorizer.fit_transform(X_train_src)
X_test = vectorizer.transform(X_test_src)

train_catboost_with_optuna(
    name="tfidf_n_m",
    vectorizer=vectorizer,
    X_train=X_train,
    y_train=y_train_encoded,
    X_test=X_test,
    y_test=y_test_encoded,
    n_trials=N_TRIALS,
    cv_folds=CV_FOLDS,
)

In [None]:
print_results()

### Генерация искусственных данных и балансировка классов

Выберите лучшее представление данных, опираясь на метрику. Попробуйте сбалансировать классы с помощью весов (параметр catboost), если выше этого не делали. Попробуйте сгенерировать новые данные для классов, в которых меньше всего объектов. Генерация представляет собой семплирование токенов из всего множества токенов определенного класса. Обучите модель на новом датасете, сравните качество.

1. признаки предложений (длина предложений, наличие орф.знаков, количество предложений в тексте)
2. обработка опечаток (расстояние между словом нормальным и словом с опечаткой)
3. склеивание с частицей не.

In [None]:
best_catboost_model_name = None
mx_score = -np.inf

for key in RESULT.keys():
    if key == "svc":
        continue

    if RESULT[key]["accuracy_test"] > mx_score:
        mx_score = RESULT[key]["accuracy_test"]
        best_catboost_model_name = key

best_catboost_model_name

In [None]:
# балансировка

class_weights = compute_class_weight(
    "balanced",
    classes=np.unique(y_train_encoded),
    y=y_train_encoded,
)
class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}

params = RESULT[best_catboost_model_name]["params"] | {
    "class_weights": class_weights_dict,
}

model = CatBoostClassifier(**params)

model.fit(RESULT[best_catboost_model_name]["X_train"], y_train_encoded, silent=True)

y_train_pred = model.predict(RESULT[best_catboost_model_name]["X_train"])
y_test_pred = model.predict(RESULT[best_catboost_model_name]["X_test"])

cl_rep_train = classification_report(y_train_encoded, y_train_pred, output_dict=True)
cl_rep_test = classification_report(y_test_encoded, y_test_pred, output_dict=True)

RESULT["balanced"] = {
    "accuracy_train": cl_rep_train["accuracy"],
    "accuracy_test": cl_rep_test["accuracy"],
    "model": model,
    "params": params,
    "X_train": RESULT[best_catboost_model_name]["X_train"],
    "X_test": RESULT[best_catboost_model_name]["X_test"],
    "y_train": y_train_encoded,
    "y_test": y_test_encoded,
    "vectorizer": vectorizer,
}

print(
    f"name: balanced, train: {cl_rep_train["accuracy"]}, test: {cl_rep_test["accuracy"]}"
)

In [None]:
# генерация искусственных данных

class_counts = df["category"].value_counts()
mean_count = class_counts.mean()
underrepresented = class_counts[class_counts < mean_count]

mean_token_len = defaultdict(list)
class_tokens = defaultdict(list)

for category, text in zip(df["category"], df["processed_text"]):
    tokens = text.split()
    mean_token_len[category].append(len(tokens))
    class_tokens[category].extend(tokens)

synthetic_texts = []
synthetic_labels = []

for category in underrepresented.index:
    samples_to_generate = int(mean_count - class_counts[category])
    tokens_pool = class_tokens[category]

    avg_len = int(sum(mean_token_len[category]) / len(mean_token_len[category]))

    print(category, avg_len)

    for _ in range(samples_to_generate):
        num_tokens = random.randint(avg_len - 2, avg_len + 2)
        synthetic_text = " ".join(random.choices(tokens_pool, k=num_tokens))
        synthetic_texts.append(process_text(synthetic_text))
        synthetic_labels.append(category)

print()
print(len(synthetic_texts), len(synthetic_labels), "\n")
print(synthetic_texts[:5], synthetic_labels[:5])

In [None]:
df

In [None]:
def get_text_features(text):
    sentences = text.split(".")
    return {
        "token_count": len(text.split()),
        "avg_token_len": np.mean([len(token) for token in text.split()]),
        "sentence_length": np.mean([len(s.split()) for s in sentences if s.strip()]),
        # "punctuation_count": sum(1 for c in text if c in string.punctuation), не нужно, так как знаки чистятся
    }


df_augmented = pd.DataFrame(
    {
        "text": list(df["processed_text"]) + synthetic_texts,
        "category": list(df["category"]) + synthetic_labels,
    }
)

text_features = pd.DataFrame([get_text_features(text) for text in df_augmented["text"]])

df_augmented = pd.concat([df_augmented, text_features], axis=1)

df_augmented

In [None]:
vectorizer = RESULT[best_catboost_model_name]["vectorizer"]

X_train_aug, X_test_aug, y_train_aug, y_test_aug = train_test_split(
    df_augmented.drop("category", axis=1),
    df_augmented["category"],
    random_state=SEED,
    stratify=df_augmented["category"],
)

y_train_aug_encoded = label_encoder.fit_transform(y_train_aug)
y_test_aug_encoded = label_encoder.transform(y_test_aug)

X_train = pd.concat(
    [
        pd.DataFrame(vectorizer.fit_transform(X_train_aug["text"]).A).reset_index(
            drop=True
        ),
        pd.DataFrame(X_train_aug.drop("text", axis=1).values.tolist()),
    ],
    axis=1,
    ignore_index=True,
).values

X_test = pd.concat(
    [
        pd.DataFrame(vectorizer.transform(X_test_aug["text"]).A).reset_index(drop=True),
        pd.DataFrame(X_test_aug.drop("text", axis=1).values.tolist()),
    ],
    axis=1,
    ignore_index=True,
).values

train_catboost_with_optuna(
    name="augmented",
    vectorizer=vectorizer,
    X_train=X_train,
    y_train=y_train_aug_encoded,
    X_test=X_test,
    y_test=y_test_aug_encoded,
    n_trials=N_TRIALS,
    cv_folds=CV_FOLDS,
)

In [None]:
print_results()

### Понижение размерности

- уменьшите размерность векторов с помощью PCA, посмотрите, улучшается ли качество
- попробуйте несколько вариантов понижения размерности: от 90% до 50%
от изначального размера вектора.

In [49]:
def evaluate_with_pca(
    X_train_vec,
    X_test_vec,
    dim_reduction_ratios=[0.9, 0.7, 0.5],
):
    results = {}
    n_features = X_train_vec.shape[1]

    for ratio in dim_reduction_ratios:
        n_components = int(n_features * ratio)
        vectorizer = PCA(n_components=n_components)

        X_train = vectorizer.fit_transform(X_train_vec)
        X_test = vectorizer.transform(X_test_vec)

        train_catboost_with_optuna(
            name=f"{best_catboost_model_name}_pca_{ratio}",
            vectorizer=vectorizer,
            X_train=X_train,
            y_train=y_train_encoded,
            X_test=X_test,
            y_test=y_test_encoded,
            n_trials=N_TRIALS,
            cv_folds=CV_FOLDS,
        )

    return results

In [None]:
evaluate_with_pca(
    RESULT[best_catboost_model_name]["X_train"],
    RESULT[best_catboost_model_name]["X_test"],
)

In [None]:
print_results()

## 5. Интерпретация результатов

https://shap.readthedocs.io/en/latest/example_notebooks/tabular_examples/tree_based_models/Catboost%20tutorial.html

In [51]:
# используя туториал выше, интерпретируйте результаты.
# Определите, какие слова влияют в каждом классе больше остальных.

In [None]:
shap.initjs()

In [None]:
explainer = shap.Explainer(RESULT[best_catboost_model_name]["model"], seed=SEED)
sv = explainer(RESULT[best_catboost_model_name]["X_test"])

feature_names = (
    vectorizer.get_feature_names_out()
    if hasattr(vectorizer, "get_feature_names_out")
    else vectorizer.get_feature_names()
)

sv.feature_names = feature_names

for cat_id in np.unique(y_test_encoded):
    plt.figure()
    sp.beeswarm(sv[:, :, cat_id], max_display=10, show=False)
    plt.title(f"Category: {label_encoder.classes_[cat_id]}")
    plt.tight_layout()
    plt.show()

### Визуализация результатов

In [None]:
# С помощью методов понижения размерности T-SNE И U-MAP взгляните
# на получившиеся векторные представления данных

tsne = TSNE(n_components=2, init="random")
X_tsne = tsne.fit_transform(RESULT[best_catboost_model_name]["X_test"])

umap = UMAP(n_components=2)
X_umap = umap.fit_transform(RESULT[best_catboost_model_name]["X_test"])

plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
scatter = plt.scatter(
    X_tsne[:, 0],
    X_tsne[:, 1],
    c=y_test_encoded,
)
plt.title("t-SNE Visualization")
plt.colorbar(scatter)

plt.subplot(1, 2, 2)
scatter = plt.scatter(
    X_umap[:, 0],
    X_umap[:, 1],
    c=y_test_encoded,
)
plt.title("UMAP Visualization")
plt.colorbar(scatter)

plt.tight_layout()
plt.show()

In [None]:
end_dt = datetime.now()

end_dt - start_dt