In [1]:
!pip install transformers conllu datasets



In [2]:
from datasets import load_dataset , get_dataset_config_names
import torch
import torch.nn as nn
import torch.optim as optim

In [3]:
# Fetching en_ewts train, validation, test splits
train_dataset = load_dataset('universal_dependencies', 'en_ewt', split = 'train')
validation_dataset = load_dataset('universal_dependencies', 'en_ewt', split = 'validation')
test_dataset = load_dataset('universal_dependencies', 'en_ewt', split = 'test')

In [6]:
# Find total pos

total_pos = set()
total_sentences = train_dataset.num_rows

for i in range(0,total_sentences):
  for pos in train_dataset[i]['xpos']:
    total_pos.add(pos)

total_pos

{'$',
 "''",
 ',',
 '-LRB-',
 '-RRB-',
 '.',
 ':',
 'ADD',
 'AFX',
 'CC',
 'CD',
 'DT',
 'EX',
 'FW',
 'GW',
 'HYPH',
 'IN',
 'JJ',
 'JJR',
 'JJS',
 'LS',
 'MD',
 'NFP',
 'NN',
 'NNP',
 'NNPS',
 'NNS',
 None,
 'PDT',
 'POS',
 'PRP',
 'PRP$',
 'RB',
 'RBR',
 'RBS',
 'RP',
 'SYM',
 'TO',
 'UH',
 'VB',
 'VBD',
 'VBG',
 'VBN',
 'VBP',
 'VBZ',
 'WDT',
 'WP',
 'WP$',
 'WRB',
 'XX',
 '``'}

Preprocessing data

UD dataset has arond 50 xpos.For the sake of simplicity I am prepocessing data to have only 6 labels.

1. NOUN - NN
2. ADJECTIVE - JJ
3. VERB - VB
4. INTERJECTION - IN
5. ADVERB - ADV
6. OTHERS - OTHERS

In [7]:
total_sentences = train_dataset.num_rows
train_X = [row['tokens'] for row in train_dataset]
train_Y = [row['xpos'] for row in train_dataset]

def preprocess(train_Y):
  for i in range(0,len(train_Y)):
    for j in range(0, len(train_Y[i])):
      pos = train_Y[i][j]
      if(pos in ['NN', 'NNP', 'NNS']):
        pos = 'NN'
      elif(pos in ['VB', 'VBD', 'VBG']):
        pos = 'VB'
      elif(pos != 'JJ' and pos != 'IN' and pos != 'RB'):
        pos = 'OTHERS'
      train_Y[i][j] = pos
  return train_Y
  
train_Y = preprocess(train_Y)

In [8]:
# Find total pos
from collections import defaultdict

total_pos = set()
total_sentences = train_dataset.num_rows

for i in range(0,total_sentences):
  for pos in train_Y[i]:
    total_pos.add(pos)

total_pos= sorted(total_pos)
print(total_pos)

pos_index = defaultdict()
for i in range(len(total_pos)):
  pos_index [total_pos[i]] = i

print(pos_index)

['IN', 'JJ', 'NN', 'OTHERS', 'RB', 'VB']
defaultdict(None, {'IN': 0, 'JJ': 1, 'NN': 2, 'OTHERS': 3, 'RB': 4, 'VB': 5})


Featuraization

We define some static features to convert the input token as a feature
1. If the given token is a 'NOUN' then the corresponding feature is true else false {0, 1}
2. Similarly for other pos tags - 4 others
3. Transition features : 1 if given token is a NOUN and previous token is VERB.
4. Similarly for other combinations - 24 others


In [9]:
def featurize_word(word, pos, prev_pos):
  feature = [0 for i in range(42)]

  for i in range(0, len(total_pos)):
    if(pos == total_pos[i]):
      feature[i]=1

  # transition features
  for i in range(0,len(total_pos)):
    for j in range(0,len(total_pos)):
      if(total_pos[i] == pos and total_pos[j] == prev_pos):
        feature[len(total_pos)-1+(i+1)*(j+1)] = 1

  return feature

def featurize_sentence(sentence,pos):
  # Sentence is a list of tokens
  sentence_feature = [];
  for i in range(0,len(sentence)):
    if(i == 0):
      sentence_feature.append(featurize_word(sentence[i],pos[i],'OTHER'))
    else:
      sentence_feature.append(featurize_word(sentence[i],pos[i],pos[i-1]))
  return sentence_feature

def featurize_dataset(train_X, train_Y):
  dataset_features = [];
  for i in range(0, len(train_X)):
    dataset_features.append(featurize_sentence(train_X[i], train_Y[i]))
  return dataset_features

