In [2]:
import math
import sys
import random as r
import numpy as np

def make_default(length):
    rands = [r.random() for _ in range(length)]
    for i in range(len(rands)):
        rands[i] = rands[i]/sum(rands)

    return rands


def make_sample(states, tp, ep, length):
    rand = r.random()
    numbers = ""
    hiddens = ""
    if rand < 0.5:
        hiddens += "F"
        numbers += make_number(ep["F"])
    else:
        hiddens += "L"
        numbers += make_number(ep["F"])

    for _ in range(length-1):
        rand_1 = r.random()
        if rand_1 < tp[hiddens[-1]]["F"]:
            hiddens += "F"
            numbers += make_number(ep["F"])
        else:
            hiddens += "L"
            numbers += make_number(ep["L"])

    # print("実際\n{}".format(hiddens))
    return hiddens, numbers


def make_number(prob):
    rand = r.random()
    if rand < prob["1"]:
        return "1"
    rand -= prob["1"]
    if rand < prob["2"]:
        return "2"
    rand -= prob["2"]
    if rand < prob["3"]:
        return "3"
    rand -= prob["3"]
    if rand < prob["4"]:
        return "4"
    rand -= prob["4"]
    if rand < prob["5"]:
        return "5"
    rand -= prob["5"]
    if rand < prob["6"]:
        return "6"
    rand -= prob["6"]


def viterbi(observs, states, sp, tp, ep):
    T = {}
    for st in states:
        T[st] = (0 + sp[st] + ep[st][observs[0]], [st])
    for ob in observs[1:]:
        T = next_state(ob, states, T, tp, ep)

    return T


def next_state(ob, states, T, tp, ep):
    U = {}
    for next_s in states:
        U[next_s] = (-float('inf'), [])
        for now_s in states:
            p = ep[next_s][ob] + T[now_s][0] + tp[now_s][next_s]
            if p > U[next_s][0]:
                U[next_s] = [p, T[now_s][1]+[next_s]]
    return U


def calculate(predict, real):
    array = []
    for i in range(len(real)):
        if predict[i] == real[i]:
            array.append(1)
        else:
            array.append(0)

    accuracy = sum(array)/len(array)

    # print("予測\n{}".format(predict))
    return accuracy



In [3]:
from scipy.special import logsumexp

def forward_log(observed, states, tp, ep):
    # 状態 FとLが格納される
    T = {}
    Ts = []
    
    # 初期化
    T["F"] = 0
    T["L"] = -float("inf")
        
    # 再帰
    for ob in observed:
        T = next_log(ob, states, T, tp, ep)
        Ts.append(T)
        
    # 終了処理
    U = {}
    l = [T["F"]+tp["F"]["F"],T["L"]+tp["L"]["F"]]
    U["F"] = logsumexp(l)

    l = [T["F"]+tp["F"]["L"], T["L"]+tp["L"]["L"]]
    U["L"] = logsumexp(l)
    

    return Ts,U


def next_log(ob, states, T, tp, ep):
    # 状態 FとLが格納される
    U = {}
    
    l = [T["F"]+tp["F"]["F"],T["L"]+tp["L"]["F"]]
    U["F"] = ep["F"][ob]+logsumexp(l)
    
    l = [T["F"]+tp["F"]["L"], T["L"]+tp["L"]["L"]]
    U["L"] = ep["L"][ob]+logsumexp(l)

    return U


def back_log(observed, states, tp, ep):
    # 状態 FとLが格納される
    T = {}
    Ts = []
    # 初期化
    T["F"] = tp["F"]["F"]
    T["L"] = tp["L"]["F"]
    
    Ts.insert(0,T)

    # 再帰
    for ob in observed[:-1]:
        T = back_l(ob, states, T, tp, ep)
        Ts.insert(0,T)
        
        
    # 終了処理
    U = {}
    
    l = [tp["F"]["F"]+T["F"]+ep["F"][observed[-1]],tp["F"]["L"]+T["L"]+ep["L"][observed[-1]]]
    U["F"] = logsumexp(l)
    
    l = [tp["L"]["F"]+T["F"]+ep["F"][observed[-1]], tp["L"]["L"]+T["L"]+ep["L"][observed[-1]]]
    U["L"] = logsumexp(l)

    return Ts,U


