In [None]:
!ls

In [4]:
from tqdm.auto import tqdm
from collections import defaultdict, OrderedDict
import numpy as np
from collections import Counter
import random, itertools
from sklearn.model_selection import KFold
import json
from multiprocessing.pool import Pool

import warnings
warnings.filterwarnings("ignore") 

### Dataset class

In [None]:
class txt_data():
    '''
    Master class to load  the dataset
    
    '''
    def __init__(self,path,split=0.9,shuffle=False):
        '''
        path: path to dataset
        split: train/val split
        shuffle: shuffle the dataset randomly before splitting or no.
        '''
        self.path = path
        
        # list of sentences of list of tuples defining pos_tag and words.
        self.data = self.___sent_to_tags___(self.___open___(self)) 
        if shuffle: random.shuffle(self.data)
        self.train =  self.data[:int(len(self.data)*split)]
        self.val = self.data[int(len(self.data)*split):]
        
        self.pos_tags = [i for i in set([j[0] for i in self.data for j in i])] #list of unique pos_tags
        self.vocab = [i for i in set([j[1] for i in self.data for j in i])] # list of vocab
            
    
    def ___open___(self,path):
        '''
        To read a text file
        path: path to the text file
        '''
        with open(self.path, "r") as f:
            return f.readlines()
    
    def ___sent_to_tags___(self,sents):
        '''
        converts a raw sentence to tags and words as a tuple
        '''
        dummy = []
        for i in tqdm(sents):
            d = []
            for j in i.split():
                try: d.append((j.split('_')[1],j.split('_')[0].lower()))
                except: pass #print(f"--{i}--{j}\n")
            dummy.append(d)
        return dummy

### Converitng emission/transition matrix to prob

In [None]:
def prob(matrix): 
    '''
    Converitng emission/transition matrix to prob
    matrix: list of lists format
    
    return: prob (2D numpy array)
    '''
    dummy = []
    for i in np.array(matrix):
        if np.sum(i) !=0: dummy.append(i/np.sum(i))
        else: dummy.append(i)
    return np.array(dummy)

### Smoothing

In [None]:
def smoothing(matrix, name='simple'):
    '''
    Smoothing operation to handle zero values
    matrix: 2D numpy array
    name: name of smoothing technique
    
    simple smoothing: just adds a fixed number to zeros: 0.000000001*matrix.mean()
    simple-good-turing: simply do the average discounting
    
    return: smoothed prob (numpy array)
    '''
    matrix = np.array(matrix, dtype= np.float64)
    if name == 'simple':
        print(f"Using simple smoothing method")
        matrix[matrix == 0] = 0.000000001*matrix.mean()
        return matrix
    
    elif name == 'simple-gt': 
        print(f"Using simple good turing smoothing method")
        for i in range(matrix.shape[0]):
            prob = len(matrix[i,:][matrix[i,:] == 1])/len(matrix[i,:])
            matrix[i,:][matrix[i,:] != 0] -= 0.7
            matrix[i,:][matrix[i,:] == 0] = prob
        
        matrix[matrix == 0] = 0.0000000001*matrix.mean()
        return matrix
    
    else: raise NotImplementedError

### emission matrix

In [None]:
def e_matrix(data,vocab,pos_tags,smooth='simple-gt'):
    '''
    emission matrix: count of words given the pos_tag
    
    data: list of sents with the tags: format is list of list of sentences with tag and word as a tuple
    vocab: list of vocab
    pos_tags: list of pos_tags
    smooth: name of smooting technique to use
    
    return list of pos_tags and a list of matrix (pos_tags,vocab) with count(vocab/pos_tags)
    '''
    data = [i for j in data for i in j] # list of pos_tag and word pair: [('VB', 'suppose'), ('NP', 'lauren'),()]
    dummy = {s[0]:[] for s in data} # dictioanry with tag and list of words as key, value pair, {'AT' :['the', 'the', 'a', 'the'],}
    for i in data:
        dummy[i[0]].append(i[1])
        
    e_matrix = [] # emission count matrix with tag and count of words as row, column pair.
    for i in tqdm(pos_tags):
        if i in dummy.keys():
            count = Counter(dummy[i]) # count of words in a list. 
            e_matrix.append([count[j] for j in vocab]) # vocab(50k) loop takes time not the man loop
        else: e_matrix.append([0 for j in vocab])
    
    return pos_tags, prob(smoothing(e_matrix,name=smooth))

### transition matrix

