In [6]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import pickle
import logging


class HMM(object):
    """
    Here is a simple implementation of a Hidden Markov Models. This implementation assumes that we have:
    * A: Transition matrix (probabilities)
    * B: Emission matrix (probabilities)
    * pi: Initial matrix (probabilities)
    * N: Number of Hidden States
    * V: Set of observations
    * T: Lenght of observation sequence
    * O: Observation sequence
    """
    
    def __init__(self, n=None, m=None, states=None, observations=None):
        """
        * param n: size of hidden states set
        * param m: size of distincts observations
        """
        self.a = None
        self.b = None
        self.pi = None
        self.params = (self.a, self.b, self.pi)
        
        if n is not None and m is not None:
            self.n = n
            self.m = m
        else:
            raise ValueError('You must provide number of hidden state (n) and number of distincts observations (m)')
        
        if not states:
            raise ValueError('You must provide states list')
        if len(states) != self.n:
            raise ValueError('Incorrect size of states list')
        
        # self.states = dict([(v, k) for k, v in list(enumerate(states))])
        self.states = dict(enumerate(states))
        
        if not observations:
            raise ValueError('You must provide observations list')
        if len(observations) != self.m:
            raise ValueError('Incorrect size of observations list')
            
        self.observations = dict([(v, k) for k, v in list(enumerate(observations))])
    
    def save_params(self):
        """
        function uses to save model parameters in external file with extension(.ph)
        """
        with open('params.ph', 'wb') as params_file:
            pickle.dump(self.params, params_file)
    
    def set_params_from_file(self, file_path):
        try:
            with open(file_path, 'rb') as params_file:
                params_tmp = pickle.load(params_file)
                self.set_params(params_tmp)
        except Exception as e:
            logging.error(f'Something went wrong: {str(e)}')
        
    
    def set_params(self, a=None, b=None, pi=None):
        """
        function uses to set models parameters
        * param a: np.array
        * param b: np.array
        * param pi: np.array
        
        If all parameters are None, they will be set randomly
        """
        if a is not None and b is not None and pi is not None:
            # Define random parameters
            if isinstance(a, np.ndarray):
                if a.shape == (self.n, self.n):
                    self.a = a
                else:
                    raise ValueError(f'Error: Please provide A parameter with a shape, expected: ({self.n}, {self.n})')
            else:
                raise TypeError(f'Error: expected numpy.ndarray as argument')
            
            if isinstance(b, np.ndarray):
                if b.shape == (self.n, self.m):
                    self.b = b
                else:
                    raise ValueError(f'Error: Please provide B parameter with a shape, expected: ({self.n}, {self.m})')
            else:
                raise Type(f'Expected numpy.ndarray as argument')
            
            if isinstance(pi, np.ndarray):
                if pi.shape == (self.n,):
                    self.pi = pi
                else:
                    raise ValueError(f'Please provide PI parameter with a shape, expected: ({self.n},)')
            else:
                raise TypeError(f'Expected numpy.ndarray as argument')
        else:
            # Random initialization
            rg = np.random.default_rng()

            # Transition matrix
            self.a = rg.random((self.n, self.n))
            self.a = self.a/self.a.sum(axis=1, keepdims=True)
            
            # Emission matrix
            self.b = rg.random((self.n, self.m))
            self.b = self.b/self.b.sum(axis=1, keepdims=True)
            
            # Transition matrix
            self.pi = rg.random(self.n)
            self.pi = self.pi/self.pi.sum()
        self.params = (self.a, self.b, self.pi)
    
    def _check_observation(self, observation):
        """
        function use to check validity of observation sequence and state list
        """
        if observation is not None:
            valid_obs = all(obs in self.observations for obs in observation)
            return valid_obs
        return None
        
    
    def _forward_algorithm(self, observation: list):
        """
        implementation of forward algorithm to compute probability P(O|H)
        * param observation: Observation sequence for which we want likehood
        """
        if not self._check_observation(observation):
            raise ValueError('Your sequence contains symbol which aren\'t in observation list')
            
        alphas = np.zeros((self.n, len(observation)))
        alphas[:, 0] = self.pi * self.b[:, self.observations[observation[0]]]
        for o in range(1, len(observation)):
            alphas[:, o] = (alphas[:, o-1] @ self.a) * self.b[:, self.observations[observation[o]]]
        
        return alphas
    
    def _backward_algorithm(self, observation: list):
        """
        implementation of backward algorithm to compute probability of P(O|H)
        """
        betas = np.zeros((self.n, len(observation)))
        betas[:, -1] = 1
        for o in range(len(observation)-2, -1, -1):
            betas[:, o] = (betas[:, o+1] @ self.a) * self.b[:, self.observations[observation[o]]]
        
        return betas
        
    def likehood(self, observation: list):
        """
        function to return likehood of an observation sequence
        * param observation: Observation sequence for which we want likehood
        """
        alphas = self._forward_algorithm(observation)
        return alphas[:, -1].sum()
        
    
    def _viterbi_algorithm(self, observation: list):
        """
        function to return the most probable hidden sequence for the given observation
        """
        if not self._check_observation(observation):
            raise ValueError('Your sequence contains symbol which aren\'t in observation list')
        # Initial path, viterbi_matrix
        best_path = np.zeros(len(observation))
        best_path_matrix = np.zeros((self.n, len(observation))) # Which will contains the viterbi value at step i.
        viterbi_matrix = np.zeros((self.n, len(observation))) # Which will contains only argmax for each pseudo path
        
        # Computation
        viterbi_matrix[:, 0] = self.pi * self.b[:, self.observations[observation[0]]]
        for o in range(1, len(observation)):
            for t in range(self.n):
                
                viterbi_matrix[t, o] = np.max(viterbi_matrix[:, o-1] * self.a[:, t]) * self.b[t, self.observations[observation[o]]]
                best_path_matrix[t, o] = np.argmax(viterbi_matrix[:, o-1] * self.a[:, t])
        
        # Optimal path, proba
        best_path_prob = np.max(viterbi_matrix[:, o])     # Get probability at time T(with sequence of lenght T)
        best_path[o] = np.argmax(viterbi_matrix[:, o])    # Start reverse to get optimal hidden sequence, by getting the state of the max prob
        for obs in range(o-1, -1, -1):
            best_path[obs] = best_path_matrix[int(best_path[obs+1]), obs+1]
        
        return best_path, best_path_prob
    
    def decoding(self, observation: list):
        """
        function to return the pretty best path for the observation sequence
        """
        best_path, best_prob = self._viterbi_algorithm(observation)
        best_path = [self.states[int(i)] for i in best_path]
        return ' -> '.join(best_path), f'With: {best_prob}'
    
    def baum_welch_algorithm(self, observation: list, iterations=10):
        """
        function to learn (A, B) with baum-welch algorithm (EM)
        """
        for iteration in range(iterations):
            max_a = self.a
            max_b = self.b
            xi = np.zeros((self.n, self.n))
            xi_over_time = list()
            gamma_over_time = list()
            gamma = np.zeros((self.n, len(observation)))
            alphas = self._forward_algorithm(observation)
            betas = self._backward_algorithm(observation)
            
            # Estimation-Step (E-Step)
            for t in range(len(observation)-1): # iterate over all observation
                # print('alphas: ', alphas[:, t], 'betas: ', betas[:, t+1])
                for i in range(self.n):
                    for j in range(self.n):
                        numerator = alphas[i, t] * self.a[i, j] * self.b[j, self.observations[observation[t+1]]] * betas[j, t+1]
                        xi[i, j] = numerator/alphas[:, -1].sum()
                        # print(f'Time {t} --> {xi[i, j]} for {i} to {j}')
                
                xi_over_time.append(xi)
                # gamma_over_time.append((observation[t], gamma))
                # print(f'Gamma {t}\n{gamma_over_time}')
            gamma = (alphas * betas)/self.likehood(observation)
            # Maximization-Step (M-Step)
            for i in range(self.n):
                for j in range(self.n):
                    max_a[i, j] = sum([xi[i, j] for xi in xi_over_time])/sum([xi[i, :].sum() for xi in xi_over_time])
           

            for obs, position in self.observations.items():
                for j in range(self.n):

                    max_b[j, position] = sum([gamma[j, t] for seq in range(len(observation)) if observation[seq] == obs])/gamma[j, :].sum()
    
            #print(f'Iteration {iteration}\n{max_a}')
            print(f'Iteration {iteration}\n{max_b}')

