# Viterbi Encoding and Decoding
Youn-Long Lin, Department of Computer Science, National Tsing Hua University, Taiwan


In [1]:
# Utilities from open Source
# StackOverflow http://stackoverflow.com/questions/7396849/convert-binary-to-ascii-and-vice-versa
# Converting text to bits and back

import binascii

def text_to_bits(text, encoding='utf-8', errors='surrogatepass'):
    bits = bin(int(binascii.hexlify(text.encode(encoding, errors)), 16))[2:]
    return bits.zfill(8 * ((len(bits) + 7) // 8))

def text_from_bits(bits, encoding='utf-8', errors='surrogatepass'):
    n = int(bits, 2)
    return int2bytes(n).decode(encoding, errors)

def int2bytes(i):
    hex_string = '%x' % i
    n = len(hex_string)
    return binascii.unhexlify(hex_string.zfill(n + (n & 1)))


# Introduce Noise (say 1% error rate)

import random

# Add random noise to a list of code words
def add_noise(error_rate, x):
    random.seed(10000)
    y = []
    for token in x:
        new_token = ''
        for b in token:
            if random.random() < error_rate:
                new_b = '0' if b == '1' else '1'
            else:
                new_b = b
            new_token = new_token + new_b            
        y.append(new_token)
        
    return y

# Returing Hamming Distance of two binary strings of '0' and '1'
def ham_dist(a, b):
    d = 0;
    for n in range(len(a)):
        if a[n] != b[n]:
            d += 1
    return d

# Viterbi Encoding
# A finite state machine producing a two-bit code for every one-bit input
# According to finite state machine fsm, initiatial state s0
def viterbi_enc(fsm, s0, x):
    y = []
    ps = s0
    for present_in in x: 
        y.append(fsm[ps][present_in]['out'])
        ns = fsm[ps][present_in]['ns']
        ps = ns
    return y

# Viterbi Decoding
def viterbi_dec(fsm_rev, s0, y):
    
    dp_array = [{}]
    path = {}
    
    # Initialization
    for s in fsm_rev.keys():
        dp_array[0][s] = 0 if s == s0 else 9999
        path[s] = [s]
        
    for i in range(len(y)):
        dp_array.append({})
        new_path = {}
        for s in fsm_rev.keys():
            min_dist, min_state = min((dp_array[i][s_prev] +
                                       ham_dist(y[i], fsm_rev[s][s_prev]['out']), s_prev)
                                      for s_prev in fsm_rev[s].keys())
            dp_array[i+1][s] = min_dist
            new_path[s] = path[min_state] + [s]
        path = new_path
        
    best_dist, best_p = min( (dp_array[-1][s], path[s]) for s in dp_array[-1] )
    ml = [fsm_rev[best_p[i]][best_p[i-1]]['out'] for i in range(1, len(best_p))]
    ml_in = [fsm_rev[best_p[i]][best_p[i-1]]['in'] for i in range(1, len(best_p))]
        
    return dp_array, path, ml, ml_in

In [2]:
# Test Run

# Finite State Machine for Viterbi Encoding
fsm = {'a':{'0':{'ns':'a', 'out':'00'},
            '1':{'ns':'b', 'out':'11'}},
       'b':{'0':{'ns':'c', 'out':'10'},
            '1':{'ns':'d', 'out':'01'}},
       'c':{'0':{'ns':'a', 'out':'11'},
            '1':{'ns':'b', 'out':'00'}},
       'd':{'0':{'ns':'c', 'out':'01'},
            '1':{'ns':'d', 'out':'10'}}}

# Inverse description of the FSM for Decoding
fsm_reverse = {'a':{'a':{'in':'0', 'out':'00'},
                    'c':{'in':'0', 'out':'11'}},
               'b':{'a':{'in':'1', 'out':'11'},
                    'c':{'in':'1', 'out':'00'}},
               'c':{'b':{'in':'0', 'out':'10'},
                    'd':{'in':'0', 'out':'01'}},
               'd':{'b':{'in':'1', 'out':'01'},
                    'd':{'in':'1', 'out':'10'}}}

x = '0100110101100'
print ('Input String=', x)

y = viterbi_enc(fsm, 'a', x)  # Encode message x, 'a' is the initial state
print ('Clean Msg=', y)

z = add_noise(0.01, y)   # Add some random noise to the message
print ('Dirty Msg=', z)

print('   ')

result, path, ml, ml_in = viterbi_dec(fsm_reverse, 'a', y)  # Decode the received dirty message

for state in result:
    print (state)
print ("      ")
for p in path:
    print (p, path[p])
print ("      ")
print ('Decoded Clean Msg=', ml)
print ('Decoded Clean Str=', ''.join(ml_in))
print("  ")


result, path, ml, ml_in = viterbi_dec(fsm_reverse, 'a', z)

for state in result:
    print (state)
print ("      ")
for p in path:
    print (p, path[p])
print ("      ")
print ('Decoded from Dirty Msg=', ml)
print ('Decoded from Dirty Str=', ''.join(ml_in))


Input String= 0100110101100
Clean Msg= ['00', '11', '10', '11', '11', '01', '01', '00', '10', '00', '01', '01', '11']
Dirty Msg= ['00', '11', '10', '10', '11', '01', '01', '00', '00', '00', '01', '01', '11']
   
{'b': 9999, 'c': 9999, 'd': 9999, 'a': 0}
{'b': 2, 'c': 10000, 'd': 10000, 'a': 0}
{'b': 0, 'c': 3, 'd': 3, 'a': 2}
{'b': 3, 'c': 0, 'd': 2, 'a': 3}
{'b': 2, 'c': 3, 'd': 3, 'a': 0}
{'b': 0, 'c': 3, 'd': 3, 'a': 2}
{'b': 3, 'c': 2, 'd': 0, 'a': 3}
{'b': 3, 'c': 0, 'd': 2, 'a': 3}
{'b': 0, 'c': 3, 'd': 3, 'a': 2}
{'b': 3, 'c': 0, 'd': 2, 'a': 3}
{'b': 0, 'c': 3, 'd': 3, 'a': 2}
{'b': 3, 'c': 2, 'd': 0, 'a': 3}
{'b': 3, 'c': 0, 'd': 2, 'a': 3}
{'b': 2, 'c': 3, 'd': 3, 'a': 0}
      
b ['a', 'a', 'b', 'c', 'a', 'b', 'd', 'c', 'b', 'c', 'b', 'd', 'c', 'b']
c ['a', 'a', 'b', 'c', 'a', 'b', 'd', 'c', 'b', 'c', 'b', 'd', 'd', 'c']
d ['a', 'a', 'b', 'c', 'a', 'b', 'd', 'c', 'b', 'c', 'b', 'd', 'd', 'd']
a ['a', 'a', 'b', 'c', 'a', 'b', 'd', 'c', 'b', 'c', 'b', 'd', 'c', 'a']
      
Dec

In [3]:
# Integration with Huffman coding
'''
1. original text --> h_tree
2. h_tree --> h_dict
3. original text, h_dict --> h_code
4. h_code --> viterbi_encoded msg
5. viterbi_encoded msg, noise --> noised msg
5. noised msg --> viterbi decoded msg
6. viterbi_decoded msg, h_tree --> decoded text
'''

import huffman as hf

fsm = {'a':{'0':{'ns':'a', 'out':'00'},
            '1':{'ns':'b', 'out':'11'}},
       'b':{'0':{'ns':'c', 'out':'10'},
            '1':{'ns':'d', 'out':'01'}},
       'c':{'0':{'ns':'a', 'out':'11'},
            '1':{'ns':'b', 'out':'00'}},
       'd':{'0':{'ns':'c', 'out':'01'},
            '1':{'ns':'d', 'out':'10'}}}

fsm_reverse = {'a':{'a':{'in':'0', 'out':'00'},
                    'c':{'in':'0', 'out':'11'}},
               'b':{'a':{'in':'1', 'out':'11'},
                    'c':{'in':'1', 'out':'00'}},
               'c':{'b':{'in':'0', 'out':'10'},
                    'd':{'in':'0', 'out':'01'}},
               'd':{'b':{'in':'1', 'out':'01'},
                    'd':{'in':'1', 'out':'10'}}}

# Let's try the openning of "A Tale of Twin Cities"

text = 'It was the best of times, it was the worst of times, it was the age of wisdom, it was the age of foolishness, it was the epoch of belief, it was the epoch of incredulity, it was the season of Light, it was the season of Darkness, it was the spring of hope, it was the winter of despair, we had everything before us, we had nothing before us, we were all going direct to Heaven, we were all going direct the other way – in short, the period was so far like the present period, that some of its noisiest authorities insisted on its being received, for good or for evil, in the superlative degree of comparison only.'



print ("Text Length", len(text))
print (text)

h_tree = hf.gen_huffman_tree(text)
print ("h_tree", h_tree)


h_dict = {}
hf.gen_huffman_dict(h_tree[0], '', h_dict)
print ("h_dict length", len(h_dict))
for key, value in h_dict.items():
    print (key, value)

h_code = hf.huffman_enc(h_dict, text)
print ("Huffman-encoded h_code length", len(h_code))
print (h_code)

# Into Viterbi Part

y = viterbi_enc(fsm, 'a', h_code)
print ('Clean Msg=', y)

z = add_noise(0.01, y)
print ('Dirty Msg=', z)

print('   ')

result, path, ml, ml_in = viterbi_dec(fsm_reverse, 'a', z)
to_be_huff_dec = ''.join(ml_in)

for state in result:
    print (state)
print ("      ")
for p in path:
    print (p, path[p])
print ("      ")
print ('Decoded Dirty Msg=', ml)
print ('Decoded Dirty Str=', to_be_huff_dec)

# Leaving Viterbi Part

dec_text = hf.huffman_dec(h_tree, to_be_huff_dec)
print ("Decoded text length", len(dec_text))
print (dec_text)



Text Length 613
It was the best of times, it was the worst of times, it was the age of wisdom, it was the age of foolishness, it was the epoch of belief, it was the epoch of incredulity, it was the season of Light, it was the season of Darkness, it was the spring of hope, it was the winter of despair, we had everything before us, we had nothing before us, we were all going direct to Heaven, we were all going direct the other way – in short, the period was so far like the present period, that some of its noisiest authorities insisted on its being received, for good or for evil, in the superlative degree of comparison only.
h_tree [(613, ((250, ((119, (' ',)), (131, ((62, ((28, ('a',)), (34, ((17, ((8, ((4, ((2, ((1, ('D',)), (1, ('I',)))), (2, ('k',)))), (4, ((2, ((1, ('H',)), (1, ('.',)))), (2, ((1, ('L',)), (1, ('–',)))))))), (9, ((4, ('y',)), (5, ('b',)))))), (17, (',',)))))), (69, ('e',)))))), (363, ((168, ((81, ((39, ((19, ('f',)), (20, ((10, ((5, ('v',)), (5, ('u',)))), (10, ('p',

NameError: name 'text' is not defined

In [None]:
# This cell shows where bits were corrupted

st = "It was the best of times, it was the worst of times, it was the age of wisdom, it was the age of foolishness, it was the epoch of belief, it was the epoch of incredulity, it was the season of Light, it was the season of Darkness, it was the spring of hope, it was the winter of despair, we had everything before us, we had nothing before us, we were all going direct to Heaven, we were all going direct the other way – in short, the period was so far like the present period, that some of its noisiest authorities insisted on its being received, for good or for evil, in the superlative degree of comparison only."


b_st = text_to_bits(st)

print ("B_st length", len(b_st))
print (b_st)

y = viterbi_enc(fsm, 'a', b_st)

z = add_noise(0.01, y)

result, path, ml, ml_in = viterbi_dec(fsm_reverse, 'a', z)

print (ml_in)
dec_b_st = ''.join(ml_in)

dec_msg = text_from_bits(dec_b_st)

print("Decode Msg =", dec_msg)

print ([('D' if a != b else '') for (a, b) in zip(y, z)])
print ([('D' if a != b else '') for (a, b) in zip(y, ml)])


### Viterbi Decoding of General HMM

In [None]:
import math

def viterbi_dp(states, start_prob, tran_prob, emit_prob, obs):
    
    dp_array = [{}]
    path = {}
    
    for s in states:
        dp_array[0][s] = start_prob[s]
        path[s] = [s]
    
    for obs_index in range(len(obs)):
        dp_array.append({})
        new_path = {}
        for s in states:
            (value, state) = max(  (dp_array[obs_index][s_prev] *
                                    tran_prob[s_prev][s] *
                                    emit_prob[s_prev][obs[obs_index]], s_prev)
                                 
                                 for s_prev in states)
            dp_array[obs_index+1][s] = value
            new_path[s] = path[state] + [s]
        path = new_path
    return dp_array, path


In [None]:
# A Two-state test case
states = ('H', 'F')
observations = ('normal', 'cold', 'dizzy', 'normal', 'dizzy', 'normal', 'dizzy', 'normal')
start_prob = {'H':0.6, 'F':0.4}
tran_prob = {'H': {'H':0.7, 'F':0.3},
              'F': {'H':0.4, 'F':0.6} }
emit_prob = {'H': {'normal':0.7, 'cold':0.1, 'dizzy':0.2},
             'F': {'normal':0.1, 'cold':0.6, 'dizzy':0.3} }


result, path = viterbi_dp(states, start_prob, tran_prob, emit_prob, observations)

for r in result:
    print (r)
for p in path:
    print (p, path[p])

In [None]:
# A three-state test case
states = ('Healthy', 'Fever', 'Hyper')
observations = ('normal', 'cold', 'dizzy', 'normal', 'dizzy', 'normal', 'dizzy', 'normal')
start_prob = {'Healthy':0.5, 'Fever':0.3, 'Hyper':0.2}
tran_prob = {'Healthy': {'Healthy':0.6, 'Fever':0.3, 'Hyper':0.1},
             'Fever': {'Healthy':0.5, 'Fever':0.3, 'Hyper':0.2},
             'Hyper': {'Healthy':0.1, 'Fever':0.2, 'Hyper':0.7} }
emit_prob = {'Healthy': {'normal':0.5, 'cold':0.4, 'dizzy':0.1},
             'Fever': {'normal':0.1, 'cold':0.3, 'dizzy':0.6},
             'Hyper': {'normal':0.2, 'cold':0.5, 'dizzy':0.3} }

result, path = viterbi_dp(states, start_prob, tran_prob, emit_prob, observations)

for r in result:
    print (r)
for p in path:
    print (p, path[p])