In [131]:
# %load mlmc.py
import numpy
import numpy.linalg

class WeakConvergenceFailure(Exception):
    pass

def mlmc(Lmin, Lmax, N0, eps, mlmc_fn, alpha_0, beta_0, gamma):
    """
    Multi-level Monte Carlo estimation.

    (P, Nl) = mlmc(...)

    Outputs:
      P:  value
      Nl: number of samples on each level
    Inputs:
      Lmin: minimum level of refinement  >= 2
      Lmax: maximum level of refinement  >= Lmin
      N0:   initial number of samples    >  0
      eps:  desired accuracy (rms error) >  0

      alpha: weak error is  O(2^{-alpha*l})
      beta:  variance is    O(2^{-beta*l})
      gamma: sample cost is O(2^{gamma*l})  > 0

      If alpha, beta are not positive then they will be estimated.

      mlmc_fn: the user low-level routine. Its interface is
        sums = mlmc_fn(l, N)
      with inputs
        l = level
        N = number of paths
      and a numpy array of outputs
        sums[0] = sum(Y)
        sums[1] = sum(Y**2)
      where Y are iid samples with expected value
        E[P_0]            on level 0
        E[P_l - P_{l-1}]  on level l > 0
    """

    # Check arguments

    if Lmin < 2:
        raise ValueError("Need Lmin >= 2")
    if Lmax < Lmin:
        raise ValueError("Need Lmax >= Lmin")
    if N0 <= 0 or eps <= 0 or gamma <= 0:
        raise ValueError("Need N0 > 0, eps > 0, gamma > 0")

    # Initialisation

    alpha = max(0, alpha_0)
    beta  = max(0, beta_0)

    theta = 0.25

    L = Lmin

    Nl   = numpy.zeros(L+1)
    suml = numpy.zeros((2, L+1))
    dNl  = N0*numpy.ones(L+1)

    while sum(dNl) > 0:

        # update sample sums

        for l in range(0, L+1):
            if dNl[l] > 0:
                sums       = mlmc_fn(l, int(dNl[l]))
                Nl[l]      = Nl[l] + dNl[l]
                suml[0, l] = suml[0, l] + sums[0]
                suml[1, l] = suml[1, l] + sums[1]

        # compute absolute average and variance

        ml = numpy.abs(       suml[0, :]/Nl)
        Vl = numpy.maximum(0, suml[1, :]/Nl - ml**2)

        # fix to cope with possible zero values for ml and Vl
        # (can happen in some applications when there are few samples)

        for l in range(3, L+2):
            ml[l-1] = max(ml[l-1], 0.5*ml[l-2]/2**alpha)
            Vl[l-1] = max(Vl[l-1], 0.5*Vl[l-2]/2**beta)

        # use linear regression to estimate alpha, beta if not given
        if alpha_0 <= 0:
            A = numpy.ones((L, 2)); A[:, 0] = range(1, L+1)
            x = numpy.linalg.solve(A, numpy.log2(ml[1:]))
            alpha = max(0.5, -x[0])

        if beta_0 <= 0:
            A = numpy.ones((L, 2)); A[:, 0] = range(1, L+1)
            x = numpy.linalg.solve(A, numpy.log2(Vl[1:]))
            beta = max(0.5, -x[0])

        # set optimal number of additional samples

        Cl = 2**(gamma*numpy.arange(0, L+1))
        Ns = numpy.ceil( numpy.sqrt(Vl/Cl) * sum(numpy.sqrt(Vl*Cl)) / ((1-theta)*eps**2) )
        dNl = numpy.maximum(0, Ns-Nl)

        # if (almost) converged, estimate remaining error and decide
        # whether a new level is required

        if sum(dNl > 0.01*Nl) == 0:
            rem = ml[L] / (2.0**alpha - 1.0)

            if rem > numpy.sqrt(theta)*eps:
                if L == Lmax:
                    raise WeakConvergenceFailure("Failed to achieve weak convergence")
                else:
                    L = L + 1
                    Vl = numpy.append(Vl, Vl[-1] / 2.0**beta)
                    Nl = numpy.append(Nl, 0.0)
                    suml = numpy.column_stack([suml, [0, 0]])

                    Cl = 2**(gamma*numpy.arange(0, L+1))
                    Ns = numpy.ceil( numpy.sqrt(Vl/Cl) * sum(numpy.sqrt(Vl*Cl)) / ((1-theta)*eps**2) )
                    dNl = numpy.maximum(0, Ns-Nl)

    # finally, evaluate the multilevel estimator
    P = sum(suml[0,:]/Nl)

    return (P, Nl)


# Gene Transcription Model

\begin{align*}
G &\xrightarrow[]{25} G + 25 \\
M &\xrightarrow[]{1000} M + P \\
P + P & \xrightarrow[]{0.001} D \\
M &\xrightarrow[]{0.1} \emptyset \\
P &\xrightarrow[]{1} \emptyset
\end{align*}

