# Exercise Sheet 7: Markov Chains

In this exercise sheet, we will simulate a Markov chain. In the first part, we will consider a pure Python based implementation where a single particle jumps from one position to another of the lattice, where all transitions to neighboring states have the same probability. Then, we will add probabilities for the transitions. Finally, the implementation will be parallelized to run many chains in parallel.

Keep in mind that you can get the documentation of modules and functions using the `help()` function.

## Testing
To each task tests are provided. You can use these tests to practice test driven development (TDD), but please note that using them will likely make the tasks easier.
Please also not that the tests may not be exhaustive, i.e. even when passing all tests, your solution can still be imperfect.

In [None]:
# This cell is for setup the testing. Please execute.
import utils

# Use unittest asserts
import unittest

t = unittest.TestCase()

## 1. Random moves in a lattice

In this exercise, we will simulate the propagation of particles in a graph composed of 8 states (denoted by letters A-H) and stored in the variable `S` defined in the cell below. The lattice is the following:

![](lattice.png)

The particle starts in state `A` and then jumps randomly from its current state to one of its neighbors, all with the same probability. Note that it cannot stay at the current position. The dictionary `T` defined in the cell below encode such transition behavior.

In [None]:
# List of states
S = list("ABCDEFGH")

# Dictionary of transitions
T = {
    "A": "BE",
    "B": "AFC",
    "C": "BGD",
    "D": "CH",
    "E": "AF",
    "F": "EBG",
    "G": "FCH",
    "H": "GD",
}
print(f"States: {S}")
print(f"Transitions: {T}")

Using pure Python (i.e. no `numpy` functions), set the initial state to `A` and run it for 1999 iterations. Return the sequence of states visited by the particle as a list. Set the random seed of the module `random` to value `123` using the function `random.seed` before starting the simulation in order to produce deterministic results.

In [None]:
import random

In [None]:
def simulate(transitions: dict) -> list[str]:
    """
    Simulates a Markov chain defined by the above transitions.
    This function always sets the random seed to `123`. All simulations start with
    the initial state `A`. It always simulates 2000 steps including the initial state.
    
    Args:
        transitions: A dictionary with eight keys [A-H]. For each key a string is mapped as
                     its value. Each of those strings can only contain the letters [A-H] each
                     letter can only appear once. `'A': 'BE'` means that from state `A` we 
                     can reach the states `B` and `E` and no other state.
                     
    Returns:
        list: A list of states (a string containing one of the letters [A-H])
              that were visited during the simulation
    """


### Please enter your solution here ###



In [None]:
# Tests
def test_simulate():
    X = simulate(T)

    # Print the first 10 states
    print(f"First 10 visited states --> {X[:10]}")
    t.assertIsInstance(X, list, "The state sequence must be a list")
    t.assertEqual(len(X), 2000)
    t.assertIsInstance(X[0], str, "The state sequence must only contain strings")
    t.assertEqual(X[0], "A", "The state sequence must start with A")
    t.assertTrue(all(x in S for x in X), "Your state sequence contains an invalid state")
    t.assertEqual(set(S), set(X), "Your list should contain each state at least once")

test_simulate()

Implement a function that returns a list of the relative frequencies of each state.

In [None]:
def compute_histogram(valid_states: list[str], state_sequence: list[str]) -> list[float]:
    """
    Returns a list of percentages relating to how many times each state
    has been visited according to the `state_sequence` list.

    Args:
        valid_states: A list of all valid states
        state_sequence: A sequence of states for which we want to calculate the frequencies
        
    Returns:
        list: A list of length 8. Contains the percentage `[0-1]` of occurrences of each state
              in the `state_sequence`.
    """

### Please enter your solution here ###



In [None]:
def test_compute_histogram():
    X = simulate(T)
    h = compute_histogram(S, X)
    print(f"frequencies -> {h}")
    t.assertIsInstance(h, list)
    # Check if the histogram is a valid probability distribution
    print(f"sum -> {sum(h)}")
    t.assertAlmostEqual(sum(h), 1.0)

    t.assertTrue(all(f < 0.2 for f in h))

test_compute_histogram()

