<a href="https://colab.research.google.com/github/sudotouchwoman/math-misc/blob/main/notebooks/dtmc.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from functools import reduce
from io import StringIO
from itertools import repeat

import graphviz
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

In [None]:
# helper function to visualize matrices
def show_matrix(A: np.ndarray, zmax: float = 1.0, zmin: float = 0.0, title: str = "") -> go.Figure:
    return px.imshow(A, zmax=zmax, zmin=zmin, color_continuous_scale="magma", title=title)


## **Problem 1**

In [None]:
# load data from text file
P = np.genfromtxt(StringIO("""0.19, 0.16, 0.22, 0.17, 0.26
0.18, 0.24, 0.16, 0.24, 0.18
0.23, 0.17, 0.19, 0.21, 0.20
0.22, 0.21, 0.19, 0.22, 0.16
0.18, 0.17, 0.16, 0.24, 0.25"""), delimiter=",")

# have a look at the matrix itself
show_matrix(P)

In [None]:
# visualize the transition matrix as a directed weighted graph

f = graphviz.Digraph("dmc-task-1", filename="dmc-1.gv", engine="dot")
f.attr(rankdir="LR", size="9", title="Markov Chain", labelloc="t")
f.attr("node", shape="circle", color="crimson", fillcolor="crimson", style="filled")
f.attr("edge", color="gainsboro")

for i, weights in enumerate(P):
    for j, weight in filter(lambda x: x[-1] > 0, enumerate(weights)):
        f.edge(f"S_{i}", f"S_{j}", label=f"{weight}")

f

In [None]:
# check the ergodicity property:
# 1) chain is irreducible (i.e., all states are recurrent and constitute a single closed communication class,
# the graph is basically fully connected)
# 2) chain is aperiodic given that self-loops are present
# all conditions are met, thus the Markov process is ergodic

# simple ergodicity check: approximate the limiting distribution
k = 10
P_lim = reduce(lambda x, p: x @ p, repeat(P, times=k))

In [None]:
# try to obtain stationary distribution as a
# left eigenvector of transition matrix

laplacian = P - np.eye(*P.shape)
show_matrix(laplacian, zmax=None, zmin=None)

In [None]:
n = laplacian.shape[0]
pi_0 = np.atleast_2d(np.zeros(n))

# add a row representing the probability contraint: all entries in
# the solution should sum up to 1
A = np.r_[laplacian.T, np.ones_like(pi_0)]
b = np.zeros(n + 1)
b[-1] = 1.

# have a look at final shape of the system
A, b

In [None]:
# solve the overdetermined system in MSE sense
sol, residuals, rank, sigmas = np.linalg.lstsq(A, b, rcond=None)

# one can observe that the stationary distribution was indeed found
# and matches the limiting distribution, as expected
print(f"{sol=}")
print(f"{residuals=}")

In [None]:
# ~ equals to the solution above
P_lim[0]

In [None]:
# one can tweak parameters below and
# explore the behaviour of the system
n_experiments = 200
n_steps = 100
gen = np.random.default_rng(seed=30)

In [None]:
# generate random initial distributions
# and have a look on them
random_pi_0 = gen.random((20, *sol.shape))
random_pi_0 /= random_pi_0.sum(-1).reshape(-1, 1)

show_matrix(np.atleast_2d(random_pi_0))

In [None]:
# simulate system for given number of steps
# all rows do converge to the limiting distribution
random_pi_lim = reduce(lambda x, p: x @ p, repeat(P, times=n_steps), random_pi_0)
show_matrix(np.atleast_2d(random_pi_lim))

In [None]:
from typing import Tuple

gen = np.random.default_rng(seed=30)
random_pi_0 = gen.choice(n, size=(n_experiments), replace=True)
identity = np.eye(n)


def emulate_propagation(A: np.ndarray, curr: Tuple[int, np.ndarray]) -> np.ndarray:
    """
    Emulates stochastic process by randomly selecting
    next step (weighted by probas)
    """
    idx, p = curr
    prev_state = A[idx]
    ohe_pi_prev = identity[prev_state]
    probas = ohe_pi_prev @ p
    # https://stackoverflow.com/questions/47722005/vectorizing-numpy-random-choice-for-given-2d-array-of-probabilities-along-an-a
    A[idx + 1] = (probas.cumsum(1) > gen.random(probas.shape[0])[:,None]).argmax(1)
    return A


pi_consecutive = np.zeros((n_steps, n_experiments), dtype=int)
pi_consecutive[0] = random_pi_0

pi_consecutive = reduce(
    emulate_propagation, enumerate(repeat(P, times=n_steps - 1)), pi_consecutive
)

pi_consecutive.shape

In [None]:
df = pd.DataFrame(pi_consecutive, columns=[f"exp_{i}" for i in range(n_experiments)])
df.head()

