#  POS-tagging 

In [1]:
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns

In [2]:
import nltk
nltk.download('brown')
nltk.download('universal_tagset')

[nltk_data] Downloading package brown to /Users/chess1812/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     /Users/chess1812/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


True

In [3]:
from nltk.corpus import brown
sents = list(brown.tagged_sents(tagset='universal'))

##  Preprocessing

First of all let's  see what we actually have. 

In [4]:
sents[0]

[('The', 'DET'),
 ('Fulton', 'NOUN'),
 ('County', 'NOUN'),
 ('Grand', 'ADJ'),
 ('Jury', 'NOUN'),
 ('said', 'VERB'),
 ('Friday', 'NOUN'),
 ('an', 'DET'),
 ('investigation', 'NOUN'),
 ('of', 'ADP'),
 ("Atlanta's", 'NOUN'),
 ('recent', 'ADJ'),
 ('primary', 'NOUN'),
 ('election', 'NOUN'),
 ('produced', 'VERB'),
 ('``', '.'),
 ('no', 'DET'),
 ('evidence', 'NOUN'),
 ("''", '.'),
 ('that', 'ADP'),
 ('any', 'DET'),
 ('irregularities', 'NOUN'),
 ('took', 'VERB'),
 ('place', 'NOUN'),
 ('.', '.')]

In [5]:
tags = set()

for sent in sents:
    for (word, tag) in sent: 
        tags.add(tag)

print(f'tag types: {tags}')

tag types: {'ADV', 'ADJ', 'CONJ', 'NOUN', '.', 'X', 'PRON', 'ADP', 'DET', 'PRT', 'NUM', 'VERB'}


Split data on train and test

In [6]:
from sklearn.model_selection import train_test_split
sents_train, sents_test = train_test_split(sents, test_size=0.2, random_state=42)

For some algorithms it may be better to transform tag type to numeric label 

In [7]:
name_to_index = {}
index_to_name = {}
for idx, tag in enumerate(tags):
    name_to_index[tag] = idx
    index_to_name[idx] = tag
    
name_to_index

{'ADV': 0,
 'ADJ': 1,
 'CONJ': 2,
 'NOUN': 3,
 '.': 4,
 'X': 5,
 'PRON': 6,
 'ADP': 7,
 'DET': 8,
 'PRT': 9,
 'NUM': 10,
 'VERB': 11}

In [8]:
y_train = []
tags_train = []
for sent in sents_train:
    for (_, tag) in sent:
        y_train.append(name_to_index[tag])
        tags_train.append(tag)
        
y_train =  np.array(y_train)

In [9]:
y_test = []
tags_test = []
for sent in sents_test:
    for (_, tag) in sent:
        y_test.append(name_to_index[tag])
        tags_test.append(tag)
        
y_test =  np.array(y_test)

In [10]:
from itertools import chain
X_train = [word for word, tag in list(chain(*sents_train))]
X_test = [word  for word, tag in  list(chain(*sents_test)) ]

Some stats about corpora:

In [11]:
unique, counts = np.unique(y_train, return_counts=True)

print('tag frequency: ')
for unq, cnt in zip(unique, counts ):
    print(f'{index_to_name[unq]}: { 100 * cnt / len(y_train):.2f}%')

tag frequency: 
ADV: 4.84%
ADJ: 7.20%
CONJ: 3.30%
NOUN: 23.75%
.: 12.70%
X: 0.11%
PRON: 4.24%
ADP: 12.48%
DET: 11.79%
PRT: 2.58%
NUM: 1.29%
VERB: 15.71%


In [12]:
vocab = {}
for sent in sents_train:
    for (word, tag) in sent:
        if word in vocab:
            vocab[word].add(tag)
        else:
            vocab[word] = {tag}
            
print(f'vocab size = {len(vocab)}')
print(f'number of words = {len(y_train)}')

vocab size = 50595
number of words = 929265


