Потренируемся самостоятельно писать многослойный перцептрон для работы с текстами.

Возьмем для этого датасет про юридические тексты. В этом датасете есть описания дел, а в качестве цп - то, что с делами произошло.

In [None]:
!wget https://raw.githubusercontent.com/rsuh-python/mag2022/main/CL/term02/06-Embeddings/legal_text_classification.csv

--2025-01-15 09:45:07--  https://raw.githubusercontent.com/rsuh-python/mag2022/main/CL/term02/06-Embeddings/legal_text_classification.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 68202412 (65M) [text/plain]
Saving to: ‘legal_text_classification.csv.1’


2025-01-15 09:45:09 (59.1 MB/s) - ‘legal_text_classification.csv.1’ saved [68202412/68202412]



Для начала напишем бейзлайн - логистическую регрессию. Возьмем в качестве признаков только текст - описание самого дела (case_text). Целевую переменную, очевидно, нужно превратить в чиселки (OHE).

- проверьте данные на пропуски
- проверьте баланс классов - это очень важно!
- используйте TF-IDF
- не забудьте использовать LabelEncoder
- логистической регрессии может понадобиться выставить solver='liblinear'
- если не помните, как работать с несбалансированными датасетами, просмотрите наши конспекты - точно где-то было (на худой конец документация к логрегу)

In [None]:
import pandas as pd

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score

In [None]:
data = pd.read_csv('legal_text_classification.csv')
data.head()

Unnamed: 0,case_id,case_outcome,case_title,case_text
0,Case1,cited,Alpine Hardwood (Aust) Pty Ltd v Hardys Pty Lt...,Ordinarily that discretion will be exercised s...
1,Case2,cited,Black v Lipovac [1998] FCA 699 ; (1998) 217 AL...,The general principles governing the exercise ...
2,Case3,cited,Colgate Palmolive Co v Cussons Pty Ltd (1993) ...,Ordinarily that discretion will be exercised s...
3,Case4,cited,Dais Studio Pty Ltd v Bullett Creative Pty Ltd...,The general principles governing the exercise ...
4,Case5,cited,Dr Martens Australia Pty Ltd v Figgins Holding...,The preceding general principles inform the ex...


In [None]:
# your code here
print("Пропущенные значения:\n", data.isnull().sum())
print("\nРаспределение класса:\n", data['case_outcome'].value_counts(normalize=True))
data['case_text'] = data['case_text'].fillna('')

Пропущенные значения:
 case_id           0
case_outcome      0
case_title        0
case_text       176
dtype: int64

Распределение класса:
 case_outcome
cited            0.489053
referred to      0.175465
applied          0.097979
followed         0.090294
considered       0.068521
discussed        0.040985
distinguished    0.024335
related          0.004523
affirmed         0.004523
approved         0.004323
Name: proportion, dtype: float64


In [None]:
X = data['case_text']
y = data['case_outcome']

le = LabelEncoder()
y = le.fit_transform(y)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=12345)

tfidf = TfidfVectorizer()
X_train_tfidf = tfidf.fit_transform(X_train)
X_test_tfidf = tfidf.transform(X_test)

In [None]:
lr = LogisticRegression(solver='liblinear', class_weight='balanced')
lr.fit(X_train_tfidf, y_train)

In [None]:
y_pred = lr.predict(X_test_tfidf)
print("\nClassification Report:")
print(classification_report(y_test, y_pred))


Classification Report:
              precision    recall  f1-score   support

           0       0.23      0.73      0.35        22
           1       0.27      0.26      0.27       476
           2       0.14      0.32      0.20        22
           3       0.68      0.66      0.67      2488
           4       0.28      0.30      0.29       330
           5       0.23      0.43      0.30       211
           6       0.23      0.47      0.31       111
           7       0.36      0.28      0.31       441
           8       0.50      0.36      0.42       879
           9       0.16      0.59      0.25        17

    accuracy                           0.50      4997
   macro avg       0.31      0.44      0.34      4997
weighted avg       0.52      0.50      0.50      4997



Если все сделали как я, должна получиться средняя f-score в районе 0.5.

Теперь давайте попробуем написать нейронную сетку по аналогии с тетрадкой про твиттер из прошлого семинара.

In [None]:
import numpy as np
from string import punctuation
from collections import Counter
from sklearn.utils import shuffle, class_weight
from sklearn.preprocessing import LabelEncoder

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from torch.nn.utils.rnn import pad_sequence
import torch.optim as optim



class_weight - очень полезная для нас штука. Можно вычислить веса классов автоматически с ее помощью:

In [None]:
# первый аргумент - какие веса высчитывать, второй - какие у нас классы, третий - какие их частоты
yweights = class_weight.compute_class_weight('balanced', classes=np.unique(data.case_outcome), y=data.case_outcome)

Заметьте, что возвращает оно np.array.

Нужно написать:

- функцию для предобработки текста, которая получает сырой текст и возвращает список токенов
- создать словарь word2id
- и обратный ему id2word

In [None]:
# your code here
def preprocess_text(text):
    text = text.lower()
    text = ''.join([char for char in text if char not in punctuation])
    tokens = text.split()
    return tokens

In [None]:
all_words = []
for text in data['case_text']:
    all_words.extend(preprocess_text(text))

In [None]:
word_counts = Counter(all_words)
vocab = ['<PAD>', '<UNK>'] + [word for word, count in word_counts.most_common() if count > 1]
word2id = {word: idx for idx, word in enumerate(vocab)}
id2word = {idx: word for word, idx in word2id.items()}

Лучше это все, конечно, запускать в колабе... не забудьте там выбрать T4 GPU в рантайме

In [None]:
DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
DEVICE

device(type='cpu')

Нужно написать класс для нашего датасета (можно беспощадно копипастить из тетрадки про твиттер)

In [None]:
class LegalDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]

        tokens = preprocess_text(text)
        token_ids = torch.LongTensor([word2id.get(token, word2id['<UNK>']) for token in tokens])
        # Return single label value instead of array
        return token_ids, torch.LongTensor([label])[0]

    def collate_fn(self, batch):
        texts, labels = zip(*batch)
        texts_padded = pad_sequence(texts, batch_first=True, padding_value=word2id['<PAD>'])
        # Stack labels directly without squeeze
        return texts_padded, torch.stack(labels)

In [None]:
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(data['case_outcome'])

In [None]:
X_train, X_val, y_train, y_val = train_test_split(
    data['case_text'].values,
    y,
    test_size=0.2,
    random_state=12345,
    stratify=y
)


In [None]:
yweights = torch.FloatTensor(
    class_weight.compute_class_weight('balanced',
                                    classes=np.unique(y),
                                    y=y_train)
)

In [None]:
train_dataset = LegalDataset(X_train, y_train)
val_dataset = LegalDataset(X_val, y_val)

train_sampler = RandomSampler(train_dataset)
val_sampler = SequentialSampler(val_dataset)

In [None]:
# your code here
train_iterator = DataLoader(train_dataset, collate_fn = train_dataset.collate_fn, sampler=train_sampler, batch_size=1024)

In [None]:
# your code here
val_iterator = DataLoader(val_dataset, collate_fn=val_dataset.collate_fn, sampler=val_sampler, batch_size=1024)

Ну и наконец напишем архитектуру. Модель при инициализации должна принимать размер словаря и эмбеддинга. У нас в датасете 10 классов, поэтому, в отличие от тетрадки про твиттер, нужно использовать Softmax и возвращать вероятности классов. В качестве лосса подойдет кросс-энтропия (я ее уже за вас вписала вместе с весами классов).

In [None]:
'''class MLP(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.fc1 = nn.Linear(embedding_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)  # 10 классов
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        embedded = self.embedding(x)
        pooled = torch.mean(embedded, dim=1)

        x = F.relu(self.fc1(pooled))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return F.softmax(x, dim=1)'''

'class MLP(nn.Module):\n    def __init__(self, vocab_size, embedding_dim):\n        super().__init__()\n        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)\n        self.fc1 = nn.Linear(embedding_dim, 128)\n        self.fc2 = nn.Linear(128, 64)\n        self.fc3 = nn.Linear(64, 10)  # 10 классов\n        self.dropout = nn.Dropout(0.3)\n\n    def forward(self, x):\n        embedded = self.embedding(x)\n        pooled = torch.mean(embedded, dim=1)\n\n        x = F.relu(self.fc1(pooled))\n        x = self.dropout(x)\n        x = F.relu(self.fc2(x))\n        x = self.dropout(x)\n        x = self.fc3(x)\n        return F.softmax(x, dim=1)'

In [None]:
class MLP(nn.Module):

    def __init__(self, vocab_size, embedding_dim):

        super().__init__()
        # указываем в атрибутах класса, какие слои и активации нам понадобятся
        self.embedding = nn.Embedding(vocab_size, embedding_dim) # передаем матрицы с ванхотами
        self.emb2h = nn.Linear(embedding_dim, 50)
        self.act1 = nn.ReLU()
        self.dropout = nn.Dropout(p=0.1)
        self.h2out = nn.Linear(50, 10)
        #self.act2 = nn.Softmax()  потому что потом используем кросс энтропию


    def forward(self, text): #необходимый метод,  в нем указываем, как именно связываются слои/активации между собой
        # batch_size x seq_len
        #ipdb.set_trace()
        embedded = self.embedding(text)   # переводим последовательность индексов в последовательность эмбеддингов
        # batch_size x seq_len x embedding_dim

        mean_emb = torch.mean(embedded, dim=1) # считаем средний эмбеддинг предложения  это тиипа его вектор/ усреднили эмбеддинги слов получили эмбеддинг предложения
        # batch_size x embedding_dim
        hidden = self.emb2h(mean_emb) # пропускаем эмбеддинг через полносвязный слой
        # batch_size x 10
        hidden = self.act1(hidden)
        # batch_size x 10
        hidden = self.dropout(hidden)
        # batch_size x 10
        out = self.h2out(hidden) # возвращаем одно число для каждого семпла
        # batch_size x 1
        #proba = self.act2(out) # пропускаем число через СОФТМАКС
        return out
        # batch_size x 1