In [None]:
from typing import List


def plot_simulation_trajectories(df: pd.DataFrame, columns: List[str]) -> go.Figure:
    fig = go.Figure()
    for col in columns:
        fig.add_scatter(x=df.index, y=df[col], name=col, line=dict(dash="dot"))
    return fig

In [None]:
plot_simulation_trajectories(df, columns=[f"exp_{i}" for i in range(0, n_experiments, 20)])

In [None]:
# compute transition frequencies from statistics
P_freq = np.zeros_like(P)


def fill_p_freq(x: np.ndarray, y: np.ndarray) -> np.ndarray:
    P_freq[x, y] += 1
    return y

_ = reduce(fill_p_freq, pi_consecutive)

unique, counts = np.unique(pi_consecutive, return_counts=True)
px.bar(x=unique, y=counts, title="Value Counts (task 1)")

In [None]:
counts = np.zeros((n_experiments, n))

for i, experiment in enumerate(pi_consecutive.T):
    un, cnts = np.unique(experiment, return_counts=True)
    counts[i][un] = cnts

counts = counts / counts.sum(-1).reshape(-1, 1)
px.imshow(
    counts.T,
    title=f"Conditional state frequencies for {n_experiments} experiments, {n_steps} timesteps",
    height=600,
    color_continuous_scale="magma",
)


In [None]:
show_matrix(P).show()
show_matrix(P_lim).show()

In [None]:
# what is the proper way of filling the frequency matrix?
empty = P_freq.sum(-1) <= 1e-10

P_freq_normed = P_freq
P_freq_normed[empty, empty] = 1
P_freq_normed /= P_freq_normed.sum(-1).reshape(-1, 1)

show_matrix(P_freq_normed)

In [None]:
def count_freqs(pi_cons: np.ndarray) -> np.ndarray:
    counts = np.zeros((n_steps, n))

    for i, step in enumerate(pi_consecutive):
        un, cnts = np.unique(step, return_counts=True, axis=0)
        counts[i][un] = cnts

    return counts


def show_frequency_barplot(counts: np.ndarray) -> go.Figure:
    cols = [f"S_{i}" for i in range(n)]
    df = pd.DataFrame(counts, columns=cols)
    df["timestep"] = df.index
    return px.bar(df, x="timestep", y=cols, title=f"State Frequencies per step")


def compute_stats(counts: np.ndarray) -> pd.DataFrame:
    cols = [f"S_{i}" for i in range(n)]
    return pd.DataFrame(counts, columns=cols).describe()

In [None]:
counts = count_freqs(pi_consecutive)
show_matrix(counts.T, zmax=None, zmin=None, title="State Frequencies per step (heatmap)").show()
show_frequency_barplot(counts).show()
stats = compute_stats(counts)
px.bar(stats.T, y=["std"], title="Per State Statistics").show()
stats

In [None]:
# perform the same experiment, but start from state 1 in
# almost all cases: the system stabilizes within couple of steps
# and distribution converges to the one on the previous figure
pi_consecutive = np.zeros((n_steps, n_experiments), dtype=int)
pi_consecutive[0] = np.array([0, 1, 2, 3, 4] + [1] * (n_experiments - 5))
pi_consecutive = reduce(
    emulate_propagation, enumerate(repeat(P, times=n_steps - 1)), pi_consecutive
)

counts = count_freqs(pi_consecutive)
show_matrix(counts.T, zmax=None, zmin=None, title="State Frequencies per step (heatmap)").show()
show_frequency_barplot(counts).show()

## **Problem 2**

In [None]:
# load data from text file
P = np.genfromtxt(
    StringIO(
        """0.12, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.88, 0.00, 0.00
0.00, 0.14, 0.00, 0.26, 0.29, 0.00, 0.00, 0.00, 0.00, 0.16, 0.00, 0.15, 0.00, 0.00, 0.00
0.12, 0.00, 0.24, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.64, 0.00, 0.00
0.00, 0.13, 0.00, 0.19, 0.00, 0.00, 0.00, 0.00, 0.00, 0.24, 0.00, 0.17, 0.00, 0.27, 0.00
0.00, 0.00, 0.00, 0.00, 0.24, 0.00, 0.12, 0.40, 0.00, 0.00, 0.24, 0.00, 0.00, 0.00, 0.00
0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.24, 0.00, 0.00, 0.00, 0.00, 0.52, 0.24
0.00, 0.00, 0.00, 0.00, 0.24, 0.00, 0.00, 0.40, 0.00, 0.00, 0.36, 0.00, 0.00, 0.00, 0.00
0.00, 0.00, 0.00, 0.00, 0.24, 0.00, 0.26, 0.16, 0.00, 0.00, 0.34, 0.00, 0.00, 0.00, 0.00
0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.24, 0.00, 0.00, 0.00, 0.00, 0.64, 0.12
0.26, 0.22, 0.00, 0.12, 0.00, 0.00, 0.00, 0.00, 0.00, 0.22, 0.00, 0.18, 0.00, 0.00, 0.00
0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.12, 0.64, 0.00, 0.00, 0.24, 0.00, 0.00, 0.00, 0.00
0.00, 0.23, 0.00, 0.31, 0.00, 0.00, 0.00, 0.00, 0.00, 0.21, 0.00, 0.25, 0.00, 0.00, 0.00
0.34, 0.00, 0.34, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.32, 0.00, 0.00
0.00, 0.00, 0.00, 0.00, 0.00, 0.23, 0.00, 0.00, 0.21, 0.00, 0.00, 0.00, 0.00, 0.27, 0.29
0.00, 0.00, 0.00, 0.00, 0.00, 0.24, 0.00, 0.00, 0.12, 0.00, 0.00, 0.00, 0.00, 0.52, 0.12"""
    ),
    delimiter=",",
)
# changed 0.22 to 0.23 on position (11, 1) so that row entries sum up to 1
n = len(P)

