First, implement the Viterbi algorithm for finding the
optimal state (tag) sequence given the sequence of observations (words). We
suggest you test your implementation on a small example for which you
know the correct tag sequence, such as the Eisner’s Ice Cream HMM from
the lecture. Make sure your Viterbi algorithm runs properly on the example
before you proceed to the next step.

## Viterbi Algorithm

In [7]:
import numpy as np

In [99]:
states = ['h', 'c'] # states 
states2ind = {'h': 0, 'c': 1}
obs = [3,1,3] # observations
obs2ind = {'1': 0, '2': 1, '3': 2}
Im = np.array([0.8, 0.2]) # follows the order of states as in Q
Tm = np.array([[0.7, 0.3],[0.4, 0.6]])
Em = np.array([[0.2, 0.4, 0.4], [0.5, 0.4, 0.1]])

In [100]:
str(obs[0])

'3'

In [12]:
# perform sanity check to confirm if all the probabilities corresponding to a state sum to 1 

def perform_sanity_check(Im, Tm, Em):
    assert(np.sum(Im) == 1)
    assert(all(np.sum(Tm, axis=1)) == 1)
    assert(all(np.sum(Em, axis=1)) == 1)
    
perform_sanity_check(Im, Tm, Em)

In [8]:
# viterbi algo for POS tagging i.e. tagger implementation 
def viterbi(states, obs, Im, Tm, Em, obs2ind):
    
    Vp = np.zeros((len(states), len(obs))) # matrix of max probabilities of dim. KxT where T = number of obs/transitions and K = number of states
    Vs = np.zeros((len(states), len(obs)))# matrix that contains prev state corresponding to the max prob. of states stored in Vp; KxT dim
    
    
    # initial run
    for s in range(len(states)):
        if str(obs[0]) not in obs2ind: # if we encounter an unknown word, 
            E_s_w = 1
        else:
            E_s_w = Em[s][obs2ind[str(obs[0])]]
        
        Vp[s, 0] = np.multiply(Im[s], E_s_w)
        Vs[s, 0] = -1 # -1 invalid state to mark that prev. state doesn't exist
    
    # transtion from t = 1,...,T
    for t, y_obs in enumerate(obs):
        
        if t == 0:
            continue
            
        # check if y_obs exists in the vocabulary
        if str(y_obs) not in obs2ind: # if we encounter an unknown word, 
            y_obs_ind = -1  
        else:
            y_obs_ind = obs2ind[str(y_obs)]
        
        for s in range(len(states)):
            #print(Vp[:, t-1], Tm[:,s], Em[s][y_obs-1])
            
            # apply smoothing -> crude approach 
            if y_obs_ind == -1:
                Em_s_w = 1
            else:
                Em_s_w = Em[s][y_obs_ind]
                
            Vp[s, t] = max(np.multiply(np.multiply(Vp[:, t-1], Tm[:,s]), Em_s_w))
            Vs[s,t] = np.argmax(np.multiply(np.multiply(Vp[:, t-1], Tm[:,s]), Em_s_w))
   
    #print(Vp, Vs)     
    
    Z = np.ones((len(obs))).astype(int) # converted to int, because Zi would be used as an index
    X = ['']*len(obs) # list of hidden states 
    Z[-1] = np.argmax(Vp[:,len(obs)-1]) # index of the state that gives the max probability of observing 'o' at t=T
    X[-1] = states[Z[-1]]    # the corresponding hidden state at t=T 
    
    # loop from right to left and fill in the prev states that are marked by the back_pointers at state, t. 
    for t in range(len(Z)-1, 0, -1):
        Z[t-1] = Vs[Z[t]][t]
        X[t-1] = states[Z[t-1]]
        
    return(X)
    
    
    

#print(viterbi(states, obs, Im, Tm, Em, obs2ind))

### Training. 
Second, learn the parameters of your HMM from data, i.e. the
initial, transition, and emission probabilities. Implement a maximum like-
lihood training procedure for supervised learning of HMMs.

In [17]:
# first load and read the corpus - fetch unique words, POS-tags, all_words, all_tags 
# ek emission prob matrix, transition prob matrix, initial prob
# viterbi 
# some preprocessing?
type(reader_corpus)