def index_labels(train_Y):
  # converting the string labels to numbers / indices
  indexed_labels = []
  for i in range(len(train_Y)):
    indexed_labels.append([pos_index[label] for label in train_Y[i]])
  return indexed_labels

In [10]:
featurized_train_X = featurize_dataset(train_X, train_Y)
indexed_train_Y = index_labels(train_Y)

Baseline model - Most frequent baseline

The baseline model is defined as follows:
1. If the word appeared previously in test dataset , assign the the most frequent label for that word in test dataset
2. For new word assign the overall most frequent label

In [22]:
from collections import defaultdict

# Overall most frequent label
def overall_most_frequent_label(train_Y):
  freq_dict = defaultdict(int)
  for i in range(0, len(train_Y)):
    for j in range(0, len(train_Y[i])):
      freq_dict[train_Y[i][j]]+=1
  frequency_sorted_list = sorted(freq_dict, key= lambda l: (-freq_dict[l], l))
  print(f'labels in the order of frequencies : {frequency_sorted_list}')
  return frequency_sorted_list[0]

overall_most_frequent_label(train_Y)


labels in the order of frequencies : ['OTHERS', 'NN', 'IN', 'VB', 'JJ', 'RB']


'OTHERS'

In [11]:
from collections import Counter

# token wise most frequent label
def tokenwise_most_frequent_label(train_X, train_Y):
  freq_dict = defaultdict(list)
  for i in range(0, len(train_X)):
    for j in range(0, len(train_X[i])):
      freq_dict[train_X[i][j]].append(train_Y[i][j])
  tokenwise_frequent_label = defaultdict(int)
  for key, values in freq_dict.items():
    most_frequent_label = Counter(values).most_common(1)[0][0]
    tokenwise_frequent_label[key] = most_frequent_label
  return tokenwise_frequent_label
 

tokenwise_frequent_labels = tokenwise_most_frequent_label(train_X, train_Y)
print(len(tokenwise_frequent_labels))

20132


In [17]:
class BaseModel():
    def __init__(self, overall_most_frequent_label, tokenwise_most_frequent_label):
      self.find_overall_most_frequent_label = overall_most_frequent_label
      self.find_tokenwise_most_frequent_label = tokenwise_most_frequent_label

    def predict(self, input):
      # input is a sentence -- a list of words/tokens
      output = []
      for word in input:
        if(word in self.tokenwise_most_frequent_label.keys()):
          output.append(self.tokenwise_most_frequent_label[word])
        else:
          output.append(self.overall_most_frequent_label)
      return output

    def train(self, train_X, train_Y):
      self.overall_most_frequent_label = self.find_overall_most_frequent_label(train_Y)
      self.tokenwise_most_frequent_label = self.find_tokenwise_most_frequent_label(train_X, train_Y)

Evaluation metrics

For this I have appended the ground truths for each sentencese into a list. Same for predicted outputs.
Them calculate performance metrics on them.

In [18]:
from sklearn import metrics


def evaluate_metrics(predicted_labels, ground_truth):
  # accuracy
  accuracy = metrics.accuracy_score(ground_truth, predicted_labels)
  print(f'accuracy : {accuracy}')

  # macroprecision
  macro_precision = metrics.precision_score(ground_truth, predicted_labels, average = 'macro')
  print(f'macro precision : {macro_precision}')

  # macrorecall
  macro_recall = metrics.recall_score(ground_truth, predicted_labels, average = 'macro')
  print(f'macro recall : {macro_recall}')

  # microprecision
  micro_precision  = metrics.precision_score(ground_truth, predicted_labels, average = 'micro')
  print(f'micro precision : {micro_precision}')

  # microrecall.
  micro_recall  = metrics.recall_score(ground_truth, predicted_labels, average = 'micro')
  print(f'micro recall : {micro_recall}')

In [23]:
base_model = BaseModel(overall_most_frequent_label, tokenwise_most_frequent_label)
base_model.train(train_X, train_Y)

total_sentences = validation_dataset.num_rows
validation_X = [row['tokens'] for row in validation_dataset]
validation_Y = [row['xpos'] for row in validation_dataset]
validation_Y = preprocess(validation_Y)

prediction_Y = []
for i in range(0, len(validation_X)):
  prediction_Y.append(base_model.predict(validation_X[i]))

labels in the order of frequencies : ['OTHERS', 'NN', 'IN', 'VB', 'JJ', 'RB']


In [24]:
appended_prediction_labels = []
for p in prediction_Y:
  appended_prediction_labels+=p
appended_ground_truth = []
for g in validation_Y:
  appended_ground_truth+=g
evaluate_metrics(appended_prediction_labels, appended_ground_truth)

accuracy : 0.8603893555224115
macro precision : 0.8775669633288364
macro recall : 0.817050865808997
micro precision : 0.8603893555224115
micro recall : 0.8603893555224115