In [None]:
def t_matrix(data, pos_tags, n_gram=2, smooth='simple-gt'):
    '''
    transition matrix: count of current_pos-tag given the last_pos-tag
    
    data = list of sents with the tags: format is list of list of sentences with tag and word as a tuple
    pos_tags = list of pos_tags
    n_gram = look back + 1 i.e, number of words tollok back to calculate the transition matrix
    smooth: name of smooting technique to use
    
    return a list of matrix (pos_tags**n_gram,pos_tags) with count(pos_tags/pos_tags**_gram)
    
    '''
    
    data = [[j[0]  for j in i] for i in data] # list of tags for each sentence: [['AT', 'NN', CS', '.'],[],] 
    #list of n_grams of tags, for 2 grams [['AT', 'NN'], ['NN', 'MD'], ['MD', 'HV'],[],]
    tags = [i[j:j+n_gram] for i in data for j in range(len(i) - n_gram + 1)]
    print(f" Number of {n_gram} grams present in the dataset: {len(tags)/1000000} million")
    # list of tags of cartesian product all the possible combination of pos_tag pair
    keys = [' '.join([j for j in i]) for i in list(itertools.product(pos_tags, repeat=n_gram-1))]
    keys.append("<s>") # start symbol <s>
     
    tags_ = {i:[] for i in keys} # initialize an empty dictionary, dict with
    for i in tags:
        a = ''
        for j in i[:-1]:
            a+= f"{j} "
        tags_[a.strip()].append(i[-1])
    tags_['<s>'] = [j for i in data for j in i] # list of all the tags in the dataset to count the start_prob
    
    t_matrix = [] # transition count matrix with n_gram tag and count of pos_tags as key, value pair.
    for i in tqdm(keys):
        count = Counter(tags_[i])
        d = [count[j] for j in pos_tags]
        t_matrix.append(d)

    return keys[:-1], prob(smoothing(t_matrix[:-1],name=smooth)), prob(smoothing([t_matrix[-1]],name=smooth))[0]

# Viterbi algorithm

In [None]:
def decoding(obs): 
    '''
    Viterbi decoding: "https://en.wikipedia.org/wiki/Viterbi_algorithm"
    
    obs: list of words in the sentence
    
    return: top path
    '''
    
    global pos_tags
    global t_tag
    global s_prob
    global t_prob
    global e_prob
    global n_gram
    global t_2
    
    
    path = { s:[] for s in pos_tags} # list of all the previous path pos_tags took
    curr_prob = {s:s_prob[s]*e_prob[s][obs[0]] for s in pos_tags} # first word/time-step prob
    
    for i in range(1, len(obs)):
        prev_prob = curr_prob
        curr_prob = {}
        for curr_state in pos_tags:
            if i > n_gram - 1 and n_gram > 2: # for the n_gram case
                max_prob, state = max(((prev_prob[last_state]*t_prob[f"{path[curr_state][-1:][0]} {last_state}"][curr_state]*e_prob[curr_state][obs[i]], last_state) 
                                           for last_state in pos_tags))
            elif n_gram > 2: # simple look back at the last word 
                max_prob, state = max(((prev_prob[last_state]*t_2[last_state][curr_state]*e_prob[curr_state][obs[i]], last_state) 
                                           for last_state in pos_tags))
            else:
                max_prob, state = max(((prev_prob[last_state]*t_prob[last_state][curr_state]*e_prob[curr_state][obs[i]], last_state) 
                                           for last_state in pos_tags))
            curr_prob[curr_state] = max_prob
            path[curr_state].append(state)

    # find the final largest probability
    max_prob = -1
    max_path = None
    for l, p in path.items():
        p.append(l)
        if curr_prob[l] > max_prob:
            max_path = p
            max_prob = curr_prob[l]
            
    return max_path



# Experimentation

In [None]:
#creating dataset

path = "brown.txt"
brown = txt_data(path)
train, val = brown.train, brown.val
pos_tags, vocab = brown.pos_tags, brown.vocab

In [None]:
n_splits = 3
cv = KFold(n_splits=n_splits, random_state=42, shuffle=True)
folds = {} # stores the data of train and test for n_split folds
for r, index in tqdm(enumerate(cv.split(brown.data)), total = n_splits):
    train = [j for k,j in enumerate(brown.data) if k in index[0]]
    test = [j for k,j in enumerate(brown.data) if k in index[1]]
    folds[f"fold_{r}"]= {
                            'train': train,
                            'test': test,
                            'pos_tags': brown.pos_tags,
                            'vocab': brown.vocab
    }

