# Parameter Fitting All Genes In Preliminary Dataset

##### Modules

In [1]:
import os
os.chdir(os.path.dirname(os.getcwd()))

import numpy as np
from data_handler import DataHandler
from droplet_model import DropletModel, ReparameterisedDropletModel, DropletModelUnknownInitialConditions
from parameter_fitting import ParameterFitting
from plots import Plots
import matplotlib.pyplot as plt
import pints
import pints.plot
from scipy.integrate import odeint
from numba import njit
import seaborn as sns
from cycler import cycle
import scipy.stats as st
import matplotlib.pyplot as plt
from pathlib import Path
import pandas as pd
import json

plotter = Plots()


##### Functions and Methods

In [42]:
def plot_mle_solution(
    model, 
    data,
    data_times,
    param_mles,
    sigma_mles,
    z_value = 1.96,
    max_size_shown=None,
    fig_size=(8,5),
    noise_model="additive normal"
):
    
    fig, ax = plt.subplots(figsize=fig_size)
    
    model_params = param_mles


    num_model_times = 200
    model_times = np.linspace(4, 14, num_model_times)

    num_outputs = model.n_outputs()
    if max_size_shown is None:
        max_size_shown = num_outputs
    
    opt_model_sol = model.simulate(params=model_params, times=model_times)


    palette = sns.color_palette("Set1", n_colors=max_size_shown)
    colour_cycle = cycle(palette)

    for n in range(max_size_shown):

        opt_model_sol_n = opt_model_sol[:, n]

        if noise_model == "additive normal":
            model_lower = opt_model_sol_n - z_value * sigma_mles[n]
            model_upper = opt_model_sol_n + z_value * sigma_mles[n]
        elif noise_model == "multiplicative normal":
            model_lower = opt_model_sol_n * (1 - z_value * sigma_mles[n]) 
            model_upper = opt_model_sol_n * (1+ z_value * sigma_mles[n])
        elif noise_model == "lognormal":
            model_lower = opt_model_sol_n * np.exp(- z_value * sigma_mles[n])
            model_upper = opt_model_sol_n * np.exp(z_value * sigma_mles[n])
        else:
            raise ValueError("noise model not valid")


        plot_colour = next(colour_cycle)
        ax.plot(data_times, data[:, n], marker='s', markersize=10, markeredgecolor='k', linestyle='--', label='data', c=plot_colour)
        ax.plot(model_times, opt_model_sol_n, linewidth=4, label='model', c=plot_colour)
        ax.fill_between(model_times,model_lower,model_upper,color=plot_colour,alpha=0.1,label=f'95% CI')

    ax.set_xlabel(r'$t$ (days)')
    ax.set_ylabel(r'$\phi_n(t)$')
    plt.show()



def generate_synthetic_data(
    model,
    model_params,
    data_times,
    sigma_values,
    noise_factor=None,
    noise_model="additive normal",
    rng=None
):
    
    model_sol = model.simulate(model_params, data_times)
    N = model.n_outputs()
    T = len(data_times)


    if sigma_values is None:

        sigma_values = noise_factor * np.std(model_sol, axis=0)

    if rng is None:
        rng = np.random.default_rng()

    eps = rng.normal(loc=0.0, scale=sigma_values.reshape(1, N), size=(T, N))

    if noise_model == "additive normal":

        synthetic_prediction = model_sol + eps
        
    elif noise_model == "multiplicative normal":

        synthetic_prediction = model_sol * (1 + eps)

    elif noise_model == "lognormal":

        synthetic_prediction = model_sol * np.exp(eps)

    return sigma_values, synthetic_prediction



def get_sigma_MLEs(
    model, 
    data, 
    mle_params, 
    data_times, 
    noise_model="additive normal"
):

    opt_model_sol = model.simulate(mle_params, data_times)
    residuals = opt_model_sol - data

    n_t = len(data_times)

    if noise_model == "additive normal":

        sigma_MLEs = np.sqrt((1 / n_t) * np.sum(residuals**2, axis=0))

    elif noise_model == "multiplicative normal":

        sigma_MLEs = np.sqrt((1 / n_t) * np.sum(residuals**2 / opt_model_sol**2, axis=0))

    elif noise_model == "lognormal":

        log_residuals = np.log(data) - np.log(opt_model_sol)

        log_residuals = np.nan_to_num(log_residuals, nan=0)

        sigma_MLEs = np.sqrt((1 / n_t) * np.sum(log_residuals**2, axis=0))

    return sigma_MLEs



