In [None]:
# numpy == 1.21.5
import numpy as np
# scipy == 1.7.3
import scipy.linalg
# to make nice plots
import matplotlib.pyplot as plt
# optional -- to track how long a simulation will take
from tqdm import tqdm

from steinberg_utils_3vertex import *

# Plot features

In [None]:
# plot formatting
plt.rc("text", usetex=False) # renders LaTeX more quickly
plt.rc("font", family = "serif",size=14) # font specifications
plt.rc("figure",figsize=(14,12)) # figure size
%config InlineBackend.figure_format = 'retina' # retina-display quality

# Sampling parameters for the 3-vertex graph $K$

Parameters are defined as $10^x$, where $x$ is uniformly sampled from $(-3, 3)$. The user can specify the endpoints of this range to sample parameter space more broadly, but $-3$ and $3$ are set as the default parameters.

In [None]:
params = random_parameters()

In [None]:
print(params)

[1.08206414e-03 1.01092706e-01 9.93254641e-01 6.15405297e-02
 2.82741566e+01 5.45289597e+00]


The parameters can be determined such that they satisfy the cycle condition on $K$, initializing the graph in an equlibrium steady state.

In [None]:
params = equilibrium_parameters()

In [None]:
print(params)

[3.14027381e-03 3.73496505e+00 4.34784835e+01 4.91502712e-01
 4.99053431e-03 6.70992400e-02]


To generate the figure in the paper, we will manually assign values for the edge labels of the 3-vertex graph. The parameters for the 3-vertex graph are listed in the following order: $[a, b, d, c, f, e]$

In [None]:
#params = [0.08833331, 0.44953403, 0.58597936, 0.02356496, 0.00747019, 0.75898155]

Next, we compute the Laplacian matrix $\mathcal{L}(K)$ for this specific parameterzation of $K$, and the steady state distribution $\pi(K)$ as computed through the Matrix-Tree Theorem.

In [None]:
L = Laplacian_K(params)
print(L)

[[-8.13080813e-03  3.73496505e+00  6.70992400e-02]
 [ 3.14027381e-03 -4.72134485e+01  4.91502712e-01]
 [ 4.99053431e-03  4.34784835e+01 -5.58601952e-01]]


In [None]:
pi = steady_state_MTT_K(params)
print(pi)

[9.30045511e-01 7.81961150e-04 6.91725277e-02]


We can also calculate the cycle affinity $\tilde{A}(C)$ for $K$. The cycle affinity  quantifies the departure from equilibrium that arises from the cycle $C$. We take the absolute value of the cycle affinity in our calculation.

In [None]:
cycle_affinity_K(params)

0.0

# Testing response of a KLD signature to increasing thermodynamic force

Another class of signatures uses the KL-divergence to detect irreversibility. Here, we are going to test whether or not the KL-divergence between the probability of observing a stochastic trajectory and its time-reversed trajectory varies monotonically with increasing thermodynamic force. We will use the mathematical conventions laid out in Martínez et al. 2019.

The entropy production rate $\dot{S}$ can be estimated by the KL-divergence between the probability of observing a stochastic trajectory $\gamma_t$ of length $t$ and the probability to observe the time-reversed trajectory $\tilde{\gamma_t}$.

$$ \dot{S} \geq \dot{S}_{KLD} \equiv \lim_{t \to \infty} \frac{k_B}{t} \mathcal{D} [ P(\gamma_t) || P(\tilde{\gamma_t}) ], $$

where $\mathcal{D}[ p || q ] \equiv \sum_x p(x) \ln{p(x)/q(x)}$ is the KL-divergence between probability distributions $p$ and $q$. We assume that $k_B = 1$.

In Martínez et al 2019, the authors claim that $\dot{S}_{KLD}$ can be expressed as the sum of two entropy productions.

$$\dot{S}_{KLD} = \dot{S}_{aff} + \dot{S}_{WTD}$$

where

$$ \dot{S}_{aff} = \frac{1}{\mathcal{T}} \sum_{\alpha \beta} p_{\beta \alpha} R_\alpha \ln{p_{\beta \alpha}/p_{\alpha \beta}} $$

and

$$\dot{S}_{WTD} = \frac{1}{\mathcal{T}} \sum_{\alpha \beta \mu} p_{\mu \beta} p_{\beta \alpha} R_\alpha \mathcal{D} [\Psi(t | \beta \to \mu ) || \Psi (t | \beta \to \alpha)]$$

In [None]:
from scipy.stats import rv_continuous
from scipy.integrate import quad
from scipy.optimize import minimize