def back_l(ob, states, T, tp, ep):
    # 状態 FとLが格納される
    U = {}
    
    l = [tp["F"]["F"]+T["F"]+ep["F"][ob],tp["F"]["L"]+T["L"]+ep["L"][ob]]
    U["F"] = logsumexp(l)
    
    l = [tp["L"]["F"]+T["F"]+ep["F"][ob], tp["L"]["L"]+T["L"]+ep["L"][ob]]
    U["L"] = logsumexp(l)

    return U


In [4]:
def forward_scaling(observed, states, tp, ep):
    T = {}
    Ts = []
    Sis = []
    
    
    # 初期化
    T["F"] = 1
    T["L"] = 0
        
    # 再帰
    for ob in observed:
        T,si = next_scale(ob, states, T, tp, ep)
        Sis.append(si)
        Ts.append(T)
        
    # 終了処理

    return Ts,Sis


def next_scale(ob, states, T, tp, ep):
    # 状態 FとLが格納される
    U = {}
    si = 0
    S = {}
    
    S["F"] = ep["F"][ob]*(T["F"]*tp["F"]["F"]+T["L"]*tp["L"]["F"])
    
    S["L"] = ep["L"][ob]*(T["F"]*tp["F"]["L"]+T["L"]*tp["L"]["L"])
    
    si = S["F"]+S["L"]
    
    U["F"] = (1/si)*S["F"]
    U["L"] = (1/si)*S["L"]

    return U,si

def back_scaling(observed, states, tp, ep):
    T = {}
    Ts = []
    Sis = []
    
    # 初期化
    T["F"] = tp["F"]["F"]
    T["L"] = tp["L"]["F"]
    
    Ts.insert(0,T)
        
    # 再帰
    for ob in observed[:-1]:
        T,si = back_s(ob, states, T, tp, ep)
        Sis.insert(0,si)
        Ts.insert(0,T)
        
    # 終了処理
    si = 0
    S = {}
    
    S["F"] = tp["F"]["F"]*T["F"]*ep["F"][ob]+tp["F"]["L"]*T["L"]*ep["L"][ob]
    S["L"] = tp["L"]["F"]*T["F"]*ep["F"][ob]+tp["L"]["L"]*T["L"]*ep["L"][ob]
    si = S["F"]+S["L"]
    
    Sis.insert(0,si)

    return Ts,Sis

def back_s(ob, states, T, tp, ep):
    # 状態 FとLが格納される
    U = {}
    si = 0
    S = {}
    
    
    S["F"] = tp["F"]["F"]*T["F"]*ep["F"][ob]+tp["F"]["L"]*T["L"]*ep["L"][ob]
    S["L"] = tp["L"]["F"]*T["F"]*ep["F"][ob]+tp["L"]["L"]*T["L"]*ep["L"][ob]
    
    si = S["F"]+S["L"]
    
    U["F"] = (1/si)*S["F"]
    U["L"] = (1/si)*S["L"]

    return U,si

In [5]:
def Scaling():
    count = 0
    predict_e = {'F': {'1': 5/20, '2': 3/20, '3': 2/20, '4': 8/20, '5': 1/20, '6': 1/20},
     'L': {'1': 1/20, '2': 2/20, '3': 3/20, '4': 8/20, '5':1/20, '6': 5/20}}
    predict_a = {"F": {"F": 0.5, "L": 0.5},
         "L": {"F": 0.5, "L": 0.5}}
    while True:
        count += 1

        F,Sis_F = forward_scaling(observed, states,
                                 predict_a, predict_e)

        B,Sis_B = back_scaling(''.join(list(reversed(observed))), states,
                                 predict_a, predict_e)

        logP = 0
        for si in Sis_F:
            logP+=np.log(si)

        logP_ = 0
        for si in Sis_B:
            logP_+=np.log(si)

        assert np.abs(logP-logP_)<1

        E = {"F": {str(i): 0 for i in range(1, 7)}, "L": {
            str(i): 0 for i in range(1, 7)}}

        for k in states:
            for i,ob in enumerate(observed):
                E[k][ob]+=F[i][k]*B[i][k]

        A = {"F": {"F": 0, "L": 0},
             "L": {"F": 0, "L": 0}}

        for k in states:
            for l in states:
                for i in range(len(observed[:-1])):
                    A[k][l]+=F[i][k]*predict_a[k][l]*predict_e[l][observed[i+1]]*B[i+1][l]/Sis_B[i+1]


        predict_a = {"F": {"F": 0, "L": 0},
             "L": {"F": 0, "L": 0}}

        predict_e = {"F": {str(i): 0 for i in range(1, 7)}, "L": {
            str(i): 0 for i in range(1, 7)}}


        for k in states:
            sum_a = 0
            for l in states:
                sum_a+=A[k][l]
            for l in states:
                predict_a[k][l] = A[k][l]/sum_a

        for k in states:
            sum_e = 0
            for b in range(1, 7):
                sum_e+=E[k][str(b)]
            for b in range(1, 7):
                predict_e[k][str(b)] = E[k][str(b)]/sum_e

        clear_output(wait=True)
        print("{}回目 \n対数尤度 {}".format(count,logP))

        for k,v in predict_a.items():
            print(f"状態{k}から")
            for k_,v_ in v.items():
                print(f"状態{k_}の遷移確率:{v_}")

        for k,v in predict_e.items():
            print(f"状態{k}から")
            for k_,v_ in v.items():
                print(f"目{k_}の出現確率:{v_}")
                
        print("-----------------")
        print("コピペ用")
        print(predict_a)
        print(predict_e)