We gather the quantities into a vector
$$ X_t = \begin{pmatrix}
\text{# of $M$ molecues at time $t$} \\
\text{# of $P$ molecues at time $t$} \\
\text{# of $D$ molecues at time $t$}
\end{pmatrix} $$

In [199]:
import numpy as np

class Reaction(object):
    def __init__(self, reactants, products, ease):
        """
        reactants and products should be a list of integers
        """
        self.reactants = reactants
        self.products = products
        self.change = np.array(products) - np.array(reactants)
        self.ease = ease
        
    def rate(self, molecular_counts):
        """
        molecular_counts[n, k] should give the number of molecules of
        type k in the nth path. Returns rates such that rates[n] is the rate
        this reaction in the nth path.
        """
        N, _ = molecular_counts.shape
        rate = self.ease * np.ones(N)
        
        for reactant, amount in enumerate(self.reactants):
            for i in range(amount):
                rate = rate * (molecular_counts[:,reactant] - i)
        
        return rate

def calculate_rates(reactions, paths):
    """
    reactions: a list of R reactions
    paths: a N * K array, paths[n, i] is the number of molecules of type i in path n
    
    returns a N * R array rates[n, j] is the rate of reaction j in path n
    """
    return np.array([reaction.rate(paths) for reaction in reactions]).T

def tau_leaping_mlmc(reactions, initial_counts, T, n0, M, N, l):
    """
    reactions: a list of reactions of class Reaction
    initial_counts: a R array of initial molecule
    T: final time
    n0: number of steps in layer 0
    M: refinement factor
    N: total number of paths to use
    l: level for MLMC
    
    return sums, an array with 2 entries, sums[0] is the sum of estimators,
    sums[1] is the sum of the square estimators
    """
    max_paths_per_loop = 1000
    
    R = initial_counts.size # number of reactants
    changes = np.array([reaction.change for reaction in reactions])
    # changes[i, j] would give the change in numbers of molecule j in the ith reaction
    
    n = n0 * (M ** l)
    h = T / n
    
    sums = np.zeros((2, R))
    
    for N0 in range(0, N, max_paths_per_loop):
        N1 = min(max_paths_per_loop, N - N0) # number of paths this loop will use

        if l == 0:
            X_f = initial_counts * np.ones((N1, R)) # molecular counts
            X_c = np.zeros((N1, R)) # course count is just 0 for the lowest level

            for _ in range(n):
                rates = calculate_rates(reactions, X_f)
                reactions_occured = np.random.poisson(h * rates)
                X_f += reactions_occured @ changes
                X_f = np.maximum(X_f, 0) # to handle negative molecular counts
        
        if l >= 1:
            X_c = initial_counts * np.ones((N1, R)) # coarse path molecular counts
            X_f = initial_counts * np.ones((N1, R)) # fine path molecular counts
            
            for i in range(n):
                rates_f = calculate_rates(reactions, X_f)
                if i % M == 0:
                    rates_c = calculate_rates(reactions, X_c) # update coarse rates only every m steps
                
                R_abs = np.random.poisson(h * np.abs(rates_f - rates_c))
                R_min = np.random.poisson(h * np.minimum(rates_f, rates_c))

                R_f = R_min + np.where(rates_f > rates_c, R_abs, 0)
                R_c = R_min + np.where(rates_c > rates_f, R_abs, 0)
                
                X_f += R_f @ changes
                X_c += R_c @ changes
                
                X_f = np.maximum(X_f, 0)
                X_c = np.maximum(X_c, 0)
                
        diff = X_f - X_c
        sums += np.array([diff, diff**2]).sum(axis=1)
         
    return np.array(sums)

reactions = [([1, 0, 0, 0], [1, 1, 0, 0], 25),
             ([0, 1, 0, 0], [0, 1, 1, 0], 1000),
             ([0, 0, 2, 0], [0, 0, 0, 1], 0.001),
             ([0, 1, 0, 0], [0, 0, 0, 0], 0.1),
             ([0, 0, 1, 0], [0, 0, 0, 0], 1)]

reactions = [Reaction(*reaction) for reaction in reactions]

T = 1 # terminal time
n0 = 9 # initial number of time steps
initial_counts = np.array([1, 0, 0, 0])
M = 3 # refinement factor
Lmin = 2
Lmax = 100
N0 = 100
eps = 100
alpha = np.log2(M)
beta = np.log2(M)
gamma = np.log2(M)

def mlmc_fn(l, N):
    """a wrapper for the tau_leaping_mlmc method"""
    return tau_leaping_mlmc(reactions, initial_counts, T, n0, M, N, l)[:,-1]

mlmc(Lmin, Lmax, N0, eps, mlmc_fn, alpha, beta, gamma)

(3587.27987654321, array([162., 100., 100.,   2.]))