In [8]:
# 1:54:49.48 to get a functioning encoding with just compartmental constraints.  Needs more time on probing the parameters
# 0 - 32:00.68: Setup basic encoding
# 32:00.68 - 48:00: Setup z3 solver and get results
# 48:00 - 1:27:15: Setup solution extraction
# 1:27:15 - 1:54:49.48: Setup solution plotting and explore 8 parameter bound settings to test
# 1:54:49.48 - 2:09:43.40: Cleanup code, comments, and improve plotting to interpolate
# 2:09:43.40 - 2:17:19.51: Add dreal and test

from pysmt.shortcuts import And, Or, Plus, Minus, Times, Div, REAL, LE, LT, GE, GT, Equals, Symbol, Real, Solver
from pysmt.fnode import  FNode
from typing import Dict
import pandas as pd
from decimal import Decimal
import numpy as np


def dataframe(assignment: Dict[Symbol, float], state_variables, parameters, timepoints) -> pd.DataFrame:
    timeseries = {sv: [None]*len(timepoints) for sv in state_variables}
    for k, v in assignment.items():
        sym = k.symbol_name()
        if "_" in sym:
            sv = sym.split("_")[0]
            t = sym.split("_")[1]
            value =Decimal(v.numerator) / Decimal(v.denominator)
            timeseries[sv][timepoints.index(int(t))] = value
        else:
            timeseries[sym] = [v]*len(timepoints)
    df = pd.DataFrame(timeseries, index=timepoints).astype(float)
    return df

def reindex_and_interpolate(df, new_index):
    df_reindexed = df.reindex(index = new_index)
    df_reindexed.interpolate(method = 'linear', inplace = True)
    return df_reindexed


def plot_results(values, timepoints):
    results: pd.DataFrame = dataframe(values, ["S", "I", "R"], ["beta", "gamma"], list(timepoints))
    newindex = np.linspace(timepoints[0], timepoints[-1], timepoints[-1]+1)
    results = reindex_and_interpolate(results, newindex)
    print(f"beta = {results['beta'][0]}, gamma = {results['gamma'][0]}")
    print(results[["S", "I", "R"]])
    ax = results[["S", "I", "R"]].plot()
    ax.set_xlabel="Time"
    return ax

def run_solver(solver):
    solver.add_assertion(consistency)
    result = solver.solve()
    if result:
        model = solver.get_model()
        variables = consistency.get_free_variables()
        values = {}
        for var in variables:
            try:
                values[var] = model.get_value(var).constant_value()
            except Exception as e:
                pass
    else:
        print("Unsat")
        values = None
    return values



In [9]:
# Generate model diagram

# from funman.api.run import Runner
# import json

# amr_model = "../../resources/amr/petrinet/amr-examples/sir.json"
# m = Runner().get_model(amr_model)
# g = m[0].to_dot()
# g.render("sir.pdf")

In [10]:
# Configure these bounds (lower, upper) to configure the parameter space

beta_bounds = (0.0, 0.01)
gamma_bounds = (0.0, 0.2)

# Set these values for the initial state

S_0_value = 0.999
I_0_value = 0.001
R_0_value = 0


# Timepoints
step_size = 10
max_time = 100

In [18]:
# The main encoding code, which includes 

time_format = lambda t: f"{t:03d}"


################################################################################
################# Initial States ###############################################
################################################################################

S_0 = Symbol(f"S_{time_format(0)}", REAL)
I_0 = Symbol(f"I_{time_format(0)}", REAL)
R_0 = Symbol(f"R_{time_format(0)}", REAL)

population_size = S_0_value + I_0_value + R_0_value

# SIR Model Initial State
initial_state = And([
    Equals(S_0, Real(S_0_value)),
    Equals(I_0, Real(I_0_value)),
    Equals(R_0, Real(R_0_value))
])

################################################################################
################# Parameters     ###############################################
################################################################################


# Parameters
beta = Symbol("beta", REAL)

gamma = Symbol("gamma", REAL)


parameters = And([
    And(LE(Real(beta_bounds[0]), beta), LT(beta, Real(beta_bounds[1]))),
    And(LE(Real(gamma_bounds[0]), gamma), LT(gamma, Real(gamma_bounds[1])))
])


timepoints = list(range(0, max_time+step_size, step_size))


################################################################################
################# Transitions ##################################################
################################################################################

S_next = lambda t: Symbol(f"S_{time_format(t+step_size)}", REAL)
S_now = lambda t: Symbol(f"S_{time_format(t)}", REAL)
I_next = lambda t: Symbol(f"I_{time_format(t+step_size)}", REAL)
I_now = lambda t: Symbol(f"I_{time_format(t)}", REAL)
R_next = lambda t: Symbol(f"R_{time_format(t+step_size)}", REAL)
R_now = lambda t: Symbol(f"R_{time_format(t)}", REAL)
dt = Real(float(step_size))

S_Trans = lambda t: Equals(S_next(t), 
                                Minus(
                                    S_now(t), 
                                    Times([beta, S_now(t), I_now(t), dt])))

I_Trans = lambda t: Equals(I_next(t), 
                                Plus(
                                    I_now(t), 
                                    Times(
                                        Minus(
                                            Times([beta, S_now(t), I_now(t)]), 
                                            Times(gamma, I_now(t))), dt)))

R_Trans = lambda t: Equals(R_next(t), 
                                Plus(
                                    R_now(t), 
                                    Times(
                                        Times(gamma, I_now(t)), 
                                        dt)))

Trans = lambda t: And(S_Trans(t), I_Trans(t), R_Trans(t))

All_Trans = And([Trans(t) for t in timepoints[:-1]])


################################################################################
################# Constraints ##################################################
################################################################################

compartmental_constraint = And([
    And( LE(Real(0.0), S_now(t)),
    LE(Real(0.0), I_now(t)),
    LE(Real(0.0), R_now(t)),
    Equals(Plus([S_now(t), I_now(t), R_now(t)]), Real(population_size)))
 for t in timepoints])

# 10m to add and check
I_peak = (0.45, 0.55) # .45, .55
I_peak_t = (45.0, 55.0) # 45, 55
peak_I = And([
    And(LE(Real(I_peak[0]), I_now(t)),
    LT(I_now(t), Real(I_peak_t[1])))
    for t in timepoints 
    if I_peak_t[0] <= t and t <= I_peak_t[1]
])


################################################################################
################# Combine Assertions ###########################################
################################################################################

consistency = And([
    initial_state,
    parameters,
    All_Trans,
    # compartmental_constraint,
    peak_I
    ])
# consistency.serialize()

In [12]:
# # Solve encoding
# with Solver() as solver:
#     values = run_solver(solver)
    
# if values:
#     plot_results(values, timepoints)


In [19]:

# Use dreal instead of z3
from pysmt.logics import QF_NRA

from funman_dreal.solver import ensure_dreal_in_pysmt

ensure_dreal_in_pysmt()

opts = {
        "dreal_precision": 1e-3,
        "dreal_log_level": "none",
        "dreal_mcts": True,
    }
with Solver(name="dreal",
            logic=QF_NRA,
            solver_options=opts
            ) as solver:
    values = run_solver(solver)
    
if values:
    plot_results(values, timepoints)


Unsat
