In [None]:
import torch
import time
import copy
import numpy as np
from torch import nn
import pyconll
from sklearn.feature_extraction.text import CountVectorizer

***
<font size=5>
Get the train and test data. Tokenize it into character tokens. Vectorize it.
</font>

***

In [None]:
'''import wget
out = './data/ru_syntagrus-ud-train.conllu'
url = "https://raw.githubusercontent.com/UniversalDependencies/UD_Russian-SynTagRus/master/ru_syntagrus-ud-train-a.conllu"
wget.download(url, out)
out = './data/ru_syntagrus-ud-test.conllu'
url = "https://raw.githubusercontent.com/UniversalDependencies/UD_Russian-SynTagRus/master/ru_syntagrus-ud-train-b.conllu"
wget.download(url, out)'''

In [None]:
data_dir = './data/'

In [None]:
train_data = pyconll.load_from_file(data_dir+'ru_syntagrus-ud-train.conllu')
test_data = pyconll.load_from_file(data_dir+'ru_syntagrus-ud-test.conllu')

In [None]:
print(' '.join([tok.form for tok in train_data[1]]))

In [None]:
MAX_TOKEN_LEN = max(len(tok.form) for sent in train_data for tok in sent)
MAX_SENT_LEN = max(len(sent) for sent in train_data)
print(f"The longest sentence has {MAX_SENT_LEN} tokens")
print(f"The longest token has {MAX_TOKEN_LEN} characters")

In [None]:
train_texts = [' '.join(tok.form for tok in sent).lower() for sent in train_data]
test_texts = [' '.join(tok.form for tok in sent).lower() for sent in test_data]

In [None]:
vect = CountVectorizer(lowercase=False, analyzer = 'char')
vect.fit_transform(train_texts)
#Insert pad word into vocabulary, it's more convenient if pad word has value 0
#hence swap it with whatever token has value 0
last_keyval = len(vect.vocabulary_)
for zerok, v in vect.vocabulary_.items():
    if v == 0:
        break
vect.vocabulary_['<PAD>'] = 0
vect.vocabulary_[zerok] = last_keyval
print(f"Vocabulary has {len(vect.vocabulary_)} unique tokens")

In [None]:
TAGS = sorted({token.upos for sent in train_data for token in sent if token.upos})
#move "tag unknown"('X') to to front so that it has zero id
TAGS = [TAGS[-1]] + TAGS[:-1]
label2id = {label:id for id, label in enumerate(TAGS)}
print(f"There are total of {len(TAGS)} unique tags")

In [None]:
def vectorize(data, token_vocab, label_vocab, max_sentence_len, max_token_len):
    #Vectorize sentences
    #add one column before and after token to mark beginning and end of the token
    data_tensor = torch.zeros((len(data), max_sentence_len, max_token_len+2), dtype=torch.long)
    labels_tensor = torch.zeros((len(data), max_sentence_len), dtype=torch.long)

    for i, sent in enumerate(data):
        for j, tok in enumerate(sent[:max_sentence_len]):
            for k, ch in enumerate(tok.form.lower()[:max_token_len]):
                data_tensor[i, j, k+1] = token_vocab.get(ch, 0)
            labels_tensor[i, j] = label_vocab.get(tok.upos, label_vocab['X'])
    return data_tensor, labels_tensor

train_tensor, train_labels_tensor = vectorize(train_data, 
                                              vect.vocabulary_, 
                                              label2id, 
                                              MAX_SENT_LEN, 
                                              MAX_TOKEN_LEN)
test_tensor, test_labels_tensor = vectorize(test_data, 
                                            vect.vocabulary_, 
                                            label2id, 
                                            MAX_SENT_LEN, 
                                            MAX_TOKEN_LEN )

In [None]:
#Pack it into a dataset so that we can feed it in batches to the model
train_dataset = torch.utils.data.TensorDataset(train_tensor, train_labels_tensor)
test_dataset = torch.utils.data.TensorDataset(test_tensor, test_labels_tensor)

***
<font size=5>
    Model architecture
</font>

***


