In [6]:
#"https://github.com/jpowie01/HMM_Viterbi_BaumWelch"
from typing import List, Tuple
import enum
import random
import numpy as np
import pandas as pd

class utils:
    def weighted_random(probabilities: np.ndarray) -> int:
        probability_so_far = 0.0
        random_value = random.random()
        for value, probability in enumerate(probabilities):
            probability_so_far += probability
            if probability_so_far > random_value:
                return value

        # By default return N-th possible value
        return probabilities.shape[0] - 1


AVAILABLE_DICES = 2
class DiceType(enum.IntEnum):
    FAIR = 0
    LOADED = 1


class Dice:
    def __init__(self, probabilities: np.ndarray) -> None:
        self.probabilities = probabilities

    def get_next_value(self) -> int:
        return utils.weighted_random(self.probabilities)


class Croupier:
    def __init__(self, fair_dice: Dice, loaded_dice: Dice,
                 initial_dice_probability: np.ndarray, transition_matrix: np.ndarray) -> None:
        self._fair_dice = fair_dice
        self._loaded_dice = loaded_dice
        self._initial_dice_probability = initial_dice_probability
        self._transition_matrix = transition_matrix
        self._current_dice = utils.weighted_random(self._initial_dice_probability)

    def get_next_value(self) -> Tuple[int, int]:
        # Try to switch between dices
        self._current_dice = utils.weighted_random(self._transition_matrix[self._current_dice])

        # Let's roll the dice!
        if self._current_dice == DiceType.FAIR:
            return self._current_dice, self._fair_dice.get_next_value()
        else:
            return self._current_dice, self._loaded_dice.get_next_value()

    def get_observations(self, number_of_observations: int) -> Tuple[np.ndarray, List[int]]:
        used_dices = []
        observations = []
        for i in range(number_of_observations):
            used_dice, dice_wall = self.get_next_value()
            used_dices.append(used_dice)
            observations.append(dice_wall)
        return np.array(observations), used_dices

    
def calculate_viterbi_probabilities(observations: np.ndarray, fair_dice: Dice, loaded_dice: Dice,
                                    initial_dice_probability: np.ndarray, transition_matrix: np.ndarray) -> np.ndarray:
    observations_rows = observations.shape[0]

    # Initial probabilities of each dices depends on random choice of croupier and probability of an observation
    viterbi_probabilities = np.zeros([observations_rows, AVAILABLE_DICES])
    viterbi_probabilities[0] = np.array([
        initial_dice_probability[DiceType.FAIR] * fair_dice.probabilities[observations[0]],
        initial_dice_probability[DiceType.LOADED] * loaded_dice.probabilities[observations[0]],
    ])

    # Iteratively compute dice probabilities based on previous observations and probabilities
    for t, observation in enumerate(observations[1:], 1):
        viterbi_probabilities[t] = np.array([
            fair_dice.probabilities[observation] * np.max([
                # Previously we've used fair dice and now we use fair dice
                viterbi_probabilities[t-1][DiceType.FAIR] * transition_matrix[DiceType.FAIR, DiceType.FAIR],
                # Previously we've used loaded dice and now we use fair dice
                viterbi_probabilities[t-1][DiceType.LOADED] * transition_matrix[DiceType.LOADED, DiceType.FAIR],
            ]),
            loaded_dice.probabilities[observation] * np.max([
                # Previously we've used fair dice and now we use loaded dice
                viterbi_probabilities[t-1][DiceType.FAIR] * transition_matrix[DiceType.FAIR, DiceType.LOADED],
                # Previously we've used loaded dice and now we use loaded dice
                viterbi_probabilities[t-1][DiceType.LOADED] * transition_matrix[DiceType.LOADED, DiceType.LOADED],
            ]),
        ])

        # Normalize current probabilities, so that values won't be too low over time
        viterbi_probabilities[t] /= np.sum(viterbi_probabilities[t])

    return viterbi_probabilities

