In [None]:
!wget https://github.com/sagespl/nlp-masterclass/blob/main/modu%C5%82-07/rnn_data.zip?raw=true
!mv rnn_data.zip?raw=true rnn_data.zip
!unzip rnn_data.zip

#### Import pakietów

In [None]:
import os
import numpy
import torch

DEVICE = torch.device("cuda:0")

#### Wczytywanie danych
Dzielimy dane na train, dev, i test set

In [None]:
def load_data(path):
    with open(path) as f:
        data_text = f.read()
    docs = data_text.split("\n\n")
    out = []
    for doc in docs:
        tok_list = []
        toks = doc.split("\n")
        for tok in toks:
            form, lemma, tag = tok.split("\t")
            tok_list.append((form, lemma, tag))
        out.append(tok_list)
    return out


def get_label_set(data):
    label_set = set([])
    for doc in data:
        for tok in doc:
            tag = tok[2]
            label_set.add(tag)
    label_list = sorted(list(label_set))
    return label_list

train_data = load_data(os.path.join("rnn_data", "train.tab"))
dev_data = load_data(os.path.join("rnn_data", "dev.tab"))
test_data = load_data(os.path.join("rnn_data", "test.tab"))
label_list = get_label_set(train_data) + ["PAD"]
label_to_ind = {label_list[i]:i for i in range(len(label_list))}
num_labels = len(label_list)

print("Kategorie dla klasyfikatora: ", label_list, "\n")
print("Długość zbioru treningowego (liczona w dokumentach): ", len(train_data))
print("Przykładowy dokument:")
for x in train_data[100][:10]:
    print(x)



#### Przygotowanie reprezentacji wektorowych

In [None]:
!wget http://dsmodels.nlp.ipipan.waw.pl/dsmodels/nkjp+wiki-forms-all-100-cbow-hs.txt.gz
!gunzip nkjp+wiki-forms-all-100-cbow-hs.txt.gz

from gensim.models import KeyedVectors

VEX = KeyedVectors.load_word2vec_format("nkjp+wiki-forms-all-100-cbow-hs.txt")

In [None]:
kot = VEX["kot"]
num_feats = len(kot)

print(kot)
print(type(kot), kot.dtype, kot.shape)

def w2v(form):
    try:
        return VEX[form.lower()]
    except KeyError:
        return numpy.zeros((VEX.vector_size,))

#### Definicja modelu

In [None]:
from torch import nn, cat, tanh

class ImprovedRecurrentModel(nn.Module):
    def __init__(self, input_size, state_size, output_size, dropout):
        super(ImprovedRecurrentModel, self).__init__()
        self.state_size = state_size
        self.lstm = nn.LSTM(input_size, state_size, num_layers=2, batch_first=True, bidirectional=True, dropout=dropout)
        self.classifier = nn.Linear(state_size*2, output_size)
        self.softmax = nn.Softmax(dim=2)

    def init_state(self, batch_size):
        state = torch.zeros(4, batch_size, self.state_size).to(DEVICE)
        cell = torch.zeros(4, batch_size, self.state_size).to(DEVICE)
        return state, cell

    def forward(self, input, lens):
        batch_size = input.shape[0]
        state, cell = self.init_state(batch_size)
        packed_input = nn.utils.rnn.pack_padded_sequence(input, lens, batch_first=True)
        packed_output, _ = self.lstm(packed_input, (state, cell))
        state, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        output = self.softmax(self.classifier(state))
        return output # wyjście i stan

#### Definicja funkcji pomocniczych
Definiujemy funkcje do przetwarzania dokumentów na gotowe przykłady treningowe, funkcje trenującą, oraz testującą model na jednym takim przykładzie, oraz funkcje do interpretacji wyjścia z sieci neuronowej, i tagowania zdań w fazie użytkowej.

In [None]:
from torch import tensor

PAD_VECTOR = numpy.zeros((VEX.vector_size,))
PAD_TAG = label_to_ind["PAD"]



def examples_to_batch(examples, maxlen):
    vex = []
    tags = []
    for ex in examples:
        ex_vex = [w2v(w) for w, _, _ in ex[:maxlen]]
        ex_tags = [label_to_ind[t] for _, _, t in ex[:maxlen]]
        vex.append(ex_vex)
        tags.append(ex_tags)

    len_indices = [(i, len(toks)) for i, toks in list(enumerate(vex))] 
    len_indices = sorted(len_indices, key=lambda x:x[1], reverse=True)
    vex = [vex[i[0]] for i in len_indices]
    tags = [tags[i[0]] for i in len_indices]
    lens = torch.LongTensor([len(toks) for toks in vex])
    maxlen = max(lens)
    for ex_vex, ex_tags in zip(vex, tags):
        while len(ex_vex) < maxlen:
            ex_vex.append(PAD_VECTOR)
            ex_tags.append(PAD_TAG)

    X = torch.FloatTensor(vex).to(DEVICE)
    Y = torch.LongTensor(tags).to(DEVICE)
    Y_mask = [[int(x != PAD_TAG) for x in y] for y in tags]
    Y_mask = torch.BoolTensor(Y_mask).to(DEVICE)
    return X, Y, lens, Y_mask 

