In [1]:
# ruff: noqa
import numpy as np
import sympy
from scipy.optimize import minimize


def check_is_stochastic_matrix(
    matrix: np.ndarray, check_has_absorption_states: bool = False
) -> None:
    if not np.all(matrix >= 0):
        raise ValueError(
            f"The given matrix\n{matrix}\nis not a stochastic matrix as at least one entry is negative."
        )

    if not np.allclose(matrix.sum(axis=1), 1):
        raise ValueError(
            f"The given matrix\n{matrix}\nis not a stochastic matrix as all rows probabilities does not sum to 1."
        )

    if not matrix.shape[0] == matrix.shape[1]:
        raise ValueError(
            f"The given matrix\n{matrix}\nis not a stochastic matrix as it is not a square matrix."
        )

        # if check_has_absorption_states:
        pass


def generate_symbol(i: int, j: int) -> sympy.core.symbol.Symbol:
    return sympy.symbols(f"x{i}{j}")


def gen_symbolic_variant(
    transition_matrix: np.ndarray[float, int],
) -> np.ndarray[np.float64 | sympy.core.symbol.Symbol]:
    T = transition_matrix
    P = np.empty_like(T, dtype=object)
    for i in range(T.shape[0]):
        for j in range(T.shape[1]):
            if 0 < T[i, j] < 1:
                P[i, j] = generate_symbol(i, j)
            else:
                P[i, j] = T[i, j]
    return P


def _derive_stochastic_matrix_conds(
    symbolic_transition_matrix: np.ndarray[np.float64 | sympy.core.symbol.Symbol],
) -> list[sympy.core.relational.Equality | sympy.logic.boolalg.BooleanTrue]:
    equations = []
    for row in symbolic_transition_matrix:
        if any(isinstance(element, sympy.Symbol) for element in row):
            row_sum_eq = sympy.Eq(sum(row), 1)
            equations.append(row_sum_eq)
    return equations


def _derive_stationary_dist_eqs(
    symbolic_absorption_probs: np.ndarray[object], absorption_probs: np.ndarray
) -> list[sympy.core.relational.Equality | sympy.logic.boolalg.BooleanTrue]:
    return [sympy.Eq(lhs, rhs) for lhs, rhs in zip(symbolic_absorption_probs, absorption_probs)]

In [2]:
### INPUT ###
T = np.asarray(
    [
        [0, 0.5, 0.5, 0, 0],
        [0, 0, 0.5, 0.5, 0],
        [0, 0, 0, 0.5, 0.5],
        [0, 0, 0, 1, 0],
        [0, 0, 0, 0, 1],
    ]
)
x0 = [1, 0, 0, 0, 0]
absorption_probs = np.array([0, 0, 0, 0.7, 0.3])

# derived
N = T.shape[0]
###########################
check_is_stochastic_matrix(T)
P = gen_symbolic_variant(T)

# replace symbols in P by expressions in terms of free variables
# delta = sympy.Matrix(T) - P
# Frobenius norm of dela

symbolc_absorption_probs = x0 @ np.linalg.matrix_power(P, N)

stationary_dist_eqs = _derive_stationary_dist_eqs(symbolc_absorption_probs, absorption_probs)
stochastic_matrix_conds = _derive_stochastic_matrix_conds(P)[
    :-1
]  # all but one to avoid over-constraining

symb_solutions = sympy.solve(stationary_dist_eqs + stochastic_matrix_conds, dict=True)
assert len(symb_solutions) == 1, "Multiple solutions"
symb_solutions = symb_solutions[0]
symb_solutions

{x01: 0.1*(10.0*x24 - 3.0)/(x13*x24),
 x02: 0.1*(10.0*x13*x24 - 10.0*x24 + 3.0)/(x13*x24),
 x12: 1.0 - x13,
 x23: 1.0 - x24}

In [3]:
from typing import Any


def _derive_free_variables(
    symb_transition_matrix: np.ndarray[np.float64 | sympy.core.symbol.Symbol],
    symb_solutions: dict[sympy.core.symbol.Symbol, Any],
) -> set[sympy.core.symbol.Symbol]:
    all_variables = set().union(*(sympy.Matrix(row).free_symbols for row in symb_transition_matrix))
    dependent_variables = set(symb_solutions.keys())
    free_variables = all_variables - dependent_variables
    return free_variables


free_variables = _derive_free_variables(P, symb_solutions)
free_variables

{x13, x24}

In [4]:
from typing import Callable


def _lambdify_dep_vars(symb_solutions) -> dict[sympy.core.symbol.Symbol, Callable[..., float]]:
    dep_var = {}
    for var, expr in symb_solutions.items():
        args = sorted(expr.free_symbols, key=lambda s: s.name)
        dep_var[var] = sympy.lambdify(args, expr, modules="numpy")
    return dep_var


lamdified_dep_vars = _lambdify_dep_vars(symb_solutions)
lamdified_dep_vars

{x01: <function _lambdifygenerated(x13, x24)>,
 x02: <function _lambdifygenerated(x13, x24)>,
 x12: <function _lambdifygenerated(x13)>,
 x23: <function _lambdifygenerated(x24)>}