Linear chain CRF model
This is a simple implementation of a linear chain conditional random field model in Python.
Two major components of the model is:
1. Viterbi decoding - Used to optimally find the best tag sequence for given word sequence (features)
2. Forward recurrence - Used to optimally compute log(Z-score)

Both of these are very similar in the sense the use dynamic programming to compute the solution

In [26]:
import torch
import torch.nn as nn
import math
import torch.optim as optim

class LinearChainCRF(nn.Module):
    def __init__(self, num_tags, num_features):
        super().__init__()
        self.num_tags = num_tags
         # Initialize weights
        self.feature_weights = nn.Parameter(torch.randn(num_features))

    def _featurize_word(self, curr_tag, prev_tag, base_feature):
        feature = torch.zeros_like(base_feature)
        feature += base_feature

        # we are featurizing the word considering its tag as curr_tag and previous tags as prev_tag
        feature[curr_tag] = 1
        feature[self.num_tags + curr_tag * prev_tag] = 1
        return feature

    def _forward_alg(self, features):
        # this function is used to compute the log of  z-score / denominator of p(y|x) / partition function for loss estimation
        # using forward recurrance

        num_tags = self.num_tags
        alpha = torch.full((1, num_tags), fill_value=-10000)

        # we didnot use a special bos tag here but instead considered it as OTHERS tag , its score is zero 
        # since it shouldnot influence the overall scores and be the base case / starting point of our sequence
        # this is done by making other values higly negative, so that when we look for argmax or max scores 
        # this will be the result. Tried to use -math.inf but it was having adverse affect on scores

        alpha[0][3] = 0  # 'OTHERS' tag index 3 as start

        for i in range(1, len(features)):
            alpha_m = []
            for curr_tag in range(num_tags):
                # calculating the possible scores for current word considering current tag and all possible previous tags
                s_m = torch.stack([alpha[0][prev_tag] + torch.dot(self.feature_weights, self._featurize_word(curr_tag, prev_tag, features[i])) for prev_tag in range(num_tags)])
                alpha_m.append(torch.logsumexp(s_m, dim=0).view(1))

            # alpha_m to be used for next word
            # its an optimized way - 
            # we are not storing all the alphas for each word with each tag but only alphas corresponding to previous word for all tags
            # because thats essentaially the sum of all possible path scores ending at that word tag pair
            # and the only thing needed as we move forward to next word.
            alpha = torch.cat(alpha_m).view(1, -1)

        return torch.logsumexp(alpha, dim=1)

    def _score_sequence(self, features, tags):
        # this function is used to compute the log of numerator of p(y|x) for estimating loss

        score = torch.zeros(1)
        for feature in features:
            score += torch.dot(self.feature_weights, feature)
        return score

    def neg_log_likelihood_loss(self, features, tags):
        # loss function for our model

        denominator = self._forward_alg(features)
        numerator = self._score_sequence(features, tags)
        return -(numerator - denominator)

    def _viterbi(self, features):
        # this function estimates the best tag sequence for a sequence
        # very similar to forward recurrence but here we store all the best previous tags (in back pointers) for a word tag pair to backtrack and find best tag sequence (through back tracking)
        num_tags = self.num_tags

        viterbi_vars = torch.full((1, num_tags), -10000)
        # This is v1 as others are highly negative
        viterbi_vars[0][3] = 0  # 'OTHERS' tag index 3 as start
        # used to backtrack the best sequence
        backpointers = []

        for i in range(len(features)):
            bptrs_t = []
            viterbi_vars_t = []
            for curr_tag in range(num_tags):
                
                # calculating the possible scores till the current word considering current tag and all possible previous tags
                s_m = torch.stack([viterbi_vars[0][prev_tag] + torch.dot(self.feature_weights, self._featurize_word(curr_tag, prev_tag, features[i])) for prev_tag in range(num_tags)])

                # maximum score and the best previous tag for the current tag
                # bm, vm according to algorithm 11 - eisensteins notes
                best_tag_id = torch.argmax(s_m).item()
                best_tag_score = torch.max(s_m)

                # for backtracking
                bptrs_t.append(best_tag_id)
                viterbi_vars_t.append(best_tag_score.view(1))

            # viterbi variables for the current word
            viterbi_vars = torch.cat(viterbi_vars_t).view(1, -1)
            backpointers.append(bptrs_t)

        # Trace back the best path
        best_path = [torch.argmax(viterbi_vars).item()]
        for bptrs_t in reversed(backpointers):
            best_path.append(bptrs_t[best_path[-1]])

        best_path.reverse()
        return best_path[1:]  # Skip the starting 'OTHERS' tag

    def decode(self, features):
        return self._viterbi(features)

