In [2]:
# Данный ноутбук использовал окружение google-colab
%pip install catboost fasttext -q

# Домашнее задание "NLP. Часть 1"

In [3]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel

In [4]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [5]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [6]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

In [7]:
vocab_index

{'and': 0,
 'are': 1,
 'brown': 2,
 'dog': 3,
 'dogs': 4,
 'fox': 5,
 'foxes': 6,
 'jump': 7,
 'jumps': 8,
 'lazy': 9,
 'never': 10,
 'over': 11,
 'quick': 12,
 'quickly': 13,
 'the': 14}

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [50]:
def one_hot_vectorization(text: str, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[int]:

    text_list = normalize_pretokenize_text(text)
    result=[]
    for word in text_list:
      binary_word = np.zeros(len(vocab))
      if word in vocab_index:
        binary_word[vocab_index[word]]=1
        result.append(binary_word)
    return result

def test_one_hot_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)
        print(result)
        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        #if len(result) != expected_length:
        for binary_word in result:
          if len(binary_word)!=expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for i, word in enumerate(words_in_text):
            if word in vocab_index:
                idx = vocab_index[word]
                if result[i][idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [51]:
assert test_one_hot_vectorization(test_corpus, vocab, vocab_index)

[array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.]), array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0.]), array([0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]), array([0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.])]
One-Hot-Vectors test PASSED


## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

In [52]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    text_list = normalize_pretokenize_text(text)
    result={}
    for word in text_list:
      if word in result:
        result[word] += 1
      else:
        result[word] = 1

    return result


def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)
        print(result)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [53]:
assert test_bag_of_words_vectorization()

{'the': 2, 'quick': 1, 'brown': 3}
Bad-of-Words test PASSED


## Задание 3 (0.5 балла)
Реализовать TF-IDF

In [54]:
def all_idf(corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None)->Dict[str, float]:
  idf_all = {}
  for word in vocab:
    word_count_all=0
    for document in corpus:
      doc = normalize_pretokenize_text(document)
      word_count_in_doc = doc.count(word)
      word_count_all += word_count_in_doc>0

    if word_count_all == 0:
      idf = 0
    else:
      idf = np.log(len(corpus) / word_count_all)
    idf_all[word] = idf
  return idf_all

def tf_idf_vectorization_optim(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None, idf_all:Dict[str, float]= None) -> Tuple[List[float], Dict[str, float]]:
  text_list = normalize_pretokenize_text(text)
  result = []
  if idf_all is None:
    idf_all = all_idf(corpus, vocab, vocab_index)
  for word in vocab:
    tf = text_list.count(word)/len(text_list)
    idf = idf_all[word]
    result.append(tf*idf)
  return result, idf_all


In [55]:
def tf_idf_vectorization(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[float]:
  text_list = normalize_pretokenize_text(text)
  result = []
  for word in vocab:
    word_count_all=0
    tf = text_list.count(word)/len(text_list)
    for document in corpus:
      doc = normalize_pretokenize_text(document)
      word_count_in_doc = doc.count(word)
      word_count_all += word_count_in_doc>0
    idf = np.log(len(corpus)/word_count_all)
    result.append(tf*idf)
  return result
def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
  try:
    text = "the quick brown"
    result = tf_idf_vectorization(text, corpus, vocab, vocab_index)
    print(result)
    if not isinstance(result, list):
      return False
    expected_length = len(vocab)
    if len(result) != expected_length:
      return False
    for val in result:
      if not isinstance(val, float):
        return False
    print("TF-IDF test PASSED")
    return True
  except Exception as e:
    print(f"TF-IDF test FAILED: {e}")
    return False

In [56]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

[np.float64(0.0), np.float64(0.0), np.float64(0.13515503603605478), np.float64(0.0), np.float64(0.0), np.float64(0.0), np.float64(0.0), np.float64(0.0), np.float64(0.0), np.float64(0.0), np.float64(0.0), np.float64(0.0), np.float64(0.13515503603605478), np.float64(0.0), np.float64(0.13515503603605478)]
TF-IDF test PASSED


## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information
$$PPMI(word, context) = max(0, PMI(word, context))$$
$$PMI(word, context) = log \frac{P(word, context)}{P(word) P(context)} = log \frac{N(word, context)|(word, context)|}{N(word) N(context)}$$
где $N(word, context)$ -- число вхождений слова $word$ в окно $context$ (размер окна -- гиперпараметр)

In [57]:
def ppmi_vectorization_optim(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2,
    co_matrix: np.ndarray = None,
    total_count: int = 0,
    individual_count: Dict[int, int] = None
) -> List[float]:

    rows, cols = co_matrix.shape
    ppmi_matrix = np.zeros((rows, cols))

    for i in range(rows):
        for j in range(cols):
            p_ij = co_matrix[i, j] / total_count
            if p_ij == 0:
                continue
            p_i = individual_count[i] / total_count
            p_j = individual_count[j] / total_count
            if p_i > 0 and p_j > 0:
                pmi = np.log2(p_ij / (p_i * p_j))
                ppmi_matrix[i, j] = max(0, pmi)

    text_list = normalize_pretokenize_text(text)
    text_vector = np.zeros(len(vocab))
    valid_words = 0
    for word in text_list:
      if word in vocab_index:
        idx = vocab_index[word]
        text_vector += ppmi_matrix[idx]
        valid_words += 1

    if valid_words > 0:
      text_vector /= valid_words
    return text_vector.tolist()

In [58]:
def get_co_occurrence_matrix(
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2
) -> Tuple[np.ndarray, int, Dict[int, int]]:
    co_occurrence = np.zeros((len(vocab), len(vocab)))
    total_count = 0
    individual_count = {}

    for document in corpus:
      doc_list = normalize_pretokenize_text(document)
      for idx, word in enumerate(doc_list):
        if word not in vocab_index:
          continue
        start_window = max(0, idx-window_size)
        end_window = min(idx+window_size+1, len(doc_list))
        for i in range(start_window, end_window):
          if doc_list[i] in vocab_index and doc_list[i]!=word:
            co_occurrence[vocab_index[word]][vocab_index[doc_list[i]]] +=1
        total_count+=1
        if vocab_index[word] in individual_count:
          individual_count[vocab_index[word]]+=1
        else:
          individual_count[vocab_index[word]] = 1

    return co_occurrence, total_count, individual_count

def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2
) -> List[float]:
    co_matrix, total_count, individual_count = get_co_occurrence_matrix(corpus, vocab, vocab_index, window_size)

    rows, cols = co_matrix.shape
    ppmi_matrix = np.zeros((rows, cols))

    for i in range(rows):
        for j in range(cols):
            p_ij = co_matrix[i, j] / total_count
            if p_ij == 0:
                continue
            p_i = individual_count[i] / total_count
            p_j = individual_count[j] / total_count
            if p_i > 0 and p_j > 0:
                pmi = np.log2(p_ij / (p_i * p_j))
                ppmi_matrix[i, j] = max(0, pmi)

    text_list = normalize_pretokenize_text(text)
    text_vector = np.zeros(len(vocab))
    for word in text_list:
        idx = vocab_index[word]
        text_vector += ppmi_matrix[idx]

    text_vector /= len(text_list)
    return text_vector.tolist()