As we shall see, the main problem in this task is to classificate some tricky words which may have different meaning. So, we have to determine its meaning from the context. Let's look at these words a little bit

In [13]:
tricky_words = set()
for word in vocab:
    if len(vocab[word]) > 1:
        tricky_words.add(word)

print(f'percentage of tricky words in vocab {100 * len(tricky_words) / len(vocab):.4f}%')

amount_of_tricky_words  = len([word for word in X_train if word in  tricky_words])
print(f'percentage of tricky words in train corpora {100 * amount_of_tricky_words / len(X_train):.4f}%')

percentage of tricky words in vocab 6.2832%
percentage of tricky words in train corpora 43.8904%


In [14]:
print('examples of tricky words:')
examples_number = 5
for word,_ in zip(tricky_words, range(examples_number)):
    print(f'{word}: {vocab[word]}')

examples of tricky words:
pay: {'NOUN', 'VERB'}
getting: {'NOUN', 'VERB'}
process: {'NOUN', 'VERB'}
cooler: {'ADJ', 'NOUN'}
bronze: {'ADJ', 'NOUN'}


## Baseline

First idea that comes to mind is just remember is to use  **Most Frequent Class** model (assigning each token to the class
it occurred in most often in the training set). For sure, in this way we will have a problem with words with different meaning but this at least model is a good baseline.

In [15]:
from sklearn.base import BaseEstimator, ClassifierMixin

class MostFrequentClassifier(BaseEstimator, ClassifierMixin):
    
    def __init__(self, n_classes, unk_class = None):
        self.vocab = {}
        self.n_classes = n_classes
        self.unk_class = unk_class
        self.aprior_prob = np.zeros(self.n_classes)
        
        
    def fit(self, X, y):       
        if self.unk_class is None:
            unique, counts = np.unique(y, return_counts=True)
            for unq, count in zip(unique, counts):
                self.aprior_prob[unq] = count / sum(counts)
            self.unk_class = self.aprior_prob.argmax()
        else:
            self.aprior_prob[self.unk_class] = 1.0
               
        for word,tag in zip(X,y):
            if word in self.vocab:
                self.vocab[word][tag]+=1
            else:
                self.vocab[word] = [0] * self.n_classes
                self.vocab[word][tag] = 1
                
        for word in self.vocab:
            self.vocab[word] = np.array(self.vocab[word]) / sum(self.vocab[word])
    
        return self
    
    
    def predict(self, X):
        y_pred = [0]*len(X)
        for idx,word in enumerate(X):
            if word in self.vocab:
                y_pred[idx] = self.vocab[word].argmax()
            else:
                y_pred[idx]= self.unk_class
        return y_pred

    
    def predict_proba(self, X):
        y_pred = [0]*len(X)
        for idx,word in enumerate(X):
            if word in self.vocab:
                y_pred[idx] = self.vocab[word]
            else:
                y_pred[idx]= self.aprior_prob
        return y_pred

In [78]:
baseline_clf = MostFrequentClassifier(n_classes = len(set(y_train)))
baseline_clf.fit(X_train, y_train)

MostFrequentClassifier(n_classes=12)

In [79]:
from sklearn.metrics import accuracy_score
y_pred = baseline_clf.predict(X_train)
print(f'baseline score for train {100 * accuracy_score(y_train,y_pred):.3f}%')

y_pred = baseline_clf.predict(X_test)
print(f'baseline score for test {100 * accuracy_score(y_test,y_pred):.3f}%')

baseline score for train 95.722%
baseline score for test 94.512%


Hm, it's not so bad, aready. But we will try to improve this score.

Firstly, let's try to change  unk_class:

In [72]:
baseline_unk_clf = MostFrequentClassifier(n_classes = len(set(y_train)), unk_class = name_to_index['X'])
baseline_unk_clf.fit(X_train, y_train)

MostFrequentClassifier(n_classes=12, unk_class=5)

