# Generate a simulation of a population by running envs_beta/BogoBetaEnv  environment

JMA 14 March 2023

See also `generate_primary_sim.py`

In [0]:
# %reload_ext autoreload
# %autoreload 2
import math, os, re, sys
from pathlib import Path
import pandas as pd
import numpy as np
from numpy.random import default_rng
import datetime as dt

# For StructType
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

NO_BATCHES = 10   # 100 X 10000 patients

# sys.path.append('/Workspace/Repos/joagosta@microsoft.com/bogovirus/RL_offline/envs_beta/')
# from beta_policies import BogoPolicies

In [0]:
Dict = dict

class Box(object):
    'To test if a value fits in an interval'
    def __init__(self, low = 0, high= 1, shape=(1,), dtype=np.float32) -> None:
        'shape is not used.'
        self.low = low
        self.high = high
        
    def not_within(self, v):
        'test if the value v is outside the interval'
        if v > self.high:
            return 'high'
        elif v < self.low:
            return 'low'
        else:
            return False
        

# sys.path.append('./beta/')
# Get the set of policies

# import beta_simulation as sm

SEED = None       # Use a random seed each run
CONST_DOSE  = 0.7 # For test
VERBOSE = False

def sigmoid(x):
    # starts at 1, goes to zero
    return 1 - 1/(1 + math.exp(-(x - 0.6)/0.1))

### Simulate environment.  
class BogoBetaEnv(object):
    'An environment class derived from the gym environment, for one patient episode.'
    
    my_rng = default_rng(seed=None)

    NUM_COHORTS = 16
    MAX_INFECTION = 150
    MAX_DOSE = 3.0  # we want doses from 0 to 1.5 in 0.1 increments
    SEVERITY_CEILING = 125; # Max expected severity.
    MAX_DAYS = 100 
    SAMPLES = 10

    def __init__(self) -> None:
        'Call this once, and reuse it for all patient episodes'
        # self.n_neighbors = N_NEIGHBORS 
        # self.render_mode = None
        # fix the type warnings for these

        self.action_space = Dict(
            {"Dose": Box(low=0.0, high=self.MAX_DOSE, shape=(1,), dtype=np.float32)}
        )
        self.observation_space = Dict(
            {"infection": Box(low=0, high=self.MAX_INFECTION, shape=(1,), dtype=np.float32), 
             "severity": Box(low=0, high=self.SEVERITY_CEILING, shape=(1,), dtype=np.float32),
             "cum_drug": Box(low=0, high=self.MAX_DOSE, shape=(1,), dtype=np.float32)}
        )
        # Immediate rewards
        self.one_day = -1
        self.recover = 100
        self.die = -100
        # Since the simulator doesn't observe the full state we keep it internal to the objec
        self.today = ModuleNotFoundError
        
    def test_v(self, the_var, v):
        def rnd(x,p):
            'replacement for the internal round function'
            e = math.pow(10,p)
            return int(e * x)/e
        'Is the var in range?'
        space = self.observation_space.get(the_var, None)
        if space is not None:
            is_not = space.not_within(v)
            if is_not:
                # Careful - the spark function round masks the python builtin
                print(f'Out-of-range {the_var}:{rnd(v,3)} is {is_not}')
        return v
        
    ### local models 
        
    def new_patient(self, patient_id, params= 0.7):
        'Return a dict of a patient with an initial infection and its severity.'
        self.stage = 0               # Not a state variable, but the sim tracks it
        self.patient_results = []
        # Used by the policy function, e.g. to randomize policy over patients. 
        self.policy_params = params
        today =  {
                'patient_id': patient_id,             # None of these variable are part of the state
                'cohort': patient_id % self.NUM_COHORTS,   # 
                'day_number': self.stage,             # The stage
                # e.g. An array of length 1, of random integers, 20 <= rv < 40
                'infection': self.my_rng.integers(low=20, high=40, size=1)[0],
                'severity': self.my_rng.integers(low=10, high=30, size=1)[0],
                'cum_drug' : 0,
                'outcome':0,
                'efficacy': 0,
                'drug': 0,
                'reward' : 0
            }
        self.today = today
        return today
        
    def get_infection(self, yesterday):
        # depends on: infection_prev
        progression = self.my_rng.integers(low=0, high=10, size=1)[0]
        return self.test_v('infection', yesterday['infection'] + progression)
    
    def get_severity(self, yesterday):
        # depends on: yesterday's severity, infection, & efficacy
        noise = self.my_rng.normal(loc=0, scale=1, size=1)[0]
        severity_next = yesterday['severity'] * 1.1 + \
                        yesterday['infection'] * 0.1 - \
                        yesterday['efficacy'] + \
                        noise
        #print('s_nxt: ', severity_next)
        return self.test_v('severity', severity_next)
    
    def get_cum_drug(self, yesterday, today, proportion=0.7):
        'Mix the current drug level and the current dose in fixed proportion.'
        # depends on: cum_drug_prev, drug
        noise = self.my_rng.normal(loc=0, scale=0.01, size=1)[0]  # !!! surprisingly sensitive !!!
        r = proportion + noise  # # larger value responds more slowly to changes in dose
        return self.test_v('cum_drug',yesterday['cum_drug'] * r + today['drug'] * (1 - r))
    
    def get_outcome(self, today) -> int:
        # Note: use -1, 0 , 1 as integer outcomes for "die", "stay", "recover"
        # depends on today's severity, infection, and cum_drug
        # possible outcomes: die, recover, none
        noise = self.my_rng.normal(loc=0, scale=0.1, size=1)[0]
        mortality_threshold = 1.0 + noise # my_rng.uniform(low=0.9, high=1.1)
        if (today['severity']/self.SEVERITY_CEILING > mortality_threshold):
            return -1   #'die'
        elif today['infection'] >= 100:
            return 1  # 'recover'
        else:
            return 0 # None
    
    def get_efficacy(self, today):
        # depends on today's drug and cum_drug
        # The amount by which severity will be reduced. 
        # Maybe this shold be a proportion not a fixed amount? Severity can be negative this way.
        noise =  self.my_rng.normal(loc=0, scale=0.4, size=1)[0]
        # efficacy = sigmoid( 12 * today['drug'] * today['cum_drug'] + noise )
        efficacy = 12 * today['drug'] * sigmoid( today['cum_drug'] ) + noise
        return efficacy
    
    def reward(self, today):
        'reward shaping for the outcome.'
        if today['outcome'] == -1:  #'die':
            return self.die
        elif today['outcome'] == 1:  #'recover':
            return self.recover
        else:
            return self.one_day