In [None]:
# visualize the transition matrix as a directed weighted graph

f = graphviz.Digraph("dmc-task-2", filename="dmc-2.gv", engine="dot")
f.attr(rankdir="LR", size="9", title="Markov Chain", labelloc="t")
f.attr("node", shape="circle", color="crimson", fillcolor="crimson", style="filled")
f.attr("edge", color="gainsboro")

for i, weights in enumerate(P):
    for j, weight in filter(lambda x: x[-1] > 0, enumerate(weights)):
        f.edge(f"S_{i}", f"S_{j}", label=f"{weight}")

f

In [None]:
# rearrange the indices so that states forming a communication class
# become neighbours
clusteds_sorted = np.array([1, 3, 9, 11, 0, 2, 12, 4, 6, 7, 10, 5, 8, 13, 14])
P_renamed = P[:, clusteds_sorted][clusteds_sorted]

In [None]:
# for now, deduce these classes manually
recurrent_classes = (np.array([0, 2, 12]), np.array([4, 6, 7, 10]), np.array([5, 8, 13, 14]))

In [None]:
# visualizes transition diagram with nodes renamed
f = graphviz.Digraph("dmc-task-2", filename="dmc-2-renamed.gv", engine="dot")
f.attr(rankdir="LR", size="9", title="Markov Chain", labelloc="t")
f.attr("edge", color="gainsboro")
f.attr("node", shape="circle", style="filled")

for color, nodes in zip(("#f5f5dc", "crimson", "teal", "plum"), (range(4), range(4, 4+3), range(4+3, 4+3+4), range(4+3+4, 4+3+4+4))):
    f.attr("node", color=color, fillcolor=color)
    for node in nodes:
        f.node(f"S_{node}")

for i, weights in enumerate(P_renamed):
    for j, weight in filter(lambda x: x[-1] > 0, enumerate(weights)):
        f.edge(f"S_{i}", f"S_{j}", label=f"{weight}")

f

In [None]:
show_matrix(P, title="Transition matrix (initial view)").show()
show_matrix(P_renamed, title="Transition matrix (node indices rearranged)").show()

In [None]:
# create transition matrix for transient states only
# replace recurrent classes with absorbing states
# (i.e., self-loop probability for such states is 1.0)
transient_trunc = np.array([0, 1, 2, 3, 4, 7, 13])
trunc_size = len(transient_trunc)
transient_size = 4

P_trans_trunc = P_renamed[:, transient_trunc][transient_trunc]
P_trans_trunc[transient_size:, transient_size:] = np.eye(trunc_size - transient_size)

In [None]:
show_matrix(
    P_trans_trunc,
    title="Transition matrix for transient communication class \
(recurrent classes replaced with absorbing states)",
)


In [None]:
k = 10  # arbitraty
P_trans_lim = reduce(lambda x, p: x @ p, repeat(P_trans_trunc, times=10))
show_matrix(P_trans_lim, title="Limiting distribution for transient states")

In [None]:
# assemble the limiting transition matrix
# from separate blocks corresponding to recurrent communication
# classes (i.e., the 3 clusters)
P_lim_partial = np.zeros_like(P)

for recurrent in recurrent_classes:
    P_lim_class = P[:, recurrent][recurrent]
    P_lim_class = reduce(lambda x, p: x @ p, repeat(P_lim_class, times=k))
    P_lim_partial[np.ix_(recurrent, recurrent)] = P_lim_class

# once again, sort w.r.t. communcation classes
P_lim_partial_renamed = P_lim_partial[:, clusteds_sorted][clusteds_sorted]
# note that the first 4 rows are still empty in this matrix
show_matrix(P_lim_partial_renamed, title="Limiting distribution (only recurrent states)")