In [None]:
batch, y = next(iter(train_iterator))

In [None]:
#пропустим через модель наш первый батч, чтобы проверить, что все работает
model = MLP(len(id2word), 5)
output = torch.argmax(model(batch), dim=1) # argmax из вероятностей сделает классы
output

tensor([1, 1, 1,  ..., 1, 1, 1])

Теперь нужно написать трейнлуп (лучше скопипастить откуда-нибудь), инициализировать нашу модель и запустить)

In [None]:
def train_loop(model, train_iterator, val_iterator, optimizer, criterion, num_epochs=70):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    criterion = criterion.to(device)

    best_val_loss = float('inf')

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for batch, labels in train_iterator:
            batch = batch.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            output = model(batch)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        # Валидация
        model.eval()
        val_loss = 0
        correct = 0
        total = 0

        with torch.no_grad():
            for batch, labels in val_iterator:
                batch = batch.to(device)
                labels = labels.to(device)

                output = model(batch)
                loss = criterion(output, labels)
                val_loss += loss.item()

                _, predicted = torch.max(output, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        avg_train_loss = total_loss / len(train_iterator)
        avg_val_loss = val_loss / len(val_iterator)
        accuracy = 100 * correct / total

        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}]')
            print(f'Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}')
            print(f'Validation Accuracy: {accuracy:.2f}%')

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_model.pt')

model = MLP(len(word2id), 5)
optimizer = optim.Adam(model.parameters(), lr=0.003)
criterion = nn.CrossEntropyLoss(weight=yweights)

model = model.to(DEVICE)
criterion = criterion.to(DEVICE)

train_loop(model, train_iterator, val_iterator, optimizer, criterion)

Epoch [10/100]
Train Loss: 2.3174, Val Loss: 2.3117
Validation Accuracy: 9.81%


KeyboardInterrupt: 

Скорее всего, вам понадобится учиться очень много эпох, чтобы предсказывать что-нибудь стоящее (эпох 100...), и, вероятнее всего, придется играться с архитектурой, чтобы получить приличное качество. На семинаре на эксперименты времени нет, поэтому добаловаться можно дома - и заодно попробовать подключить эмбеддинги w2v, например.

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

def evaluate_model(model, test_iterator, criterion, label_encoder):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.eval()
    model = model.to(device)

    all_predictions = []
    all_labels = []
    total_loss = 0

    with torch.no_grad():
        for batch, labels in test_iterator:
            batch = batch.to(device)
            labels = labels.to(device)

            outputs = model(batch)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            _, predicted = torch.max(outputs, 1)

            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Преобразуем числовые метки обратно в оригинальные классы
    pred_labels = label_encoder.inverse_transform(all_predictions)
    true_labels = label_encoder.inverse_transform(all_labels)

    # Выводим подробный отчет
    print("\nDetailed Classification Report:")
    print(classification_report(true_labels, pred_labels))

    # Создаем матрицу ошибок
    plt.figure(figsize=(12, 8))
    cm = confusion_matrix(true_labels, pred_labels)
    sns.heatmap(cm,
                annot=True,
                fmt='d',
                cmap='Blues',
                xticklabels=label_encoder.classes_,
                yticklabels=label_encoder.classes_)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.xticks(rotation=45)
    plt.yticks(rotation=45)
    plt.tight_layout()
    plt.show()

    # Вычисляем среднюю ошибку
    avg_loss = total_loss / len(test_iterator)
    print(f"\nAverage Loss: {avg_loss:.4f}")

    # Возвращаем предсказания и истинные метки для дальнейшего анализа если потребуется
    return pred_labels, true_labels

# Использование функции оценки:
print("Evaluating model on validation set...")
pred_labels, true_labels = evaluate_model(model, val_iterator, criterion, label_encoder)

# Дополнительный анализ по классам
class_accuracy = {}
for true, pred in zip(true_labels, pred_labels):
    if true not in class_accuracy:
        class_accuracy[true] = {'correct': 0, 'total': 0}
    class_accuracy[true]['total'] += 1
    if true == pred:
        class_accuracy[true]['correct'] += 1

print("\nAccuracy by class:")
for class_name in class_accuracy:
    accuracy = class_accuracy[class_name]['correct'] / class_accuracy[class_name]['total'] * 100
    print(f"{class_name}: {accuracy:.2f}%")