Using the above `compute_histogram` function, produce a bar plot using `matplotlib` (`matplotlib.pyplot.bar`) showing the fraction of the time the particle is found in a given state, averaged over the whole simulation. Do **not** call plt.show in the function (it is automatically called because of `%matplotlib inline`

In [None]:
import matplotlib.pyplot as plt

In [None]:
def plot_histogram(valid_states: list[str], frequencies: list[float]):

    """
    Plots a bar graph of a provided histogram.

    Args:
        valid_states: The list of states
        frequencies: The frequency of each state
    """

### Please enter your solution here ###



In [None]:
# Plot the histogram of the above defined sequence X
X = simulate(T)
h = compute_histogram(S, X)
plot_histogram(S, h)

## 2. Adding a special state
Suppose now that the rule (defined by the transition dictionary) is modified such that every time the particle is in state `F`, it always moves to `E` in the next step.

* Modify the code to handle this special case, and create a bar plot for the new states distribution. Make sure to not modify the original transition dictionary. To achieve this, you can have a look at the `copy` module, but every working solution is accepted.

In [None]:
def modify_transitions(transitions: dict) -> dict:
    """
    Creates a modified transition dictionary without modifying the provided one.

    This function creates a new transition dictionary such that from state `F` the only
    possible following state is `E`.

    Args:
        transitions: A dictionary that describes the possible transitions from each state
        
    Returns:
        dict: A modified transition dict where from state `F` only state `E` can follow
    """

### Please enter your solution here ###



In [None]:
new_T = modify_transitions(T)
new_X = simulate(new_T)
h = compute_histogram(S, new_X)
plot_histogram(S, h)

In [None]:
# Tests
def test_modify_transitions():
    new_T = modify_transitions(T)
    print(f"new_T['F'] = {new_T['F']}")
    print(f"T['F'] = {T['F']}")
    t.assertIsInstance(new_T, dict)
    t.assertIsNot(T, new_T, "T and new_T should not be the same instance")

test_modify_transitions()

## 3. Exact solution to the previous exercise

For simple Markov chains, a number of statistics can be obtained analytically from the structure of the transition model, in particular, by analysis of the transition matrix. The transition matrix is a square matrix $T$ of size $n \times n$ where $n$ is the number of states. The entry $T_{ij}$ is the probability of transitioning from state $i$ to state $j$ in one step. The rows of the matrix sum to 1.

The first step we need to take is to convert the dictionary based representation of our Markov chain, into a transition matrix.

* Implement the function `to_matrix`. The function computes the transition matrices associated with the models of exercise 1 and 2 (make sure that each row in these matrices sums to 1).

**Hint**: You can notice that for each state, the transitions towards other states are always listed from left to right in the dictionary `T`. Also note that characters A-H can be mapped to integer values using the Python function `ord()`, thus, giving a direct relation between state names and indices of the transition matrix.

In [None]:
import numpy as np

In [None]:
def to_matrix(transition: dict) -> np.ndarray:

    """

    Converts a transition dictionary into a transition matrix. The first row
    represents the probability of moving from the first state to every state.

    If the state dict is irreflexive (we cannot go from one state to the same
    state) the sum of the diagonal is 0.

    The sum of each row should be 1.

    All the elements in the matrix are values in [0-1].

    Args:
        transition: A dictionary describing the possible transitions from each state.

    Returns:
        np.ndarray: The transition matrix (ndim=2) that represents the same
                    (uniform) transitions as the transition dict
    """

### Please enter your solution here ###



In [None]:
def test_to_matrix():
    matrix_T = to_matrix(T)
    print(matrix_T.round(2))

    t.assertIsInstance(matrix_T, np.ndarray)
    np.testing.assert_allclose(np.sum(matrix_T, axis=1), 1.0)

test_to_matrix()

## 4. Adding non-uniform transition probabilities

We consider the original lattice defined by the variable `T`. We set transition probabilities for each state to be such that:

1. For states that have only two connections, the probability of transitioning to either connection is equal.
2. For states that have three connections, the probability of moving to the middle state, as seen in the string that defines the transitions in dictionary `T`, is 0.5 while the probability of moving to the left state is twice the probability of moving right.


### 4.1 State string to index
Build a function that converts the string state into a numeric index

In [None]:
def state_string_to_index(state: str) -> int:
    """
    Converts the state string into a numerical index, where:
    'A' -> 0
    'B' -> 1
    ...
    'H' -> 7

    Args:
        state: A state string in [A-H] with len(state) == 1
        
    Returns:
        int: The index of the state in [0-7]
    """

### Please enter your solution here ###



In [None]:
# Test state_string_to_index
def test_state_string_to_index():
    A_idx = state_string_to_index("A")
    t.assertNotIsInstance(A_idx, float)
    t.assertEqual(A_idx, 0)

test_state_string_to_index()

### 4.2 Build transition matrix
Now implement the `build_transition_matrix` according to the rules defined above.

In [None]:
def build_transition_matrix(transition: dict) -> np.ndarray:
    """
    Builds a transition matrix from a transition dictionary, similarly to
    `to_matrix` function. However, this function does not create a uniform
    distribution among the following states.

    If a state is connected to two other states, the distribution is uniform.

    If a state is connected to three other states, then moving to the middle
    state should have a 50% chance and moving left twice as much as moving right.

    Like in the `to_matrix` function the sum of each row should be 1.

    Args:
        transition: A dictionary describing the possible transitions from each state
        
    Returns:
        np.ndarray: A transition matrix
    """

### Please enter your solution here ###



In [None]:
# Test build_transition_matrix
def test_build_transition_matrix():
    P = build_transition_matrix(T)
    print(P.round(3))

    t.assertIsInstance(P, np.ndarray)
    np.testing.assert_allclose(P.sum(axis=1), 1.0)
    np.testing.assert_allclose(P[1, 2], 0.16666667)

test_build_transition_matrix()

## 5. Simulation for multiple particles

Now that we have a transition matrix, we can simulate the evolution of multiple particles in the system efficiently. We want to check if we can approximate the stationary distribution, by simulating 1000 particles for 500 time steps.

### 5.1 Simulate 1000 particles
Implement the function `simulate_1000` which will simulate 1000 particles for 500 steps. The function estimates the stationary distribution by calculating the distribution of the 1000 particles after the 500 steps.

* The initial state of these particles is pseudo-random and given by the function `utils.getinitialstate()`.
* Using the function `utils.mcstep()`, simulate this system for 500 time steps.
* Estimate the stationary distribution by looking at the distribution of these particles in state space after 500 time steps.

For reproducibility, give seed values to the function utils.mcstep corresponding to the current time step of the simulation (i.e. from 0 to 499).

In [None]:
def simulate_1000(transition: dict) -> np.ndarray:
    """
    Simulates 1000 particles for 500 time steps, in order to approximate
    the stationary distribution.

    Args:
        transition: A transition dict, that will be converted into a transition matrix using the
                   `build_transition_matrix` function
    Returns:
        np.ndarray: The estimated stationary distribution vector (ndim=1)

    """

### Please enter your solution here ###



In [None]:
# Tests
def test_simulate_1000():
    stationary_distribution = simulate_1000(T)
    print(stationary_distribution)
    t.assertIsInstance(stationary_distribution, np.ndarray)
    t.assertEqual(stationary_distribution.shape, (8,))
    np.testing.assert_allclose(np.sum(stationary_distribution), 1)

test_simulate_1000()

In [None]:
# For reference this is the actual stationary distribution
P = build_transition_matrix(T)
print(utils.getstationary(build_transition_matrix(T)).round(4))

### 5.2 Rebuild utils.getstationary using simple NumPy functions
We want to compare our simulated results with the actual stationary distribution.
Therefore, implement the function `getstationary`, which solves the system of equations using Gaussian elimination. For practice, please solely use 'primitive' NumPy functions (this excludes anything that can solve a system of equations with one function call like `np.linalg.solve`, `np.linalg.pinv`, `np.linalg.inv`, ...).
The comments will help you with the implementation step by step. For reference and testing, a 'high-level' implementation can be found in `utils.getstationary`.
If you do not have prior experience with Gaussian elimination, we recommend some reading before starting with the exercise, e.g., by reviewing the respective [Wikipedia page](https://en.wikipedia.org/wiki/Gaussian_elimination).

In [None]:
def getstationary(P: np.array) -> np.ndarray:
    """
    Returns the stationary distribution of a markov chain with transition matrix P.   

    Args:
        P: np.ndarray: The transition matrix     
    Returns:
        np.ndarray: The stationary distribution vector (ndim=1)  
    """
    n = P.shape[0]
    
    # Step 1: Prepare system matrix
    
    # From transition matrix, construct A and b such that A @ x = b
    A = np.concatenate([(P.T - np.identity(n)), np.ones([1, n])], axis=0)
    b = np.array([0] * n + [1])

    # We will solve A.T A x = A.T b for x. Therefore, compute the matrix products for A (transposed) with A and b, respectively.

### Please enter your solution here ###


    # Form the system matrix for Gaussian elimination by incorporating the matrix products above into a single tensor.

### Please enter your solution here ###

    
    # Step 2: Forward Elimination

    # Row by row, transform the system matrix into the row echelon form:
    for i in range(n):
        # Make the diagonal of the system matrix contain all 1's

### Please enter your solution here ###

    
        # Make the elements below the diagonal 0:
        # Column by column, subtract row i+1 with a multiple of row i
        for k in range(i+1, n):

### Please enter your solution here ###

    
    # Step 3: Backward Substitution
    
    x = np.zeros(n)
    # Get the solution for every variable of the system of equations:
    # From the last to the first row, insert the already-known variables (rows of x)
    # to solve the row's equation for a further variable.

### Please enter your solution here ###

    
    # Since we augmented the matrix with an additional row, we only need the first 8 elements of x
    x = x[:8]

    return x

In [None]:
# Tests
def test_getstationary():
    P = build_transition_matrix(T)
    stat_distr_utils = utils.getstationary(P)
    print(stat_distr_utils.round(4))
    stat_distr = getstationary(P)
    print(stat_distr.round(4))
    
    np.testing.assert_allclose(stat_distr_utils, stat_distr)

test_getstationary()