In [None]:
import utils
import io
import numpy as np
import math
import scipy
import mpmath
import matplotlib.pyplot as plt
import seaborn as sns
import bayesflow as bf
import tensorflow as tf
import tensorflow_probability as tfp
# Set TensorFlow to use only the CPU
tf.config.set_visible_devices([], 'GPU')

cor, pal = utils.matplotlib_style()

# Set random seed
RANDOM_SEED = 42
rng = np.random.default_rng(RANDOM_SEED)

# Two-state log 

In [20]:
def prior_fn():
    # Prior on k_on rate
    k_on = np.abs(rng.normal(1E-4, 10))
    # Prior on k_off rate
    k_off = np.abs(rng.normal(1E-4, 10))
    # Prior on rate of transcription r
    r = 1E-3 + np.abs(10 * rng.standard_cauchy())

    return np.float32(np.log(np.array([k_on, k_off, r])))


# Define parameter names
param_names = ['log_k_on', 'log_k_off', 'log_r']

# Define prior simulator
prior = bf.simulation.Prior(
    prior_fun=prior_fn,
    param_names=param_names,
)

In [21]:
def likelihood_fn(params, n_obs=10_000, m_range=range(0, 3_000)):
    # Unpack parameters
    k_on, k_off, r = np.exp(params)
    # Compute the log probability over m_range
    logP = utils.two_state_log_probability(m_range, k_on, k_off, r)
    # Convert log probabilities to probabilities
    P = np.exp(logP)
    # Normalize the probabilities to use as weights. This is necessary because
    # of numerical precision issues.
    P /= P.sum()
    # Generate random samples using these weights
    u = np.random.choice(m_range, size=n_obs, p=P)
    # Add a 3rd dimension to the array to make output 3D tensor
    u = np.expand_dims(u, axis=1)
    # Return the samples as float32
    return np.float32(u)

In [None]:
print("Defining the generative model...")

# Define Likelihood simulator function for BayesFlow
simulator = bf.simulation.Simulator(simulator_fun=likelihood_fn)

# Build generative model
model = bf.simulation.GenerativeModel(prior, simulator)

# Define summary network as a Deepset
summary_net = bf.networks.DeepSet(summary_dim=32)

# Define the conditional invertible network with affine coupling layers
inference_net = bf.inference_networks.InvertibleNetwork(
    num_params=prior(1)["prior_draws"].shape[-1],
)

In [None]:
# Define the number of draws
n_draws = 50
# Draw samples from the generative model
model_draws = model(n_draws)

# Initialize figure
fig, ax = plt.subplots(1, 1, figsize=(1.75, 1.5))

# Loop through the draws and plot the data
for m in range(n_draws):
    sns.ecdfplot(model_draws["sim_data"][m, :, :].flatten(), ax=ax)

# Set x-axis to log scale
ax.set_xscale("log")

# Label axis
ax.set_xlabel("UMI counts")
ax.set_ylabel("ECDF")

In [None]:
# Assemble the amoratizer that combines the summary network and inference
# network
amortizer = bf.amortizers.AmortizedPosterior(
    inference_net, summary_net,
)

# Assemble the trainer with the amortizer and generative model
trainer = bf.trainers.Trainer(
    amortizer=amortizer,
    generative_model=model,
    checkpoint_path="./two_state_log_bayesflow"
)

In [None]:
test_sims = trainer.configurator(model(500))
z_samples, _ = amortizer(test_sims)
f = bf.diagnostics.plot_latent_space_2d(z_samples)

In [None]:
# Obtain 100 posterior samples for each simulated data set in test_sims
posterior_samples = amortizer.sample(test_sims, n_samples=100)
f = bf.diagnostics.plot_sbc_histograms(
    posterior_samples,
    test_sims["parameters"],
    num_bins=10,
    param_names=param_names,
)

In [None]:
f = bf.diagnostics.plot_sbc_ecdf(
    posterior_samples, 
    test_sims["parameters"], 
    difference=True,
    param_names=param_names,
)

In [None]:
post_samples = amortizer.sample(test_sims, n_samples=1000)
f = bf.diagnostics.plot_recovery(
    post_samples,
    test_sims["parameters"],
    param_names=param_names,
)

