In [1]:
# pretty printing our distributions
import pandas as pd
from tabulate import tabulate
import numpy as np

In [2]:
def generate_sequence(states, sequence_length):
  all_sequences = []
  nodes = []
  depth = sequence_length
  def gen_seq_recur(states,nodes,depth):
    if depth == 0:
      #print nodes
      all_sequences.append(nodes)
    else:
      for state in states:
        temp_nodes = list(nodes)
        temp_nodes.append(state)
        gen_seq_recur(states, temp_nodes, depth-1)

  gen_seq_recur(states, [],depth)

  return all_sequences

In [3]:
def score_sequences(sequences, initial_probs, transition_probs, emission_probs, obs):

  best_score = -1
  best_sequence = None

  sequence_scores = []
  for seq in sequences:
    total_score = 1
    total_score_breakdown = []
    first = True
    for i in range(len(seq)):
      state_score = 1
      # compute transitition probability score
      if first == True:
        state_score *= initial_probs[seq[i]]
        # reset first flag
        first = False
      else:
        state_score *= transition_probs[seq[i] + "|" + seq[i-1]]
        # add to emission probability score
        state_score *= emission_probs[obs[i] + "|" + seq[i]]
        # update the total score
        #print state_score
        total_score *= state_score
        total_score_breakdown. append(total_score)

    if(total_score > best_score):
      best_score = total_score
      best_sequence = total_score_breakdown

    sequence_scores.append(total_score)

  return best_sequence, sequence_scores

In [4]:
def pretty_print_probs(distribs):
    rows = set()
    cols = set()
    for val in distribs.keys():
        temp = val.split("|")
        rows.add(temp[0].strip())  # strip whitespace around states
        cols.add(temp[1].strip())  # strip whitespace around states

    rows = list(rows)
    cols = list(cols)
    df = []

    for i in range(len(rows)):
        temp = []
        for j in range(len(cols)):
            # Attempt to find the key in both formats
            key_candidate = f"{rows[i]}|{cols[j]}"
            if key_candidate in distribs:
                temp.append(distribs[key_candidate])
            else:
                # Try the version with spaces around the pipe
                key_candidate = f"{rows[i]} | {cols[j]}"
                if key_candidate in distribs:
                    temp.append(distribs[key_candidate])
                else:
                    temp.append('N/A')  # If neither key is found, append 'N/A'
        df.append(temp)

    I = pd.Index(rows, name="rows")
    C = pd.Index(cols, name="cols")
    df = pd.DataFrame(data=df, index=I, columns=C)

    print(tabulate(df, headers='keys', tablefmt='psql'))


In [5]:
def initializeSequences(states,prob_dict, initial_probs,_obs):
  # Generate list of sequences
  transition_probs = prob_dict[0]
  emission_probs = prob_dict[1]
  seqLen = len(_obs)
  seqs = generate_sequence(states, seqLen)
  # Score sequences
  best_seq, seq_scores = score_sequences(seqs, initial_probs, transition_probs, emission_probs,_obs)

  return (seqLen, seqs, best_seq, seq_scores)

In [6]:
def viterbi(states, prob_dict, initial_probs, _obs):
    # Generate list of sequences
    n = len(states)
    transition_probs = prob_dict[0]
    emission_probs = prob_dict[1]
    viterbi_list = []
    cache = {}

    for state in states:
        # Remove spaces around the pipe when accessing the emission probabilities
        viterbi_list.append(initial_probs[state] * emission_probs[_obs[0] + "|" + state])

    for i, ob in enumerate(_obs):
        if i == 0: continue
        temp_list = [None] * n
        for j, state in enumerate(states):
            x = -1
            for k, prob in enumerate(viterbi_list):
                val = prob * transition_probs[state + "|" + states[k]] * emission_probs[ob + "|" + state]
                if (x < val):
                    x = val
                    cache[str(i) + "-" + state] = [states[k], val]
            temp_list[j] = x
        viterbi_list = [x for x in temp_list]

    return cache, viterbi_list


In [7]:
def viterbi_backward(states,cache,viterbi_list):

  num_states = len(states)
  n = len(cache)//num_states
  best_sequence = []
  best_sequence_breakdown=[ ]
  x = states[np.argmax(np.asarray(viterbi_list))]
  best_sequence.append(x)

  for i in range(n,0,-1):
    val = cache[str(i)+'-'+x][1]
    x = cache[str(i)+'-'+x][0]
    best_sequence = [x] + best_sequence
    best_sequence_breakdown = [val]+best_sequence_breakdown

  return best_sequence, best_sequence_breakdown