In [None]:
#A stack of 1D convolution layers
class StackedConv1d(torch.nn.Module):
    def __init__(self, num_features, num_layers=1, kernel_size=3, dropout_probab=0.5):
        super().__init__()
        layers = []
        for _ in range(num_layers):
            layers.append(nn.Sequential(nn.Conv1d(num_features, num_features, kernel_size, padding=kernel_size//2), 
                                        nn.Dropout(dropout_probab), 
                                        nn.LeakyReLU()))
        self.layers = nn.ModuleList(layers)
    
    def forward(self, x):
        for layer in self.layers:
            x = x + layer(x)
        return x

#POS tagger net that predicts POS of separate tokens without considering context
class TokenPOSTaggerNet(nn.Module):
    def __init__(self, vocab_size, num_labels, emb_size=32, **kwargs):
        super().__init__()
        self.char_embeddings = nn.Embedding(vocab_size, emb_size, padding_idx=0)
        self.backbone = StackedConv1d(emb_size, **kwargs)
        self.global_pooling = nn.AdaptiveMaxPool1d(1)
        self.out = nn.Linear(emb_size, num_labels)
        self.num_labels = num_labels
    
    def forward(self, tokens):
        batch_sz, sent_len, token_len = tokens.shape
        #Collapse it into 2D Matrix (BatchSize * MAX_SENT_LEN) x  MAX_TOKEN_LEN so that we could 
        #feed it into embeddings
        flat_view = tokens.view(batch_sz*sent_len, token_len)
        
        #Get initial char embeddings (BatchSize * MAX_SENT_LEN) x  MAX_TOKEN_LEN X EmbSize
        emb = self.char_embeddings(flat_view)
        #To pass it into neural network the order of dimentions should be: 
        #         NUM_SAMPLES X NUM_FEATURES X ...(other dimensions)...
        #   Hence we need to change the dim order in data to 
        #   (BatchSize * MAX_SENT_LEN) x  EmbSize x MAX_TOKEN_LEN
        emb = emb.permute(0, 2, 1)
        
        #Pass it through the convolution layers
        features = self.backbone(emb) 
        #Use Max Pooling to transform character embeddings of a token into a token embedding
        token_features = self.global_pooling(features).squeeze(-1) #(BatchSize * MAX_SENT_LEN) x  EmbSize
        
        #predict token labels
        pred = self.out(token_features) #(BatchSize * MAX_SENT_LEN) x  NumLabels
        #reshape it back into sentences
        pred = pred.view(batch_sz, sent_len, self.num_labels)
        #transpose the output so that the dimensions correspond to what is expected 
        #in the loss function
        pred = pred.permute(0, 2, 1)
        return pred

***
Train the model
***

In [None]:
def train_model(model, 
                train_dataset, 
                test_dataset, 
                loss_fun=nn.functional.cross_entropy,
                lr=5e-3,
                num_epoch=10,
                batch_sz=64, 
                dev='cuda'):
    
    device = dev if torch.cuda.is_available() else 'cpu'
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, verbose=True)    
    data_loader_train = torch.utils.data.DataLoader(train_dataset, 
                                              batch_size=batch_sz, shuffle=True, drop_last=True)
    data_loader_val = torch.utils.data.DataLoader(test_dataset, 
                                                 batch_size=batch_sz, shuffle=True, drop_last=True)
    model.to(device)
    best_loss = float('inf')
    best_model = None
    for epoch in range(num_epoch):
        st = time.perf_counter()
        model.train()
        for samples, labels in data_loader_train:
            samples, labels = samples.to(device), labels.to(device)
            pred = model.forward(samples)
            loss_val = loss_fun(pred, labels)
            model.zero_grad()
            loss_val.backward()
            opt.step()
        
        model.eval()    
        with torch.no_grad():
            nb = 0
            mean_loss = 0
            for samples, labels in data_loader_val:
                nb += 1
                samples, labels = samples.to(device), labels.to(device)
                pred = model.forward(samples)
                mean_loss += float(loss_fun(pred, labels))
            mean_loss = mean_loss/nb
            print(f"Epoch {epoch} loss {mean_loss}, time {time.perf_counter()-st}.")
            if mean_loss < best_loss:
              best_model = copy.deep_copy(model)
        scheduler.step(mean_loss)
    return best_loss, best_model
    

In [None]:
best_tagger = TokenPOSTaggerNet(len(vect.vocabulary_), len(label2id), num_layers=3)
print('Number of params in the model: ', sum(np.product(t.shape) for t in best_tagger.parameters()))

In [None]:
best_loss, best_tagger = train_model(best_tagger, train_dataset, test_dataset)