In [None]:
class SemiMarkovProcess:
    def __init__(self, states, transition_matrix, waiting_time_distributions):
        """
        Initialize the semi-Markov process.

        Parameters:
        - states: list of states in the process.
        - transition_matrix: NxN matrix of transition probabilities between states.
        - waiting_time_distributions: Dictionary with state pairs (i, j) as keys and
          waiting time distribution functions as values.
        """
        self.states = states
        self.transition_matrix = np.array(transition_matrix)
        self.waiting_time_distributions = waiting_time_distributions
        
        self._validate_inputs()

    def _validate_inputs(self):
        """Validate the inputs for consistency."""
        assert self.transition_matrix.shape[0] == len(self.states), "Transition matrix size must match the number of states."
        assert np.allclose(self.transition_matrix.sum(axis=1), 1), "Rows of transition matrix must sum to 1."

    def sample_trajectory(self, initial_state, max_time):
        """
        Simulate a trajectory of the semi-Markov process.

        Parameters:
        - initial_state: Starting state.
        - max_time: Maximum simulation time.

        Returns:
        - trajectory: List of tuples [(state, time_spent), ...].
        """
        trajectory = []
        current_state = initial_state
        total_time = 0

        while total_time < max_time:
            next_state = np.random.choice(
                self.states, p=self.transition_matrix[self.states.index(current_state)]
            )
            waiting_time_dist = self.waiting_time_distributions.get((current_state, next_state))
            if waiting_time_dist is None:
                raise ValueError(f"No waiting time distribution defined for transition ({current_state}, {next_state}).")
            waiting_time = waiting_time_dist.rvs()

            if total_time + waiting_time > max_time:
                break

            trajectory.append((current_state, waiting_time))
            total_time += waiting_time
            current_state = next_state

        return trajectory

    def compute_entropy_production(self):
        """
        Compute the entropy production rate for the semi-Markov process.

        Returns:
        - S_aff: Affinity entropy production rate.
        - S_WTD: Waiting time distribution entropy production rate.
        """
        S_aff = 0
        S_WTD = 0

        for i, state_i in enumerate(self.states):
            for j, state_j in enumerate(self.states):
                if i != j and self.transition_matrix[i, j] > 0:
                    R_i = self._compute_stationary_distribution()[i]
                    S_aff += (
                        self.transition_matrix[i, j] * R_i
                        * np.log(self.transition_matrix[i, j] / self.transition_matrix[j, i])
                    )

                    for k, state_k in enumerate(self.states):
                        if k != i and self.transition_matrix[j, k] > 0:
                            waiting_time_ij = self.waiting_time_distributions.get((state_i, state_j))
                            waiting_time_jk = self.waiting_time_distributions.get((state_j, state_k))
                            if waiting_time_ij and waiting_time_jk:
                                S_WTD += (
                                    self.transition_matrix[j, k] * self.transition_matrix[i, j] * R_i
                                    * self._kullback_leibler(waiting_time_ij, waiting_time_jk)
                                )

        return S_aff, S_WTD

    def _compute_stationary_distribution(self):
        """Compute the stationary distribution of the Markov chain."""
        eigvals, eigvecs = np.linalg.eig(self.transition_matrix.T)
        stationary = eigvecs[:, np.isclose(eigvals, 1)]
        stationary = stationary / stationary.sum()
        return stationary.real.flatten()

    def _kullback_leibler(self, dist1, dist2):
        """
        Compute the Kullback-Leibler divergence between two distributions.

        Parameters:
        - dist1, dist2: Distribution objects with PDF methods.

        Returns:
        - KL divergence.
        """
        def integrand(x):
            p = dist1.pdf(x)
            q = dist2.pdf(x)
            return p * np.log(p / q) if p > 0 and q > 0 else 0

        return quad(integrand, 0, np.inf)[0]

In [None]:
# Example Usage
# Define custom waiting time distributions
class CustomExponential(rv_continuous):
    """Custom exponential distribution."""
    def __init__(self, rate):
        super().__init__()
        self.rate = rate

    def _pdf(self, x):
        return self.rate * np.exp(-self.rate * x) if x >= 0 else 0

    def rvs(self, size=1):
        return np.random.exponential(1 / self.rate, size=size)

In [None]:
# Define states and transition matrix
states = ["A", "B", "C"]
transition_matrix = [
    [0.0, 0.6, 0.4],
    [0.5, 0.0, 0.5],
    [0.3, 0.7, 0.0],
]

np.random.seed(42)  # For reproducibility
sampled_rates = np.random.uniform(low=0.5, high=2.0, size=6)

# Parameters for the log-normal distribution
mean = 0  # Mean of the logarithm of the rates
sigma = 0.5  # Standard deviation of the logarithm of the rates

# Sample rates from the log-normal distribution
np.random.seed(42)  # For reproducibility
log_sampled_rates = np.random.lognormal(mean=mean, sigma=sigma, size=6)

# Define waiting time distributions
waiting_time_distributions = {
    ("A", "B"): CustomExponential(rate=log_sampled_rates[0]),
    ("A", "C"): CustomExponential(rate=log_sampled_rates[1]),
    ("B", "A"): CustomExponential(rate=log_sampled_rates[2]),
    ("B", "C"): CustomExponential(rate=log_sampled_rates[3]),
    ("C", "A"): CustomExponential(rate=log_sampled_rates[4]),
    ("C", "B"): CustomExponential(rate=log_sampled_rates[5]),
}

# Create semi-Markov process and analyze it
smp = SemiMarkovProcess(states, transition_matrix, waiting_time_distributions)
trajectory = smp.sample_trajectory(initial_state="A", max_time=10)
S_aff, S_WTD = smp.compute_entropy_production()

print("Trajectory:", trajectory)
print("Affinity Entropy Production:", S_aff)
print("Waiting Time Distribution Entropy Production:", S_WTD)

Trajectory: [('A', array([1.56892368])), ('B', array([0.57494266])), ('C', array([3.93871907])), ('A', array([0.25577236])), ('C', array([0.2277769])), ('A', array([0.58032422])), ('B', array([0.24899814])), ('A', array([0.16098799])), ('C', array([0.51294942])), ('A', array([1.19971519])), ('B', array([0.52228909])), ('A', array([0.03710367])), ('B', array([0.08730314]))]
Affinity Entropy Production: 0.009906563952444797
Waiting Time Distribution Entropy Production: 0.0533403951121652
