In [None]:
import reactionmodel.load
import glob
import os
import re
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import NamedTuple
from reactionmodel.specification import SimulationSpecification

from simulators import SIMULATORS
import test

@dataclass
class SimulatorArguments():
    t_span: tuple
    t_eval: tuple

TEST_ARGUMENTS = SimulatorArguments((0.0, 50.0), np.linspace(0, 50, 51))

inital_cwd = os.getcwd()

In [None]:
def do_simulations(s, n=200):
    results = []
    simulator = s.simulator
    forward_time = SIMULATORS[simulator]
    rng = np.random.default_rng()
    initial_condition = s.model.make_initial_condition(s.initial_condition)
    simulation_options = s.simulation_options.copy()

    if simulator == 'hybrid':
        import hybrid
        partition_path = simulation_options.pop('partition')
        partition_scheme = hybrid.load_partition_scheme(partition_path)
        simulation_options['partition_function'] = partition_scheme.partition_function
    k = s.model.get_k(parameters=s.parameters, jit=True)
    for i in range(n):
        print(i)
        result = forward_time(initial_condition, TEST_ARGUMENTS.t_span, k, s.model.stoichiometry(), s.model.rate_involvement(), rng, discontinuities=TEST_ARGUMENTS.t_eval, **simulation_options)
        results.append(result)
    return results

def align_results(results, time, target_indices, species_names):
    all_aligned = []
    for r in results:
        aligned = []
        t_history = r.t_history
        for t in time:
            idx = np.argmin(np.abs(t-t_history))
            aligned.append((r.t_history[idx], *[r.y_history[target_index,idx] for target_index in target_indices]))
        all_aligned.append(pd.DataFrame.from_records(aligned, columns=['time', *species_names]))
    
    indexed_results = []
    for r in all_aligned:
        r['time'] = np.round(r['time'], 5)
        r = r.set_index('time')
        indexed_results.append(r)

    return indexed_results

def z_score_for_mean(aligned_results, check_data):
    target_species = set([c.split('-')[0] for c in check_data.columns if len(c.split('-')) > 1])

    df = pd.concat(aligned_results, axis=1)
    results_to_check = pd.concat([df.groupby(by=df.columns, axis=1).mean(), df.groupby(by=df.columns, axis=1).std()], axis=1)
    results_to_check.columns = [c + '-mean' if i < len(target_species) else c + '-sd' for i,c in enumerate(results_to_check.columns)]

    # https://github.com/sbmlteam/sbml-test-suite/blob/release/cases/stochastic/DSMTS-userguide-31v2.pdf
    z_ts = {}
    for species in target_species:
        z_t = (results_to_check[f'{species}-mean'] - check_data[f'{species}-mean'])/(check_data[f'{species}-sd']) * np.sqrt(n)
        z_ts[species] = z_t

    return results_to_check, z_ts

class TestResult(NamedTuple):
    results_df: pd.DataFrame
    check_df: pd.DataFrame
    z_scores_for_mean_by_species: dict

def single_test(specification, check_path, **kwargs):
    check_data = pd.read_csv(check_path)
    results = do_simulations(specification, **kwargs)
    desired_species = set([c.split('-')[0] for c in check_data.columns if len(c.split('-')) > 1])
    all_species = [s.name for s in specification.model.species]
    targets = [all_species.index(s) for s in desired_species]
    aligned = align_results(results, check_data['time'], targets, desired_species)

    results_table, z_ts = z_score_for_mean(aligned, check_data)
    
    return TestResult(results_table, check_data, z_ts)

def run_tests_with_checks(root, specifications, **kwargs):
    tests_to_do = []
    for check in glob.glob(os.path.join(root, 'checks', '*/')):
        check_dir_root = check.split('/')[-2]
        tests_to_do.append(specifications[check_dir_root])
    print(f"Performing tests with benchmark data for {len(tests_to_do)}/{len(specifications)} combinations.")

    test_results = {}
    for specification,check_dir in tests_to_do:
        # each check directory contains 1 CSV file with a name like check{SBML_TEST_NUMBER}.csv
        assert(len(glob.glob(os.path.join(check_dir, 'check*.csv')))) == 1, f"Check directory {check_dir} had more than 1 check csv. I don't know what to do"
        check_file = glob.glob(os.path.join(check_dir, '*.csv'))[0]
        test_results[check_dir] = single_test(specification, check_file, **kwargs)
    return test_results

def get_files(root, individual, collection, pattern):
    if os.path.isfile(os.path.join(root, individual)):
        return [os.path.join(root, individual)]
    return glob.glob(os.path.join(root, collection, pattern))

class SpecTuple(NamedTuple):
    specification: SimulationSpecification
    config_path: str

def run_tests_from_dir(dir, **kwargs):
    model_paths  = get_files(dir, 'model.txt', 'models', 'model*.txt')
    params_paths = get_files(dir, 'parameters.txt', 'parameters', 'parameters*.txt')
    config_paths = get_files(dir, 'config.txt', 'configurations', 'config*.txt')
    ic_paths     = get_files(dir, 'ic.txt', 'initial_conditions', 'initial*.txt')
    specifications = {}
    for model_path in model_paths:
        for params_path in params_paths:
            for config_path in config_paths:
                for ic_path in ic_paths:
                    specification = reactionmodel.load.load_specification(model_path, params_path, config_path, ic_path)
                    # use the parameter and ic file names as a unique identifier for this combination
                    # later, we will look up all the combinations that we have test data for, and run simulations to check
                    model_match = re.search('[a-z]+([0-9]+)\.txt', model_path)
                    config_match = re.search('[a-z]+([0-9]+)\.txt', config_path)
                    param_match = re.search('[a-z]+([0-9]+)\.txt', params_path)
                    ic_match = re.search('[a-z]+([0-9]+)\.txt', ic_path)
                    matches = [('m', model_match), ('c', config_match), ('p', param_match), ('i', ic_match)]
                    identifier = ''
                    for id_str, match in matches:
                        if match:
                            identifier += id_str + str(match[1])
                    # if identifier == '': all of the configuration files lived in root directory, so the check should just live in the root of the check directory
                    specifications[identifier] = SpecTuple(specification, os.path.dirname(config_path))
    return run_tests_with_checks(dir, specifications, **kwargs)

In [None]:
os.chdir(inital_cwd)
test_dir = "./tests/sbml-tests/"
tests = glob.glob(os.path.join(test_dir, 'sbml-*'))

target_test = "sbml-003-dimerisation"
#target_test = "sbml-001-birth-death"
target_check = "p01i01"

n = 10

specification = reactionmodel.load.load_specification(*test.get_path_tuple(os.path.join(test_dir, target_test), target_check))
check_file = glob.glob(os.path.join(test_dir, target_test, 'checks', target_check, '*.csv'))[0]
test_result = single_test(specification, check_file, n=n)

# Analyze results

In [None]:
test_result.results_df

In [None]:
test_result.check_df

In [None]:
for species, series in test_result.z_scores_for_mean_by_species.items():
    print(species)
    print(series)