In [2]:
import os
import random
from copy import deepcopy
from typing import Tuple

import torch
from rich.progress import track
from torch import Tensor
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchmetrics import F1Score
from transformers import (
    AutoModelForCausalLM,
    AutoModelForSequenceClassification,
    AutoTokenizer,
    DataCollatorForLanguageModeling,
    DataCollatorWithPadding,
    get_scheduler,
)

from datasets import load_dataset

In [3]:
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
os.environ["PYTHONHASHSEED"] = str(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)

In [4]:
DATASET_NAME = "rotten_tomatoes"
CHECKPOINT_NAME = "lvwerra/distilbert-imdb"

## Скачаем датасет, сделаем предобработку

Будем использовать датасет rotten-tomatoes с отзывами на фильмы. В датасете два класса  - негативный и позитивный отзыв

In [5]:
dataset = load_dataset(DATASET_NAME)

Downloading builder script:   0%|          | 0.00/5.03k [00:00<?, ?B/s]

Downloading metadata:   0%|          | 0.00/2.02k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/7.25k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/488k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/8530 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/1066 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1066 [00:00<?, ? examples/s]

Токенизируем датасет, используя BertTokenizer

In [6]:
tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT_NAME)

Downloading (…)okenizer_config.json:   0%|          | 0.00/333 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

In [7]:
def tokenize(dataset):
    return tokenizer(dataset["text"], truncation=True)


tokenized_dataset = dataset.map(tokenize, batched=True)

Map:   0%|          | 0/8530 [00:00<?, ? examples/s]

Map:   0%|          | 0/1066 [00:00<?, ? examples/s]

Map:   0%|          | 0/1066 [00:00<?, ? examples/s]

In [8]:
tokenized_dataset

DatasetDict({
    train: Dataset({
        features: ['text', 'label', 'input_ids', 'attention_mask'],
        num_rows: 8530
    })
    validation: Dataset({
        features: ['text', 'label', 'input_ids', 'attention_mask'],
        num_rows: 1066
    })
    test: Dataset({
        features: ['text', 'label', 'input_ids', 'attention_mask'],
        num_rows: 1066
    })
})

Удалим лишние поля, чтобы по памяти не мешались

In [9]:
tokenized_dataset = tokenized_dataset.remove_columns(["text"])
tokenized_dataset = tokenized_dataset.rename_column("label", "labels")
tokenized_dataset.set_format("torch")

##  Обучаем классификатор текстов моделью DistilBert, предобученной на датасете imdb

In [10]:
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
num_classes = len(set(tokenized_dataset["train"]["labels"].tolist()))
batch_size = 64

Делаем даталоадеры

In [11]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer, return_tensors="pt")
dataloaders = {
    "train": DataLoader(
        tokenized_dataset["train"],
        shuffle=True,
        batch_size=batch_size,
        collate_fn=data_collator,
    ),
    "validation": DataLoader(
        tokenized_dataset["validation"], batch_size=batch_size, collate_fn=data_collator
    ),
    "test": DataLoader(
        tokenized_dataset["test"], batch_size=batch_size, collate_fn=data_collator
    ),
}

In [12]:
def train_model(
    model, lr_scheduler, optimizer, metric, num_epochs=25
) -> Tuple[list[float], list[float], float]:
    metrics = []
    best_metric = 0.0
    for epoch in track(range(num_epochs), total=num_epochs):
        for phase in ["train", "validation"]:
            torch.cuda.empty_cache()
            if phase == "train":
                model.train()
            else:
                model.eval()

            for batch in dataloaders[phase]:
                optimizer.zero_grad()
                batch = {k: v.to(device) for k, v in batch.items()}
                outputs = model(**batch)
                loss = outputs.loss

                if phase == "train":
                    loss.backward()
                    optimizer.step()
                    lr_scheduler.step()
                if phase == "validation":
                    logits = outputs.logits
                    metric.update(torch.argmax(logits, -1), batch["labels"])
        epoch_metric = metric.compute()
        best_metric = epoch_metric if epoch_metric > best_metric else best_metric
        metrics.append(float(epoch_metric))
        print(
            f"Epoch: {epoch}, Metric: {metric.__class__.__name__}: {epoch_metric:.4f}"
        )
    return metrics, best_metric

Инициализируем модель

In [13]:
model = AutoModelForSequenceClassification.from_pretrained(CHECKPOINT_NAME).to(device)

Downloading (…)lve/main/config.json:   0%|          | 0.00/735 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/268M [00:00<?, ?B/s]

Перед тем, как обучать модель, нужно проверить качество на исходной модели. Будем оценивать качество по F1 мере

In [14]:
def test_model(model, metric):
    model.eval()
    for batch in dataloaders["test"]:
        torch.cuda.empty_cache()
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        logits = outputs.logits
        metric.update(torch.argmax(logits, -1), batch["labels"])
    test_metric = metric.compute()
    return test_metric

In [15]:
f1_score_fn_test = F1Score(task="binary", num_classes=num_classes).to(device)
print(f"Test F1 metric before training: {test_model(model, f1_score_fn_test):.4f}")

You're using a DistilBertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


Test F1 metric before training: 0.8285


In [16]:
optimizer = Adam(model.parameters(), lr=1e-5)
num_epochs = 10
f1_score_fn = F1Score(task="binary", num_classes=num_classes).to(device)
num_training_steps = num_epochs * len(dataloaders["train"])
lr_scheduler = get_scheduler(
    name="linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=num_training_steps,
)
val_metrics, best_metric = train_model(
    model, lr_scheduler, optimizer, f1_score_fn, num_epochs
)

Output()

In [17]:
f1_score_fn_test = F1Score(task="binary", num_classes=num_classes).to(device)
print(f"Test F1 metric after training: {test_model(model, f1_score_fn_test):.4f}")

Test F1 metric after training: 0.8414


Вывод: качество чуть-чуть увеличилось, успех