### Simulation  Environment functions

    def reset(self, id_serial, sd=SEED, options=None) -> dict:
        'Initialize a patient -  episode'
        # Set state variables.
        # THe state is observable, so we use observation as the state. 
        # Of course for a constant policy observability is moot.
        self.stage = 0 
        self.today = self.yesterday = self.new_patient(patient_id=id_serial)
        #  Features for the predictor -- representing the current state. Only those features samples will be searched on. 

        info = {'stage': self.stage}   # Just a place to return additional info
                                       # that is not part of the state, e.g. for diagnostics
        return self.get_observation(), info
    
    def cycle(self, yesterday, day_number, policy):
        today = {
            'patient_id': yesterday['patient_id'],
            'cohort': yesterday['cohort'],
            'day_number': day_number,
        } 
        # Note, the order these are called matters.
        today['infection'] = self.get_infection(yesterday)
        today['severity']  = self.get_severity(yesterday)
        today['drug']      = policy(yesterday, today)
        today['cum_drug']  = self.get_cum_drug(yesterday, today)
        today['efficacy']  = self.get_efficacy(today)
        today['outcome']   = self.get_outcome(today)
        today['reward']    = self.reward(today)
        return today


    def step(self, policy):
        'Increment the state at each stage in an episode, and terminate on death or recovery.'
        # Call cycle
        self.stage += 1
        today = self.cycle(self.yesterday, self.stage, policy)
        self.today = self.yesterday = today
        info = {"stage": self.stage}
        # Return only those things the RL solver can see. 
        self.patient_results.append(today)
        return self.get_observation(), today['reward'],  (today['outcome'] != 0) , info
    
    def close(self):
        'Anything to finish up an episode'
        # Note - to get the temporal df needed for causal learning join each
        #        row with its previous row.  
        episode = pd.DataFrame(self.patient_results)
        cum_reward = episode.reward.sum()
        return episode, cum_reward

    def get_observation(self):
        'A convenience function to format the observable output '
        return {"Severity":self.today['severity']}