def calculate_forward_probabilities(observations: np.ndarray, fair_dice: Dice, loaded_dice: Dice,
                                    initial_dice_probability: np.ndarray, transition_matrix: np.ndarray) -> np.ndarray:
    observations_rows = observations.shape[0]

    # Initial probabilities for alpha equals probability of random choice of a given dice and roll of given observation
    alpha_probabilities = np.zeros([observations_rows, AVAILABLE_DICES])
    alpha_probabilities[0] = np.array([
        initial_dice_probability[DiceType.FAIR] * fair_dice.probabilities[observations[0]],
        initial_dice_probability[DiceType.LOADED] * loaded_dice.probabilities[observations[0]],
    ])

    # Iteratively compute probabilities based on previous ones
    for t, observation in enumerate(observations[1:], 1):
        alpha_probabilities[t] = np.array([
            fair_dice.probabilities[observation] * np.sum([
                # Previously we've used fair dice and now we use fair dice
                alpha_probabilities[t-1][DiceType.FAIR] * transition_matrix[DiceType.FAIR, DiceType.FAIR],
                # Previously we've used loaded dice and now we use fair dice
                alpha_probabilities[t-1][DiceType.LOADED] * transition_matrix[DiceType.LOADED, DiceType.FAIR]
            ]),
            loaded_dice.probabilities[observation] * np.sum([
                # Previously we've used fair dice and now we use loaded dice
                alpha_probabilities[t-1][DiceType.FAIR] * transition_matrix[DiceType.FAIR, DiceType.LOADED],
                # Previously we've used loaded dice and now we use loaded dice
                alpha_probabilities[t-1][DiceType.LOADED] * transition_matrix[DiceType.LOADED, DiceType.LOADED]
            ]),
        ])

        # Normalize current probabilities, so that values won't be too low over time
        alpha_probabilities[t] /= np.sum(alpha_probabilities[t])

    return alpha_probabilities


def calculate_backward_probabilities(observations: np.ndarray, fair_dice: Dice, loaded_dice: Dice,
                                     initial_dice_probability: np.ndarray, transition_matrix: np.ndarray) -> np.ndarray:
    observations_rows = observations.shape[0]

    # Initial probabilities for beta equals to 1 for each dice. Keep in mind that we need to iterate from the back!
    beta_probabilities = np.zeros([observations_rows, AVAILABLE_DICES])
    beta_probabilities[observations_rows-1] = np.array([1, 1])

    for t, observation in reversed(list(enumerate(observations))[1:]):
        beta_probabilities[t-1] = np.array([
            np.sum([
                # In the observation T croupier used fair dice and fair dice in T-1
                transition_matrix[DiceType.FAIR, DiceType.FAIR] * fair_dice.probabilities[observation]
                  * beta_probabilities[t][DiceType.FAIR],
                # In the observation T croupier used loaded dice and fair dice in T-1
                transition_matrix[DiceType.FAIR, DiceType.LOADED] * loaded_dice.probabilities[observation]
                  * beta_probabilities[t][DiceType.LOADED],
            ]),
            np.sum([
                # In the observation T croupier used fair dice and loaded dice in T-1
                transition_matrix[DiceType.LOADED, DiceType.FAIR] * fair_dice.probabilities[observation]
                  * beta_probabilities[t][DiceType.FAIR],
                # In the observation T croupier used loaded dice and loaded dice in T-1
                transition_matrix[DiceType.LOADED, DiceType.LOADED] * loaded_dice.probabilities[observation]
                  * beta_probabilities[t][DiceType.LOADED],
            ]),
        ])

        # Again, as previously, normalize all probabilities, so that values won't be too low over time
        beta_probabilities[t-1] /= np.sum(beta_probabilities[t-1])

    return beta_probabilities