def get_opt_parameters(
    log_likelihood,
    opt_lower_bounds,
    opt_upper_bounds,
    log_transform_opt=True,
    opt_method=pints.CMAES,
    opt_initial_position=None,
    display_log=True,
    num_optimisations=10,
    display_initial_positions=False
):
    """
    Run `num_optimisations` optimisations and return the best params found (as numpy array).
    If opt_initial_position is None: sample a fresh random start for each run.
    If opt_initial_position is provided: use the same start for every run.
    """

    opt_bounds = pints.RectangularBoundaries(opt_lower_bounds, opt_upper_bounds)
    log_prior = pints.UniformLogPrior(opt_bounds)

    if log_transform_opt:
        transformation = pints.LogTransformation(n_parameters=len(opt_lower_bounds))
    else:
        transformation = None

    best_score = -np.inf
    best_params = None

    for j in range(1, num_optimisations + 1):
        # pick initial position for this run (do not overwrite user-supplied opt_initial_position)
        if opt_initial_position is None:
            # log_prior.sample(1) often returns an array shaped (1, n); take [0] to get 1D
            init = np.asarray(log_prior.sample(1)).reshape(-1)
        else:
            init = np.asarray(opt_initial_position).reshape(-1)

        if display_initial_positions:
            print(f"Initial position: {init}")

        opt = pints.OptimisationController(
            log_likelihood,
            init,
            boundaries=opt_bounds,
            method=opt_method,
            transformation=transformation
        )

        opt.set_log_to_screen(display_log)
        # opt.set_max_unchanged_iterations(200, 1e-11)

        opt_params, opt_score = opt.run()

        if num_optimisations > 1:
            print(f"Iteration {j}/{num_optimisations}: opt_params: {opt_params} ; score: {opt_score}")

        # compare scores (assumes larger is better for your objective - typical for log-likelihood)
        if opt_score > best_score:
            best_score = opt_score
            best_params = np.asarray(opt_params)

    if best_params is None:
        raise RuntimeError("No optimisation results produced.")
    

    return best_params



def get_mcmc_chains(
    log_likelihood,
    apply_log_transform = False,
    mcmc_method = pints.HaarioBardenetACMC,
    max_mcmc_iterations = 30000,
    num_chains = 4,
    prior_lower_bounds = [1e1, 1e-10, 1e-2],
    prior_upper_bounds = [1e4, 1e0, 1e1],
    specified_log_prior=None,
    initial_mcmc_points=None,
    log_to_screen=True
):

    if specified_log_prior is None:
        
        prior_bounds = pints.RectangularBoundaries(prior_lower_bounds, prior_upper_bounds)
        log_prior = pints.UniformLogPrior(prior_bounds)

    else:

        log_prior = specified_log_prior


    if initial_mcmc_points is None:
        initial_mcmc_points = log_prior.sample(num_chains)


    mcmc_transform = None
    if apply_log_transform:

        mcmc_transform = pints.LogTransformation(n_parameters=log_likelihood._n_parameters)

    log_posterior = pints.LogPosterior(log_likelihood, log_prior)

    mcmc = pints.MCMCController(log_posterior, num_chains, initial_mcmc_points, method=mcmc_method, transformation=mcmc_transform)
    mcmc.set_max_iterations(max_mcmc_iterations)
    # mcmc.set_parallel(True)
    mcmc.set_log_to_screen(log_to_screen)

    chains = mcmc.run()

    return chains



def get_standardised_residuals(
    model,
    data,
    data_times,
    mle_params,
    noise_model
):
    
    model_sol = model.simulate(mle_params, data_times)

    sigma_mles = get_sigma_MLEs(
        model=model,
        data=data,
        mle_params=mle_params,
        data_times=data_times,
        noise_model=noise_model
    )

    if noise_model == "additive normal":

        residuals = data - model_sol
        std_residuals = residuals / sigma_mles
        
    elif noise_model == "multiplicative normal":

        residuals = (data - model_sol) / model_sol
        std_residuals = residuals / sigma_mles

    elif noise_model == "lognormal":

        residuals = np.log(data) - np.log(model_sol)
        std_residuals = residuals / sigma_mles

    return std_residuals

##### Log-likelihoods