In [73]:
y_pred = baseline_unk_clf.predict(X_train)
print(f'baseline with default X as unk score for train {100 * accuracy_score(y_train,y_pred):.3f}')

y_pred = baseline_unk_clf.predict(X_test)
print(f'baseline score default X as unk  for test {100 * accuracy_score(y_test,y_pred):.3f}')

baseline with default X as unk score for train 95.722
baseline score default X as unk  for test 93.085


As mentioned earlier.  the problem is that we do not use context and simply defined tag as most frequent class that we've seen. 

For instance word 'calls' has probabilities:

In [19]:
baseline_clf.vocab['calls']

array([0.        , 0.        , 0.        , 0.33333333, 0.        ,
       0.01754386, 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.64912281])

But our model just says that I am not care about context, 'calls' is a verb in any case.

In [20]:
index_to_name[baseline_clf.predict(['calls'])[0]]

'VERB'

## kNN ?

One of my thoughts was to use kNN when we face words with different meanings. We will fit kNN with words in which we are confident. And than if we face tricky word we will use our kNN model together with MFC classifier. For sure for kNN model we will use embeddings.

In [23]:
import gensim.downloader 
embeddings = gensim.downloader.load("glove-wiki-gigaword-100")   #"fasttext-wiki-news-subwords-300"



In [35]:
from sklearn.neighbors import KNeighborsClassifier

class MostFrequenWithkNNClassifier(BaseEstimator, ClassifierMixin):
    
    def __init__(self, n_classes, embeddings, alpha = 0.999,  beta = 0.5,  n_neighbors = 1, unk_class = None):
        self.embeddings = embeddings
        self.alpha = alpha
        self.beta = beta 
        self.neigh = KNeighborsClassifier(n_neighbors= n_neighbors )
        self.vocab = {}
        self.n_classes =  n_classes
        self.aprior_prob = np.zeros(self.n_classes)
        self.data_for_knn = []
        self.labels_for_knn = []
        self.unk_class = unk_class
            
            
    def fit(self, X, y):
        if self.unk_class is None:
            unique, counts = np.unique(y, return_counts=True)
            for unq, count in zip(unique, counts):
                self.aprior_prob[unq] = count / sum(counts)
            self.unk_class = self.aprior_prob.argmax()
        else:
            self.aprior_prob[self.unk_class] = 1.0
            
        for word,tag in zip(X,y):
            if word in self.vocab:
                self.vocab[word][tag]+=1
            else:
                self.vocab[word] = [0] * self.n_classes
                self.vocab[word][tag] = 1
                              
        for word, label in zip(X,y):
            self.vocab[word] = np.array(self.vocab[word]) / sum(self.vocab[word])
            if np.max(self.vocab[word]  > self.alpha) and word in self.embeddings:
                self.data_for_knn.append(self.embeddings[word])
                self.labels_for_knn.append(label)
                
            self.neigh.fit(self.data_for_knn, self.labels_for_knn)
        return self
    
    
    def predict(self, X):
        y_pred = [0]*len(X)
        self.unk_class = self.aprior_prob.argmax()
        for idx,word in enumerate(X):
            if word in self.vocab:
                if np.max(self.vocab[word]  < self.alpha) and word in self.embeddings:
                    knn_pred = neigh.predict_proba([self.embeddings[word]])[0]
                    y_pred[idx] = (self.beta * self.vocab[word] + (1 - beta) * knn_pred).argmax()
                else:
                    y_pred[idx] = self.vocab[word].argmax()
            else:
                if word in self.embeddings:
                    knn_pred = neigh.predict_proba([self.embeddings[word]])[0] 
                    y_pred[idx]= (self.beta * self.aprior_prob + (1 - beta) * knn_pred).argmax()
                else:
                     y_pred[idx]= self.unk_class                    
        return y_pred

    
    def predict_proba(self, X):
        y_pred = [0]*len(X)
        for idx,word in enumerate(X):
            if word in self.vocab:
                if np.max(self.vocab[word]  < self.alpha) and word in self.embeddings:
                    knn_pred = neigh.predict_proba([self.embeddings[word]])[0]
                    y_pred[idx] = (self.beta * self.vocab[word] + (1 - beta) * knn_pred)
                else:
                    y_pred[idx] = self.vocab[word]
            else:
                if word in self.embeddings:
                    knn_pred = neigh.predict_proba([self.embeddings[word]])[0] 
                    y_pred[idx]= (self.beta * self.aprior_prob + (1 - beta) * knn_pred)
                else:
                     y_pred[idx]= self.aprior_prob                  
        return y_pred