class BogoPolicies (BogoBetaEnv):
    
    def __init__(self, params = 0) -> None:
        super().__init__()
        # Settings that may vary at the patient or other levels,
        # not a function of state. 
        self.policy_params = params
        
    def const_policy(self, yesterday, today):  # default policy
        dose = self.policy_params
        return dose

    def standard_of_care_policy(self, yesterday, today):  # default policy
        # depends on: today's severity
        random_dose = self.my_rng.uniform( low=0.0, high=self.MAX_DOSE )
        severity_dependent_dose = random_dose * today['severity'] / self.SEVERITY_CEILING
        return math.floor(10 * severity_dependent_dose)/10 # rounded down to the nearest tenth

    def completely_random_policy(self, yesterday, today):
        # does not depend on anything
        dose = math.floor(10 * self.my_rng.uniform( low=0.0, high=self.MAX_DOSE ))/10
        return dose


    def dose_cohort_policy(self, yesterday, today):
        cohort = today['cohort']
        # print(f'c {cohort}')
        dose = self.MAX_DOSE * cohort / (self.NUM_COHORTS - 1)
        return dose
 

In [0]:
### UDF to run a batch of patients at a time

# MAX_DOSE = 0.7 
VERBOSE = 0
PATIENTS = 10

# patient_id,cohort,day_number,infection,severity,drug,cum_drug,efficacy,outcome,reward
# patient_schema = StructType([
#     StructField('patient_id', IntegerType(), True),
#     StructField('cohort', IntegerType(), True),
#     StructField('day_number', IntegerType(), True),
#     StructField('infection', FloatType(), True),
#     StructField('severity', FloatType(), True),
#     StructField('drug', FloatType(), True),
#     StructField('cum_drug', FloatType(), True),
#     StructField('efficacy', FloatType(), True),
#     StructField('outcome', IntegerType(), True),
#     StructField('reward', IntegerType(), True)
# ])
   
### a test run
def test_patient_run(env, policy):
    ''
    record_df = pd.DataFrame()
    for p in range(PATIENTS):
        # Create a patient with a random Id. 
        the_patient = p # BogoPolicies.my_rng.integers(low=0, high=100000, size=1)[0]
        observation, info = env.reset(id_serial= the_patient)
        print('\tpatient: ', the_patient)
        for _ in range(BogoPolicies.MAX_DAYS):
            observation, reward, terminated, info = env.step(policy)
            if VERBOSE == 2: 
                print(env.today)
            elif VERBOSE == 1:
                print(f'i: {info} # obs: {env.get_observation()}, R: {reward}, end? {terminated}' )
            else:
                pass
            if terminated:
                break
        episode_df, total_reward = env.close()
        print(f'reward {total_reward}')
        record_df = pd.concat([record_df, episode_df])
    return record_df

def wrapped_patient(run_index):
    'Default all args'
    # strange - the namespaces are not visible in UDF?
    # sys.path.append('/Workspace/Repos/joagosta@microsoft.com/bogovirus/RL_offline/envs_beta/')
    # from beta_policies import BogoPolicies
    w_env = BogoPolicies()
    the_run = test_patient_run(w_env, w_env.standard_of_care_policy)
    the_run['run'] = pd.Series(the_run.shape[0] * [run_index])
    # save the batch df in a table. 
    sdf = spark.createDataFrame(the_run)
    sdf.write.saveAsTable(f"bogovirus_beta.xtst_{run_index}_of_{NO_BATCHES}", mode='overwrite')
    return None # the_run

def run_stats(rdf):
    'Extract the dose-respose curve from the patient episodes'
    # For each patient run 
    print(rdf.groupby('patient_id').mean('dose'))
    
        


In [0]:

for i in range(NO_BATCHES):
    wrapped_patient(i)


In [0]:
for i in range(NO_BATCHES):
    print(spark.sql(f'select count(*) from bogovirus_beta.xtst_{i}_of_{NO_BATCHES}').show())

In [0]:

# Create a df of just NO_BATCHES serials
# Note - no side effects of the UDF appear -- e.g. print statements are not rendered in the notebook
numbers=spark.sparkContext.parallelize(range(NO_BATCHES))
numbers.foreach(wrapped_patient)

In [0]:
import matplotlib.pyplot as plt

xavg = df.groupby('cohort').mean()
xavg.sort_values('drug', inplace=True)
# xmax = df.groupby('cohort').max()
# xmin = df.groupby('cohort').min()

#plt.plot(xmax['drug'], xmax['reward'], color = 'black')
plt.plot(xavg['drug'], xavg['reward'], color = 'red')
# plt.plot(xmin['drug'], xmin['reward'], color = 'blue')