In [None]:
# Данный ноутбук использовал окружение google-colab
%pip install catboost fasttext -q

# Домашнее задание "NLP. Часть 1"

In [None]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel

In [None]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [None]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [None]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [None]:
def one_hot_vectorization(
    text: str,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> List[int]:
    tokens = normalize_pretokenize_text(text)
    embedding = np.zeros(len(vocab))
    for word in tokens:
        if word in vocab:
            embedding[vocab_index[word]] = 1

    return embedding.tolist()

def test_one_hot_vectorization(
    vocab: List[str],
    vocab_index: Dict[str, int]
) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for word in words_in_text:
            if word in vocab_index:
                idx = vocab_index[word]
                if result[idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [None]:
assert test_one_hot_vectorization(vocab, vocab_index)

One-Hot-Vectors test PASSED


## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

In [None]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    tokens = normalize_pretokenize_text(text)
    counter = Counter(tokens)
    return counter

def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [None]:
assert test_bag_of_words_vectorization()

Bad-of-Words test PASSED


## Задание 3 (0.5 балла)
Реализовать TF-IDF

In [None]:
def tf_idf_vectorization(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[float]:
    tokens = normalize_pretokenize_text(text)
    tf_counter = Counter(tokens)
    tfs = [tf_counter.get(word, 0) / len(tokens) for word in vocab]

    idf_counter = Counter()
    for doc in corpus:
        doc_tokens = set(normalize_pretokenize_text(doc))
        idf_counter.update(doc_tokens)
    idfs = [math.log(len(corpus) / (idf_counter.get(word, 0) + 1e-9)) for word in vocab]
    return [tf * idf for tf, idf in zip(tfs, idfs)]

def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [None]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

TF-IDF test PASSED


## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information
$$PPMI(word, context) = max(0, PMI(word, context))$$
$$PMI(word, context) = log \frac{P(word, context)}{P(word) P(context)} = log \frac{N(word, context)|(word, context)|}{N(word) N(context)}$$
где $N(word, context)$ -- число вхождений слова $word$ в окно $context$ (размер окна -- гиперпараметр)

In [None]:
from collections import defaultdict
from typing import List, Dict
import numpy as np
from scipy.sparse import coo_matrix, csr_matrix

def ppmi_vectorization(text: str, corpus: List[str], vocab: List[str], vocab_index: Dict[str, int], window_size: int = 2) -> np.ndarray:
    V = len(vocab)
    if not corpus or not vocab or not vocab_index:
        return np.zeros(V, dtype=np.float32)

    rows, cols, data = [], [], []
    for doc in corpus:
        tokens = normalize_pretokenize_text(doc)
        n = len(tokens)
        for i, center_word in enumerate(tokens):
            if center_word not in vocab_index:
                continue
            center_idx = vocab_index[center_word]
            start = max(0, i - window_size)
            end = min(n, i + window_size + 1)
            for j in range(start, end):
                if i == j:
                    continue
                context_word = tokens[j]
                if context_word not in vocab_index:
                    continue
                context_idx = vocab_index[context_word]
                rows.append(center_idx)
                cols.append(context_idx)
                data.append(1.0)

    if not data:
        return np.zeros(V, dtype=np.float32)

    cooc_mat = coo_matrix((data, (rows, cols)), shape=(V, V), dtype=np.float32).tocsr()

    total = cooc_mat.sum()
    if total == 0:
        return np.zeros(V, dtype=np.float32)

    p_w = np.array(cooc_mat.sum(axis=1)).flatten() / total
    p_c = np.array(cooc_mat.sum(axis=0)).flatten() / total

    cooc_mat = cooc_mat.tocoo()
    p_wc = cooc_mat.data / total
    denom = p_w[cooc_mat.row] * p_c[cooc_mat.col] + 1e-12
    ppmi_data = np.maximum(0, np.log2(p_wc / denom))
    ppmi_mat = csr_matrix((ppmi_data, (cooc_mat.row, cooc_mat.col)), shape=(V, V), dtype=np.float32)

    tokens = normalize_pretokenize_text(text)
    valid_indices = [vocab_index[w] for w in tokens if w in vocab_index]
    if not valid_indices:
        return np.zeros(V, dtype=np.float32)

    vector = ppmi_mat[valid_indices].mean(axis=0).A1
    return vector.astype(np.float32).tolist()

def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [None]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

PPMI test PASSED


## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

In [None]:
# !wget https://dl.fbaipublicfiles.com/fasttext/vectors-wiki/wiki.en.zip
# !unzip wiki.en.zip

In [None]:
model = fasttext.load_model('wiki.en.bin')

def get_fasttext_embeddings(text: str, model_path: str = None, model: any = model) -> List[np.ndarray]:
    tokens = normalize_pretokenize_text(text)
    embeddings = []
    for word in tokens:
        emb = model.get_word_vector(word)
        embeddings.append(emb)
    return embeddings

In [None]:
get_fasttext_embeddings("  my friends")

[array([-1.26858622e-01,  1.52882934e-01,  1.49032772e-01,  3.92690450e-02,
        -1.30524158e-01, -3.80693674e-02, -1.62300408e-01, -2.76634097e-03,
         1.21185914e-01,  1.42024562e-01,  4.11207974e-01, -2.44587511e-01,
        -9.64097083e-02,  2.38146663e-01,  8.28057975e-02, -1.26340926e-01,
         6.91213086e-02,  8.52295905e-02, -7.36847520e-04,  7.16888234e-02,
        -8.18991661e-02,  1.00517511e-01, -1.16089121e-01, -7.65723884e-02,
        -1.08819604e-02,  9.11623016e-02, -1.83051862e-02,  8.34907368e-02,
        -2.89088726e-01,  2.27737129e-01, -7.89996535e-02,  3.03535312e-01,
        -1.69834763e-01,  1.47010624e-01, -5.41614830e-01, -1.04691982e-02,
        -1.70235902e-01, -1.41128808e-01, -1.41191676e-01, -3.36246461e-01,
         2.03713253e-01, -1.27543211e-02, -9.01129246e-02,  6.05605021e-02,
         2.20914185e-01,  1.35715351e-01,  2.11866438e-01,  1.99898273e-01,
        -3.12961221e-01, -1.15871273e-01, -8.84860605e-02, -5.90643644e-01,
        -2.5

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

def get_bert_embeddings(
    text: str,
    model_name: str = 'bert-base-uncased',
    pool_method: str = 'cls'
) -> np.ndarray:
    # tokenizer = BertTokenizer.from_pretrained(model_name)
    # model = BertModel.from_pretrained(model_name)

    tokens = tokenizer(text, return_tensors='pt', max_length=512)

    with torch.no_grad():
        outputs = model(**tokens)

    return outputs['pooler_output'][0].detach().numpy()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

In [None]:
from datasets import concatenate_datasets
class Vectorizer:
    def __init__(self):
        self.vocab = None
        self.vocab_index = None

    def vectorize_dataset(
        self,
        dataset_name: str = "imdb",
        vectorizer_type: str = "bow",
        split: str = "train",
        sample_size: int = 100
    ) -> Tuple[Any, List, List]:

        dataset = datasets.load_dataset(dataset_name, split=split)
        class_0 = dataset.filter(lambda x: x['label'] == 0).shuffle(42).select(range(sample_size//2))
        class_1 = dataset.filter(lambda x: x['label'] == 1).shuffle(42).select(range(sample_size//2))
        dataset = concatenate_datasets([class_0, class_1]).shuffle(42)

        texts = [item['text'] for item in dataset if 'text' in item and item['text'].strip()]
        labels = [item['label'] for item in dataset if 'label' in item]

        def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
            all_words = []
            for text in texts:
                words = normalize_pretokenize_text(text)
                all_words.extend(words)
            vocab = sorted(set(all_words))
            vocab_index = {word: idx for idx, word in enumerate(vocab)}
            return vocab, vocab_index

        if split == "train":
            self.vocab, self.vocab_index = build_vocab(texts)

        if split == "test" and self.vocab is None:
            raise ValueError("Can not vectorize test data before train")

        vectorized_data = []
        for text in texts:
            if vectorizer_type == "one_hot":
                vectorized_data.append(one_hot_vectorization(text, self.vocab, self.vocab_index))
            elif vectorizer_type == "bow":
                bow_dict = bag_of_words_vectorization(text)
                vector = [bow_dict.get(word, 0) for word in self.vocab]
                vectorized_data.append(vector)
            elif vectorizer_type == "tfidf":
                vectorized_data.append(tf_idf_vectorization(text, texts, self.vocab, self.vocab_index))
            elif vectorizer_type == "ppmi":
                vectorized_data.append(ppmi_vectorization(text, texts, self.vocab, self.vocab_index))
            elif vectorizer_type == "fasttext":
                embeddings = get_fasttext_embeddings(text)
                if embeddings:
                    avg_embedding = np.mean(embeddings, axis=0)
                    vectorized_data.append(avg_embedding.tolist())
                else:
                    vectorized_data.append([0] * 300)
            elif vectorizer_type == "bert":
                embedding = get_bert_embeddings(text)
                vectorized_data.append(embedding.tolist())
            else:
                raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")
        return vocab, vectorized_data, labels

In [None]:
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold

vectorizer = Vectorizer()

def train(
    embeddings_method="bow",
    val_size=0.2,
    cv_folds=5
):

    vocab, X, y = vectorizer.vectorize_dataset("imdb", embeddings_method, "train")
    _, X_test, y_test = vectorizer.vectorize_dataset("imdb", embeddings_method, "test")

    X_train, X_val, y_train, y_val = train_test_split(
        X, y,
        test_size=val_size,
        shuffle=True,
        stratify=y
    )

    model = CatBoostClassifier(
        iterations=50,
        learning_rate=0.1,
        random_seed=42,
        verbose=False
    )

    model.fit(X_train, y_train, eval_set=(X_val, y_val), early_stopping_rounds=50, verbose=False)
    y_pred = model.predict(X_test)
    print(f"<==========> Embedding method: {embeddings_method} <==========>")
    print(classification_report(y_test, y_pred))

    cv_scores = cross_val_score(
        model,
        X, y,
        cv=cv_folds,
        scoring='f1'
    )
    print(f"Mean CV F1: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}\n\n\n")


In [None]:
for embeddings_method in ["bow", "one_hot", "tfidf", "ppmi", "fasttext", "bert"]:
    train(embeddings_method=embeddings_method)

              precision    recall  f1-score   support

           0       0.61      0.54      0.57        50
           1       0.59      0.66      0.62        50

    accuracy                           0.60       100
   macro avg       0.60      0.60      0.60       100
weighted avg       0.60      0.60      0.60       100

Mean CV F1: 0.6070 ± 0.1117



              precision    recall  f1-score   support

           0       0.67      0.60      0.63        50
           1       0.64      0.70      0.67        50

    accuracy                           0.65       100
   macro avg       0.65      0.65      0.65       100
weighted avg       0.65      0.65      0.65       100

Mean CV F1: 0.5855 ± 0.1064



              precision    recall  f1-score   support

           0       0.64      0.76      0.70        50
           1       0.71      0.58      0.64        50

    accuracy                           0.67       100
   macro avg       0.68      0.67      0.67       100
weighted avg

Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.


              precision    recall  f1-score   support

           0       0.58      0.60      0.59        50
           1       0.58      0.56      0.57        50

    accuracy                           0.58       100
   macro avg       0.58      0.58      0.58       100
weighted avg       0.58      0.58      0.58       100

Mean CV F1: 0.6412 ± 0.1229