In [36]:
freq_knn_clf =  MostFrequenWithkNNClassifier( len(set(y_train)),embeddings, alpha = 0.97, n_neighbors = 3)

But unfortunately, Besides our idea do not use context too, it also works very slow due to the number of words, that we have. 

In [37]:
freq_knn_clf.fit(X_train, y_train)

KeyboardInterrupt: 

After some waiting hours I decided to stop that.

In [None]:
#y_pred = freq_knn_clf(X_train)
#print(f'baseline with default X score for train {accuracy_score(y_train,y_pred)}')

#y_pred = freq_knn_clf(X_test)
#print(f'baseline score default X for test {accuracy_score(y_test,y_pred)}')

## Random Forest & LogisticRegression for context

Second idea that comes to my mind is to build some classificator which having information about context will predict word tag. For simplicity we will build context features from the results of MFC algorithm. And than using these features we will fit some classifier such as random forest or logistics regression.

Feature extractor:

In [38]:
def neigh_info(sents, clf, n_classes):
    feature_vectors = []
    for sent in sents:
        words = [word for word, _ in sent]
        predictions = clf.predict_proba(words)
        if len(sent) == 1:
            feature_vectors.append(np.hstack( (np.zeros(n_classes),np.zeros(n_classes))))
            continue
        feature_vectors.append( np.hstack( (np.zeros(n_classes), predictions[0])) )
        for i in range(1, len(predictions) - 1):
            feature_vectors.append(np.hstack( (predictions[i-1], predictions[i+1])) )
        feature_vectors.append( np.hstack( (predictions[-1], np.zeros(n_classes))) )
    return np.array(feature_vectors)

In [39]:
train_features = neigh_info(sents_train, baseline_clf, n_classes = len(set(y_train)))
test_features = neigh_info(sents_test, baseline_clf, n_classes = len(set(y_train)))

In [40]:
train_features[35]

array([0.   , 0.   , 0.   , 1.   , 0.   , 0.   , 0.   , 0.   , 0.   ,
       0.   , 0.   , 0.   , 0.   , 0.   , 0.   , 0.992, 0.   , 0.   ,
       0.   , 0.   , 0.   , 0.   , 0.   , 0.008])

Okay, now it's time to fit some classificators. Let's start with LogisticRegression

In [74]:
from sklearn.linear_model import LogisticRegression

lr_clf =  LogisticRegression(random_state=42, max_iter=1000)
lr_clf.fit(train_features, y_train)

LogisticRegression(max_iter=1000, random_state=42)

In [75]:
y_pred = lr_clf.predict(train_features)
print(f'logistic regression train score : {100 * accuracy_score(y_train,y_pred):.3f}%')

y_pred = lr_clf.predict(test_features)
print(f'logistic regression test score {100 * accuracy_score(y_test,y_pred):.3f}%')

logistic regression train score : 52.043%
logistic regression test score 51.282%


What about random forest?

In [76]:
from sklearn.ensemble import RandomForestClassifier

rf_clf = RandomForestClassifier(random_state=42)
rf_clf.fit(train_features, y_train)

RandomForestClassifier(random_state=42)

In [77]:
y_pred = rf_clf.predict(train_features)
print(f'random forest train score: {100 * accuracy_score(y_train,y_pred):.3f}%')