## Training loop

In [None]:
# run this cell twice (first time you will wrror, ignore it and run it again)
global pos_tags
global t_tag
global s_prob
global t_prob
global e_prob
global n_gram
global t_2


n_gram = 2 # markov assumption length: if it's 2 we need trigrams, thus value would be 3 (assumption length + 1) 
exp_results = {} # saving the results in the dict

p = Pool(4)


print(f"Number of folds-/-n_gram used: {len(folds)}-/-{n_gram}\n\n")
for fold, data in tqdm(folds.items()):
    save_dict = {}
    print(f"\n\n{'-'*15}: {fold} \n\nlength of train and test data : {len(data['train'])}/{len(data['test'])}")
    
    save_dict['n_gram'] =  n_gram
    
    train = data['train']
    val = data['test']
    pos_tags = data['pos_tags']
    vocab = data['vocab']
    
    print(f"Creating the emission and transition probability matrix: ")
    e_tag, e_prob = e_matrix(train, vocab, pos_tags) # emission prob and pos_tags
    print(f"Finished emission prob matrix")
    # transition prob and tags
    if n_gram == 3: # if n_gram is 3 we also need 2_gram prob for the first index
        t_tag, t_prob_2, start_probability = t_matrix(train, pos_tags, 2) 
        t_tag, t_prob, start_probability = t_matrix(train, pos_tags, n_gram) 
    if n_gram == 2: t_tag, t_prob, start_probability = t_matrix(train, pos_tags, n_gram) 
    print(f"Finished tansition prob matrix")
    
    print(f"Creating emission and transition probability dict for O(1) searching")
    # creating emission and transition probability dict for O(1) searching
    if n_gram == 3: t_2 = {i: {pos_tags[f]:j for f,j in enumerate(t_prob_2[r])} for r,i in tqdm(enumerate(pos_tags), total=len(pos_tags))}
    else: t_2 = None
    t_prob = {i: {pos_tags[f]:j for f,j in enumerate(t_prob[r])} for r,i in tqdm(enumerate(t_tag), total=len(t_tag))}
    e_prob = {i: {vocab[f]:j for f,j in enumerate(e_prob[r])} for r,i in tqdm(enumerate(e_tag), total=len(e_tag))}
    s_prob = {i:start_probability[r] for r,i in enumerate(pos_tags)} 
    # start_prob = {i:1.0 for r,i in enumerate(t_tag)} # if wanted the same starting prob for all pos_tags
    
#     break
    print(f"Starting viterbi decoding: ")
    
    out = []
    true = []
    obs = []
    for i in val[:3]:
        true.append(i)
        obs.append([j[1] for j in i])
    
    for params in tqdm(p.imap(decoding,obs), total=len(obs)):
            out.append(list(params))    

    save_dict['true_val'] = true
    save_dict['pred_val'] = out
    
    print(f"starting metric calcualtion")
    true = [i[0] for j in true for i in j]
    out = [j for i in out for j in i]
    assert len(true) == len(out)

    
    #pos to index and index to pos dict for easy searching
    pos_index = {i:e for e,i in enumerate(pos_tags)}
    index_pos = {e:i for e,i in enumerate(pos_tags)}
    
    save_dict['pos_index'] = pos_index
    
    # adding the true and predictions in a single list
    expected = [pos_index[i] for i in  true]
    predicted = [pos_index[i] for i in  out]
    
    labels = sorted(pos_index.values())
    
    eps = 0

    dummy = OrderedDict()
    for i in labels:
        dummy[str(i)] = {str(j) : eps for j in labels}
    for r, i in enumerate(expected):
        dummy[str(i)][str(predicted[r])] +=1
    
    save_dict['conf_labels'] = [k for k in dummy.keys()]
    # confusion matrix
    conf_m = np.array([[j for j in i.values()] for i in dummy.values()]) # confusion matrix
    save_dict['conf_matrix'] =  conf_m.tolist()
    
    #classwise 
    pre = np.sum(conf_m,axis=0)
    rec = np.sum(conf_m,axis=1)
    p = []
    for i in range(len(pre)):
        if pre[i] !=0 : p.append(conf_m[i,i]/pre[i])
        else: p.append(0)

    r = []
    for i in range(len(pre)):
        if rec[i] !=0 : r.append(conf_m[i,i]/rec[i])
        else: r.append(0)

    save_dict['classwise_prec_and_rec'] = [p, r]
          
    # macro
    ind = [i for i in range(len(conf_m)) if np.sum(conf_m[:,i]) == 0 and np.sum(conf_m[i,:]) == 0]
    conf_m = np.delete(conf_m,ind,1)
    conf_m = np.delete(conf_m,ind,0)
    pre = np.sum(conf_m,axis=0)
    rec = np.sum(conf_m,axis=1)
    p = []
    for i in range(len(pre)):
        if pre[i] !=0 : p.append(conf_m[i,i]/pre[i])
        else: p.append(0)

    r = []
    for i in range(len(pre)):
        if rec[i] !=0 : r.append(conf_m[i,i]/rec[i])
        else: r.append(0)
    save_dict['macro_prec_and_rec'] = [p, r]

    print("macro --",sum(p)/len(p), sum(r)/len(r))

    # macro f1 score
    f_1 = 2*(((sum(p)/len(p))*(sum(r)/len(r))))/((sum(p)/len(p))+(sum(r)/len(r)))
    print(f" Macro F1 score: {f_1}")
    save_dict['macro_f1'] = f_1
    
    # micro precision and recall
    precision = sum([i[r] for r,i in enumerate(conf_m)])/np.sum(pre) 
    recall = sum([i[r] for r,i in enumerate(conf_m)])/np.sum(rec)
    print("micro--",precision, recall)
    save_dict['micro_precision_and_recall'] = [precision.tolist(), recall.tolist()]
    
    # accuracy
    acc = len([i for i in range(len(expected)) if expected[i]==predicted[i]])/len(expected)
    print(f"Accuracy: {acc}")
    save_dict['accuracy'] = acc
    
    # micro f1 score
    f_1 = 2*((precision*recall)/(precision+recall))
    print(f"Micro F1 score: {f_1}")
    save_dict['micro_f1'] = f_1
    
    exp_results[fold] = save_dict
    break

