In [1]:
import numpy as np
import scipy, time
import warnings
import matplotlib.pyplot as plt
from random import random
from collections import Counter
from scipy.interpolate import interp1d
from scipy import stats, integrate

warnings.filterwarnings('ignore')

#returns the step function value at each increment of the CDF
def cumcount_reduced(arr):
    '''Cumulative count of a array'''
    sorted_arr = np.array(sorted(arr))
    counts = np.zeros(len(arr))
    
    rolling_count = 0
    for idx, elem in enumerate(sorted_arr):
        rolling_count += 1
        counts[idx] = rolling_count

    counts /= len(counts)
    counts -= (1 / (2 * len(counts)))

    return (sorted_arr, counts)

#takes two datasets to estimate the relative entropy between their PDFs
#we use eps=10^-11, but it could be defined as < the minimal interval between data points
def KLD_PerezCruz(P, Q, eps=1e-11):
    '''KL divergence calculation'''
    P = sorted(P)
    Q = sorted(Q)
    
    P_positions, P_counts = cumcount_reduced(P)
    Q_positions, Q_counts = cumcount_reduced(Q)
    
    #definition of x_0 and x_{n+1}
    x_0 = np.min([P_positions[0], Q_positions[0]]) - 1
    P_positions = np.insert(P_positions, 0, [x_0])
    P_counts = np.insert(P_counts, 0, [0])
    Q_positions = np.insert(Q_positions, 0, [x_0])
    Q_counts = np.insert(Q_counts, 0, [0])
    
    x_np1 = np.max([P_positions[-1], Q_positions[-1]]) + 1
    P_positions = np.append(P_positions, [x_np1])
    P_counts = np.append(P_counts, [1])
    Q_positions = np.append(Q_positions, [x_np1])
    Q_counts = np.append(Q_counts, [1])
    
    f_P = interp1d(P_positions, P_counts)
    f_Q = interp1d(Q_positions, Q_counts) 
    
    X = P_positions[1:-2]
    values = (f_P(X) - f_P(X - eps)) / (f_Q(X) - f_Q(X - eps))
    filt = ((values != 0.) & ~(np.isinf(values)) & ~(np.isnan(values)))
    values_filter = values[filt]
    out = (np.sum(np.log(values_filter)) / len(values_filter)) - 1.

    return out


def cofactor(A,i,j):
    sub_A = np.delete(A,i,0)     # Delete i-th row
    sub_A = np.delete(sub_A,j,1) # Delete j-th column
    M_ij = np.linalg.det(sub_A)  # Minor of the element at ith row and jth column
    return M_ij

def column(matrix, i):
    return [row[i] for row in matrix]

def hidden_gillespie(states, vis_states, rates, tmax):
    W = np.copy(rates)
    
    #occupation prob
    pvec = []
    for n in states:
        pvec.append(0)
    
    #exit rates
    for n in states:
        W[n,n] = -sum(column(rates,n)) 
    
    #steady-state
    pss = [] 
    for n in states:
        elem = ((-1)**(n+1))*cofactor(W,1,n)
        pss.append(elem)
    norm = np.sum(pss)
    for n in states:
        pss[n] /= norm
    
    #initial state
    x = np.random.choice(states, p = pss)
    vis_traj = []
    hidd_traj = [[x,0]]
    
    #gillespie evolution
    t = 0
    int_trans_time = 0
    while t < tmax:        
        #time
        tau = np.log(random())/W[x,x]
        t += tau
        int_trans_time += tau
        pvec[x] += tau/tmax

        #next state
        probs = column(W,x)/(-W[x,x])
        probs = np.delete(probs, x)
        y = np.random.choice(np.delete(states, x), p = probs)
        
        #DELETE
        hidd_traj.append([[x,y], tau, t])
        
        #if visible, prints
        if([x,y] in vis_states):
            vis_traj.append([[x,y], int_trans_time])
            int_trans_time = 0
            
        x = y
        #traj_ell, traj_t = tuple(zip(*traj1))
    return vis_traj