def maskNLLLoss(out, target, mask):
    crossEntropy = -torch.log(torch.gather(out, 2, target.unsqueeze(2)).squeeze(2))
    loss = crossEntropy.masked_select(mask).mean()
    return loss


def train_on_batch(model, optimizer, criterion, X, Y, lens, Y_mask):
    model.train()
    optimizer.zero_grad()
    loss = 0
    output = model(X, lens)
    loss += criterion(output, Y, Y_mask)
    loss.backward()
    _ = nn.utils.clip_grad_norm_(model.parameters(), 1)
    optimizer.step()
    return loss.item()

def test_on_batch(model, criterion, X, Y, lens, Y_mask):
    model.eval()
    optimizer.zero_grad()
    loss = 0
    output = model(X, lens)
    loss += criterion(output, Y, Y_mask)
    decision = output.topk(1, dim=2).indices.squeeze()
    correct = decision == Y
    total_tokens = lens.sum()
    correct_tokens = correct.masked_select(Y_mask).sum()
    return correct_tokens, total_tokens, loss.item()


def out_to_labels(out):
    indices = out.topk(1, dim=2).indices
    tags = [[label_list[tok] for tok in seq] for seq in indices]
    return tags, indices
    

def tag_sentence(model, sent):
    model.eval()
    tokens = sent.split(" ")
    example = [(tok, "_", "xxx") for tok in tokens]
    X, Y, lens, Y_mask = examples_to_batch([example], len(tokens))
    output = model(X, lens)
    labels, _ = out_to_labels(output)
    return tokens, labels[0]

#### Trening i ewaluacja modelu

In [None]:
from tqdm.notebook import tqdm
import random


model = ImprovedRecurrentModel(100, 50, num_labels, dropout=0.5).to(DEVICE)
learning_rate = 0.01
criterion = maskNLLLoss
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
epochs = 10
BATCH_SIZE = 64
MAXLEN = 60

num_train_batches = (len(train_data) // BATCH_SIZE)+ int(len(train_data) % BATCH_SIZE>0)
num_dev_batches = (len(dev_data) // BATCH_SIZE)+ int(len(dev_data) % BATCH_SIZE>0)
num_test_batches = (len(test_data) // BATCH_SIZE)+ int(len(test_data) % BATCH_SIZE>0)

for epoch in range(epochs):
    # trening
    random.shuffle(train_data)
    total_loss = 0
    for iter in tqdm(range(num_train_batches)):
        examples = train_data[iter*BATCH_SIZE:(iter+1)*BATCH_SIZE]
        X, Y, lens, Y_mask = examples_to_batch(examples, MAXLEN)
        loss = train_on_batch(model, optimizer, criterion, X, Y, lens, Y_mask)
        total_loss += loss
    print("train_loss: ", total_loss)
    
    # ewaluacja
    dev_total = 0
    dev_correct = 0
    dev_loss = 0
    with torch.no_grad():
        for iter in tqdm(range(num_dev_batches)):
            examples = dev_data[iter*BATCH_SIZE:(iter+1)*BATCH_SIZE]
            X, Y, lens, Y_mask = examples_to_batch(examples, MAXLEN)
            correct_tokens, total_tokens, loss = test_on_batch(model, criterion, X, Y, lens, Y_mask)
            dev_loss += loss
            dev_total += total_tokens
            dev_correct += correct_tokens
        accuracy = "{:4.2f}%".format(((dev_correct/dev_total) * 100))
        print("dev acc: ", accuracy)
        print("dev loss: ", dev_loss)

# test
test_total = 0
test_correct = 0
test_loss = 0
with torch.no_grad():
    for iter in tqdm(range(num_test_batches)):
        examples = test_data[iter*BATCH_SIZE:(iter+1)*BATCH_SIZE]
        X, Y, lens, Y_mask = examples_to_batch(examples, MAXLEN)
        correct_tokens, total_tokens, loss = test_on_batch(model, criterion, X, Y, lens, Y_mask)
        test_loss += loss
        test_total += total_tokens
        test_correct += correct_tokens
    
accuracy = "{:4.2f}%".format(((test_correct/test_total) * 100))
print("test acc: ", accuracy)
print("test loss: ", test_loss)
        

#### Sprawdzenie modelu na realnych danych

In [None]:
tokens, tags = tag_sentence(model, "mamy kwiaty dla mamy")
for token, tag in zip(tokens, tags):
    print(token, tag)