In [1]:
!pip install -q polyglot
!pip install -q conllu

In [2]:
import torch
from torch import nn
from torch import optim
import numpy as np
from polyglot.mapping import Embedding
from conllu import parse
from tqdm import tqdm
from collections import Counter

In [3]:
from google.colab import drive
drive.mount('/content/drive')
%cd "/content/drive/MyDrive/DL for NLP project/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/DL for NLP project


In [4]:
embeds = Embedding.from_glove('polyglot/en.polyglot.txt')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
with open("languages/en/en-ud-train.conllu") as file:
    train_data = parse(file.read())
with open("languages/en/en-ud-dev.conllu") as file:
    dev_data = parse(file.read())
with open("languages/en/en-ud-test.conllu") as file:
    test_data = parse(file.read())

In [6]:
def build_indexes(data):
    w2i = {}
    c2i = {}
    l2i = {}
    w2i["_UNK"] = 0
    c2i["_UNK"] = 0
    c2i["<w>"] = 1
    c2i["</w>"] = 2
    for sentence in data:
        for token in sentence:
            word = token['form'].lower()
            if word not in w2i:
                w2i[word] = len(w2i)
            for character in list(word):
                if character not in c2i:
                    c2i[character] = len(c2i)
            if token['upos'] not in l2i:
                l2i[token['upos']] = len(l2i)
    return w2i, c2i, l2i
w2i, c2i, l2i = build_indexes(train_data)

In [7]:
def build_embedding_matrix(w2i, embeds):
    embedding_matrix = torch.FloatTensor(size=(len(w2i), 64))
    for word, index in w2i.items():
        embedding = embeds.get(word.lower())
        if embedding is not None:
            embedding_matrix[index] = torch.FloatTensor(embedding)
        else:
            embedding_matrix[index] = torch.rand((1,64))
    return embedding_matrix
embedding_matrix = build_embedding_matrix(w2i, embeds)

In [8]:
freqbin = {word : int(np.log(frequency)) for word, frequency in Counter([token['form'].lower() for sentence in train_data for token in sentence]).items()}

In [9]:
class Character_Encoder(nn.Module):

    def __init__(self, vocab_size):
        super(Character_Encoder, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=100)
        self.bilstm = nn.LSTM(input_size=100, hidden_size=100, num_layers=1, bidirectional=True)

    def forward(self, chars:torch.tensor): # chars = (N_CHARS,)
        embedded = self.embedding(chars)
        _, (final_hidden, _) = self.bilstm(embedded.view(len(chars), 1, 100))
        return final_hidden.view(-1) # (2*HIDDEN,)

In [17]:
class POS_Tagger(nn.Module):

    def __init__(self, embedding_matrix, vocab_size, freq_max):
        super(POS_Tagger, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(embedding_matrix).requires_grad_(True)
        self.characterbased = Character_Encoder(vocab_size).to(device)
        self.bilstm = nn.LSTM(input_size=264, hidden_size=100, num_layers=1, bidirectional=True)
        self.pos_tagger = nn.Linear(in_features=200, out_features=17)
        self.freqbin = nn.Linear(in_features=200, out_features=freq_max)

    def forward(self, tokens:torch.tensor, char_lists:list): # tokens = (N_TOKENS, 64), char_lists = List[List[int]]
        embedded = self.embedding(tokens)
        concatted = torch.zeros((len(embedded), 264), device=device) # concatted = (N_TOKENS, 264)
        for i, char_list in enumerate(char_lists):
            encoded_token = self.characterbased(torch.tensor(char_list, device=device))
            concatted[i] = torch.concat((embedded[i], encoded_token))
        bilstm_out, _ = self.bilstm(concatted.view(len(embedded), 1, 264))
        pos_tags = self.pos_tagger(bilstm_out.view(len(embedded), 200))
        freq = self.freqbin(bilstm_out.view(len(embedded), 200))
        return pos_tags, freq

In [22]:
def tensorize_data(sentence):
    tokens_list = []
    char_lists = []
    pos_tag_list = []
    freq_list = []
    for token in sentence:
        word = token['form'].lower()
        tokens_list.append(w2i[word] if word in w2i else 0)
        char_list = [c2i['<w>']]
        for char in word:
            char_list.append(c2i[char] if char in c2i else 0)
        char_list.append(c2i['</w>'])
        char_lists.append(char_list)
        pos_tag_list.append(l2i[token['upos']])
        freq_list.append(freqbin[word] if word in freqbin else 0)
    tokens = torch.LongTensor(tokens_list).to(device)
    pos_gold = torch.LongTensor(pos_tag_list).to(device)
    freq_gold = torch.LongTensor(freq_list).to(device)
    return tokens, char_lists, pos_gold, freq_gold

In [19]:
def eval(model, data):
    model.eval()
    with torch.no_grad():
        accuracy = 0
        n_tokens = 0
        for sentence in tqdm(data, desc="Evaluation"):
            tokens, char_lists, golden, _ = tensorize_data(sentence)
            pred, _ = model(tokens, char_lists)
            pred_label = torch.argmax(pred, dim=1)
            accuracy += torch.sum(pred_label == golden)
            n_tokens += len(tokens)
    return accuracy/n_tokens

In [20]:
n_epochs = 20
report_every = 1
learning_rate = 0.1
variance = 0.2 #TODO######################################################################################

model = POS_Tagger(embedding_matrix, len(c2i), max(freqbin.values())+1).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate)

for epoch in range(n_epochs):
    total_loss = 0
    model.train()
    for sentence in tqdm(train_data, desc="Training  "):
        optimizer.zero_grad()
        tokens, char_lists, pos_gold, freq_gold = tensorize_data(sentence)
        pos_pred, freq_pred = model(tokens, char_lists)
        loss = criterion(pos_pred, pos_gold) + criterion(freq_pred, freq_gold)
        total_loss += loss
        loss.backward()
        optimizer.step()

    # Testing    
    if ((epoch + 1) % report_every) == 0:
        train_accuracy = eval(model, train_data)
        dev_accuracy = eval(model, dev_data)
        print('epoch: %d, loss: %.4f, train acc: %.2f, dev acc: %.2f' % (epoch, total_loss/len(train_data), train_accuracy, dev_accuracy))

Training  : 100%|██████████| 12543/12543 [03:10<00:00, 66.00it/s]
Evaluation: 100%|██████████| 12543/12543 [01:06<00:00, 188.02it/s]
Evaluation:   0%|          | 1/2002 [00:00<00:09, 204.78it/s]


KeyError: ignored

In [25]:
test_accuracy = eval(model, test_data)
print(f"\nTest accuracy: {test_accuracy}")

Evaluation: 100%|██████████| 2077/2077 [00:08<00:00, 242.15it/s]


Test accuracy: 0.905841588973999