y_pred = rf_clf.predict(test_features)
print(f'random forest test score:  {100 * accuracy_score(y_test,y_pred):.3f}%')

random forest train score: 64.360%
random forest test score:  59.630%


Okay, it's better than LogisticRegression. Also we may tune some paramentrs. For instance **max_features**.

In [51]:
from sklearn.model_selection import GridSearchCV
parameters = {'max_features' :['auto', 'sqrt', 'log2'] }
rf = RandomForestClassifier(random_state=42, n_estimators = 200)
best_rf = GridSearchCV(rf, parameters)
best_rf.fit(train_features, y_train)

GridSearchCV(estimator=RandomForestClassifier(n_estimators=200,
                                              random_state=42),
             param_grid={'max_features': ['auto', 'sqrt', 'log2']})

In [53]:
 print(best_rf.best_params_)

{'max_features': 'auto'}


For sure we may tune other paraments too. But let's do it in anouther time.

You may think that we waste time because results are poor. But wait, we predict part of speach using only information about context but what will be if we combite two ideas together...

In [54]:
from sklearn.base import BaseEstimator, ClassifierMixin

class BlendingClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, clf1, clf2, alpha = 0.5):
        self.clf1 = clf1
        self.clf2 = clf2
        self.alpha = alpha
        
    
    def fit(self, X1,X2, y):  
        self.clf1.fit(X1,y)
        self.clf2.fit(X2,y)
        return self
    
    
    def predict(self, X1,X2):
        return np.argmax(self.alpha * np.array(self.clf1.predict_proba(X1)) +
                (1 - self.alpha)* np.array(self.clf2.predict_proba(X2)), axis = 1)

    
    def predict_proba(self, X1,X2):
        return (self.alpha * np.array(self.clf1.predict_proba(X1)) +
                (1 - self.alpha)* np.array(self.clf2.predict_proba(X2)))

In [55]:
clf1 = MostFrequentClassifier(n_classes = len(set(y_train)))
clf2 = RandomForestClassifier(random_state=42, n_estimators = 200)

mix_clf = BlendingClassifier( clf1,clf2  )
mix_clf.fit(X_train, train_features, y_train)

BlendingClassifier(clf1=MostFrequentClassifier(n_classes=12),
                   clf2=RandomForestClassifier(n_estimators=200,
                                               random_state=42))

In [56]:
y_pred = mix_clf.predict(X_train,train_features)
print(f'BlendingClassifier train score: {100 * accuracy_score(y_train,y_pred):.3f}%')

y_pred = mix_clf.predict(X_test, test_features)
print(f'BlendingClassifier test score:  {100 * accuracy_score(y_test,y_pred):.3f}%')

BlendingClassifier train score: 97.357%
BlendingClassifier test score:  95.389%


We improve our results approximately on $1\%$ !

**Conclusions:** 

1) If we want to improve baseline results we should use context. And even such simple idea as we present may increase score. 

2) Also we may improve our BlendingClassifier by tuning parametr alpha. 

## Hidden Markov Model

Finally we will use HMM model. 

In [57]:
from collections import Counter, defaultdict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution
import random

First of all we need to define transition probabilities, between tags. 

In [58]:
tags_count = Counter(tags_train)

In [59]:
tags_count

Counter({'NOUN': 220736,
         '.': 118045,
         'DET': 109545,
         'VERB': 146006,
         'ADP': 115998,
         'CONJ': 30622,
         'ADJ': 66949,
         'PRON': 39400,
         'ADV': 44968,
         'PRT': 23976,
         'NUM': 11963,
         'X': 1057})

In [60]:
tag_bigrams =  Counter( [(tags_train[i], tags_train[i+1]) for i in range(0,len(tags_train)-2,2)])

In [61]:
tag_starts = Counter([sent[0][1] for sent in sents_train] ) #number of times a tag occured at the start
tag_ends = Counter([sent[-1][1]  for sent in sents_train] ) #number of times a tag occured at the end