nltk.corpus.reader.conll.ConllCorpusReader

In [1]:
from nltk.corpus.reader import ConllCorpusReader, TaggedCorpusReader

def read_corpus(path='de-utb/de-train.tt', columntypes=('words', 'pos')): # by default reads the training data
    reader_corpus = ConllCorpusReader('.', path, columntypes=columntypes)
    print(len(reader_corpus.tagged_words()))
    return reader_corpus
    
reader_corpus = read_corpus()

264906


In [2]:
def obj_dict_index(list_obj):
    return dict(zip(list_obj, range(0,len(list_obj))))

def record_states_from_corpus(reader_corpus_obj):
    states = list(set([tup[1] for tup in reader_corpus.tagged_words()])) # list of unique POS tags
    #states.sort()
    state_index = obj_dict_index(states)
    return states, state_index

def record_obs_from_corpus(reader_corpus_obj):
    obs = list(set([tup[0] for tup in reader_corpus.tagged_words()])) # list of unique observations
    #obs.sort()
    obs_index = obj_dict_index(obs)
    return obs, obs_index

In [3]:
states, state_index = record_states_from_corpus(reader_corpus)
obs, obs_index = record_obs_from_corpus(reader_corpus)

In [4]:
#states = list(([tup[1] for tup in reader_corpus.tagged_words()])) # list of all POS tags
#states.sort() # the following order will be followed in all the matrices
#state_index = dict(zip(states, range(0,len(states)))) # states_2_index
state_index

{'.': 2,
 'ADJ': 0,
 'ADP': 7,
 'ADV': 10,
 'CONJ': 11,
 'DET': 8,
 'NOUN': 1,
 'NUM': 5,
 'PRON': 3,
 'PRT': 4,
 'VERB': 6,
 'X': 9}

In [5]:
#obs = list(set([tup[0] for tup in reader_corpus.tagged_words()])) # list of unique observations
#obs.sort()
#obs_index = dict(zip(obs, range(0,len(obs)))) # obs_word_2_index
obs_index['Der']

21693

In [9]:
Im = np.zeros(len(states)) #  initial probab matrix for all states - len K
Em = np.zeros((len(states), len(obs))) # dim KxN
Tm = np.zeros((len(states), len(states))) # dim KxK

In [10]:
for sent in reader_corpus.tagged_sents():
    prev_state_index = -1
    for i, tup in enumerate(sent):
        #print(tup[1], tup[0])
        hidden_state_index = state_index[tup[1]]
        obs_col = obs_index[tup[0]]
        #print(state_index[tup[1]])
        
        if i == 0: # first word of the sentence 
            Im[hidden_state_index] += 1
        else:
            Tm[prev_state_index][hidden_state_index] += 1
        
        Em[hidden_state_index][obs_col] += 1
        prev_state_index = hidden_state_index

# divide the count by the corresponding totals to get probability         
Im /= len(reader_corpus.tagged_sents()) 
Tm /= np.sum(Tm, axis=1).reshape(-1,1)
Em /= np.sum(Em, axis=1).reshape(-1,1)            

In [13]:
perform_sanity_check(Im, Tm, Em)

The test set (de-test.t) is a copy of the evaluation set with tags stripped, as you should
tag the test set using your tagger and then compare your results with the
gold-standard ones in the evaluation set. The corpus uses the 12-tag univer-
sal POS tagset by Petrov et al. (2012)

In [125]:
#test_corpus = TaggedCorpusReader('.', r'de-utb/de-test.t')

#test_obs = record_obs_from_corpus(test_corpus)
#for sent in test_corpus.tagged_sents():
#    print(sent)
#    break

In [14]:
eval_corpus = ConllCorpusReader('.', 'de-utb/de-eval.tt', columntypes=('words', 'pos'))
predicted_tags = []

for sent in eval_corpus.tagged_sents():
    words = [tup[0] for tup in sent]
    predicted_tags.append(viterbi(states, words, Im, Tm, Em, obs_index))

In [15]:
predicted_tags

