Необходимые импорты

In [4]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.optim import Adam

from tqdm import tqdm

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, balanced_accuracy_score, f1_score

import re
import pandas as pd

import transformers
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers.onnx import FeaturesManager

from prettytable import PrettyTable

  from .autonotebook import tqdm as notebook_tqdm


Импорт класса для обработки текста

In [3]:
from text_preproccessor import TextPreproccessor

Структура данных для датасета

In [19]:
class DataSet(torch.utils.data.Dataset):
    def __init__(self, texts, labels, tokenizer_path="cointegrated/rubert-tiny2"):
        super(DataSet, self).__init__()
        
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)

        self.texts = texts
        self.labels = labels

        self.text_preprocessor = TextPreproccessor()

    def tokenize(self, text: str):
        t = self.tokenizer(text, padding='max_length', truncation=True, return_tensors='pt')

        input_ids = t['input_ids']
        token_type_ids = t['token_type_ids']
        attention_mask = t['attention_mask']
        
        return input_ids.squeeze(), token_type_ids.squeeze(), attention_mask.squeeze()

    def __getitem__(self, idx: int):
        text = self.texts[idx]
        label = self.labels[idx]

        text = self.text_preprocessor.preproccess_text(text)

        input_ids, token_type_ids, attention_mask = self.tokenize(text)

        return input_ids, token_type_ids, attention_mask, label
    
    def __len__(self):
        return len(self.texts)



TinyBert Классификатор

In [6]:
class TinyBertClassificator(nn.Module):
    def __init__(self, num_labels=1, dropout=0.25, embedding_size=312, model="cointegrated/rubert-tiny2"):
        super(TinyBertClassificator, self).__init__()

        self.bert_model = AutoModelForSequenceClassification.from_pretrained(model, num_labels=num_labels)
        
        self.dropout = nn.Dropout(dropout)

        self.linear = nn.Linear(embedding_size, num_labels)

        self.softmax = nn.Softmax()

        self.reLU = nn.ReLU()
    
    def forward(self, input_ids, token_type_ids, attention_mask):
        output = self.bert_model(input_ids=input_ids, attention_mask=attention_mask, return_dict=False)[0]

        return output


Обучение сети

In [7]:
def train(model: TinyBertClassificator, train_dataloader: DataLoader, loss_f, optimizer, batch_size=4, train=True):
    model.train(train)

    train_loss = 0

    for i, batch in enumerate(tqdm(train_dataloader)):
        input_ids, token_type_ids, attention_mask, labels = (t.cpu() for t in batch)

        output = model(input_ids, token_type_ids, attention_mask)

        batch_loss = loss_f(output, labels)

        if train:
            optimizer.zero_grad()  # обнуляем градиенты
            batch_loss.backward()  # вычисляем градиенты
            optimizer.step()  # подправляем параметры

        train_loss = batch_loss.item()
    
    return train_loss / len(train_dataloader)


def test(model: TinyBertClassificator, test_dataloader: DataLoader, loss_f, batch_size=4):
    total_loss_test = 0
    predicts = []
    y_true = []

    with torch.no_grad():
        for batch in tqdm(test_dataloader):
            input_ids, token_type_ids, attention_mask, labels = (t.cpu() for t in batch)

            output = model(input_ids, token_type_ids, attention_mask)

            predicts.extend(output.argmax(-1).tolist())
            y_true.extend(labels.tolist())

            batch_loss = loss_f(output, labels)
            total_loss_test += batch_loss.item()

    total_loss_test = total_loss_test / len(test_dataloader)

    accuracy = accuracy_score(y_true, predicts)
    balanced_accuracy = balanced_accuracy_score(y_true, predicts)
    f1 = f1_score(y_true, predicts, average='weighted')

    metrics = {"Accuracy": accuracy,
            "Balanced_accuracy": balanced_accuracy,
            "F1-score": f1,
            "Test loss": total_loss_test }

    return metrics


In [8]:
def get_texts_from_excel_file(file_path: str) -> pd.DataFrame:
    return pd.read_excel(file_path).dropna()

Получение данных из файла

In [9]:
TEXTS_FILEPATH = 'texts.xlsx'

texts_df = get_texts_from_excel_file(TEXTS_FILEPATH)


Подготовка данных

In [10]:
class_counts = texts_df['name'].value_counts()

#Считаем встречаемость классов в датасете
name2count = {name:count for name,count in zip(class_counts.index, class_counts.values)}

In [11]:
#Добавляем в табличку столбец вхождений класса
texts_df['class_count'] = texts_df['name'].apply(lambda x: name2count[x])

In [12]:
data = texts_df[texts_df['class_count'] > 1][['description', 'name']]

In [13]:
name2class_id = {name: idx for idx, name in enumerate(data['name'].unique())}
#Добавляем в табличку столбец с индексами классов
data['class_id'] = data['name'].apply(lambda x: name2class_id[x])
data.drop(['name'], axis=1, inplace=True)

In [14]:
X = data['description'].values
y = data['class_id'].values
#Делим датасет на тренировочную и тестовую выборки, учитывая распределение классов
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)


In [15]:
batch_size = 4                              #Размер батча
epochs = 40                                 #Итерации обучения
n_classes = len(data['class_id'].unique())  #Количество классов в датасете

In [16]:
#Создаем объекты тренировочного и тестового датасетов
train_dataset = DataSet(X_train, y_train)
test_dataset = DataSet(X_test, y_test)

#Оборачиваем в dataloader-ы
train_dataloader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=batch_size)


Downloading: 100%|██████████| 401/401 [00:00<00:00, 91.6kB/s]
Downloading: 100%|██████████| 1.03M/1.03M [00:01<00:00, 732kB/s] 
Downloading: 100%|██████████| 1.66M/1.66M [00:02<00:00, 606kB/s] 
Downloading: 100%|██████████| 112/112 [00:00<00:00, 29.2kB/s]


In [17]:
model = TinyBertClassificator(n_classes).cpu()
criterion = nn.CrossEntropyLoss().cpu()
optimizer = Adam(model.parameters(), lr=0.5)


Downloading: 100%|██████████| 715/715 [00:00<00:00, 285kB/s]
Downloading: 100%|██████████| 112M/112M [00:11<00:00, 9.88MB/s] 
Some weights of the model checkpoint at cointegrated/rubert-tiny2 were not used when initializing BertForSequenceClassification: ['cls.seq_relationship.bias', 'cls.predictions.decoder.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSeq

In [49]:
table = PrettyTable(["Accuracy", "Balanced_accuracy", "F1-score", "Test loss"])

for epoch in range(epochs):
    train_loss = train(model, train_dataloader, criterion,
                       optimizer, batch_size=batch_size)
    metrics = test(model, test_dataloader, criterion, batch_size=batch_size)
    
    table.add_row([metrics["Accuracy"], metrics["Balanced_accuracy"], metrics["F1-score"], metrics["Test loss"]])

print(table)

100%|██████████| 1886/1886 [9:05:33<00:00, 17.36s/it]    
100%|██████████| 472/472 [44:21<00:00,  5.64s/it]
 48%|████▊     | 897/1886 [4:14:35<4:40:41, 17.03s/it]


KeyboardInterrupt: 

In [18]:
table

NameError: name 'table' is not defined