def baum_welch(multiple_observations: List[np.ndarray], epochs: int = 50) -> Tuple[Dice, Dice, np.ndarray, np.ndarray]:
    # Initialize first dice parameters
    first_dice_probabilities = np.random.rand(6)
    first_dice_probabilities /= first_dice_probabilities.sum()  # Make probabilities sum up to 1.0
    first_dice = Dice(first_dice_probabilities)

    # Initialize second dice parameters
    second_dice_probabilities = np.random.rand(6)
    second_dice_probabilities /= second_dice_probabilities.sum()  # Make probabilities sum up to 1.0
    second_dice = Dice(second_dice_probabilities)

    # Initialize probabilities which were used to randomize first dice
    initial_dice_probability = np.random.rand(2)
    initial_dice_probability /= initial_dice_probability.sum()  # Make probabilities sum up to 1.0

    # Initialize transition probabilities
    transition_matrix = np.random.rand(2, 2)
    transition_matrix /= np.sum(transition_matrix, axis=1)[:, None]  # Make probabilities sum up to 1.0 for each dice

    # Train our Hidden Markov Model in epochs
    for epoch in range(epochs):
        # Prepare placeholders for collecting computations from all observations
        new_transition_matrix = np.zeros((2, 2))
        new_initial_dice_probability = np.zeros(2)
        new_first_dice_probabilities = np.zeros(6)
        new_second_dice_probabilities = np.zeros(6)
        fitness = 0

        # In each epoch walk through all collected observations
        for observations in multiple_observations:
            if np.ndim(multiple_observations) == 1:
                observations_rows = 1
            else:
                observations_rows = observations.shape[0]

            # E Step - use forward and backward pass to calculate alphas and betas
            alpha_probabilities = calculate_forward_probabilities(observations, first_dice, second_dice,
                                                                  initial_dice_probability, transition_matrix)
            beta_probabilities = calculate_backward_probabilities(observations, first_dice, second_dice,
                                                                  initial_dice_probability, transition_matrix)
            aposteriori_probabilities = np.multiply(alpha_probabilities, beta_probabilities)

            # Collect probabilities for each observations which can be defined as P(O|parameters)
            probability_of_observation = initial_dice_probability[0] * beta_probabilities[0][0] \
                                         + initial_dice_probability[1] * beta_probabilities[0][1]
            fitness += probability_of_observation

            # xi (Greek Letter) parameters are known as probability of being in state `i` at time `t` and in
            # state `j` at time `t+1`, given the model and the sequence of observations
            xi = np.zeros([observations_rows, 2, 2])
            for t in range(observations_rows-1):
                denominator = (alpha_probabilities[t][0] * beta_probabilities[t][0] + alpha_probabilities[t][1] * beta_probabilities[t][1])
                xi[t, 0, 0] = (alpha_probabilities[t][0] * transition_matrix[0, 0]
                                * first_dice_probabilities[observations[t+1]] * beta_probabilities[t+1][0]) / denominator
                xi[t, 0, 1] = (alpha_probabilities[t][0] * transition_matrix[0, 1]
                                * second_dice_probabilities[observations[t+1]] * beta_probabilities[t+1][1]) / denominator
                xi[t, 1, 0] = (alpha_probabilities[t][1] * transition_matrix[1, 0]
                                * first_dice_probabilities[observations[t+1]] * beta_probabilities[t+1][0]) / denominator
                xi[t, 1, 1] = (alpha_probabilities[t][1] * transition_matrix[1, 1]
                                * second_dice_probabilities[observations[t+1]] * beta_probabilities[t+1][1]) / denominator

            # Gamma (another Greek Letter) parameters stands for probability of being in state `i` at time `t`
            gamma = np.zeros([observations_rows, 2])
            for t in range(observations_rows-1):
                gamma[t, 0] = np.sum(xi[t, 0, :])
                gamma[t, 1] = np.sum(xi[t, 1, :])

            # Compute initial probabilities for this observation and add it for later use
            new_initial_dice_probability += gamma[0] / np.sum(gamma[0])

            # Compute transition matrix and add it up to for later use
            new_transition_matrix[0, 0] += np.sum(xi[:, 0, 0]) / np.sum(gamma[:, 0]) / probability_of_observation
            new_transition_matrix[0, 1] += np.sum(xi[:, 0, 1]) / np.sum(gamma[:, 0]) / probability_of_observation
            new_transition_matrix[1, 0] += np.sum(xi[:, 1, 0]) / np.sum(gamma[:, 1]) / probability_of_observation
            new_transition_matrix[1, 1] += np.sum(xi[:, 1, 1]) / np.sum(gamma[:, 1]) / probability_of_observation

            # Compute probabilities for each dice (hidden states) and collect them for later use
            for wall in range(6):
                # Emission probabilities can be computed as expected number of times in state `j` and observing `k-th` wall on dice
                # divided by expected number of times in state `j`
                new_first_dice_probabilities[wall] = np.sum(np.take(aposteriori_probabilities[:, 0], np.where(observations == wall)))
                new_first_dice_probabilities[wall] /= np.sum(aposteriori_probabilities[:, 0]) / probability_of_observation
                new_second_dice_probabilities[wall] = np.sum(np.take(aposteriori_probabilities[:, 1], np.where(observations == wall)))
                new_second_dice_probabilities[wall] /= np.sum(aposteriori_probabilities[:, 1]) / probability_of_observation

        # Normalize all values for probabilities, so that they won't blow up quickly...
        initial_dice_probability = new_initial_dice_probability / np.sum(new_initial_dice_probability)
        transition_matrix = new_transition_matrix / np.sum(new_transition_matrix, axis=1)[:, None]
        first_dice_probabilities = new_first_dice_probabilities / np.sum(new_first_dice_probabilities)
        second_dice_probabilities = new_second_dice_probabilities / np.sum(new_second_dice_probabilities)

        # Put dice probabilities into Dice instances
        first_dice = Dice(first_dice_probabilities)
        second_dice = Dice(second_dice_probabilities)

        # Some logging is always good :)
        print('= EPOCH #{} ='.format(epoch))
        print('Transition Matrix:', transition_matrix)
        print('Initial Dice Probability:', initial_dice_probability)
        print('First Dice:', first_dice_probabilities)
        print('Second Dice:', second_dice_probabilities)
        print('Fitness:', fitness)
        print()

    return first_dice, second_dice, initial_dice_probability, transition_matrix