#### Saving everything

In [None]:
# # saving results
# with open("exp_results.json", "w")as f:
#     json.dump(exp_results, f, indent=4)

### statistics of tag set

In [7]:
#loading saved results
with open('markov_1/stat.json', 'r') as f:
    exp_results = json.load(f)

# exp_results['fold_0'].keys()

In [8]:
exp_results.keys()

dict_keys(['key_0', 'key_1', 'key_2', 'key_3', 'key_4', 'key_5', 'key_6', 'key_7', 'key_8', 'key_9', 'key_10', 'key_11', 'key_12', 'key_13', 'key_14', 'key_15', 'vocab_len', 'pos_tag_len', 'max_len', 'min_len', 'average_len', 'standard_deviation_len', 'num_word_types_more_than_1_pos_tag', 'num_of_tag_vs_word_types', 'word_type_to_num_of_tag', 'tag_2_num_word_types', 'fold_0_scores_conf_m', 'fold_1_scores_conf_m', 'fold_2_scores_conf_m', 'p_a', 'r_a', 'f_a'])

In [None]:
exp_results['fold_0_scores_conf_m']

In [6]:
stat_tag = {}

stat_tag['vocab_len'] = len(brown.vocab)

stat_tag['pos_tag_len'] = len(brown.pos_tags)

sent = np.array([len(i) for i in brown.data])

stat_tag['max_len'], stat_tag['min_len'], stat_tag['average_len'], stat_tag['standard_deviation_len'] = float(max(sent)), float(min(sent)), np.mean(sent), np.std(sent)

tag = {s:[] for s in vocab}

for i in brown.data:
    for j in i:
        tag[j[1]].append(j[0])

for s in tag.keys():
    tag[s] = len(set(tag[s]))

a = sorted([i for i in tag.values() if i >1], reverse=True)
    
stat_tag['num_word_types_more_than_1_pos_tag'] = len([i for i in tag.values() if i >1])
stat_tag['num_of_tag_vs_word_types'] = Counter(a)
stat_tag['word_type_to_num_of_tag'] = sorted(tag.items(), key = lambda x: x[1], reverse=True)
print(f"Number of word types having more than 1 tag: {len([i for i in tag.values() if i >1])}")

tag = {s:[] for s in pos_tags}

for i in brown.data:
    for j in i:
        tag[j[0]].append(j[1])

for s in tag.keys():
    tag[s] = len(set(tag[s]))

stat_tag['tag_2_num_word_types'] = sorted(tag.items(), key = lambda x: x[-1], reverse=True)

p_a, r_a, f_a = 0,0,0