In [6]:
def Log():
    count = 0
    predict_e = {'F': {'1': np.log(5/20), '2': np.log(3/20), '3': np.log(2/20), '4': np.log(8/20), '5': np.log(1/20), '6': np.log(1/20)},
         'L': {'1': np.log(1/20), '2': np.log(2/20), '3': np.log(3/20), '4': np.log(8/20), '5': np.log(1/20), '6': np.log(5/20)}}
    predict_a = {"F": {"F": np.log(0.5), "L": np.log(0.5)},
         "L": {"F": np.log(0.5), "L": np.log(0.5)}}
    while True:
        count += 1

        F,logP_F = forward_log(observed, states,
                                 predict_a, predict_e)

        B,logP_B = back_log(''.join(list(reversed(observed))), states,
                                 predict_a, predict_e)

        assert np.abs(logP_F["F"]-logP_B["F"])<0.1

        logP = logP_F["F"]

        E = {"F":{},"L":{}}

        for k in states:
            nums = {"1":[],"2":[],"3":[],"4":[],"5":[],"6":[]}
            for i,ob in enumerate(observed):
                nums[ob].append(F[i][k]+B[i][k])
            for key in nums.keys():
                E[k][key] = logsumexp(nums[key])-logP

        assert np.sum([len(x) for x in nums.values()])==L

        A = {"F": {},
             "L": {}}

        for k in states:
            for l in states:
                lis = []
                for i,_ in enumerate(observed[:-1]):
                    lis.append(F[i][k]+predict_a[k][l]+predict_e[l][observed[i+1]]+B[i+1][l])
                A[k][l]=logsumexp(lis)-logP



        predict_a = {"F": {"F": 0, "L": 0},
             "L": {"F": 0, "L": 0}}

        predict_e = {"F": {str(i): 0 for i in range(1, 7)}, "L": {
            str(i): 0 for i in range(1, 7)}}


        for k in states:
            sum_a = 0
            lis = []
            for l in states:
                lis.append(A[k][l])
            sum_a = logsumexp(lis)
            for l in states:
                predict_a[k][l] = A[k][l]-sum_a

        for k in states:
            sum_e = 0
            lis = []
            for b in range(1, 7):
                lis.append(E[k][str(b)])
            sum_e = logsumexp(lis)
            for b in range(1, 7):
                predict_e[k][str(b)] = E[k][str(b)]-sum_e

                
        clear_output(wait=True)
        print("{}回目\n対数尤度 {}".format(count,logP))

        for k,v in predict_a.items():
            print(f"状態{k}から")
            for k_,v_ in v.items():
                print(f"状態{k_}の遷移確率:{np.exp(v_)}")

        for k,v in predict_e.items():
            print(f"状態{k}から")
            for k_,v_ in v.items():
                print(f"目{k_}の出現確率:{np.exp(v_)}")

        print("-----------------")
        print("コピペ用")
        print(predict_a)
        print(predict_e)

In [7]:
import numpy as np
from IPython.display import clear_output

L = 30000
COUNT = 100
TYPE = "log"

states = ("F", "L")
# start_prob = {"F": 0.5, "L": 0.5}

