# Project: Stochastic and Spatial Models

## 0. Init

Most code will reference algorithms and practices as described in "Modeling infectious diseases in humans and animals" (MID) by M. Keeling and P. Rohani.

In [None]:
import subprocess

# Install script with all dependencies
subprocess.run(["pip", "install", "--no-input", "python-slugify", "numpy<1.27", "matplotlib", "scipy", "numba"]) 

In [None]:
# WARNING: Comment if code doesn't run
# Use jit to compile and optimize Python code
from numba import jit

# Arrays and analysis
import numpy as np
import scipy as sp
from scipy.integrate import solve_ivp
from scipy.optimize import curve_fit
from scipy.ndimage import convolve1d

# Plotting and config
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = [6, 3]
plt.rcParams['lines.linewidth'] = 1
plt.rcParams['figure.constrained_layout.use'] = True

# Misc imports
from slugify import slugify
from functools import partial
import os

# Important directories
FIG_DIR = 'fig/'
DATA_DIR = 'data/'
DUMP_DIR = 'dump/'

def save_fig(title):
    """Save figure under normalized name."""
    plt.savefig(f'{FIG_DIR}/{slugify(title)}.png', bbox_inches='tight')

def props_fig(props_dict):
    """Update current Axes with the given dictionary of properties."""
    plt.gca().update(props_dict)

def set_ax_props(ax, props_dict):
    """Update Axes with the given dictionary of properties."""
    ax.update(props_dict)

def create_dirs(path):
    """Create directory, do nothing if it exists."""
    os.makedirs(path, exist_ok=True)

create_dirs(FIG_DIR)
create_dirs(DATA_DIR)
create_dirs(DUMP_DIR)

## 1. Gillespie’s Direct Algorithm and Stochastic Hallmarks

### 1.1 Implement Gillespies algorithm

In [None]:
# WARNING: Comment if code doesn't run
# Use jit to compile and optimize Python code
@jit(nopython=True)
def SIR_GDA(y0, params, t_max):
    """Implementation of Gillespie's Direct Algorithm (GDA)
    for the standard SIR model. Written after Box 6.3 in MID (p 201)."""
    # np.random.seed()
    X, Y, Z = y0
    beta, gamma, mu = params
    
    # 1.
    # Events.
    ps = np.array([
        (1, 0, 0),    # birth
        (-1, 1, 0),   # transmission
        (0, -1, 1),   # recovery
        (-1, 0, 0),   # death_X
        (0, -1, 0),   # death_Y
        (0, 0, -1),   # death_Z
    ])

    # Bookkeeping variables.
    t = 0
    ts = []
    ys = []
    while t < t_max:    
        # 2.
        N = np.sum(np.array([X, Y, Z]))

        # Rates in the same order as their corresponding events.
        Rs = np.array([mu*N, beta*X*Y/N, gamma*Y, mu*X, mu*Y, mu*Z])
        
        # 3.
        R_total = np.sum(Rs)
        
        # 4.
        rand_1 = np.random.rand()
        dt = -1 / R_total * np.log(rand_1)
        
        # 5.
        rand_2 = np.random.rand()
        P = rand_2 * R_total
        
        # 6.
        R_cum = np.cumsum(Rs)
        
        # Find index of event p.
        p_idx = np.searchsorted(R_cum, P)
        
        # Add event result to current state.
        p = ps[p_idx]
        X, Y, Z = np.array([X, Y, Z]) + p
        
        # 7.
        t += dt

        # Bookkeep results.
        ts.append(t); ys.append([X, Y, Z])

        # Early exit in case of extinction.
        if Y == 0:
            return np.array(ts), np.array(ys)

    return np.array(ts), np.array(ys)

In [None]:
def SIR_D(t, y0, params):
    """Deterministic, density-dependent implementation of the SIR model
    with demography as described in MID 2.1.2."""
    X, Y, Z = y0
    beta, gamma, mu = params

    N = X + Y + Z
    dx = mu*N - beta*X*Y/N - mu*X
    dy = beta*X*Y/N - gamma*Y - mu*Y
    dz = gamma*Y - mu*Z
    
    return dx, dy, dz

In [None]:
def MA(series, filter_length):
    """Calculate the simple moving average (SMA) of a series over 
    its columns using convolution."""
    kernel = np.ones(filter_length) / filter_length
    
    return convolve1d(series, kernel, mode='nearest', axis=0)

In [None]:
def equal_time(x_equal, xp, fp):
    """Interpolate to convert discrete event to continuous time with equal timesteps."""
    
    return np.interp(ts_discrete, xp, fp)

In [None]:
title="Comparison of GDA with deterministic SIR model"
t_max = 300
ts_discrete = np.linspace(0, t_max, 1000)
params = (0.3, 0.1, 1/80)
N = 1000
y0 = (N-10, 10, 0)
num_runs = 100