In [None]:
# extract probabilities to "sink" into recurrent classes
# when starting in the transient one
P_trans_to_recurrent = P_trans_lim[:transient_size, transient_size:]
show_matrix(
    P_trans_to_recurrent,
    title="Probas for transient state to \
'sink' to recurrent states",
).show()

# extract limiting probas per recurrent class
P_lim_per_cluster = P_lim_partial_renamed[[4, 7, 11]]
show_matrix(
    P_lim_per_cluster, title="Transition probas for each recurrent class"
).show()

# multiplication product for these 2 matrices can be
# interpreted as per-cluster probas smoothed by
# probas to escape to a particular cluster
show_matrix(
    P_trans_to_recurrent @ P_lim_per_cluster,
    title="Product of two matrices above, \
per-cluster probas smoothed out by 'sink' probas",
).show()


In [None]:
# assemble the matrix from 2 steps above
P_lim_partial_renamed[:transient_size] = P_trans_to_recurrent @ P_lim_per_cluster
P_lim_partial_renamed[:transient_size, :transient_size] = P_trans_lim[:transient_size, :transient_size]

In [None]:
# viola, we obtained a matrix very simular to the one obtained by
# repeated multiplication if the entire transition matrix, hence
# the problem dimensionality was reduced
show_matrix(P_lim_partial_renamed, title="Finally, assembled limiting transition matrix").show()

In [None]:
# for sake of simplicity, work with the
# rearranged version of this matrix
P = P_renamed

k = 10
P_lim = reduce(lambda x, p: x @ p, repeat(P, times=k))
show_matrix(P_lim, title=f"Transition matrix P^{k}")

In [None]:
# try to obtain stationary distribution as a
# left eigenvector of transition matrix

laplacian = P - np.eye(*P.shape)
show_matrix(laplacian, zmax=None, zmin=None)

In [None]:
n_experiments = 200
n_steps = 100
gen = np.random.default_rng(seed=30)

random_pi_0 = gen.random((20, n))
random_pi_0 /= random_pi_0.sum(-1).reshape(-1, 1)

# simulate system for given number of steps
random_pi_lim = reduce(lambda x, p: x @ p, repeat(P, times=n_steps), random_pi_0)
show_matrix(np.atleast_2d(random_pi_lim))

In [None]:
from typing import Tuple

gen = np.random.default_rng(seed=30)
random_pi_0 = gen.choice(n, size=(n_experiments), replace=True)
identity = np.eye(n)


def emulate_propagation(A: np.ndarray, curr: Tuple[int, np.ndarray]) -> np.ndarray:
    idx, p = curr
    prev_state = A[idx]
    ohe_pi_prev = identity[prev_state]
    # print(f"{prev_state.shape=}, {ohe_pi_prev.shape=}")
    probas = ohe_pi_prev @ p
    # print(f"{probas.shape=}")
    A[idx + 1] = (probas.cumsum(1) > gen.random(probas.shape[0])[:,None]).argmax(1)
    return A


pi_consecutive = np.zeros((n_steps, n_experiments), dtype=int)
pi_consecutive[0] = random_pi_0

pi_consecutive = reduce(
    emulate_propagation, enumerate(repeat(P, times=n_steps - 1)), pi_consecutive
)

In [None]:
df = pd.DataFrame(pi_consecutive, columns=[f"exp_{i}" for i in range(n_experiments)])

In [None]:
plot_simulation_trajectories(df, columns=[f"exp_{i}" for i in range(0, n_experiments, 10)])

In [None]:
P_freq = np.zeros_like(P)


def fill_p_freq(x: np.ndarray, y: np.ndarray) -> np.ndarray:
    P_freq[x, y] += 1
    return y

_ = reduce(fill_p_freq, pi_consecutive)

In [None]:
unique, counts = np.unique(pi_consecutive, return_counts=True)

In [None]:
px.bar(x=unique, y=counts, title="Value Counts (task 2)")

In [None]:
counts = count_freqs(pi_consecutive)
show_matrix(
    counts.T,
    zmax=None,
    zmin=None,
    title=f"Conditional state frequencies for {n_experiments} experiments, {n_steps} timesteps",
).show()
show_frequency_barplot(counts).show()
counts = count_freqs(pi_consecutive)
stats = compute_stats(counts)
px.bar(stats.T, y=["std"], title="Per State Statistics").show()
stats

In [None]:
# one can observe that system tends to eventually sink into one of
# recurrent communication classes

In [None]:
# construct transition matrix approximation from transition frequencies
empty = P_freq.sum(-1) <= 1e-10

P_freq_normed = P_freq
P_freq_normed[empty, empty] = 1
P_freq_normed /= P_freq_normed.sum(-1).reshape(-1, 1)

show_matrix(P_freq_normed)