def count_trans(vis_states, traj):
    #pair counting: transition in column followed by row
    num_vis_states = len(vis_states)
    counts = np.zeros((num_vis_states, num_vis_states))
    int_trans_times = [[[] for i in range(num_vis_states)] for i in range(num_vis_states)]

    #enumerate the visible states
    en_vis_states = list(enumerate(vis_states))

    #first transition
    pos1 = [x for x, y in en_vis_states if y == traj[0][0]][0]
    #counts[pos][pos] += 1

    #counts occurrence of pairs and inter-transition times
    for i in range(1, len(traj)):
        pos2 = [x for x, y in en_vis_states if y == traj[i][0]][0]
        counts[pos2][pos1] += 1
        int_trans_times[pos2][pos1].append(traj[i][1])
        pos1 = pos2

    return counts, int_trans_times

def EP_probs_KLD(counts, int_trans_times):
    #transform matrix of counts into conditional probabilities
    cond_prob = []
    for i in range(len(counts)):
        row = counts[i]
        rowsum = counts.sum(axis=0)
        cond_prob.append(np.divide(row, rowsum, where=rowsum!=0))
    
    #unconditional probabilities
    single_prob = mat1.sum(axis=1)
    single_prob /= single_prob.sum()
    
    #time reversal of matrices
    num_vis_states = int(len(counts)/2)
    tr_cond_prob = [[[] for i in range(len(counts))] for i in range(len(counts))]
    tr_int_trans_times = [[[] for i in range(len(counts))] for i in range(len(counts))]
    for m in range(num_vis_states):
        for n in range(2):
            for m2 in range(num_vis_states):
                for n2 in range(2):
                    tr_cond_prob[2*m+n][2*m2+n2] = cond_prob[2*m2+1-n2][2*m+1-n]
                    tr_int_trans_times[2*m+n][2*m2+n2] = int_trans_times[2*m2+1-n2][2*m+1-n]
                    
    #visible traffic
    K = sum(sum(counts))/sum(sum(sum(int_trans_times,[]),[]))
    
    #two contributions for the visible EP
    σell = 0
    σt = 0
    test = []
    for i in range(len(counts)):
        for j in range(len(counts)):
            σell += K*cond_prob[i][j]*single_prob[j]*np.log(cond_prob[i][j]/tr_cond_prob[i][j])
            #as discussed, I use half the data for each input because the KLD would fail for alternated transitions
            σt += K*cond_prob[i][j]*single_prob[j]*KLD_PerezCruz(int_trans_times[i][j][:len(int_trans_times[i][j])//2], tr_int_trans_times[i][j][-len(int_trans_times[i][j])//2:])        
    
    return σell, σt   

In the following, I generate a trajectory in state space using Gillespie and return only the visible transitions and respective inter-transitions times.

From such a trajectory, I organize in a matrix the conditional probabilities, in a vector the uncond. probabilities, and in a matrix the inter-transition times.

To use Pérez-Cruz's KLD we need two independent samples, therefore I perform a harmless trick: I split inter-transition time datasets in two to analyze KLDs because of "alternated" transitions $D[P(t\vert\ell,\bar{\ell}) \vert \vert P(t\vert\vert\ell,\bar{\ell})]$.

One more detail, the vis_states array has to be organized in such a way that a transition and its reverse are neighbors: $[[\ell_1],[\bar{\ell}_1],[\ell_2],[\bar{\ell}_2],\ldots]$

Finally, we evaluate Eqs. for $\sigma_\ell$ and $\sigma_t$ in arXiv:2203.07427 to obtain the visible entropy production rate $\sigma_\mathcal{L}$

For the following rates and choosing four transitions to observe, $\sigma_\ell = 0.529087$ and $\sigma_t = 0.0169746$

In [2]:
%%time
states = (0,1,2,3)
rates = np.array([[0, 3, 2, 2], [1, 0, 2, 1], [3, 1, 0, 3], [1, 2, 1, 0]])
vis_states = [[0,1], [1,0], [2,3], [3,2]]
tmax = 1000000

traj = hidden_gillespie(states, vis_states, rates, tmax)
mat1, mat2 = count_trans(vis_states, traj)
sell, st = EP_probs_KLD(mat1, mat2)
print(sell, st)

0.5284602646508627 0.014483521880225057
Wall time: 8min 34s