In [8]:
obs_test = pd.read_csv("../datasets/markov/hmm_pb2.csv", header=None).values
obs_test = obs_test - 1
np.set_printoptions(precision = 3)
print('+----------------------------+')
print('|    Baum-Welch Algorithm    |')
print('+----------------------------+')

first_dice, second_dice, initial_dice_probability, transition_matrix = baum_welch(obs_test, epochs = 100)

+----------------------------+
|    Baum-Welch Algorithm    |
+----------------------------+
= EPOCH #0 =
Transition Matrix: [[0.494 0.506]
 [0.361 0.639]]
Initial Dice Probability: [0.978 0.022]
First Dice: [0.228 0.188 0.278 0.055 0.018 0.233]
Second Dice: [0.176 0.211 0.13  0.287 0.194 0.001]
Fitness: 0.5229485494781139

= EPOCH #1 =
Transition Matrix: [[0.474 0.526]
 [0.349 0.651]]
Initial Dice Probability: [9.998e-01 1.507e-04]
First Dice: [0.23  0.191 0.278 0.058 0.018 0.225]
Second Dice: [0.174 0.21  0.127 0.291 0.198 0.001]
Fitness: 0.46057342876458024

= EPOCH #2 =
Transition Matrix: [[0.457 0.543]
 [0.34  0.66 ]]
Initial Dice Probability: [1.000e+00 9.782e-07]
First Dice: [0.229 0.189 0.279 0.057 0.018 0.229]
Second Dice: [0.175 0.211 0.128 0.289 0.196 0.001]
Fitness: 0.4626077910068489

= EPOCH #3 =
Transition Matrix: [[0.44  0.56 ]
 [0.331 0.669]]
