In [1]:
import numpy as np
import pickle as pkl
import matplotlib.pyplot as plt

In [2]:
np.set_printoptions(precision=2, linewidth=320)
plt.close('all')
alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
   

In [3]:

def load_data(fname):

    with open('lettres1.pkl', 'rb') as f:
        data = pkl.load(f, encoding='latin1') 
    X = np.array(data.get('letters')) # récupération des données sur les lettres
    Y = np.array(data.get('labels')) # récupération des étiquettes associées 
    nCl=26
    return data,X,Y,nCl

In [4]:
data, X, Y, nCl = load_data('lettres1.pkl')

In [5]:
#data

#  1 Apprentissage d'un modèle connaissant les états ##

In [6]:
# Hypothèse Gauche-Droite
def discretisation( X, n_etats = 10 ) :
    intervalle = 360. / n_etats
    return np.array([ np.floor( x / intervalle ) for x in X ])

In [7]:
#teste
xd = discretisation(X)

# Hypothèse Gauche-Droite

In [8]:
def seq_Xd(Xd,N):
    return np.floor(np.linspace(0,N-.00000001,len(Xd)))
    
def initGD(X,N):
    return np.array([ seq_Xd(x,N) for x in X ])


In [9]:
#teste

# 10 etats possibles pour la variable d'observation X
# 3  etats possibles pour la variable d'état caché 

#X0 = [ 1.  9.  8.  8.  8.  8.  8.   9.  3.  4.  5.  6.  6.  6.  7.  7.  8.  9.  0.  0.  0.  1.  1.]
#S0 = [ 0.  0.  0.  0.  0.  0.  1.   1.  1.  1.  1.  1.  2.  2.  2.  2.  2.  3.  3.  3.  3.  3.  3.]
S= initGD(xd,5)
#print(xd[0])
#print(S[0])

### Apprentissage

In [10]:
 #version 2 
def learnHMM(x, s, N, K, initTo0=True):
    
    if np.shape(s) != np.shape(x):
        raise Exception("Invalid Data")
        
    if initTo0:
        A  = np.zeros((N,N))
        B  = np.zeros((N,K))
        Pi = np.zeros(N)
    else:
        eps = 1e-8
        A  = np.ones((N,N))*eps
        B  = np.ones((N,K))*eps
        Pi = np.ones(N)*eps

    for i,ss in enumerate(s):

        Pi[int(s[i][0])] += 1
        
    for i in range(len(s)):
        for j in range(1,len(s[i])):
            A[int(s[i][j-1])][int(s[i][j])] += 1
            
    for ss,xx in zip(s,x):
        for i,j in zip(ss,xx):
            B[int(i)][int(j)] +=1
            
    A /= np.maximum(A.sum(1).reshape(N,1),1)
    B /= np.maximum(B.sum(1).reshape(N,1),1)
    Pi/= Pi.sum()

    return ( Pi, A, B )

In [11]:
#teste
K = 10 # discrétisation (=10 observations possibles)
N = 5  # 5 états possibles (de 0 à 4 en python) 
Pi, A, B = learnHMM(xd[Y=='a'],S[Y=='a'],N,K)
print(Pi)
print(A)
print(B)

[ 1.  0.  0.  0.  0.]
[[ 0.79  0.21  0.    0.    0.  ]
 [ 0.    0.76  0.24  0.    0.  ]
 [ 0.    0.    0.77  0.23  0.  ]
 [ 0.    0.    0.    0.76  0.24]
 [ 0.    0.    0.    0.    1.  ]]
[[ 0.06  0.02  0.    0.    0.    0.    0.    0.04  0.49  0.4 ]
 [ 0.    0.04  0.    0.13  0.09  0.13  0.02  0.09  0.41  0.09]
 [ 0.    0.    0.    0.02  0.12  0.5   0.31  0.04  0.    0.  ]
 [ 0.07  0.    0.    0.    0.    0.    0.26  0.33  0.2   0.15]
 [ 0.73  0.12  0.    0.    0.    0.    0.    0.02  0.02  0.12]]


