In [1]:
# robot localization
import numpy as np

T = np.array([[0,1/3,0,1/2,0,0],
              [1/2,0,1,0,1/3,0],
              [0,1/3,0,0,0,0],
              [1/2,0,0,0,1/3,0],
              [0,1/3,0,1/2,0,1],
              [0,0,0,0,1/3,0]])

Owall = np.diag([1/2, 1/4, 1/2, 1/4, 0, 1/4])
Ohash = np.diag([0, 0, 1/4, 1/4, 1/4, 1/2])

v, w = np.linalg.eig(T)
pi = w[:,1] / np.sum(w[:,1])
print("eigenvalues:", v)
print("stationary:", pi)

X0 = np.array([0,0,0,0,1,0])
X1e1 = Owall@T@X0
X1e1 = X1e1 / np.sum(X1e1)
print("X1e1 =", X1e1)

X2e1e2 = Ohash@T@X1e1
X2e1e2 = X2e1e2 / np.sum(X2e1e2)
print("X2e1e2 =", X2e1e2)

# (E,X1,X2) sequences with nonzero probability: BC, BE, DE, FE
BC = T[2,1]
BE = T[4,1]
DE = T[4,3]
FE = T[4,5]
joint = np.array([BC,BE,DE,FE])/(BC+BE+DE+FE)
print("X1X2e1e2 =", joint)

b = T.T@Ohash@np.ones(6)
X1e1e2 = X1e1*b/(np.sum(X1e1*b))
print("X1e1e2 from f * b =", X1e1e2)
print("X1e1e2 from joint =", np.array([0,joint[0]+joint[1],0,joint[2],0,joint[3]]))

eigenvalues: [-1.          1.         -0.5        -0.33333333  0.33333333  0.5       ]
stationary: [0.16666667 0.25       0.08333333 0.16666667 0.25       0.08333333]
X1e1 = [0.         0.33333333 0.         0.33333333 0.         0.33333333]
X2e1e2 = [0.         0.         0.15384615 0.         0.84615385 0.        ]
X1X2e1e2 = [0.15384615 0.15384615 0.23076923 0.46153846]
X1e1e2 from f * b = [0.         0.30769231 0.         0.23076923 0.         0.46153846]
X1e1e2 from joint = [0.         0.30769231 0.         0.23076923 0.         0.46153846]


In [2]:
import numpy as np

def read_sentence(f):
  sentence = []
  while True:
    line = f.readline()
    if not line or line == '\n':
      return sentence
    line = line.strip()
    word, tag = line.split("\t", 1)
    sentence.append((word, tag))

def read_corpus(file):
  f = open(file, 'r', encoding='utf-8')
  sentences = []
  while True:
    sentence = read_sentence(f)
    if sentence == []:
      return sentences
    sentences.append(sentence)

POS = ['ADJ', 'ADP', 'ADV', 'AUX', 'CCONJ', 'DET', 'INTJ', 'NOUN', 'NUM', 
        'PART', 'PRON', 'PROPN', 'PUNCT', 'SCONJ', 'SYM', 'VERB', 'X']

In [3]:
def learn_model(data):
  num_pos = len(POS)
  pos_counts = np.zeros(num_pos)
  X0 = np.zeros(num_pos)
  Tprob = np.zeros((num_pos,num_pos))
  Oprob = {}

  for sent in data:
    for i in range(len(sent)):
      word = sent[i][0]
      pos = POS.index(sent[i][1])
      pos_counts[pos] += 1

      if i == 0: X0[pos] += 1
      if i < len(sent)-1:
        Tprob[POS.index(sent[i+1][1]), pos] += 1
      if word not in Oprob:
        Oprob[word] = np.zeros(num_pos)
      (Oprob[word])[pos] += 1

  X0 = X0 / np.sum(X0)
  Tprob = Tprob / np.sum(Tprob, axis=0)
  for word, counts in Oprob.items():
    Oprob[word] = counts / pos_counts

  return X0, Tprob, Oprob

In [4]:
def viterbi_forward(X0, Tprobs, Oprobs, obs):
  m = X0
  pointers = np.zeros((len(obs), len(X0)), dtype='int')
  for i in range(len(obs)):
    Tm = Tprobs * np.tile(m,(len(X0),1))
    m = (Oprobs[obs[i]] if obs[i] in Oprobs else 1) * np.amax(Tm, axis=1)
    pointers[i] = np.argmax(Tm, axis=1)
  return m, pointers

def viterbi_backward(m, pointers):
  p = np.argmax(m)
  seq = [POS[p]]
  for i in range(pointers.shape[0]-1, 0, -1):
    p = pointers[i,p]
    seq.append(POS[p])
  seq.reverse()
  return seq

In [5]:
def evaluate_viterbi(X0, T, O, data):
  correct = 0
  num_words = 0

  for sentence in data:
    words = [sent[0] for sent in sentence]
    pos = [sent[1] for sent in sentence]
    m, p = viterbi_forward(X0, T, O, words)
    predict = viterbi_backward(m, p)
    correct += sum(1 for i,j in zip(pos, predict) if i==j)
    num_words += len(words)
  
  return correct/num_words

train = read_corpus('train.upos.tsv')
test = read_corpus('test.upos.tsv')
X0, T, O = learn_model(train)
print("Train accuracy:", evaluate_viterbi(X0, T, O, train))
print("Test accuracy:", evaluate_viterbi(X0, T, O, test))

Train accuracy: 0.9527044529268305
Test accuracy: 0.8832131330437901


In [6]:
def forward(X0, Tprobs, Oprobs, obs):
  a = X0
  for i in range(len(obs)):
    a = (Oprobs[obs[i]] if obs[i] in Oprobs else 1) * (Tprobs @ a)
  return a

def backward(Tprobs, Oprobs, obs, k):
  b = np.ones(Tprobs.shape[0])
  for i in range(len(obs)-1, k, -1):
    b = Tprobs.T @ ((Oprobs[obs[i]] if obs[i] in Oprobs else 1) * b)
  return b

def forward_backward(X0, Tprobs, Oprobs, obs, k):
  f = forward(X0, Tprobs, Oprobs, obs[:k+1])
  b = backward(Tprobs, Oprobs, obs, k)
  return f*b/(np.sum(f*b))

In [7]:
def expected_emissions(X0, Tprobs, Oprobs, data):
  expO = {}
  total = np.zeros(len(X0))

  for obs in data:
    for k in range(len(obs)):
      if obs[k] not in expO:
        expO[obs[k]] = np.zeros(len(X0))
      gamma = forward_backward(X0, Tprobs, Oprobs, obs, k)
      expO[obs[k]] += gamma
      total += gamma

  for word, counts in expO.items():
    expO[word] = counts/total

  return expO

obs = [[pair[0] for pair in sentence] for sentence in [test[0]]]
Onew = expected_emissions(X0, T, O, obs)
print(Onew)

{'What': array([0.        , 0.        , 0.        , 0.        , 0.        ,
       0.07883701, 0.        , 0.        , 0.        , 0.        ,
       0.93862028, 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        ]), 'if': array([0.        , 0.02132942, 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.99871082, 0.        ,
       0.        , 0.        ]), 'Google': array([0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.63967522, 0.        , 0.        , 0.        ,
       0.        , 0.        ]), 'Morphed': array([0.12187301, 0.02587711, 0.5086282 , 0.99806667, 0.9674151 ,
       0.01997999, 0.02880006, 0.37986506, 0.30991731, 0.9714963 ,
       0.00515111, 0.1654484 , 0.14666717, 0.00104646, 0.14593278,
       0.97421953, 0.03689384]), 'Into': array([0.        , 0