Initial Dice Probability: [1.000e+00 6.452e-09]
First Dice: [0.227 0.186 0.279 0.056 0.017 0.235]
Second Dice: [0.177 0.212 0.13

= EPOCH #33 =
Transition Matrix: [[1.443e-08 1.000e+00]
 [9.982e-06 1.000e+00]]
Initial Dice Probability: [1.000e+00 3.323e-56]
First Dice: [7.255e-05 4.870e-05 1.161e-04 1.098e-05 3.090e-06 9.997e-01]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5000046946547404

= EPOCH #34 =
Transition Matrix: [[5.891e-10 1.000e+00]
 [5.242e-06 1.000e+00]]
Initial Dice Probability: [1.000e+00 3.094e-57]
First Dice: [6.586e-06 4.421e-06 1.054e-05 9.966e-07 2.805e-07 1.000e+00]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5000024918471625

= EPOCH #35 =
Transition Matrix: [[1.313e-11 1.000e+00]
 [2.752e-06 1.000e+00]]
Initial Dice Probability: [1.00e+00 2.88e-58]
First Dice: [3.280e-07 2.202e-07 5.251e-07 4.961e-08 1.397e-08 1.000e+00]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5000013103174782

= EPOCH #36 =
Transition Matrix: [[1.570e-13 1.000e+00]
 [1.445e-06 1.000e+00]]
Initial Dice Probability: [1.000e+00 2.682e-59]
First Dice: [8.786e-09 5.898

= EPOCH #62 =
Transition Matrix: [[1.799e-161 1.000e+000]
 [7.656e-014 1.000e+000]]
Initial Dice Probability: [1.000e+00 4.179e-86]
First Dice: [1.373e-147 9.216e-148 2.198e-147 2.048e-148 5.846e-149 1.000e+000]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5000000000000364

= EPOCH #63 =
Transition Matrix: [[6.131e-171 1.000e+000]
 [4.019e-014 1.000e+000]]
Initial Dice Probability: [1.00e+00 3.89e-87]
First Dice: [1.051e-156 7.056e-157 1.682e-156 1.567e-157 4.476e-158 1.000e+000]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5000000000000192

= EPOCH #64 =
Transition Matrix: [[1.097e-180 1.000e+000]
 [2.110e-014 1.000e+000]]
Initial Dice Probability: [1.000e+00 3.622e-88]
First Dice: [4.224e-166 2.836e-166 6.762e-166 6.294e-167 1.799e-167 1.000e+000]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5000000000000101

= EPOCH #65 =
Transition Matrix: [[1.031e-190 1.000e+000]
 [1.108e-014 1.000e+000]]
Initial Dice Probability: [1.000e+00 3.372

= EPOCH #95 =
Transition Matrix: [[0.00e+00 1.00e+00]
 [4.46e-23 1.00e+00]]
Initial Dice Probability: [1.000e+000 3.948e-120]
First Dice: [0. 0. 0. 0. 0. 1.]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5

= EPOCH #96 =
Transition Matrix: [[0.000e+00 1.000e+00]
 [2.341e-23 1.000e+00]]
Initial Dice Probability: [1.000e+000 3.676e-121]
First Dice: [0. 0. 0. 0. 0. 1.]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5

= EPOCH #97 =
Transition Matrix: [[0.000e+00 1.000e+00]
 [1.229e-23 1.000e+00]]
Initial Dice Probability: [1.000e+000 3.422e-122]
First Dice: [0. 0. 0. 0. 0. 1.]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5

= EPOCH #98 =
Transition Matrix: [[0.000e+00 1.000e+00]
 [6.453e-24 1.000e+00]]
Initial Dice Probability: [1.000e+000 3.186e-123]
First Dice: [0. 0. 0. 0. 0. 1.]
Second Dice: [0.197 0.202 0.189 0.195 0.124 0.093]
Fitness: 0.5

= EPOCH #99 =
Transition Matrix: [[0.000e+00 1.000e+00]
 [3.388e-24 1.000e+00]]
Initial Dice Pro