#  Viterbi (en log)

In [12]:
xd[0]

array([ 1.,  9.,  8.,  8.,  8.,  8.,  8.,  9.,  3.,  4.,  5.,  6.,  6.,  6.,  7.,  7.,  8.,  9.,  0.,  0.,  0.,  1.,  1.])

In [13]:
def viterbi(O, Pi, A, B):
    
    #rappel : la fonction argmax returne l'indice de l'element max dans un tableau depuis l'axis 
    
   
    delta = np.zeros((len(A), len(O)))
    psi   = np.zeros((len(A), len(O)))

    # initialisation
    #formule du cours 
    # δ0(i) = πi . bi(x0) on passe au log:   long(πi) +log(bi(O(0)))
    # Ψ0(i) = −1 Note: -1 car non utilisé normalement
    
    #B[:,int(O[0])] renvoit la colonne correspondante a la valeur de O[0]
    # O[0] pour la sequence x vaut 1 donc la colonne selectionnée est la colonne 1 
    
    
    delta[:, 0] = np.log(B[:, int(O[0])]) + np.log(Pi)
    psi[:, 0] = -1

    
    # Recursion
    #δt(j) = [maxiδt−1(i)+log aij ]+log bj(xt)
    #Ψt(j) = arg max i∈[1, N] δt−1(i)+log aij
    
    for t in range(1, len(O)):
        for j in range(len(A)):
            tmp = delta[:, int(t-1)] + np.log(A[:, j])
            delta[j, t] = max(tmp) + np.log(B[j, int(O[t])])
            psi[j, t] = np.argmax(tmp)

    # Termination
    #S⋆=maxiδT−1(i)
    
    S = max(delta[:, -1])

    # Path
    #s⋆T−1 = arg max i δT−1(i)
    #s⋆t   = Ψt+1 (s⋆t+1)
    
    T = len(O)
    s = [0] * T
    s[T - 1] = np.argmax(delta[:, int(t)])
    for t in range(T - 2, 1, - 1):
        s[t] = psi[:, int(t + 1)][int(s[t + 1])]

    return np.int64(s), S

In [14]:
#teste

path,proba = viterbi(xd[0],Pi,A,B)
path



array([0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4], dtype=int64)

# [OPT] Probabilité d'une séquence d'observation

In [33]:
def methode_alpha(x,Pi,A,B):

    mat = np.zeros((len(A),len(X)))
   
    for i in range(len(mat)):
        mat[i][0] = Pi[i] * B[i][int(x[0])]

    for t in range(1,len(x)):
        for j in range(len(A)):
            tmp=0
            for i in range(len(A)):
                tmp += np.log(mat[i][t-1]) * np.log(A[i][j])
               # print(np.log(tmp))
            tmp = tmp * np.log(B[j][int(x[t])])
            #print(tmp)
            mat[j][t] = tmp
    
    proba = np.sum(mat[:,-1])
    
    print(mat)
    print(proba)

methode_alpha(xd[0],Pi,A,B)

[[ 0.02  -inf   nan ...,  0.    0.    0.  ]
 [ 0.    -inf   nan ...,  0.    0.    0.  ]
 [ 0.    -inf   nan ...,  0.    0.    0.  ]
 [ 0.    -inf   nan ...,  0.    0.    0.  ]
 [ 0.     nan   nan ...,  0.    0.    0.  ]]
0.0


  if sys.path[0] == '':
  
  if sys.path[0] == '':
  if sys.path[0] == '':