In [None]:
f = bf.diagnostics.plot_z_score_contraction(
    post_samples, test_sims["parameters"], param_names=param_names
)

# Two-state log constraint

In [3]:
def prior_fn():
    # Prior on log_k_on rate
    log_k_on = np.log(np.abs(rng.normal(1E-4, 10)))
    # Prior on log_k_off
    log_k_off = np.log(np.abs(rng.normal(1E-4, 10)))
    # Prior on log(r / k_off)
    log_r_k_off = np.log(1E-3 + np.abs(rng.standard_cauchy()))
    # Prior on rate of transcription r
    log_r = log_r_k_off + log_k_off

    return np.float32(np.array([log_k_on, log_k_off, log_r]))


# Define parameter names
param_names = ['log_k_on', 'log_k_off', 'log_r']

# Define prior simulator
prior = bf.simulation.Prior(
    prior_fun=prior_fn,
    param_names=param_names,
)

In [4]:
def likelihood_fn(params, n_obs=10_000, m_range=range(0, 3_000)):
    # Unpack parameters
    k_on, k_off, r = np.exp(params)
    # Compute the log probability over m_range
    logP = utils.two_state_log_probability(m_range, k_on, k_off, r)
    # Convert log probabilities to probabilities
    P = np.exp(logP)
    # Normalize the probabilities to use as weights. This is necessary because
    # of numerical precision issues.
    P /= P.sum()
    # Generate random samples using these weights
    u = np.random.choice(m_range, size=n_obs, p=P)
    # Add a 3rd dimension to the array to make output 3D tensor
    u = np.expand_dims(u, axis=1)
    # Return the samples as float32
    return np.float32(u)

In [None]:
print("Defining the generative model...")

# Define Likelihood simulator function for BayesFlow
simulator = bf.simulation.Simulator(simulator_fun=likelihood_fn)

# Build generative model
model = bf.simulation.GenerativeModel(prior, simulator)

# Define summary network as a Deepset
summary_net = bf.networks.DeepSet(summary_dim=32)

# Define the conditional invertible network with affine coupling layers
inference_net = bf.inference_networks.InvertibleNetwork(
    num_params=prior(1)["prior_draws"].shape[-1],
)

In [None]:
# Define the number of draws
n_draws = 50
# Draw samples from the generative model
model_draws = model(n_draws)

# Initialize figure
fig, ax = plt.subplots(1, 1, figsize=(1.75, 1.5))

# Loop through the draws and plot the data
for m in range(n_draws):
    sns.ecdfplot(model_draws["sim_data"][m, :, :].flatten(), ax=ax)

# Set x-axis to log scale
ax.set_xscale("log")

# Label axis
ax.set_xlabel("UMI counts")
ax.set_ylabel("ECDF")

In [None]:
# Assemble the amoratizer that combines the summary network and inference
# network
amortizer = bf.amortizers.AmortizedPosterior(
    inference_net, summary_net,
)

# Assemble the trainer with the amortizer and generative model
trainer = bf.trainers.Trainer(
    amortizer=amortizer,
    generative_model=model,
    checkpoint_path="./two_state_log_constraint_bayesflow"
)

In [None]:
test_sims = trainer.configurator(model(500))
z_samples, _ = amortizer(test_sims)
f = bf.diagnostics.plot_latent_space_2d(z_samples)

In [None]:
# Obtain 100 posterior samples for each simulated data set in test_sims
posterior_samples = amortizer.sample(test_sims, n_samples=100)
f = bf.diagnostics.plot_sbc_histograms(
    posterior_samples,
    test_sims["parameters"],
    num_bins=10,
    param_names=param_names,
)

In [None]:
f = bf.diagnostics.plot_sbc_ecdf(
    posterior_samples,
    test_sims["parameters"],
    difference=True,
    param_names=param_names,
)

In [None]:
post_samples = amortizer.sample(test_sims, n_samples=1000)
f = bf.diagnostics.plot_recovery(
    post_samples,
    test_sims["parameters"],
    param_names=param_names,
)