In [31]:
class AdditivenormalLogLikelihood(pints.ProblemLogLikelihood):

    def __init__(self, problem, use_profile=True, eps=1e-12, mask_threshold=0.0):
        super(AdditivenormalLogLikelihood, self).__init__(problem)

        # Get number of times, number of outputs
        self._nt = len(self._times)
        self._no = problem.n_outputs()

        # Add parameters to problem

        self.use_profile = use_profile

        if self.use_profile:
            self._n_parameters = problem.n_parameters()
        else:
            self._n_parameters = problem.n_parameters() + self._no

        self._eps = eps
        self._mask_threshold = mask_threshold

        # Pre-calculate parts

        # For a log-normal sampling distribution any data points being below
        # zero would mean that the log-likelihood is always -infinity
        # Boolean mask: True for columns with NO zeros

        vals = self._values

        mask = np.all(vals > self._mask_threshold, axis=0)

        # Indices of size classes kept
        kept_indices = np.where(mask)[0]

        # New vals array with only those size classes
        self.values_filtered = vals[:, mask]

        self._no_filtered = self.values_filtered.shape[1]



    def __call__(self, x):

        sigmas = np.asarray(x[-self._no:])

        if self.use_profile:

            soln = self._problem.evaluate(x[:-self._no])

        else:

            soln = self._problem.evaluate(x)

        ll = 0.0

        for i in range(0, self._no_filtered):

            model_n = soln[:, i]
            data_n = self.values_filtered[:, i]

            res = (data_n - model_n + self._eps)**2

            if self.use_profile:

                sigma_n = np.sqrt((1 / self._nt) * np.sum(res))

            else:

                sigma_n = sigmas[i]

            term1 = -0.5 * self._nt * np.log(2 * np.pi)

            term2 = -0.5 * self._nt * np.log(sigma_n**2 + self._eps)

            term3 = -0.5 * self._nt

            ll_n = term1 + term2 + term3

            # print(f"n = {i+1}, ll_n = {ll_n}")

            ll += ll_n

        return ll



class MultiplicativenormalLogLikelihood(pints.ProblemLogLikelihood):

    def __init__(self, problem, use_profile=True, eps=1e-12, mask_threshold=0.0):
        super(MultiplicativenormalLogLikelihood, self).__init__(problem)

        # Get number of times, number of outputs
        self._nt = len(self._times)
        self._no = problem.n_outputs()

        self.use_profile = use_profile

        if self.use_profile:
            self._n_parameters = problem.n_parameters()
        else:
            self._n_parameters = problem.n_parameters() + self._no

        self._eps = eps
        self._mask_threshold = mask_threshold

        # Pre-calculate parts

        # For a log-normal sampling distribution any data points being below
        # zero would mean that the log-likelihood is always -infinity
        # Boolean mask: True for columns with NO zeros

        vals = self._values

        mask = np.all(vals > self._mask_threshold, axis=0)

        # Indices of size classes kept
        kept_indices = np.where(mask)[0]

        # New vals array with only those size classes
        self.values_filtered = vals[:, mask]

        self._no_filtered = self.values_filtered.shape[1]



    def __call__(self, x):

        sigmas = np.asarray(x[-self._no:])

        if self.use_profile:

            soln = self._problem.evaluate(x[:-self._no])

        else:

            soln = self._problem.evaluate(x)

        ll = 0.0

        for i in range(0, self._no_filtered):

            model_n = soln[:, i]
            data_n = self.values_filtered[:, i]
            # model_n_shape = model_n.shape
            # eps_array = self._eps * np.ones(model_n_shape)

            res = (data_n - model_n)**2 / (model_n**2 + self._eps)


            # print(res.shape)

            if self.use_profile:

                sigma_n = np.sqrt((1 / self._nt) * np.sum(res))

            else:

                sigma_n = sigmas[i]

            # term1 = -0.5 * self._nt * np.log(2 * np.pi)

            # term2 = -0.5 * self._nt * np.log(sigma_n**2 + self._eps)

            # term3 = -0.5 * np.sum(np.log(model_n**2 + self._eps))

            # term4 = -0.5 * self._nt

            # ll_n = term1 + term2 + term3 + term4

            ll_n = -0.5 * np.sum(
                (data_n - model_n)**2 / (sigma_n**2 * model_n**2 + self._eps)
                + np.log(sigma_n**2 * model_n**2 + self._eps)
                + np.log(2 * np.pi)
            )

            # print(f"n = {i+1}, ll_n = {ll_n}")

            ll += ll_n

        return ll
    