In [62]:
tag_ends

Counter({'.': 44923,
         'NOUN': 727,
         'ADV': 18,
         'DET': 14,
         'VERB': 76,
         'ADJ': 25,
         'PRON': 4,
         'NUM': 65,
         'ADP': 8,
         'CONJ': 2,
         'PRT': 9,
         'X': 1})

Also we need to define all words which occur from each tag. 

In [63]:
def pair_counts(tags, words):
    dct = defaultdict(lambda: defaultdict(int))
    for tag, word in zip(tags, words):
        dct[tag][word] += 1
    return dct

In [64]:
tag_words_count = pair_counts(tags_train,X_train)

Now it's time to put all in hmm model.

In [65]:
basic_model = HiddenMarkovModel(name="hmm for POS-tagging")
     
to_pass_states = []
for tag, words_dict in tag_words_count.items():
    total = float(sum(words_dict.values()))
    distribution = {word: count/total for word, count in words_dict.items()}
    tag_emissions = DiscreteDistribution(distribution)
    tag_state = State(tag_emissions, name=tag)
    to_pass_states.append(tag_state)
    
basic_model.add_states(to_pass_states) 

In [66]:
start_prob={}
for tag in tags:
    start_prob[tag]=tag_starts[tag]/ float(sum(tag_starts.values()))
for tag_state in to_pass_states:
    basic_model.add_transition(basic_model.start, tag_state, start_prob[tag_state.name])
    
end_prob={}
for tag in tags:
    end_prob[tag] = tag_ends[tag] /float(sum( tag_ends.values())) 
for tag_state in to_pass_states:
    basic_model.add_transition(tag_state,basic_model.end, end_prob[tag_state.name])
    
transition_prob_pair={}
for key in tag_bigrams.keys():
    transition_prob_pair[key]=tag_bigrams.get(key) / tags_count[key[0]]
    
for tag_state in to_pass_states :
    for next_tag_state in to_pass_states:
        if (tag_state.name,next_tag_state.name) not in transition_prob_pair: # prob of this  transition = 0
            transition_prob_pair[(tag_state.name,next_tag_state.name)] = 0.0
        basic_model.add_transition(tag_state,next_tag_state,transition_prob_pair[(tag_state.name,next_tag_state.name)])
basic_model.bake()

Also we need to define what we should do with unknown words, and how do decode words into part of speach. 

In [67]:
def replace_unknown(sequence, vocab):   
    return [w if w in vocab else 'nan' for w in sequence]

def simplify_decoding(sequence, model):    
    _, state_path = model.viterbi(replace_unknown(sequence, vocab))
    return [state[1].name for state in state_path[1:-1]]

In [68]:
def hmm_predict(X, model):
    y_pred = []
    for observations in X:
        most_likely_tags = simplify_decoding(observations, model)
        y_pred.append(most_likely_tags)
    return list(chain(*y_pred))

Okay, let's look what happens from that.

In [69]:
only_words = lambda sent: [word for (word,tag) in sent]
observations_train = [only_words(sent) for sent in sents_train ]
observations_test = [only_words(sent) for sent in sents_test]

In [70]:
hmm_training_acc = accuracy_score(tags_train, hmm_predict(observations_train, basic_model) )
print("training accuracy basic hmm model: {:.3f}%".format(100 * hmm_training_acc))
hmm_testing_acc = accuracy_score(tags_test, hmm_predict(observations_test, basic_model) )
print("testing accuracy basic hmm model: {:.3f}%".format(100 * hmm_testing_acc))

training accuracy basic hmm model: 97.542%
testing accuracy basic hmm model: 96.021%


Finally hmm model archives best result.

**Conclusions:** 

 In this notebook we try to solve pos-tagging problem. We have seen that the main difficulty is to classify polysemous words. Thus, we try to fit some models that will look at the context in order to solve this challenge.