# Create simulated trajectories

We are trying to create a very naive simulated dataset for Crystal Island Prototype (CIP) that we can leverage to learn narrative planner RL policy. This is the [document that describes the overall project](https://docs.google.com/document/d/1T22rFdxEdN0qtQTIy5CnKC49UUh04-85lV2CWi7Qy1w/edit#heading=h.dot2cxvm5eu0).


[Check this doc on details about the implementation](https://docs.google.com/document/d/1iMvKCv6Zaa4iMVvvGpaSYO7PMA9eObtyAeGuw6mWh60/edit?usp=sharing)


In [1]:
import logging

import pandas as pd
import numpy as np
from copy import deepcopy

import torch

# example probabilities are only valid when there is uncertainity. for example, s[4] can only be 1 if s[1] is 1.
# so in a case where s[1] is 0, even if probability of s[4] == 1 is 100%, s[4] would still be 0.
init_feature_prob = {1: 0.3, 2: 0.10, 3: 0.05, 4: 0.05, 5: 0.05}
next_feature_prob = {1: 0.9, 2: 0.5, 3: 0.2, 4: 0.2, 5: 0.2}

def check_state(state: np.ndarray, next_state: np.ndarray):
    if state[1] == 0:
        if state[3] == 1 or state[4] == 1 or state[5] == 1:
            return False

    if next_state[1] == 0:
        if next_state[3] == 1 or next_state[4] == 1 or next_state[5] == 1:
            return False

    if any(next_state - state)<0:
        return False

    if state[0] != next_state[0]:
        return False

    return True

def gen_next_state(state: np.ndarray, next_feature_prob: dict=next_feature_prob):
    next_state = deepcopy(state)

    # remember there can be multiple "student steps" inbetween state and next_state
    # for example, a student can talk to kim and other patients, test and get positive
    # and then a AES is triggered
    if next_state[1] == 0:
        next_state[1] = int(next_feature_prob[1] >= np.random.uniform(0, 1))
    if next_state[2] == 0:
        next_state[2] = int(next_feature_prob[2] >= np.random.uniform(0, 1))
    if next_state[3] == 0 and next_state[1] == 1:
        next_state[3] = int(next_feature_prob[3] >= np.random.uniform(0, 1))
    if next_state[4] == 0 and next_state[1] == 1:
        next_state[4] = int(next_feature_prob[4] >= np.random.uniform(0, 1))
    if next_state[5] == 0 and next_state[1] == 1:
        next_state[5] = int(next_feature_prob[5] >= np.random.uniform(0, 1))

    return next_state

def gen_init_state(aes: str, pretest: int, init_feature_prob: dict=init_feature_prob):
    if aes == "DiseaseMutation":
        # for DiseaseMutation to trigger, we need initial state to be s[1] == 1 and s[3] == 0 and s[5] == 0
        s_0 = pretest
        s_1 = 1
        s_2 = int(init_feature_prob[2] >= np.random.uniform(0, 1))  # prob of talked to patient = 1
        s_3 = 0
        s_4 = int(init_feature_prob[4] >= np.random.uniform(0, 1))  # prob of worksheet submitted = 1
        s_5 = 0
    elif aes in ["ProvideHint", "IntroduceCharacter"]:
        # for ProvideHint and IntroduceCharacter, we only need s[1] == 1. rest is random
        s_0 = pretest
        s_1 = 1
        s_2 = int(init_feature_prob[2] >= np.random.uniform(0, 1))  # prob of talked to patient = 1
        s_3 = int(init_feature_prob[3] >= np.random.uniform(0, 1))  # prob of test result = 1
        s_4 = int(init_feature_prob[4] >= np.random.uniform(0, 1))  # prob of worksheet submitted = 1
        s_5 = int(init_feature_prob[5] >= np.random.uniform(0, 1))  # prob of mutation happened = 1
    elif aes == "ChooseBookContent":
        # no restriction for ChooseBookContent
        s_0 = pretest
        s_1 = int(init_feature_prob[1] >= np.random.uniform(0, 1))  # prob of talked to kim = 1
        s_2 = int(init_feature_prob[2] >= np.random.uniform(0, 1))  # prob of talked to patient = 1
        s_3, s_4, s_5 = 0, 0, 0
        if s_1 == 1:
            s_3 = int(init_feature_prob[3] >= np.random.uniform(0, 1))  # prob of test result = 1
            s_4 = int(init_feature_prob[4] >= np.random.uniform(0, 1))  # prob of worksheet submitted = 1
            s_5 = int(init_feature_prob[5] >= np.random.uniform(0, 1))  # prob of mutation happened = 1
    else:
        print("ERROR: aes {0} is unknown!".format(aes))
        return []

    state = np.array([s_0, s_1, s_2, s_3, s_4, s_5])
    return state

def gen_episode(ep_id: str, ep_len, aes, pretest: int, delayed_reward: float, action_prob: dict):

    # check configuration
    if aes in ["ProvideHint", "IntroduceCharacter", "DiseaseMutation"]:
        if len(action_prob) != 2:
            print("ERROR: aes {0} does not have 2 actions! {1}".format(aes, action_prob))
            return []
    elif aes == "ChooseBookContent":
        if len(action_prob) != 8:
            print("ERROR: aes {0} does not have 8 actions! {1}".format(aes, action_prob))
            return []
    if sum(action_prob.values()) != 1.0:
        print("ERROR: the sum of action prob is not 1! {0}".format(action_prob))
        return []

    # generate episode
    ep_log = []
    state = gen_init_state(aes=aes, pretest=pretest)
    done = False
    step = 0
    while step < ep_len and done is False:
        action = np.random.choice(list(action_prob.keys()), p=list(action_prob.values()))
        reward = 0
        next_state = gen_next_state(state=state)
        is_valid = check_state(state, next_state)

        if is_valid is False:
            print("ERROR! Not valid transition!", state, next_state)
            break

        if step+1==ep_len:
            done = True
            reward = delayed_reward

        # only for DiseaseMutation, we can end early
        if aes=="DiseaseMutation" and action==1:
            done = True
            reward = delayed_reward
            next_state[5] = 1
            is_valid = check_state(state, next_state)
            if is_valid is False:
                print("ERROR! Not valid transition!", state, next_state)
                break

        # print(step, "|", state, "|", action, "|", reward, "|", next_state, "|", done)
        ep_log.append({"aes": aes, "episode": ep_id, "step": step, "state": state, "action": action, "reward": reward, "next_state": next_state, "done": done})

        step += 1
        state = next_state

    return pd.DataFrame(ep_log)


In [2]:
# default uniform action probability
uni_action_prob = {0: 0.5, 1: 0.5}  # must sum to 1
uni_book_action_prob = {0: 0.125, 1: 0.125, 2: 0.125, 3: 0.125, 4: 0.125, 5: 0.125, 6: 0.125, 7: 0.125}  # must sum to 1
reward_good = 100
reward_bad = -100

np.random.seed(0)

def gen_n_episodes(n: int, aes: str, action_prob: dict, pretest: int, delayed_reward: float, mean_ep_len: int=10, sd_ep_len: int=3):
    max_act = max(action_prob, key=action_prob.get)
    max_prob = int(max(action_prob.values())*100)
    ep_name = aes.lower() + "_" + ("high" if pretest == 1 else "low") + "_" + ("good" if delayed_reward==reward_good else "bad") + "_" + str(max_act) + "_" + str(max_prob)

    df = pd.DataFrame()
    for i in range(n):
        ep_len = int(np.random.normal(mean_ep_len, sd_ep_len))
        ep_id = ep_name + "_" + str(i)
        ep_df = gen_episode(ep_id, ep_len, aes, pretest, delayed_reward, action_prob)
        df = pd.concat([df, ep_df], axis=0, ignore_index=True)
    return df

In [3]:

# hypothesis: high pretest will only perform bad if very low or very high hint is given
df1 = gen_n_episodes(n=1000, aes="ProvideHint", action_prob={0: 0.2, 1: 0.8}, pretest=1, delayed_reward=reward_bad)
df2 = gen_n_episodes(n=1000, aes="ProvideHint", action_prob={0: 0.8, 1: 0.2}, pretest=1, delayed_reward=reward_bad)
df3 = gen_n_episodes(n=500, aes="ProvideHint", action_prob={0: 0.5, 1: 0.5}, pretest=1, delayed_reward=reward_good)
df4 = gen_n_episodes(n=500, aes="ProvideHint", action_prob={0: 0.5, 1: 0.5}, pretest=1, delayed_reward=reward_good)

df_hint_high = pd.concat([df1, df2, df3, df4], axis=0, ignore_index=True)

# hypothesis: low pretest will only perform good if very high is given and perform bad if low is given.
df1 = gen_n_episodes(n=1000, aes="ProvideHint", action_prob={0: 0.2, 1: 0.8}, pretest=0, delayed_reward=reward_good)
df2 = gen_n_episodes(n=1000, aes="ProvideHint", action_prob={0: 0.8, 1: 0.2}, pretest=0, delayed_reward=reward_bad)
df3 = gen_n_episodes(n=500, aes="ProvideHint", action_prob={0: 0.5, 1: 0.5}, pretest=0, delayed_reward=reward_good)
df4 = gen_n_episodes(n=500, aes="ProvideHint", action_prob={0: 0.5, 1: 0.5}, pretest=0, delayed_reward=reward_bad)

df_hint_low = pd.concat([df1, df2, df3, df4], axis=0, ignore_index=True)
df_hint = pd.concat([df_hint_high, df_hint_low], axis=0, ignore_index=True)
df_hint.to_pickle("../simulated_data/sim_providehint.pkl")
df_hint

Unnamed: 0,aes,episode,step,state,action,reward,next_state,done
0,ProvideHint,providehint_high_bad_1_80_0,0,"[1, 1, 0, 0, 0, 0]",1,0,"[1, 1, 0, 0, 0, 0]",False
1,ProvideHint,providehint_high_bad_1_80_0,1,"[1, 1, 0, 0, 0, 0]",1,0,"[1, 1, 0, 0, 1, 1]",False
2,ProvideHint,providehint_high_bad_1_80_0,2,"[1, 1, 0, 0, 1, 1]",0,0,"[1, 1, 0, 0, 1, 1]",False
3,ProvideHint,providehint_high_bad_1_80_0,3,"[1, 1, 0, 0, 1, 1]",1,0,"[1, 1, 0, 0, 1, 1]",False
4,ProvideHint,providehint_high_bad_1_80_0,4,"[1, 1, 0, 0, 1, 1]",1,0,"[1, 1, 0, 1, 1, 1]",False
...,...,...,...,...,...,...,...,...
56750,ProvideHint,providehint_low_bad_0_50_499,3,"[0, 1, 1, 0, 1, 1]",0,0,"[0, 1, 1, 0, 1, 1]",False
56751,ProvideHint,providehint_low_bad_0_50_499,4,"[0, 1, 1, 0, 1, 1]",1,0,"[0, 1, 1, 0, 1, 1]",False
56752,ProvideHint,providehint_low_bad_0_50_499,5,"[0, 1, 1, 0, 1, 1]",0,0,"[0, 1, 1, 0, 1, 1]",False
56753,ProvideHint,providehint_low_bad_0_50_499,6,"[0, 1, 1, 0, 1, 1]",1,0,"[0, 1, 1, 0, 1, 1]",False