fig, ax = plt.subplots()

Xs = []
Ys = []
Zs = []
for i in range(num_runs):
    ts, ys = SIR_GDA(y0=y0, params=params, t_max=t_max)

    # Interpolate time axis for statistics
    Xs.append(equal_time(ts, ys[:, 0], t_max))
    Ys.append(equal_time(ts, ys[:, 1], t_max))
    Zs.append(equal_time(ts, ys[:, 2], t_max))
    
    # Plot the results for each trajectory
    plt.plot(ts, ys[:, 0], color='r', alpha=0.1)
    plt.plot(ts, ys[:, 1], color='g', alpha=0.1)
    plt.plot(ts, ys[:, 2], color='b', alpha=0.1)

ts_det = np.linspace(0, t_max, 200)
sol = solve_ivp(SIR_D, [0, t_max], y0, args=(params,), dense_output=True)
ys = sol.sol(ts_det).T

plt.plot(ts_det, ys[:, 0], color='#b30000', linewidth=3, label='Susceptible')
plt.plot(ts_det, ys[:, 1], color='#006600', linewidth=3, label='Infected')
plt.plot(ts_det, ys[:, 2], color='#001f5b', linewidth=3, label='Recovered')

plt.legend(loc="upper right")
props_fig({"xlabel": "t (days)", "ylabel": "population", "title": title})
save_fig(title)
plt.show()
plt.close()

In [None]:
X_mean, X_std = np.mean(Xs, axis=0), np.std(Xs, axis=0)
Y_mean, Y_std = np.mean(Ys, axis=0), np.std(Ys, axis=0)
Z_mean, Z_std = np.mean(Zs, axis=0), np.std(Zs, axis=0)

title="Comparison of means"

fig, ax = plt.subplots()

plt.plot(ts_det, ys[:, 0], color='r', label='Exact S')
plt.plot(ts_det, ys[:, 1], color='g', label='Exact I')
plt.plot(ts_det, ys[:, 2], color='b', label='Exact R')

plt.plot(ts_discrete, X_mean, color='r', linestyle="--", label='Mean S')
plt.plot(ts_discrete, Y_mean, color='g', linestyle="--", label='Mean I')
plt.plot(ts_discrete, Z_mean, color='b', linestyle="--", label='Mean R')

plt.legend(loc="upper right")
props_fig({"xlabel": "t (days)", "ylabel": "population", "title": title})
save_fig(title)

plt.show()

title="Comparison of stds"

fig, ax = plt.subplots()

plt.fill_between(ts_discrete, X_mean - X_std, X_mean + X_std, color='r', alpha=0.3)
plt.fill_between(ts_discrete, Y_mean - Y_std, Y_mean + Y_std, color='g', alpha=0.3)
plt.fill_between(ts_discrete, Z_mean - Z_std, Z_mean + Z_std, color='b', alpha=0.3)

plt.plot(ts_discrete, X_mean, color='r', linestyle="--", label='Mean S')
plt.plot(ts_discrete, Y_mean, color='g', linestyle="--", label='Mean I')
plt.plot(ts_discrete, Z_mean, color='b', linestyle="--", label='Mean R')

plt.legend(loc="upper right")
props_fig({"xlabel": "t (days)", "ylabel": "population", "title": title})
save_fig(title)

plt.show()

In [None]:
title="Comparison of std after smoothing"
t_max = 300
ts_discrete = np.linspace(0, t_max, 1000)
params = (0.3, 0.1, 1/80)
N = 1000
y0 = (N-10, 10, 0)
num_runs = 100

fig, ax = plt.subplots()

Xs = []
Ys = []
Zs = []

Xs_ma = []
Ys_ma = []
Zs_ma = []
for i in range(num_runs):
    ts, ys = SIR_GDA(y0=y0, params=params, t_max=t_max)

    # Interpolate time axis for statistics
    Xs.append(equal_time(ts, ys[:, 0], t_max))
    Ys.append(equal_time(ts, ys[:, 1], t_max))
    Zs.append(equal_time(ts, ys[:, 2], t_max))

    ys = MA(ys, 1101)

    Xs_ma.append(equal_time(ts, ys[:, 0], t_max))
    Ys_ma.append(equal_time(ts, ys[:, 1], t_max))
    Zs_ma.append(equal_time(ts, ys[:, 2], t_max))

plt.plot(ts_discrete, np.std(Ys, axis=0), label="Stddev I (unsmoothed)")
plt.plot(ts_discrete, np.std(Ys_ma, axis=0), label="Stddev I (smoothed)")

plt.legend()
props_fig({"xlabel": "t (days)", "ylabel": "stdev", "title": title})
save_fig(title)

plt.show()