class LognormalLogLikelihood(pints.ProblemLogLikelihood):

    def __init__(self, problem, use_profile=True, eps=1e-12, mask_threshold=0.0):
        super(LognormalLogLikelihood, self).__init__(problem)

        # Get number of times, number of outputs
        self._nt = len(self._times)
        self._no = problem.n_outputs()

        self.use_profile = use_profile

        if self.use_profile:
            self._n_parameters = problem.n_parameters()
        else:
            self._n_parameters = problem.n_parameters() + self._no

        self._eps = eps
        self._mask_threshold = mask_threshold

        # Pre-calculate parts

        # For a log-normal sampling distribution any data points being below
        # zero would mean that the log-likelihood is always -infinity
        # Boolean mask: True for columns with NO zeros

        vals = self._values

        mask = np.all(vals > self._mask_threshold, axis=0)

        # Indices of size classes kept
        kept_indices = np.where(mask)[0]

        # New vals array with only those size classes
        self.values_filtered = vals[:, mask]

        self._no_filtered = self.values_filtered.shape[1]



    def __call__(self, x):

        sigmas = np.asarray(x[-self._no:])

        if self.use_profile:

            soln = self._problem.evaluate(x[:-self._no])

        else:

            soln = self._problem.evaluate(x)

        ll = 0.0

        for i in range(0, self._no_filtered):

            model_n = soln[:, i]
            data_n = self.values_filtered[:, i]

            log_res = (np.log(data_n + self._eps) - np.log(model_n + self._eps))**2

            if self.use_profile:

                sigma_n = np.sqrt((1 / self._nt) * np.sum(log_res))

            else:

                sigma_n = sigmas[i]

            term1 = -0.5 * self._nt * np.log(2 * np.pi)

            term2 = -0.5 * self._nt * np.log(sigma_n**2 + self._eps)

            term3 = -0.5 * np.sum(np.log(data_n**2 + self._eps))

            term4 = -0.5 * self._nt

            ll_n = term1 + term2 + term3 + term4

            # print(f"n = {i+1}, ll_n = {ll_n}")

            ll += ll_n

        return ll



def get_log_likelihood(
    problem,
    noise_model,
    use_profile=True,
    mask_threshold=1        
):


    if noise_model == "additive normal":

        log_likelihood = AdditivenormalLogLikelihood(
            problem=problem,
            use_profile=use_profile,
            eps=1e-12,
            mask_threshold=mask_threshold
        )

    elif noise_model == "multiplicative normal":

        log_likelihood = MultiplicativenormalLogLikelihood(
            problem=problem,
            use_profile=use_profile,
            mask_threshold=mask_threshold
        )

    elif noise_model == "lognormal":

        log_likelihood = LognormalLogLikelihood(
            problem=problem,
            use_profile=use_profile,
            mask_threshold=mask_threshold
        )

    else:

        raise ValueError(f"Noise model, {noise_model}, is not a valid nosie model!")





    return log_likelihood

## Fitting pipeline

##### Hyperparameters

In [55]:
# noise_model = "additive normal"

dataset = "full dataset"

noise_model = "additive normal"

data_handler = DataHandler(dataset=dataset)


##### Get optimum parameters

In [62]:
plot_result = False

data_times = data_handler.data_times

pool_ids = [1]

# pool_ids = [2] ###################################################

for pool_id in pool_ids:

    print(f"Pool {pool_id}\n")

    opt_params_df_rows = []

    write_file_path = Path(f"../Results/{noise_model}/{dataset}/pool {pool_id}/opt_params.csv")


    # all_gene_ids = [g for g in data_handler.get_pool_gene_ids(pool_id=pool_id) if g != "NA"]

    all_gene_ids = data_handler.get_pool_gene_ids(pool_id=pool_id)

    num_genes = len(all_gene_ids)

    # all_gene_ids = ["NTC1"]

    # all_gene_ids=['gene_1']

    for i, gene_id in enumerate(all_gene_ids):

        print(f"{i+1}/{num_genes} : {gene_id}")

        data_array = data_handler.get_gene_average_array(pool_id=pool_id, gene_id=gene_id, max_bin_number=51) #[:4]

        # data_times=[4,6,8,11]


        data_initial_conditions = data_array[0]

        data_model = DropletModel(initial_conditions=data_initial_conditions)

        data_problem = pints.MultiOutputProblem(model=data_model, times=data_times, values=data_array)

        data_log_likelihood = get_log_likelihood(
            problem=data_problem,
            noise_model=noise_model,
            mask_threshold=1
        )



        opt_lower_bounds = np.asarray([1e1, 1e-10, 1e-4])
        opt_upper_bounds = np.asarray([1e4, 5e-3, 1e0])

        data_opt_params = get_opt_parameters(
            log_likelihood=data_log_likelihood,
            opt_lower_bounds=opt_lower_bounds,
            opt_upper_bounds=opt_upper_bounds,
            log_transform_opt=True,
            opt_method=pints.CMAES,
            display_log=False,
            display_initial_positions=True
        )

        print(f"Estimated parameters: {data_opt_params}")

        df_row = {
            "gene_id": gene_id,
            "b": data_opt_params[0],
            "f": data_opt_params[1],
            "g": data_opt_params[2]
        }

        sigma_mles = get_sigma_MLEs(
            model=data_model,
            data=data_array,
            mle_params=data_opt_params,
            data_times=data_times,
            noise_model=noise_model
        )

        for i, sigma in enumerate(sigma_mles):

            df_row[f"sigma_{i+1}"] = sigma

        opt_params_df_rows.append(df_row)

    Path(f"../Results/{noise_model}/{dataset}/pool {pool_id}").mkdir(parents=True, exist_ok=True)

    opt_params_df = pd.DataFrame(opt_params_df_rows)
    opt_params_df.to_csv(write_file_path, index=False)

    test_sol = data_model.simulate(data_opt_params, np.linspace(4, 14, 200))

    if plot_result:
        plotter.plot_data_and_model_timecourse(test_sol, data_array,data_times)
        