ans_e = {'F': {'1': 1/6, '2': 1/6, '3': 1/6, '4': 1/6, '5': 1/6, '6': 1/6},
         'L': {'1': 1/10, '2': 1/10, '3': 1/10, '4': 1/10, '5': 1/10, '6': 1/2}}
ans_a = {"F": {"F": 0.95, "L": 0.05},
         "L": {"F": 0.1, "L": 0.9}}

hiddens, observed = make_sample(states, ans_a, ans_e, L)

if TYPE=="log":
    Log()
else:
    Scaling()

29回目
対数尤度 -52378.1922210325
状態Fから
状態Fの遷移確率:0.6591855970834
状態Lの遷移確率:0.3408144029166001
状態Lから
状態Fの遷移確率:0.33268498312216377
状態Lの遷移確率:0.667315016877836
状態Fから
目1の出現確率:0.23186127821007216
目2の出現確率:0.19292144022350025
目3の出現確率:0.16229688400520736
目4の出現確率:0.18299965131920143
目5の出現確率:0.17437200932746844
目6の出現確率:0.055548736914549666
状態Lから
目1の出現確率:0.05388205184832653
目2の出現確率:0.1011174505134554
目3の出現確率:0.12185678598342417
目4の出現確率:0.10803643297426346
目5の出現確率:0.11514128175442023
目6の出現確率:0.49996599692610977
-----------------
コピペ用
{'F': {'F': -0.41675014971021795, 'L': -1.0764172228407674}, 'L': {'F': -1.1005592333690384, 'L': -0.404493055387773}}
{'F': {'1': -1.4616160249021828, '2': -1.64547221841684, '3': -1.8183280036279807, '4': -1.6982710315020864, '5': -1.7465642773638006, '6': -2.890494500966893}, 'L': {'1': -2.9209578463475747, '2': -2.291472561387037, '3': -2.104908809173777, '4': -2.2252867663907647, '5': -2.161595367657993, '6': -0.6932151890202487}}


KeyboardInterrupt: 

In [None]:
states = ("F", "L")
start_prob = {"F": np.log(0.5), "L": np.log(0.5)}
# transit_prob = {"F": {"F": np.log(0.95), "L": np.log(0.05)},
#                 "L": {"F": np.log(0.1), "L": np.log(0.9)}}
# emission_prob = {'F': {'1': np.log(1/6), '2': np.log(1/6), '3': np.log(1/6), '4': np.log(1/6), '5': np.log(1/6), '6': np.log(1/6)},
#                 'L': {'1': np.log(1/10), '2': np.log(1/10), '3': np.log(1/10), '4': np.log(1/10), '5': np.log(1/10), '6': np.log(1/2)}}

transit_prob = {'F': {'F': -0.048169757692534176, 'L': -3.05701208909349}, 'L': {'F': -2.4080421041650624, 'L': -0.09430113646492089}}
emission_prob = {'F': {'1': -1.825684891074232, '2': -1.7641865677497464, '3': -1.7940583783266248, '4': -1.7867216542063744, '5': -1.7732054475773822, '6': -1.8079718006329841}, 'L': {'1': -2.303521354998038, '2': -2.274151081672853, '3': -2.2842614329226034, '4': -2.3114439989828224, '5': -2.3452324150154436, '6': -0.6923132011458346}}

v = viterbi(observed, states,
            start_prob, transit_prob, emission_prob)

# post_pi = ""
# if TYPE=="log":
#     for i in range(len(observed)):
#         _pi = {"F": 0, "L": 0}
#         for st in states:
#             _pi[st] = F[i][st]+B[i][st]-logP
#         post_pi += max(_pi, key=_pi.get)
# else:
#     for i in range(len(observed)):
#         _pi = {"F": 0, "L": 0}
#         for st in states:
#             _pi[st] = F[i][st]*B[i][st]
#         post_pi += max(_pi, key=_pi.get)


viterbi_pi = ""
if v["F"][1] > v["L"][1]:
    viterbi_pi = "".join(v["F"][1])
else:
    viterbi_pi = "".join(v["L"][1])

accuracies = []
accuracies.append(calculate(viterbi_pi, hiddens))
# accuracies.append(calculate(post_pi, hiddens))


print("viterbi 精度:{}".format(accuracies[0]))
# print("posterior 精度:{}".format(accuracies[0]))