[['DET',
  'NOUN',
  'VERB',
  'ADP',
  'NOUN',
  '.',
  'CONJ',
  'PRON',
  'PRON',
  'ADP',
  'NOUN',
  '.'],
 ['PRON',
  'VERB',
  'ADV',
  'NUM',
  'VERB',
  '.',
  'VERB',
  'CONJ',
  'CONJ',
  'PRON',
  'DET',
  'NOUN',
  'ADV',
  'VERB',
  'VERB',
  'PRON',
  'ADV',
  'ADV',
  'ADP',
  'ADJ',
  'NOUN',
  'DET',
  'NOUN',
  'CONJ',
  'DET',
  'NOUN',
  'VERB',
  '.'],
 ['VERB', 'ADV', 'ADV', 'DET', 'NOUN', '.'],
 ['.', 'NOUN', 'NOUN', 'ADP', 'NOUN', 'VERB', '.', '.'],
 ['.', 'NOUN', 'NOUN', 'VERB', 'PRON', 'ADV', 'NUM', 'VERB', '.', '.'],
 ['.',
  'ADP',
  'DET',
  'NOUN',
  'CONJ',
  'NOUN',
  'DET',
  'ADJ',
  'NOUN',
  'VERB',
  'PRON',
  'ADP',
  'NOUN',
  'ADV',
  'ADV',
  'ADV',
  'ADP',
  'ADJ',
  'NOUN',
  'VERB',
  '.',
  '.'],
 ['.', 'DET', 'NOUN', 'NOUN', 'VERB', 'ADV', 'ADV', 'VERB', '.', '.'],
 ['NUM', 'NOUN', 'ADJ', 'ADV', 'VERB', 'VERB', 'VERB', '.'],
 ['DET', 'NOUN', 'VERB', 'PRON', 'VERB', '.'],
 ['ADV',
  'ADV',
  'VERB',
  '.',
  'PRON',
  'PRON',
  'ADP',
  'A

In [16]:
def write_tags_to_file(predicted_tags:list, test_file='./de-utb/de-test.t', tagged_file='de-utb/de-tagged.tt'):
    with open(test_file) as inp_file, open(tagged_file, 'w') as op_file:
        i = 0
        j = 0
        for line in inp_file:
            if line == '\n': # delimiter marks the end of sentence
                i += 1
                j = 0
                op_file.write("\n")
                continue 
            word = line.strip() # remove any whitespace chars
            tag = predicted_tags[i][j]
            j += 1
            op_file.write(word+"\t"+tag+"\n")
            

write_tags_to_file(predicted_tags)

In [81]:
eval_obs[1]

'schäumt'

In [82]:
eval_obs_in[eval_obs[1]]

1

In [73]:
Em[:][3]

array([1., 1., 1., ..., 1., 1., 1.])

In [158]:
with open('de-utb/de-test.t') as f:
    lines = f.read()
    print(lines)

Der
Hauptgang
war
in
Ordnung
,
aber
alles
andere
als
umwerfend
.

Ich
habe
dort
2007
meinen
OWD
gemacht
und
weil
mir
das
Tauchen
so
gefiel
hab
ich
dort
noch
im
selben
Jahr
den
AOWD
und
den
Deep
drangehängt
.

Ist
ja
wohl
ein
Witz
.

"
Bitte
Termin
im
Internet
machen
.
"

"
Welcher
Kollege
hat
Ihnen
denn
99
gesagt
?
"

(
Bei
den
Damen
und
Herren
des
besagten
Mobilfunkanbieters
bin
ich
im
Gegensatz
dazu
bisher
ausschließlich
auf
arrogante
Unkenntnis
gestossen
)
.

(
Der
Shop
Thomann
ist
noch
sehr
vielversprechender
!
)

10
Euro
günstiger
selbst
hätte
kaufen
können
.

17.45
Uhr
hatte
ich
bestellt
.

Aber
mal
abwarten
,
was
sich
in
näherer
Zukunft
abspielt
....

Aber
über
die
Freundlichkeit
,
Zuverlässlichkeit
und
Komptenz
des
gesamten
Team
kann
man
nur
eines
Sagen
--
PEFERKTION

Absolut
empfehlenswert
ist
auch
der
Service
.

Alle
am
Wort
Gottes
interessierte
Personen
sind
herzlich
eingelanden
,
Gottesdienst
mit
Liedern
,
Gebeten
und
Predigten
mit
den
Mitgliedern
zu
feiern
.

Alles
in
Alle