Pool 1

1/81 : gene_1
Initial position: [8.31954339e+03 3.15904259e-05 9.37749585e-01]
Iteration 1/10: opt_params: [1.79672678e+03 8.56220479e-05 6.92557622e-01] ; score: -429.6053237254116
Initial position: [5.73364917e+03 6.64012879e-04 5.04198369e-01]
Iteration 2/10: opt_params: [1.79672672e+03 8.56220459e-05 6.92557647e-01] ; score: -429.60532372541167
Initial position: [9.44915513e+03 2.94889052e-03 1.96347565e-01]
Iteration 3/10: opt_params: [1.79672676e+03 8.56220461e-05 6.92557622e-01] ; score: -429.6053237254116
Initial position: [3.28055182e+03 1.08727739e-03 7.66361256e-01]
Iteration 4/10: opt_params: [1.79672675e+03 8.56220458e-05 6.92557631e-01] ; score: -429.6053237254116
Initial position: [4.35406374e+03 3.86409507e-03 5.26460883e-01]
Iteration 5/10: opt_params: [1.79672676e+03 8.56220452e-05 6.92557637e-01] ; score: -429.6053237254116
Initial position: [7.20626414e+03 2.77417280e-03 8.87699690e-01]
Iteration 6/10: opt_params: [1.79672675e+03 8.56220466e-05 6.92557621e-0

-426.6710090075119
-426.6710090075119


##### Get MCMC chains using profile likelihood

In [6]:
data_times = data_handler.data_times

pool_ids = [1,2]

# pool_ids = [2]

for pool_id in pool_ids:

    print(f"Pool {pool_id}\n")

    opt_params_df_rows = []

    read_file_path = Path(f"../Results/{noise_model}/preliminary dataset/pool {pool_id}/opt_params.csv")

    opt_params_df = pd.read_csv(read_file_path, low_memory=False, keep_default_na=False)


    # all_gene_ids = [g for g in data_handler.get_pool_gene_ids(pool_id=pool_id) if g != "NA"]

    all_gene_ids = data_handler.get_pool_gene_ids(pool_id=pool_id)

    # all_gene_ids = ['NA']

    num_genes = len(all_gene_ids)

    # all_gene_ids = ["NTC1"]

    for i, gene_id in enumerate(all_gene_ids):

        print(f"{i+1}/{num_genes} : {gene_id}")

        opt_params_row = opt_params_df.loc[opt_params_df["gene_id"] == gene_id].iloc[0]

        opt_model_params = np.asarray([opt_params_row["b"], opt_params_row["f"], opt_params_row["g"]])

        sigma_cols = [c for c in opt_params_df.columns if c.startswith("sigma_")]
        opt_sigma_mles = np.asarray(opt_params_row[sigma_cols].tolist())

        data_array = data_handler.get_gene_average_array(pool_id=pool_id, gene_id=gene_id, max_bin_number=51)

        data_initial_conditions = data_array[0]

        data_model = DropletModel(initial_conditions=data_initial_conditions)

        data_problem = pints.MultiOutputProblem(model=data_model, times=data_times, values=data_array)

        data_log_likelihood = get_log_likelihood(
            problem=data_problem,
            noise_model=noise_model,
            mask_threshold=1
        )

        num_chains = 4

        model_prior_lower_bounds = opt_model_params / 2.0
        model_prior_upper_bounds = opt_model_params * 2.0

        log_prior = pints.UniformLogPrior(model_prior_lower_bounds, model_prior_upper_bounds)

        initial_mcmc_points = [list(opt_model_params) for _ in range(4)]

        apply_log_transform = True

        mcmc_chains = get_mcmc_chains(
            log_likelihood=data_log_likelihood,
            apply_log_transform=apply_log_transform,
            mcmc_method=pints.HaarioBardenetACMC,
            max_mcmc_iterations=20000,
            num_chains=num_chains,
            prior_lower_bounds=None,
            prior_upper_bounds=None,
            specified_log_prior=log_prior,
            initial_mcmc_points=initial_mcmc_points,
            log_to_screen=True
        )

        Path(f"../Results/{noise_model}/preliminary dataset/pool {pool_id}/{gene_id}/profile mcmc").mkdir(parents=True, exist_ok=True)

        for c in range(num_chains):

            chain = mcmc_chains[c]

            chain_df = pd.DataFrame(chain, columns=data_model.param_names)

            write_file_path = Path(f"../Results/{noise_model}/preliminary dataset/pool {pool_id}/{gene_id}/profile mcmc/chain {c+1}.csv")

            

            chain_df.to_csv(write_file_path, index=False)

        mcmc_metadata = {
            "gene_id": gene_id,
            "pool_id": pool_id,
            "opt_model_params": list(opt_model_params),
            "opt_sigma_mles": list(opt_sigma_mles),
            "num_chains": num_chains,
            "prior_lower_bounds": list(model_prior_lower_bounds),
            "prior_upper_bounds": list(model_prior_upper_bounds),
            "initial_mcmc_points": initial_mcmc_points,
            "apply_log_transform": apply_log_transform
        }

        json_write_file_path = Path(f"../Results/{noise_model}/preliminary dataset/pool {pool_id}/{gene_id}/profile mcmc/metadata.json")

        with open(json_write_file_path, "w") as f:
            json.dump(mcmc_metadata, f)

        

Pool 1

1/43 : NTC (avg)
Using Haario-Bardenet adaptive covariance MCMC
Generating 4 chains.
Running in sequential mode.
Iter. Eval. Accept.   Accept.   Accept.   Accept.   Time m:s
0     4      0         0         0         0          0:00.0
1     8      0         0         0.5       0          0:00.0
2     12     0.333     0         0.667     0.333      0:00.0
3     16     0.25      0         0.5       0.25       0:00.0
20    84     0.238     0.0952    0.143     0.286      0:00.2
40    164    0.268     0.122     0.195122  0.22       0:00.3
60    244    0.18      0.0984    0.131     0.213      0:00.4
80    324    0.198     0.0988    0.123     0.222      0:00.5
100   404    0.168     0.099     0.109     0.218      0:00.5
120   484    0.214876  0.116     0.0909    0.214876   0:00.7
140   564    0.227     0.121     0.0851    0.212766   0:00.8
160   644    0.23      0.124     0.0807    0.211      0:00.8
180   724    0.232     0.133     0.0939    0.215      0:01.0
Initial phase completed.


##### Least squares

In [4]:
def get_least_squares_parameters(
    opt_lower_bounds,
    opt_upper_bounds,
    log_transform_opt=True,
    opt_method=pints.CMAES,
    opt_initial_position=None,
    display_log=True,
    num_optimisations=10
):
    """
    Run `num_optimisations` optimisations and return the best params found (as numpy array).
    If opt_initial_position is None: sample a fresh random start for each run.
    If opt_initial_position is provided: use the same start for every run.
    """

    opt_bounds = pints.RectangularBoundaries(opt_lower_bounds, opt_upper_bounds)
    log_prior = pints.UniformLogPrior(opt_bounds)

    if log_transform_opt:
        transformation = pints.LogTransformation(n_parameters=len(opt_lower_bounds))
    else:
        transformation = None

    best_score = -np.inf
    best_params = None

    for j in range(1, num_optimisations + 1):
        # pick initial position for this run (do not overwrite user-supplied opt_initial_position)
        if opt_initial_position is None:
            # log_prior.sample(1) often returns an array shaped (1, n); take [0] to get 1D
            init = np.asarray(log_prior.sample(1)).reshape(-1)
        else:
            init = np.asarray(opt_initial_position).reshape(-1)

        score = pints.SumOfSquaresError(data_problem)

        opt = pints.OptimisationController(
            score,
            init,
            boundaries=opt_bounds,
            method=opt_method,
            transformation=transformation
        )

        opt.set_log_to_screen(display_log)
        # opt.set_max_unchanged_iterations(200, 1e-11)

        opt_params, opt_score = opt.run()

        if num_optimisations > 1:
            print(f"Iteration {j}/{num_optimisations}: opt_params: {opt_params} ; score: {opt_score}")

        # compare scores (assumes larger is better for your objective - typical for log-likelihood)
        if opt_score > best_score:
            best_score = opt_score
            best_params = np.asarray(opt_params)

    if best_params is None:
        raise RuntimeError("No optimisation results produced.")

    return best_params

In [6]:
data_times = data_handler.data_times

pool_ids = [1,2]

# pool_ids = [2] ###################################################

for pool_id in pool_ids:

    print(f"Pool {pool_id}\n")

    opt_params_df_rows = []

    write_file_path = Path(f"../Results/least squares/preliminary dataset/pool {pool_id}/opt_params.csv")


    # all_gene_ids = [g for g in data_handler.get_pool_gene_ids(pool_id=pool_id) if g != "NA"]

    all_gene_ids = data_handler.get_pool_gene_ids(pool_id=pool_id)

    num_genes = len(all_gene_ids)

    # all_gene_ids = ["NTC1"]

    for i, gene_id in enumerate(all_gene_ids):

        print(f"{i+1}/{num_genes} : {gene_id}")

        data_array = data_handler.get_gene_average_array(pool_id=pool_id, gene_id=gene_id, max_bin_number=51)

        data_initial_conditions = data_array[0]

        data_model = DropletModel(initial_conditions=data_initial_conditions)

        data_problem = pints.MultiOutputProblem(model=data_model, times=data_times, values=data_array)

        opt_lower_bounds = np.asarray([1e1, 1e-10, 1e-4])
        opt_upper_bounds = np.asarray([1e4, 1e-3, 1e0])

        data_opt_params = get_least_squares_parameters(
            opt_lower_bounds=opt_lower_bounds,
            opt_upper_bounds=opt_upper_bounds,
            log_transform_opt=True,
            display_log=False
        )

        print(f"Estimated parameters: {data_opt_params}")

        df_row = {
            "gene_id": gene_id,
            "b": data_opt_params[0],
            "f": data_opt_params[1],
            "g": data_opt_params[2]
        }



        sigma_mles = get_sigma_MLEs(
            model=data_model,
            data=data_array,
            mle_params=data_opt_params,
            data_times=data_times,
            noise_model="additive normal"
        )

        num_sigmas = len(sigma_mles)

        sigma_mle = (1 / num_sigmas) * np.sum(sigma_mles)

        df_row["sigma"] = sigma_mle

        opt_params_df_rows.append(df_row)

 

    Path(f"../Results/least squares/preliminary dataset/pool {pool_id}").mkdir(parents=True, exist_ok=True)

    opt_params_df = pd.DataFrame(opt_params_df_rows)
    opt_params_df.to_csv(write_file_path, index=False)

Pool 1

1/43 : NTC (avg)
Iteration 1/10: opt_params: [2.77266113e+03 1.31613702e-05 2.18942596e-01] ; score: 8614616.532546014
Iteration 2/10: opt_params: [2.77266112e+03 1.31613703e-05 2.18942595e-01] ; score: 8614616.532546015
Iteration 3/10: opt_params: [2.77266112e+03 1.31613703e-05 2.18942596e-01] ; score: 8614616.532546017
Iteration 4/10: opt_params: [2.77266111e+03 1.31613702e-05 2.18942595e-01] ; score: 8614616.532546017
Iteration 5/10: opt_params: [2.77266112e+03 1.31613703e-05 2.18942596e-01] ; score: 8614616.53254601
Iteration 6/10: opt_params: [2.77266112e+03 1.31613703e-05 2.18942595e-01] ; score: 8614616.532546015
Iteration 7/10: opt_params: [2.77266112e+03 1.31613703e-05 2.18942595e-01] ; score: 8614616.532546015
Iteration 8/10: opt_params: [2.77266111e+03 1.31613700e-05 2.18942598e-01] ; score: 8614616.53254602
Iteration 9/10: opt_params: [2.77266112e+03 1.31613701e-05 2.18942597e-01] ; score: 8614616.532546015
Iteration 10/10: opt_params: [2.77266112e+03 1.31613703e-05

---

### Synthetic data


In [None]:

# ----------------------------
# Model & synthetic data setup
# ----------------------------

param_names = ['b', 'f', 'g']
n_params = len(param_names)

initial_conditions = plotter.get_synthetic_initial_conditions()
data_times = np.array([4, 6, 8, 11, 14])
model_times = np.linspace(4, 14, 250)

timecourse_model = DropletModel(
    model_type='bfg',
    initial_conditions=initial_conditions,
    kernel='constant'
)

true_params = np.array([1e3, 1e-05, 2e-1])

param_bounds = [
    [1e1, 1e-10, 1e-2],
    [1e5, 1e-3,  1e2]
]

true_sol = timecourse_model.simulate(
    params=true_params,
    times=data_times
)

# ----------------------------
# Noise & replicate settings
# ----------------------------

noise_vals = np.linspace(0, 0.5, 21)
n_reps = 20  # number of synthetic replicates

# Storage: (noise, replicate, parameter)
all_opt_params = np.zeros((len(noise_vals), n_reps, n_params))

# ----------------------------
# Synthetic validation loop
# ----------------------------

for i, noise_frac in enumerate(noise_vals):

    # Scale noise by variability of true signal
    output_scale = true_sol.std(axis=0, keepdims=True)

    for r in range(n_reps):

        noise = np.random.normal(
            loc=0.0,
            scale=noise_frac * output_scale,
            size=true_sol.shape
        )

        data_array = true_sol + noise

        opt_params = parameter_fitting.do_optimisation_by_score(
            model=timecourse_model,
            data=data_array,
            times=data_times,
            num_optimisations=1,
            param_bounds=param_bounds,
            max_iterations=2000
        )

        all_opt_params[i, r, :] = opt_params

# ----------------------------
# Summary statistics
# ----------------------------

mean_params = all_opt_params.mean(axis=1)
std_params  = all_opt_params.std(axis=1)

# Relative error (bias)
relative_error = np.abs(mean_params - true_params) / true_params

# Empirical 95% confidence intervals
ci_lower = np.percentile(all_opt_params, 2.5, axis=1)
ci_upper = np.percentile(all_opt_params, 97.5, axis=1)

# Coefficient of variation
cv_params = std_params / mean_params

# ----------------------------
# Plotting
# ----------------------------

fig, axes = plt.subplots(3, 1, figsize=(10, 7), sharex=True)

factors = [1e3, 1e-5, 1e-1]
factor_exponents = [3, -5, -1]

palette = sns.color_palette("Set2", n_colors=3)
colour_cycle = cycle(palette)

for i, param_name in enumerate(param_names):

    colour = next(colour_cycle)
    ax = axes[i]

    mean_trans = mean_params[:, i] / factors[i]
    std_trans  = std_params[:, i]  / factors[i]
    true_trans = true_params[i]    / factors[i]

    # True parameter line
    ax.axhline(
        y=true_trans,
        c='k',
        linestyle='--',
        linewidth=2,
        label='True value'
    )

    # Mean ± SD error bars
    ax.errorbar(
        noise_vals,
        mean_trans,
        yerr=std_trans,
        fmt='-o',
        linewidth=3,
        capsize=4,
        c=colour
    )

    y_label_text = rf'${param_name} \; (\times 10^{{{factor_exponents[i]}}})$'
    ax.set_ylabel(y_label_text, fontsize=14, fontweight='bold')

    # Aesthetics
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['left'].set_linewidth(2)
    ax.spines['bottom'].set_linewidth(2)

    for tick in ax.get_xticklabels() + ax.get_yticklabels():
        tick.set_fontsize(13)
        tick.set_fontweight('bold')

axes[-1].set_xlabel('Noise factor', fontsize=14, fontweight='bold')

fig.suptitle(
    'Synthetic data validation: parameter robustness',
    fontsize=16,
    fontweight='bold'
)

plt.tight_layout()
plt.show()

# ----------------------------
# Optional: report identifiability limits
# ----------------------------

threshold = 0.3  # 30% relative error

for i, param_name in enumerate(param_names):

    exceeded = np.where(relative_error[:, i] > threshold)[0]

    if len(exceeded) > 0:
        crit_alpha = noise_vals[exceeded[0]]
        print(
            f"{param_name}: relative error exceeds {threshold*100:.0f}% "
            f"at noise factor ≈ {crit_alpha:.2f}"
        )
    else:
        print(
            f"{param_name}: relative error stays below "
            f"{threshold*100:.0f}% for all noise levels"
        )