IndentationError: unexpected indent (<ipython-input-6-5e50e7c03ad2>, line 221)

In [7]:
hmm = HMM(2, 2, states=['Sunshine', 'Rain'], observations=['Hot', 'Cold'])
hmm.set_params(np.array([[0.2, 0.8], [0.8, 0.2]]), np.array([[0.2, 0.8], [0.65, 0.35]]), np.array([0.3, 0.7]))
# t = hmm.likehood(['Hot', 'Hot', 'Cold', 'Hot', 'Cold', 'Cold', 'Hot', 'Hot', 'Hot', 'Cold'])
# hmm.decoding(['Hot', 'Hot', 'Cold', 'Hot', 'Hot', 'Cold', 'Cold', 'Cold', 'Cold', 'Hot', 'Hot', 'Hot', 'Cold'])

hmm.baum_welch_algorithm(['Hot', 'Hot', 'Cold', 'Hot', 'Hot', 'Cold', 'Cold', 'Cold', 'Cold', 'Hot', 'Hot', 'Hot', 'Cold'])

Iteration 0
[[0.11283335 0.0967143 ]
 [0.85402871 0.73202461]]
Iteration 1
[[0.41978114 0.35981241]
 [0.70272597 0.60233654]]
Iteration 2
[[0.41587047 0.35646041]
 [0.65555393 0.56190337]]
Iteration 3
[[0.37428237 0.32081346]
 [0.65189062 0.55876339]]
Iteration 4
[[0.32236485 0.27631272]
 [0.64346236 0.55153916]]
Iteration 5
[[0.25961712 0.22252896]
 [0.62975278 0.5397881 ]]
Iteration 6
[[0.17900856 0.15343591]
 [0.61635772 0.52830662]]
Iteration 7
[[0.08359741 0.07165492]
 [0.60680203 0.52011603]]
Iteration 8
[[0.01594399 0.01366628]
 [0.60099834 0.51514143]]
Iteration 9
[[4.86567755e-04 4.17058076e-04]
 [5.98168400e-01 5.12715771e-01]]