In [50]:
def methode_alpha(x,Pi,A,B):

    mat = np.zeros((len(A),len(x)))
   
    for i in range(len(mat)):
        mat[i][0] = Pi[i] * B[i][int(x[0])]
        
    print(mat[:, 0].sum())

    for t in range(1,len(x)):
        for j in range(len(A)):
            tmp=0
            for i in range(len(A)):
                tmp += mat[i][t-1]*A[i][j]*B[j][int(x[t])]
               # print(np.log(tmp))
             
            #print(tmp)
            tmp /= np.sum(mat[:,(t-1)])
            mat[j][t] = tmp

        #print(mat[:,t], (mat[:(t-1)]))
    proba = [np.sum(np.log(mat[:,-i]) ) for i in range(len(x))]
    for i in range(len(x)):
        tmp = np.sum(mat[:,i])
    print(mat)
    print(proba)

methode_alpha(xd[0],Pi,A,B)

0.0188679245283
[[  1.89e-02   3.14e-01   3.68e-01   3.07e-01   2.71e-01   2.47e-01   2.31e-01   1.77e-01   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00]
 [  0.00e+00   1.80e-02   9.81e-02   1.34e-01   1.55e-01   1.69e-01   1.79e-01   3.90e-02   4.01e-02   6.47e-02   6.69e-02   4.05e-03   2.68e-04   1.59e-05   3.91e-06   1.60e-06   2.16e-06   9.60e-07   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00]
 [  0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   9.00e-04   3.13e-02   2.06e-01   2.00e-01   1.95e-01   1.68e-01   2.00e-02   3.98e-03   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00]
 [  0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   0.00e+00   4.52e-02   8.39e-02   1.01e-01   1.40e-01   2.24e-01   1.44e-01   1.12e-01   3.90e-02   5.8



In [65]:
def methode_alpha(x,Pi,A,B):

    mat = np.zeros((len(A),len(x)))
   
    for i in range(len(mat)):
        mat[i][0] = Pi[i] * B[i][int(x[0])]
        
#    print(mat[:, 0].sum())

    for t in range(1,len(x)):
        for j in range(len(A)):
            tmp=0
            for i in range(len(A)):
                tmp += mat[i][t-1]*A[i][j]*B[j][int(x[t])]
               # print(np.log(tmp))
             
            #print(tmp)
            tmp /= np.sum(mat[:,(t-1)])
            mat[j][t] = tmp


    tmp=0
    for i in range(len(x)):
        tmp += np.log(np.sum(mat[:,i]))
    print(tmp)

methode_alpha(xd[0],Pi,A,B)

-34.8950422805


# Apprentissage complet (Baum-Welch simplifié) 

In [None]:
def baum_welch_simplifie( lv_lst, X, Y, N = 5, K = 10):
    # 1. Initialisation des états cachés arbitraire (eg méthode gauche-droite)
    nom_iter = 0
    alpha    = []
    Xd       = discretisation(X,K)
    q        = initGD(X,N)
    
    # 2. Tant que critère de convergence non atteint
    while True:
        if ( nom_iter > 2 ) and ( lv_lst[-1] - lv_lst[-2] < 0.0001 ):
            break
            
        lv        = 0
        nom_iter += 1
        alpha     = []
        for lettre in alphabet:
            # 1. Apprentissage des modèles
            ( Pi, A, B ) = learnHMM( Xd[Y==lettre], q[Y==lettre], N, K)
            # print "nom_iter: %d"%nom_iter, B
            alpha.append((Pi,A,B))
            # 2. Estimation des états cachés par Viterbi
            for [i] in np.argwhere(Y==lettre):
                ( p_est, s_est ) = viterbi(Xd[i],Pi,A,B)
                q[i] = s_est
                lv  += p_est
                
        lv_lst.append(lv)
                
    return alpha

traçage

In [None]:
####jai pas encore testé cette fonction 

def tracer_evolution_vraisemblance(lv_list):
    fig = plt.figure()
    x = range(len(lv_list))
    y = lv_list
    plt.plot( x, y )
    plt.xlabel("Nombre d'Iteration")
    plt.ylabel("Log Vraisemblance")
    plt.savefig("vraisemblance_regression.png")
    


#lv_lst = []
#baum_welch_simplifie( lv_lst, X, Y,)
#tracer_evolution_vraisemblance(lv_lst)



In [169]:
if t == tt: 
    print("yes")