def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)
        print(result)
        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [59]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

[1.1949875002403854, 1.723308333814104, 2.0566416671474372, 0.0, 1.1949875002403854, 2.3899750004807707, 2.3899750004807707, 0.0, 2.723308333814104, 0.0, 0.0, 1.1949875002403854, 2.0566416671474372, 0.0, 1.3333333333333333]
PPMI test PASSED


In [60]:
vocab_index

{'and': 0,
 'are': 1,
 'brown': 2,
 'dog': 3,
 'dogs': 4,
 'fox': 5,
 'foxes': 6,
 'jump': 7,
 'jumps': 8,
 'lazy': 9,
 'never': 10,
 'over': 11,
 'quick': 12,
 'quickly': 13,
 'the': 14}

## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

In [20]:
!pip install gensim -q

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m27.9/27.9 MB[0m [31m26.7 MB/s[0m eta [36m0:00:00[0m
[?25h

https://www.geeksforgeeks.org/nlp/word-embeddings-using-fasttext/

In [62]:
from gensim.models import FastText
import numpy as np

def train_fasttext_model(texts, vector_size=100, window_size=2, min_count=1, workers=4):
    tokenized_texts = [normalize_pretokenize_text(text) for text in texts]

    model = FastText(
        sentences=tokenized_texts,
        vector_size=vector_size,
        window=window_size,
        min_count=min_count,
        workers=workers,
        sg=1,
        epochs=10
    )

    model.save("fasttext.model")
    return model
def get_fasttext_model(sample_size=50):
  dataset = datasets.load_dataset("imdb", split="train").shuffle(seed=42).select(range(sample_size))
  train_texts = [item['text'] for item in dataset]
  fasttext_model = train_fasttext_model(train_texts)
get_fasttext_model(500)

In [63]:
def get_fasttext_embeddings(
    text: str,
    model_path: str = "fasttext.model"
) -> List[np.ndarray]:

    try:
       model = FastText.load(model_path)
    except Exception as e:
      return []

    words = normalize_pretokenize_text(text)
    embeddings = []

    for word in words:
        embedding = model.wv[word]
        embeddings.append(embedding)

    return embeddings

Берт

In [64]:
def get_bert_embeddings(
    text: str,
    model_name: str = 'bert-base-uncased',
    pool_method: str = 'cls'
) -> np.ndarray:
    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = BertModel.from_pretrained(model_name)

    with torch.no_grad():
        inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
        outputs = model(**inputs)
        last_hidden_states = outputs.last_hidden_state

        if pool_method == 'cls':
            embeddings = last_hidden_states[:, 0, :]

        return embeddings.squeeze(0).detach().numpy()

## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

In [68]:
def vectorize_dataset(
    dataset_name: str = "imdb",
    vectorizer_type: str = "bow",
    split: str = "train",
    sample_size: int = 50,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> Tuple[Any, List, List]:

    dataset = datasets.load_dataset(dataset_name, split=split)

    if sample_size:
        dataset = dataset.shuffle(seed=42).select(range(min(sample_size, len(dataset))))

    texts = [item['text'] for item in dataset if 'text' in item and item['text'].strip()]
    labels = [item['label'] for item in dataset if 'label' in item]
    if split == "train" :
        def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
            all_words = []
            for text in texts:
                words = normalize_pretokenize_text(text)
                all_words.extend(words)
            vocab = sorted(set(all_words))
            vocab_index = {word: idx for idx, word in enumerate(vocab)}
            return vocab, vocab_index

        vocab, vocab_index = build_vocab(texts)
        print(f"Размерчик чловаря 0_о: {len(vocab)}")

    elif split == "test" and vocab is None:
        raise ValueError("Для теста должен быть передан словарь (0-0)")

    vectorized_data = []
    idf_all = None
    if vectorizer_type == "tfidf":
        idf_all = all_idf(texts, vocab, vocab_index)
    if vectorizer_type == "ppmi":
      co_matrix, total_count, individual_count = get_co_occurrence_matrix(texts, vocab, vocab_index)

    for text in texts:
        if vectorizer_type == "one_hot":
            word_vectors = one_hot_vectorization(text, vocab, vocab_index)
            if word_vectors:
                avg_vector = np.mean(word_vectors, axis=0)
                vectorized_data.append(avg_vector.tolist())
            else:
                vectorized_data.append([0] * len(vocab))
        elif vectorizer_type == "bow":
            bow_dict = bag_of_words_vectorization(text)
            vector = [bow_dict.get(word, 0) for word in vocab]
            vectorized_data.append(vector)
        elif vectorizer_type == "tfidf":
            vector, _= tf_idf_vectorization_optim(text, texts, vocab, vocab_index, idf_all)
            vectorized_data.append(vector)
        elif vectorizer_type == "ppmi":
            vectorized_data.append(ppmi_vectorization_optim(text, texts, vocab, vocab_index, 2, co_matrix, total_count, individual_count))
        elif vectorizer_type == "fasttext":
            embeddings = get_fasttext_embeddings(text)
            if embeddings:
                avg_embedding = np.mean(embeddings, axis=0)
                vectorized_data.append(avg_embedding.tolist())
            else:
                vectorized_data.append([0] * 300)
        elif vectorizer_type == "bert":
            embedding = get_bert_embeddings(text)
            vectorized_data.append(embedding.tolist())
        else:
            raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")
    return vocab, vectorized_data, labels

In [69]:
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold
def train(
    embeddings_method="bow",
    test_size=0.2,
    val_size=0.2,
    cv_folds=5
):
    print(f"\n{embeddings_method.upper()}")
    print('dataset')
    vocab, X_train, y_train = vectorize_dataset("imdb", embeddings_method, "train")
    _, X_test, y_test = vectorize_dataset("imdb", embeddings_method, "test", vocab=vocab, vocab_index={word: idx for idx, word in enumerate(vocab)})

    X_train = np.array(X_train)
    y_train = np.array(y_train)
    X_test = np.array(X_test)
    y_test = np.array(y_test)

    model = CatBoostClassifier(
        iterations=50,
        learning_rate=0.1,
        depth=6,
        random_seed=42,
        verbose=False
    )
    print('fit')
    model.fit(X_train, y_train, verbose=False)
    print('cross')

    kf = KFold(n_splits=cv_folds, shuffle=True, random_state=42)
    cv_scores = cross_val_score(model, X_train, y_train, cv=kf, scoring='accuracy')

    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)


    print(f"Cross-validation Accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test F1-score: {f1:.4f}")

    return model, accuracy, f1

In [70]:
for embeddings_method in ["bow", "one_hot", "tfidf", "ppmi", "fasttext", "bert"]:
    train(embeddings_method=embeddings_method)


BOW
dataset
Размерчик чловаря 0_о: 3128
fit
cross
Cross-validation Accuracy: 0.6800 (+/- 0.0800)
Test Accuracy: 0.6400
Test F1-score: 0.6400

ONE_HOT
dataset
Размерчик чловаря 0_о: 3128
fit
cross
Cross-validation Accuracy: 0.8000 (+/- 0.1265)
Test Accuracy: 0.6200
Test F1-score: 0.5581

TFIDF
dataset
Размерчик чловаря 0_о: 3128
fit
cross
Cross-validation Accuracy: 0.6400 (+/- 0.0980)
Test Accuracy: 0.6400
Test F1-score: 0.5909

PPMI
dataset
Размерчик чловаря 0_о: 3128
fit
cross
Cross-validation Accuracy: 0.5800 (+/- 0.1497)
Test Accuracy: 0.4800
Test F1-score: 0.2353

FASTTEXT
dataset
Размерчик чловаря 0_о: 3128
fit
cross
Cross-validation Accuracy: 0.7000 (+/- 0.1265)
Test Accuracy: 0.4800
Test F1-score: 0.2353

BERT
dataset
Размерчик чловаря 0_о: 3128
fit
cross
Cross-validation Accuracy: 0.7800 (+/- 0.1960)
Test Accuracy: 0.7600
Test F1-score: 0.7143