In [8]:
def get_probs_ex2():
  transition_probs = {
      'Noun|Noun': 0.1, 'Noun|Verb': 0.1, 'Noun|Determiner': 0.8,
      'Verb|Noun': 0.8, 'Verb|Verb': 0.1, 'Verb|Determiner': 0.1,
      'Determiner|Noun': 0.1, 'Determiner|Verb': 0.8, 'Determiner|Determiner': 0.1
  }
  emission_probs = {
      'Bob|Noun': 0.9, 'ate|Noun': 0.05, 'the|Noun': 0.05, 'fruit|Noun': 0.9,
      'Bob|Verb': 0.05, 'ate|Verb': 0.9, 'the|Verb': 0.05, 'fruit|Verb': 0.05,
      'Bob|Determiner': 0.05, 'ate|Determiner': 0.05, 'the|Determiner': 0.9, 'fruit|Determiner': 0.05
  }
  return transition_probs, emission_probs

# generate new sequences
states2 = ['Noun','Verb' , 'Determiner' ]

initial_probs2 = {'Noun':0.9,'Verb':0.05,'Determiner':0.05}

transition_probs, emission_probs = get_probs_ex2()

print("Initial Distributions")
print(initial_probs2)
print("\nTransition Probabilities")
pretty_print_probs(transition_probs)
print("\nEmission Probabilities")
pretty_print_probs(emission_probs)

Initial Distributions
{'Noun': 0.9, 'Verb': 0.05, 'Determiner': 0.05}

Transition Probabilities
+------------+--------------+--------+--------+
| rows       |   Determiner |   Verb |   Noun |
|------------+--------------+--------+--------|
| Determiner |          0.1 |    0.8 |    0.1 |
| Verb       |          0.1 |    0.1 |    0.8 |
| Noun       |          0.8 |    0.1 |    0.1 |
+------------+--------------+--------+--------+

Emission Probabilities
+--------+--------------+--------+--------+
| rows   |   Determiner |   Verb |   Noun |
|--------+--------------+--------+--------|
| fruit  |         0.05 |   0.05 |   0.9  |
| Bob    |         0.05 |   0.05 |   0.9  |
| the    |         0.9  |   0.05 |   0.05 |
| ate    |         0.05 |   0.9  |   0.05 |
+--------+--------------+--------+--------+


In [9]:
obs = ['Bob','ate','the','fruit' ]

# print results
print("\nScores")

# Generate list of sequences
sequence_length, sequences, best_sequence,sequence_scores = initializeSequences(states2,get_probs_ex2(),initial_probs2,obs)

# Display sequence scores
# Total number of sequences are 3^4 = 81
for i in range(len(sequences)):
  print("Sequence:%-60s Score:%0.6f" % (sequences[i],sequence_scores[i]) )

# Display the winning score
best_seq_n = np.argmax(sequence_scores)
obs = ['Bob','ate','the','fruit' ]

# print results
print("\nScores")

# Generate list of sequences
sequence_length, sequences, best_sequence,sequence_scores = initializeSequences(states2,get_probs_ex2(),initial_probs2,obs)

# Display sequence scores
# Total number of sequences are 3^4 = 81
for i in range(len(sequences)):
  print("Sequence:%-60s Score:%0.6f" % (sequences[i],sequence_scores[i]) )

# Display the winning score
best_seq_n = np.argmax(sequence_scores)
print("\nBest Sequence:",sequences[best_seq_n],best_sequence)
print ()
cache, _ = viterbi(states2,get_probs_ex2(),initial_probs2,obs) # corrected line
print("Path generated by viterbi algorithm by reducing computation ... ",cache, sep="\n")
print()
print("Finding the best hidden sequence from the path generated by viterbi algorithm ... ")
print(viterbi_backward(states2,cache,_),np.max(np.asarray(_))) # corrected line
print("Finding the best hidden sequence from the path generated by viterbi algorithm ... ")
print(viterbi_backward(states2,cache,1),np.max(np.asarray(1)))


Scores
Sequence:['Noun', 'Noun', 'Noun', 'Noun']                             Score:0.000002
Sequence:['Noun', 'Noun', 'Noun', 'Verb']                             Score:0.000001
Sequence:['Noun', 'Noun', 'Noun', 'Determiner']                       Score:0.000000
Sequence:['Noun', 'Noun', 'Verb', 'Noun']                             Score:0.000018
Sequence:['Noun', 'Noun', 'Verb', 'Verb']                             Score:0.000001
Sequence:['Noun', 'Noun', 'Verb', 'Determiner']                       Score:0.000008
Sequence:['Noun', 'Noun', 'Determiner', 'Noun']                       Score:0.000324
Sequence:['Noun', 'Noun', 'Determiner', 'Verb']                       Score:0.000002
Sequence:['Noun', 'Noun', 'Determiner', 'Determiner']                 Score:0.000002
Sequence:['Noun', 'Verb', 'Noun', 'Noun']                             Score:0.000324
Sequence:['Noun', 'Verb', 'Noun', 'Verb']                             Score:0.000144
Sequence:['Noun', 'Verb', 'Noun', 'Determiner']          