# 1. Подготовка данных

In [1]:
# Импортируем библиотеки
import torch
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import f1_score

In [2]:
# Прочитаем файлы
def read_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
        sentences = []
        sentence = []
        for line in lines:
            line = line.strip()
            if not line:
                if sentence:
                    sentences.append(sentence)
                    sentence = []
            else:
                word, tag = line.split()
                sentence.append((word, tag))
        if sentence:
            sentences.append(sentence)
    return sentences

train_sentences = read_file('train.txt')
dev_sentences = read_file('dev.txt')
test_sentences = read_file('test.txt')

In [3]:
train_sentences[0]

[('"', 'O'),
 ('Если', 'O'),
 ('Миронов', 'B-PER'),
 ('занял', 'O'),
 ('столь', 'O'),
 ('оппозиционную', 'O'),
 ('позицию', 'O'),
 (',', 'O'),
 ('то', 'O'),
 ('мне', 'O'),
 ('представляется', 'O'),
 (',', 'O'),
 ('что', 'O'),
 ('для', 'O'),
 ('него', 'O'),
 ('было', 'O'),
 ('бы', 'O'),
 ('порядочным', 'O'),
 ('и', 'O'),
 ('правильным', 'O'),
 ('уйти', 'O'),
 ('в', 'O'),
 ('отставку', 'O'),
 ('с', 'O'),
 ('занимаемого', 'O'),
 ('им', 'O'),
 ('поста', 'O'),
 (',', 'O'),
 ('поста', 'O'),
 (',', 'O'),
 ('который', 'O'),
 ('предоставлен', 'O'),
 ('ему', 'O'),
 ('сегодня', 'O'),
 ('"', 'O'),
 ('Единой', 'B-ORG'),
 ('Россией', 'I-ORG'),
 ("''", 'O'),
 ('и', 'O'),
 ('никем', 'O'),
 ('больше', 'O'),
 ("''", 'O'),
 (',', 'O'),
 ('-', 'O'),
 ('заключает', 'O'),
 ('Исаев', 'B-PER'),
 ('.', 'O')]

In [4]:
# Построим словари
word2idx = {"<PAD>": 0, "<UNK>": 1}
tag2idx = {"<PAD>": 0}
for sentence in train_sentences:
    for word, tag in sentence:
        if word not in word2idx:
            word2idx[word] = len(word2idx)
        if tag not in tag2idx:
            tag2idx[tag] = len(tag2idx)

idx2tag = {idx: tag for tag, idx in tag2idx.items()}

In [5]:
idx2tag

{0: '<PAD>',
 1: 'O',
 2: 'B-PER',
 3: 'B-ORG',
 4: 'I-ORG',
 5: 'B-LOC',
 6: 'I-LOC',
 7: 'I-PER'}

In [6]:
# Конвертируем слова в индексы
def convert_to_ids(sentences, word2idx, tag2idx):
    sentences_ids = []
    for sentence in sentences:
        words_ids = [word2idx.get(word, word2idx["<UNK>"]) for word, _ in sentence]
        tags_ids = [tag2idx[tag] for _, tag in sentence]
        sentences_ids.append((words_ids, tags_ids))
    return sentences_ids

train_data_ids = convert_to_ids(train_sentences, word2idx, tag2idx)
dev_data_ids = convert_to_ids(dev_sentences, word2idx, tag2idx)
test_data_ids = convert_to_ids(test_sentences, word2idx, tag2idx)

In [7]:
# Создание класса датасета для загрузки данных и преобразование входных данных в тензоры

class NERDataset(Dataset):
    def __init__(self, sentences_ids):
        self.sentences_ids = sentences_ids

    def __len__(self):
        return len(self.sentences_ids)

    def __getitem__(self, idx):
        return torch.tensor(self.sentences_ids[idx][0]), torch.tensor(self.sentences_ids[idx][1])

def collate_fn(batch):
    sentences, tags = zip(*batch)
    sentences_padded = pad_sequence(sentences, batch_first=True, padding_value=word2idx["<PAD>"])
    tags_padded = pad_sequence(tags, batch_first=True, padding_value=tag2idx["<PAD>"])
    return sentences_padded, tags_padded

train_dataset = NERDataset(train_data_ids)
dev_dataset = NERDataset(dev_data_ids)
test_dataset = NERDataset(test_data_ids)

BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
dev_loader = DataLoader(dev_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

# Выбор модели

In [8]:
# В качестве модели выберем BiLSTM
# BiLSTM является одной из разновидностей рекуррентных сетей. LSTM (Long Short-Term Memory) — это специализированная версия RNN, которая более устойчива к проблемам с градиентами,
# обычно возникающими при обучении стандартных RNN. BiLSTM (Bidirectional LSTM) — это LSTM, которая обрабатывает данные в двух направлениях (вперед и назад),
# что позволяет улавливать контекстную информацию лучше, чем обычная однонаправленная LSTM.

import torch.nn as nn

class BiLSTMNER(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate):
        super(BiLSTMNER, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, text):
        embedded = self.dropout(self.embedding(text))
        rnn_out, _ = self.rnn(embedded)
        logits = self.fc(self.dropout(rnn_out))
        return logits

In [9]:
# Подбор гиперпараметров
EMBEDDING_DIM = 100
HIDDEN_DIM = 128
OUTPUT_DIM = len(tag2idx)
DROPOUT = 0.5
N_EPOCHS = 10
LEARNING_RATE = 0.001

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = BiLSTMNER(len(word2idx), EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, DROPOUT).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index=tag2idx["<PAD>"])

# Для уменьшения lr во время обучения
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

# 3. Обучение модели

In [10]:
# Определим функции обучения и валидации
def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for words, tags in iterator:
        if isinstance(words, list) or isinstance(tags, list):
            raise TypeError(f"words type: {type(words)}, tags type: {type(tags)}")
        words, tags = words.to(device), tags.to(device)
        predictions = model(words)
        predictions_dim = predictions.shape[-1]
        predictions = predictions.view(-1, predictions_dim)
        tags = tags.view(-1)
        loss = criterion(predictions, tags)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for words, tags in iterator:
            predictions = model(words)
            predictions_dim = predictions.shape[-1]
            predictions = predictions.view(-1, predictions_dim)
            tags = tags.view(-1)
            loss = criterion(predictions, tags)
            epoch_loss += loss.item()
    return epoch_loss / len(iterator)

In [11]:
# Обучение
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion)
    valid_loss = evaluate(model, dev_loader, criterion)
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best-model.pt')
    scheduler.step(valid_loss)

# 4. Тестирование модели

In [12]:
model.load_state_dict(torch.load('best-model.pt'))

def get_predictions(model, iterator):
    model.eval()
    all_predictions = []
    all_tags = []
    with torch.no_grad():
        for words, tags in iterator:
            predictions = model(words)
            _, top_class = predictions.topk(1, dim=2)
            all_predictions.extend(top_class.view(-1).tolist())
            all_tags.extend(tags.view(-1).tolist())
    return all_predictions, all_tags

predictions, labels = get_predictions(model, test_loader)
# Уберем <PAD> теги
predictions = [p for p, l in zip(predictions, labels) if l != tag2idx["<PAD>"]]
labels = [l for l in labels if l != tag2idx["<PAD>"]]

f1 = f1_score(labels, predictions, average='weighted')
print(f"F1 Score: {f1:.3f}")

F1 Score: 0.931


# 5. Вывод

С помощью подбора гиперпараметров нам удалось постоить отличную модель с показателем F1 Score: 0.931. Таким образом применение модели BiLSTM является наилучшим