In [33]:
def train(model, features, labels, epochs=10, learning_rate=0.01):
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        for i, (feature, label) in enumerate(zip(features, labels)):
            model.zero_grad()
            feature = torch.tensor(feature, dtype=torch.float32)
            label = torch.tensor(label)
            loss = model.neg_log_likelihood_loss(feature, label)
            loss.backward()
            optimizer.step()

        print(f'Epoch {epoch+1}: Loss = {loss.item()}')

In [27]:
def train(model, features, labels, epochs=10, learning_rate=0.01):
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        total_loss = 0
        for i, (feature, label) in enumerate(zip(features, labels)):
            # Reset gradients for each batch - in this case for each sentence
            model.zero_grad()  
            feature_tensor = torch.tensor(feature, dtype=torch.float32)
            label_tensor = torch.tensor(label, dtype=torch.long)
            loss = model.neg_log_likelihood_loss(feature_tensor, label_tensor)
            total_loss += loss.item()
            loss.backward()
            # Update model parameters  
            optimizer.step()  

        average_loss = total_loss / len(labels)
        print(f"Epoch {epoch+1}: Average loss = {average_loss}")

In [28]:
# Train

num_tags = 6
num_features = 42
model = LinearChainCRF(num_tags, num_features)

train(model,featurized_train_X[:100], indexed_train_Y[:100], epochs = 10)

Epoch 1: Average loss = 19.246506974697112
Epoch 2: Average loss = -32.66696103334427
Epoch 3: Average loss = -65.9540945839882
Epoch 4: Average loss = -95.06311148166657
Epoch 5: Average loss = -122.72813519001006
Epoch 6: Average loss = -149.76245324850083
Epoch 7: Average loss = -176.47458614349364
Epoch 8: Average loss = -202.99815746068955
Epoch 9: Average loss = -229.39877597332
Epoch 10: Average loss = -255.71189512014388


In [29]:
# evaluation on validation split

total_sentences = validation_dataset.num_rows
validation_X = [row['tokens'] for row in validation_dataset]
validation_Y = [row['xpos'] for row in validation_dataset]
validation_Y = preprocess(validation_Y)
featurized_validation_X = featurize_dataset(validation_X, validation_Y)
indexed_validation_Y = index_labels(validation_Y)

appended_prediction_labels = []
for p in featurized_validation_X:
  appended_prediction_labels+=model.decode(torch.Tensor(p))
appended_ground_truth = []
for g in indexed_validation_Y:
  appended_ground_truth+=g
evaluate_metrics(appended_prediction_labels, appended_ground_truth)

accuracy : 0.6780359525865453
macro precision : 0.6749695130037922
macro recall : 0.7298550060360395
micro precision : 0.6780359525865453
micro recall : 0.6780359525865453


In [30]:
# evaluation on test split

# evaluation on validation split

total_sentences = test_dataset.num_rows
test_X = [row['tokens'] for row in test_dataset]
test_Y = [row['xpos'] for row in test_dataset]
test_Y = preprocess(test_Y)
featurized_test_X = featurize_dataset(test_X, test_Y)
indexed_test_Y = index_labels(test_Y)

appended_prediction_labels = []
for p in featurized_test_X:
  appended_prediction_labels+=model.decode(torch.Tensor(p))
appended_ground_truth = []
for g in indexed_test_Y:
  appended_ground_truth+=g
evaluate_metrics(appended_prediction_labels, appended_ground_truth)

accuracy : 0.6767863462975343
macro precision : 0.6709451551784911
macro recall : 0.728455310109875
micro precision : 0.6767863462975343
micro recall : 0.6767863462975343


In [33]:
def pos_tagger(model, sentence, tags):
  featurized_sentence = featurize_sentence(sentence, tags)
  predicted_labels = model.decode(torch.Tensor(featurized_sentence))
  predicted_tags = [total_pos[i] for i in predicted_labels]
  return predicted_tags


In [34]:
# find tag for a sentence
import random

i = random.randint(0, len(validation_X)-1)
tags = pos_tagger(model, validation_X[i], validation_Y[i])

for j in range(len(tags)):
  print(f'word: {validation_X[i][j]} original: {validation_Y[i][j]}, predicted: {tags[j]}')

word: Blooming original: VB, predicted: VB
word: onion original: NN, predicted: NN
word: , original: OTHERS, predicted: OTHERS
word: the original: OTHERS, predicted: VB
word: only original: JJ, predicted: JJ
word: reason original: NN, predicted: NN
word: to original: OTHERS, predicted: RB
word: visit original: VB, predicted: VB
word: this original: OTHERS, predicted: OTHERS
word: restaurant original: NN, predicted: OTHERS
word: . original: OTHERS, predicted: OTHERS