for k,value in exp_results.items():
    dummy = {}
    
    conf_m = np.array(exp_results[k]['conf_matrix'])
    dummy['conf_m'] = conf_m.tolist()
    #classwise 
    pre = np.sum(conf_m,axis=0)
    rec = np.sum(conf_m,axis=1)
    p = []
    for i in range(len(pre)):
        if pre[i] !=0 : p.append(conf_m[i,i]/pre[i])
        else: p.append(0)

    r = []
    for i in range(len(pre)):
        if rec[i] !=0 : r.append(conf_m[i,i]/rec[i])
        else: r.append(0)

    dummy['macro_classwise_prec_and_rec'] = [p, r]
    
    # macro
    ind = [i for i in range(len(conf_m)) if np.sum(conf_m[:,i]) == 0 and np.sum(conf_m[i,:]) == 0]
    conf_m = np.delete(conf_m,ind,1)
    conf_m = np.delete(conf_m,ind,0)
    pre = np.sum(conf_m,axis=0)
    rec = np.sum(conf_m,axis=1)
    p = []
    for i in range(len(pre)):
        if pre[i] !=0 : p.append(conf_m[i,i]/pre[i])
        else: p.append(0)

    r = []
    for i in range(len(pre)):
        if rec[i] !=0 : r.append(conf_m[i,i]/rec[i])
        else: r.append(0)
            
    dummy['macro_pre_rec'] = [sum(p)/len(p), sum(r)/len(r)]
    print("macro --",sum(p)/len(p), sum(r)/len(r))
    p_a += sum(p)/len(p)
    r_a += sum(r)/len(r)
    # macro f1 score
    f_1 = 2*(((sum(p)/len(p))*(sum(r)/len(r))))/((sum(p)/len(p))+(sum(r)/len(r)))
    f_a += f_1
    print(f" Macro F1 score: {f_1}")
    dummy['macro_f1_score'] = f_1
    
    stat_tag[f"{k}_scores_conf_m"] = dummy

stat_tag['p_a'] = p_a/len(exp_results.keys())
stat_tag['r_a'] = r_a/len(exp_results.keys())
stat_tag['f_a'] = f_a/len(exp_results.keys())
    
dummy = {f"key_{i}":j for i,j in enumerate(stat_tag.keys())}
for k,v in stat_tag.items():
    dummy[k] = v

NameError: name 'brown' is not defined

In [None]:
# # saving results
# with open("stat.json", "w")as f:
#     json.dump(dummy, f,indent=4)

##### Plot words with having more than N tags to get a perspective.

In [None]:
import matplotlib.pyplot as plt
a = [i for i in tag.values() if i >1]
plt.scatter(range(len(a)),a)
len(a), len(tag)

# question 3
###### Which word types are most frequently tagged incorrectly by the HMM, and why?

In [None]:
pred = [j for i in exp_results['fold_0']['pred_val'] for j in i ]
true = [j[0]for i in exp_results['fold_0']['true_val']  for j in i ]
word = [j[1] for i in exp_results['fold_0']['true_val']  for j in i ]

In [None]:
a = ''
incorrect = {s: [] for s in set(word)} # word types:[pred, true] for incorrect predictions
for i in range(len(true)): 
    if pred[i] != true[i] : incorrect[word[i]].append([[pred[i]], true[i]])

In [None]:
wt_pw = {w:len(v) for w,v in incorrect.items() if len(v)>0}    
a += f"Percentage of words incorrectly classified: {len(wt_pw)/len(set(word))*100}\n\n\n"

In [None]:
most_incorrectly = sorted(incorrect.items(), key=lambda x: len(x[1]), reverse=True)

In [None]:
a+= f"Word types predicted incorrectly atleast 100 times are: \n\n"
z = []
for i in most_incorrectly:
    a+=f"word type ({i[0].upper()}) predicted incorrectly {len(i[1])} times \n"
    z.append(i[0])
#     print([i[0],len(i[1])] )
    if len(i[1]) <= 100:
         break
print("\n\n\n")

In [None]:
z_ = [i for i in stat_tag['word_type_to_num_of_tag'] if i[0] in z] # word types predictred correctly

In [None]:
z_

In [None]:
for m in range(1,max(wt_pw.values())+1):
    c = 0
    for k,v in wt_pw.items():
        if v>=m:c+=1
#     print(f"number of words predicted incorrect more than {m} times are {c}")
    a+= f"number of words predicted incorrect more than {m} times are {c}\n"
    
print("\n\n\n")

In [None]:
# #saving question 3 stats
# with open('q3.txt', "w") as f:
#     f.write(a)