In [1]:
import math
import json
import numpy as np
import pickle
import sys
import pandas as pd
import tqdm

from pathlib import Path
from scipy.optimize import fmin_l_bfgs_b
from utils.corpus import Corpus
from utils.dataset import Dataset
from utils.features import Features
from utils.parser import Parser
from utils.history import History

In [21]:
# configure path
text = '--file assets/train2.wtag --n-pairs 500 --n-unigrams 32 --n-bigrams 250 --n-trigrams 500 --n-prefixes 100 --n-suffixes 100 --n-prev-w-curr-t 100 --n-next-w-curr-t 100'
text = text.split()
text.insert(0,str(Path().absolute())+"\\viterbi algorithm")
print(text)
sys.argv = text

['C:\\Users\\danie\\Desktop\\קורסים\\עיבוד שפה טבעית\\תרגילי בית\\תרגיל 1\\ie097215-hw1-master\\ie097215-hw1-master\\viterbi algorithm', '--file', 'assets/train2.wtag', '--n-pairs', '500', '--n-unigrams', '32', '--n-bigrams', '250', '--n-trigrams', '500', '--n-prefixes', '100', '--n-suffixes', '100', '--n-prev-w-curr-t', '100', '--n-next-w-curr-t', '100']


In [22]:
# creates features 
opts = Parser.train()
corpus = Corpus(opts.file)
corpus.filter(
        pairs=opts.n_pairs, unigrams=opts.n_unigrams, bigrams=opts.n_bigrams,
        trigrams=opts.n_trigrams, prefixes=opts.n_prefixes, suffixes=opts.n_suffixes,
        pairs_prev_w_curr_t=opts.n_prev_w_curr_t, pairs_next_w_curr_t=opts.n_next_w_curr_t
    )

features = Features()
features.from_data(corpus)
opts.ds_tags = list(corpus.unigrams.keys())
dataset = Dataset(opts.file, features, opts.ds_tags)

processing corpus: 100%|████████████████████████████████████████████████████████████| 250/250 [00:01<00:00, 161.87it/s]
processing dataset: 100%|████████████████████████████████████████████████████████████| 250/250 [03:31<00:00,  1.18it/s]


In [4]:
def get_S(n, labels):
    # creates labels for each step in the algoritem
    S = []
    for i in range(n+3):
        if i<=1:
            S.append('*')
        elif i == n+2:
            S.append('.')
        else:
            S.append(labels)
    return S

In [5]:
def k_largest_index_argsort(A, k):
    #return k largest index in a 2d matrix
    idx = np.argsort(A.ravel())[:-k-1:-1]
    return np.column_stack(np.unravel_index(idx, A.shape))

In [7]:
def get_prob_base(labels, sentence, t_2_labels, t_1_labels, ind, weights, calc_ind):
    ''' 
    calculates the probability for each possible history
    labels - list of all possible labale for the current word
    sentence - current sentence
    t_2_labels - list of all possible labale for the t-2 word
    t_1_labels - list of all possible labale for the t-1 word
    ind - current word index
    calc_ind = boolean matrix for Beam search of shape(len(t_2_labels), len(t_1_labels))
    '''
    e_f = np.zeros((len(t_2_labels), len(t_1_labels), len(labels)))
    e_f_sum = np.zeros((len(t_2_labels), len(t_1_labels)))
    for i in range(len(t_2_labels)):
        for j in range(len(t_1_labels)):
            for q in range(len(labels)):
                if calc_ind[i,j] == 1:
                    tags = (t_2_labels[i], t_1_labels[j], labels[q])
                    hist = History(sentence, tags, ind)
                    f = features.to_vec(history=hist)
                    e_f[i, j, q] = np.exp(f @ weights)
    e_f_sum = np.sum(e_f, axis=2)
    prob = e_f/e_f_sum[..., None]
    
    np.nan_to_num(prob, copy=False)
    return prob

In [8]:
def Viterbi(sentence, weights, features, labels, Beam=0):
    #forward
    n = len(sentence)
    l = len(labels)
    pi = np.zeros((n, l, l))
    bp = np.zeros((n, l, l))
    S = get_S(n, labels)
    for k in range(n):
        if k == 0:
            #pi[-1] = 1
            prob = get_prob_base(S[k+2], sentence, S[k], S[k+1], k, weights, np.ones((1,1)))
            pi[k,0,:] = np.max(prob, axis=0)
        else:
            prob = get_prob_base(S[k+2], sentence, S[k], S[k+1], k, weights, to_calc)
            temp = pi[k-1,:,:,np.newaxis]*prob
            pi[k,:,:] = np.max(temp, axis=0)
            bp[k,:,:] = np.argmax(temp, axis=0)

        to_calc = np.zeros((len(S[k+1]), len(S[k+2])))
        if Beam == 0:
            to_calc += 1
        else:
            c = k_largest_index_argsort(pi[k,:,:], Beam)
            to_calc[c[:,0],c[:,1]] = 1
    #backward
    t = np.zeros(n).astype(int).tolist()
    # argmax change the matric into vectot and then returns the ind of the max
    ind = np.argmax(pi[-1,:,:])
    # calc the row of the argmax
    t[n-2] = math.floor(ind/pi[-1,:,:].shape[0])
    # calc the column
    t[n-1] = ind-t[n-2]*pi[-1,:,:].shape[0]
    for k in range(3,n+1):
        t[n-k] = bp[n-k+2, t[n-k+1], t[n-k+2]].astype(int)
    return t, pi, bp

In [9]:
# load weights and all words and tags for accuracy testing
weights = pd.read_pickle("models/05-17_19_03_43_trained_weights_data_2.pkl")[0]
line_words = []
line_tags = []
with open(sys.argv[2], "r") as f:
        for line in tqdm.tqdm(f.readlines(), desc="processing dataset"):
            pairs = [tuple(s.split("_")) for s in line.split()]

            line_words.append(list(list(zip(*pairs))[0]))
            line_tags.append(np.array(list(list(zip(*pairs))[1])))


processing dataset: 100%|█████████████████████████████████████████████████████████| 250/250 [00:00<00:00, 22768.40it/s]


In [10]:
#predict
predict = []
tags = np.array(opts.ds_tags)
i = 0
correct = 0
total = 0
for sentance in tqdm.tqdm(line_words):
    pred, pi, bp = Viterbi(sentance, weights, features, opts.ds_tags, 2)
    predict.append(pred)
    correct += np.sum(tags[pred]==line_tags[i])
    total += line_tags[i].shape[0]
    i += 1


  prob = e_f/e_f_sum[..., None]
100%|████████████████████████████████████████████████████████████████████████████████| 250/250 [09:17<00:00,  2.23s/it]


In [23]:
accuracy = correct/total
print(f"Train accuracy: {accuracy * 100 :.2f}%")

Train accuracy: 87.20%