In [5]:
def _derive_free_var_init_values(
    free_variables: set[sympy.core.symbol.Symbol],
    T: np.ndarray[float],
    P: np.ndarray[np.float64 | sympy.core.symbol.Symbol],
) -> dict[sympy.core.symbol.Symbol, float]:
    assert T.shape == P.shape

    free_var_value_mapping = {}
    for i in range(T.shape[0]):
        for j in range(T.shape[1]):
            if isinstance(P[i, j], sympy.Symbol) and P[i, j] in free_variables:
                free_var_value_mapping[P[i, j]] = T[i, j]

    return free_var_value_mapping


free_var_value_mapping = _derive_free_var_init_values(free_variables, T, P)
free_var_value_mapping


len(free_var_value_mapping)

2

In [6]:
def _derive_lambda_arg_names(lambda_func: Callable) -> tuple[str, ...]:
    return lambda_func.__code__.co_varnames


def _derive_lambda_arg_values(
    free_var_value_mapping, arg_names: tuple[str, ...]
) -> tuple[float, ...]:
    return tuple((free_var_value_mapping[sympy.Symbol(var)] for var in arg_names))


def evaluate_symbolic_matrix(
    symbolic_transition_matrix: np.ndarray[object],
    lamdified_dep_vars: dict[sympy.core.symbol.Symbol, Callable[..., float]],
    free_var_value_mapping: dict[sympy.core.symbol.Symbol, float],
):
    evaluated_matrix = np.zeros_like(symbolic_transition_matrix, dtype=float)

    for i in range(symbolic_transition_matrix.shape[0]):
        for j in range(symbolic_transition_matrix.shape[1]):
            element: np.float64 | sympy.core.symbol.Symbol = symbolic_transition_matrix[i, j]

            if element in lamdified_dep_vars:
                assert isinstance(element, sympy.core.symbol.Symbol)
                func = lamdified_dep_vars[element]
                arg_names = _derive_lambda_arg_names(func)
                args = _derive_lambda_arg_values(free_var_value_mapping, arg_names)
                evaluated_matrix[i, j] = func(*args)
            elif element in free_var_value_mapping:
                assert isinstance(element, sympy.core.symbol.Symbol)
                evaluated_matrix[i, j] = free_var_value_mapping[element]
            else:
                evaluated_matrix[i, j] = element if isinstance(element, (int, float)) else 0

    return evaluated_matrix


P_evaluated = evaluate_symbolic_matrix(P, lamdified_dep_vars, free_var_value_mapping)
P_evaluated

array([[0. , 0.8, 0.2, 0. , 0. ],
       [0. , 0. , 0.5, 0.5, 0. ],
       [0. , 0. , 0. , 0.5, 0.5],
       [0. , 0. , 0. , 1. , 0. ],
       [0. , 0. , 0. , 0. , 1. ]])

In [7]:
P

array([[0.0, x01, x02, 0.0, 0.0],
       [0.0, 0.0, x12, x13, 0.0],
       [0.0, 0.0, 0.0, x23, x24],
       [0.0, 0.0, 0.0, 1.0, 0.0],
       [0.0, 0.0, 0.0, 0.0, 1.0]], dtype=object)

In [8]:
def objective(
    free_var_vals: np.ndarray,
    free_var_value_mapping: dict,
    symbolic_transition_matrix: np.ndarray,
    lamdified_dep_vars: dict[sympy.Symbol, Callable[..., float]],
    original_transition_matrix: np.ndarray,
) -> float:
    for i, var in enumerate(free_var_value_mapping.keys()):
        free_var_value_mapping[var] = free_var_vals[i]

    P_evaluated = evaluate_symbolic_matrix(
        symbolic_transition_matrix, lamdified_dep_vars, free_var_value_mapping
    )

    frobenius_norm = np.linalg.norm(P_evaluated - original_transition_matrix, "fro")
    return frobenius_norm


initial_values = tuple(free_var_value_mapping.values())
EPSILON = 10e-9
bounds = [(0 + EPSILON, 1 - EPSILON) for _ in range(len(free_var_value_mapping))]
result = minimize(
    objective,
    initial_values,
    args=(free_var_value_mapping, P, lamdified_dep_vars, T),
    bounds=bounds,
)

from copy import deepcopy

free_var_value_mapping_optimal = deepcopy(free_var_value_mapping)
for i, var in enumerate(free_var_value_mapping.keys()):
    free_var_value_mapping_optimal[var] = result.x[i]

perturbed_P = evaluate_symbolic_matrix(P, lamdified_dep_vars, free_var_value_mapping_optimal)

# sanity check that the new transition matrix does indeed have the target absorption probabilities
perturbed_P_abs_prob = x0 @ np.linalg.matrix_power(perturbed_P, 5)
assert all(
    np.isclose(x, y) for x, y in zip(perturbed_P_abs_prob, absorption_probs)
), "Resulting transition matrix does not have the target absorption probabilities"


perturbed_P

array([[0.        , 0.52572715, 0.47427285, 0.        , 0.        ],
       [0.        , 0.        , 0.47427264, 0.52572736, 0.        ],
       [0.        , 0.        , 0.        , 0.58541252, 0.41458748],
       [0.        , 0.        , 0.        , 1.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        , 1.        ]])

In [9]:
T

array([[0. , 0.5, 0.5, 0. , 0. ],
       [0. , 0. , 0.5, 0.5, 0. ],
       [0. , 0. , 0. , 0.5, 0.5],
       [0. , 0. , 0. , 1. , 0. ],
       [0. , 0. , 0. , 0. , 1. ]])