### 1.2 Investigate Simulation Variability and Negative Co-variance

In [None]:
def errorbar_plot(x, ys, fig_props):
    """Create an errorbar plot where observations over x 
    are collected in the rows of ys."""
    fig, ax = plt.subplots()
    
    ax.errorbar(x, np.mean(ys, axis=1), np.std(ys, axis=1), marker='.', color='black', capsize=4, linestyle='--')
    # Always use scientific notation.
    ax.ticklabel_format(style='scientific', axis='y', scilimits=(0, 0))
    props_fig(fig_props)
    save_fig(fig_props["title"])
    
    plt.show()

In [None]:
t_max = 20
ts_interpolate = np.linspace(0, t_max, 10000)

y0 = (9990, 10, 0)
num_runs = 20

gamma = 0.1
betas = np.linspace(0.3, 2, 13)

all_covs = []
all_vars = []
for beta in betas:
    params = (beta, gamma, 1/80)

    covs = []
    Is_interpolate = []
    for _ in range(num_runs):
        ts, ys = SIR_GDA(y0=y0, params=params, t_max=t_max)

        S = ys[:, 0]
        I = ys[:, 1]
    
        covs.append(np.cov(S, I)[0][1])
        Is_interpolate.append(np.interp(ts_interpolate, ts, I))

    all_covs.append(covs)
    all_vars.append(np.var(Is_interpolate, axis=1))

In [None]:
errorbar_plot(betas, all_variances, {
    "xlabel": "beta", 
    "ylabel": "variance", 
    "title": "Mean variance of I for varying beta"
})

errorbar_plot(betas, all_covs, {
    "xlabel": "beta", 
    "ylabel": "covariance", 
    "title": "Mean covariance of S and I for varying beta"
})

In [None]:
t_max = 100
num_runs = 20

gamma = 0.1
beta = 0.3
params = (beta, gamma, 1/80)

Ns = np.linspace(1000, 20000, 21)

all_covs = []
all_variances = []
for N in Ns:
    y0 = (N - 10, 10, 0)
    
    covs = []
    variances = []
    for _ in range(num_runs):
        ts, ys = SIR_GDA(y0=y0, params=params, t_max=t_max)
    
        covs.append(np.cov(ys[:, 0], ys[:, 1])[0][1])
        variances.append(np.var(ys[:, 1]))

    all_covs.append(covs)
    all_variances.append(variances)

In [None]:
errorbar_plot(Ns, all_variances, {
    "xlabel": "N", 
    "ylabel": "variance", 
    "title": "Mean variance of I for varying N"
})

errorbar_plot(Ns, all_covs, {
    "xlabel": "N", 
    "ylabel": "covariance", 
    "title": "Mean covariance of S and I for varying N"
})

### 1.3 Stochastic Resonance and Increased Transients

In [None]:
def get_endemic_equilibrium(params):
    """Get the endemic equilibrium for the given setup."""
    beta, gamma, mu = params
    R0 = beta / (gamma + mu)

    Sp = 1/R0
    Ip = mu/beta*(R0 - 1)
    Rp = 1 - 1/R0 - mu/beta*(R0 - 1)

    return Sp, Ip, Rp

def normalize_population(ys, N):
    """Normalize the given values."""
    return ys / N

def plot_trajectory(params, yss, N):
    Sp, Ip, Rp = get_endemic_equilibrium(params)

    fig, ax = plt.subplots()
    ax.set_aspect("equal")
    
    # Triangle bounds.
    plt.plot([0, 1, 0, 0], [0, 0, 1, 0], linewidth=1, color="gray", zorder=0)
    
    for ys in yss:
        ys = normalize_population(ys, N)
        plt.plot(ys[:, 0], ys[:, 1], alpha=0.2, color="black", linewidth=0.5)

    plt.plot(Sp, Ip, marker="o", color="red", markeredgecolor="white")

    plt.show()

N = 10000
y0 = (N-10, 10, 0)
params = (0.3, 0.1, 1/80)

yss = []
for _ in range(10):
    ts, ys = SIR_GDA(y0, params, 300)

    yss.append(ys)

plot_trajectory(params, yss, N)

### 1.4 Extinction events and Critical Community Size

In [None]:
def is_extinct(ys):
    """Check whether disease is extinct"""
    last_val = ys[:, 1][-1]
    
    return last_val == 0

In [None]:
t_max = 2000
params = (0.12, 0.1, 1/80)
N = 5000
y0 = (N-10, 10, 0)

extinction_times = []
for _ in range(20):
    ts, ys = SIR_GDA(y0, params, t_max)

    extinct = is_extinct(ys)
        
    print(extinct, ts[-1])

    if extinct:
        extinction_times.append(ts[-1])

print("Mean extinction time:", np.mean(extinction_times))