In [1]:
import numpy as np
import matplotlib.pyplot as plt
import copy
import random
import collections
import os
import csv

__demo=False

In [2]:
class MDP_Environment:
    '''
    A Markov Decision Process (MDP) Environment consists of:
    
    - states: [attribute] a list of state names.
    - actions: [attribute] a list of action names.
    - reward_table r(s,a): A 2D matrix that stores the reward obtained when action 'a' is performed in state 's'.
    - transition_table p(s'|s,a): [attribute] a 3D matrix which stores the transition probability of going from state 's' to state 's'' given action 'a'.

    They provided a complete description of the MDP environment dynamics.
    '''

    def __init__(self, states, actions, reward_table, transition_table, session_length):
        '''
        Initializes the MDP Environment with states, actions, a reward table, and a transition table.
        
        Args:
        - states: List of state names.
        - actions: List of action names.
        - reward_table: 2D array for rewards where reward_table[a,s] is the reward for action a in state s.
        - transition_table: 3D array for transitions where transition_table[a,s1,s2] is the probability of going from s1 to s2 with action a.
        - session_length: The total number of time steps in which the agent can interact with the environment.
        '''
        self.states = np.array(states) 
        self.actions = np.array(actions)
        self.reward_table = np.array(reward_table)
        self.transition_table = np.array(transition_table)
        self.session_length = session_length

        self.parameter={
            "session_length": session_length
        }
    
    def build_inner_model(self):
        '''
        Agent's belief about the environment.
        
        Returns:
        - A deep copy of the environment instance.
        '''
        model = copy.deepcopy(self)
        return model
    


def demo():
    states = ['s1', 's2', 's3']
    actions = ['a1', 'a2']
    reward_table = [[10, 5, 2],  # rewards for action a1 in state s2: 5
                    [8, 6, 1]]
    transition_table = [
        [[0.8, 0.2, 0.0],  # probability of transitioning from s1 to s2 when conducting action a1: 0.2
        [0.1, 0.9, 0.0],  
        [0.0, 0.1, 0.9]],

        [[0.7, 0.3, 0.0],  
        [0.2, 0.7, 0.1],  
        [0.0, 0.3, 0.7]] ]
    session_length = 100
    env = MDP_Environment(states, actions, reward_table, transition_table, session_length)
    print(f"reward table: \n{env.reward_table}")
    print(type(env.reward_table))
    print(f"transition probability table: \n{env.transition_table}")   
    print(f"Reward for taking action 'a1' in 's1' : {env.reward_table[0, 1]}")#should be 5
    print(f"Probability of transitioning from 's1' to 's2' through action 'a1': {env.transition_table[0,0,1]}")#should be 0.2

if __demo:
    demo()


In [3]:
class Mindset_Environment(MDP_Environment):
    '''
    A specialized MDP environment that characterizes the environment malleability 
        environment malleability: the probability of transitioning from one state to a better state through appropriate actions.

    Parameters:
    - state_num: Number of states in the environment.
    - reward_baseline: The baseline reward for state-independent actions.
    - reward_increment: The incremental reward for state-dependent actions.
    - malleability: The probability of transitioning to a higher state for the state-dependent action.
    - session_length: The total number of time steps in which the agent can interact with the environment.
    '''
    def __init__(self, state_num=3, reward_baseline=2, reward_increment=1, malleability=1.0, session_length=100):
        states = [f"s{i+1}" for i in range(state_num)]
        actions = ["A_dependent", "A_independent"]

        middle_state_index = state_num // 2
        # Reward table
        reward_table = [
            [reward_baseline + (i - middle_state_index) * reward_increment for i in range(state_num)], # Reward for state-related action: a positive linear function of the state index, with 'reward_baseline' at the middle state index, and 'reward_increment' as the reward increment.
            [reward_baseline] * state_num  # Reward for state-independent action: a constant 'reward_baseline'
        ]

        # Transition table: 
        transition_table = [
            self.generate_dependent_transition(state_num, malleability),# Transition for state-dependent action: The probability of transition from a lower state to a higher state is 'malleability', while the probability of maintaining the current state is '(1-malleability)'.
            np.identity(state_num)  #Transition for state-independent action: always maintaining the current state (probability is 1)
        ]

        super().__init__(states, actions, reward_table, transition_table, session_length)

        #record environment parameter
        self.parameter={
            "state_num": state_num,
            "reward_baseline": reward_baseline,
            "reward_increment": reward_increment,
            "malleability": malleability,
            "session_length": session_length
        }
    

    def generate_dependent_transition(self, state_num, malleability):
        '''
        Generates a transition matrix for the state-dependent action with a given 'malleability'.
        
        Args:
        - state_num: Number of states.
        - malleability: Probability of transitioning to the next higher state.

        Returns:
        - transition_matrix: A matrix showing probabilities of transitions between states.
        '''
        transition_matrix = np.zeros((state_num, state_num))
        
        # Last state stays the same
        transition_matrix[state_num - 1, state_num - 1] = 1.0
        
        # For other states: transition to next state with 'malleability', stay with '1 - malleability'
        transition_matrix[:state_num - 1, 1:state_num ] += np.identity(state_num - 1) * malleability
        transition_matrix[:state_num - 1, :state_num - 1] += np.identity(state_num - 1) * (1 - malleability)

        return transition_matrix
    
    def build_inner_model(self, malleability):
        '''
        This is the internal representation of agent for environmental dynamics. 
        Return a deep copy of the environment and update the transition matrix of dependent actions based on "malleability".

        Args:
        - malleability: Probability for transitioning to a higher state. Agent's belief about the environment malleability.

        Returns:
        - A new environment instance with updated transition probabilities.
        '''
        model = super().build_inner_model()
        model.transition_table[0,:,:] = self.generate_dependent_transition(len(model.states), malleability)
        return model
    


def demo():
    env = Mindset_Environment(state_num=3, reward_baseline=2, reward_increment=1, malleability=1, session_length=100)
    print("Reward Table:\n", env.reward_table)
    print("Transition Table for dependent action:\n", env.transition_table[np.where(env.actions=="A_dependent")])
    print("Transition Table for independent action:\n", env.transition_table[np.where(env.actions=="A_independent")])
    model=env.build_inner_model(malleability=0.2)
    print("Internal model:\nstates:\n",model.states,"\nactions:\n",model.actions,"\nreward_table:\n",model.reward_table,"\ntransition_table:\n",model.transition_table)
    print(f"Parameters of environment:\n{env.parameter}")

if __demo:
    demo()

In [4]:
class DP_Agent:
    '''
    A Dynamic Programming (DP) agent that interacts with the environment, figuring out an optimal policy based on a finite time horizon and discount rate.
        policy: In a specific state, what action should be taken
        optimal policy: policy that maximizes cumulative rewards given a planning length.

    Attributes:
    - env: The environment the agent is interacting with.
    - time_horizon: The number of time steps the agent plans over.
    - discount_rate: The subjective discount factor for future rewards.
    - model: The agent's internal representation of the environment.

    - expected_values_list: A 3D matrix storing the expected values for each state-action pair over time.
    - policy: The optimal policy for each state and time step.
    '''
    def __init__(self, env, time_horizon=100, discount_rate=0.9):
        '''
        Initializes the DP Agent with the environment, time horizon, and discount rate.

        Args:
        - env: The environment the agent interacts with.
        - time_horizon: Total number of steps the agent plans for. Length of the interaction session.
        - discount_rate: Discount factor for future rewards.
        '''
        self.env = env
        self.time_horizon = time_horizon
        self.discount_rate = discount_rate
        self.model = env

        self.expected_values_list = None
        self.policy = None

        self.parameter={
            "time_horizon": time_horizon,
            "discount_rate": discount_rate
        }
    
    def dynamic_programming(self, convergence_threshold=1e-5):
        '''
        Calculates the expected values using dynamic programming:
        - The maximum depth of calculation is the time horizon of the agent, or the session length of the environment (if session length < time horizon).
        - Expected values are stored in 'expected_values_list' attribution.

        Args:
        - convergence_threshold: The threshold for determining when the policy converges.
        '''
        states_num = len(self.model.states)
        actions_num = len(self.model.actions)
        transition_table = self.model.transition_table
        reward_table = self.model.reward_table

        # Initialize expected values and state values
        expected_values_within_horizon = []
        expected_values = np.zeros([actions_num, states_num])
        state_values = np.max(expected_values, axis=0)

        for planning_depth in range(self.time_horizon):
            expected_values_old = expected_values.copy()# Copy one for us to check for convergence later.
            
            # Update expected values for each action-state pair
            for a_i in range(actions_num):
                for s_i in range(states_num):
                    expected_values[a_i, s_i] = reward_table[a_i][s_i] + \
                        np.sum(transition_table[a_i, s_i, :] * state_values) * self.discount_rate
            
            # Store expected values for this planning depth
            expected_values_within_horizon.append(expected_values.copy())
            state_values = np.max(expected_values, axis=0)

            # Check for convergence
            if np.all(np.abs(expected_values - expected_values_old) < convergence_threshold):
                #print(f"Converged at planning depth {planning_depth + 1}")
                break
        else:
            pass#print(f"Not converged, ended at planning depth {planning_depth + 1}")

        # Generate the 'expected_values_list' attribute by processing time_horizon and session_length. The resulting 'expected_values_list' should have length of session_length
        if self.env.session_length > self.time_horizon:
            # 1. Repeat the last time step enough times to cover the remaining length 2. Reverse the expected_values_within_horizon 3. Concatenate
            self.expected_values_list = \
                [expected_values_within_horizon[-1]  for i in range(self.env.session_length - self.time_horizon)] +\
                expected_values_within_horizon[::-1]

        else:
            # 1. Slice the expected_values_list up to session_length 2. Reverse the result
            self.expected_values_list = expected_values_within_horizon[:self.env.session_length][::-1]
        self.expected_values_list=np.array(self.expected_values_list)
        

    def policy_readout(self):
        '''
        Reads out the optimal policy from the expected values ('expected_values_list').
        Note: if there are multiple actions that achieve the highest expected value, the strategy will include tuples of all these tie actions.
        '''
        self.dynamic_programming()# The policy readout must be made after dynamic programming.

        self.policy = []
        for expected_values in self.expected_values_list:
            state_values = np.max(expected_values, axis=0)
            policy = [tuple(np.where(np.isclose(expected_values[:, s_i], state_value))[0]) 
                      for s_i, state_value in enumerate(state_values)]
            self.policy.append(policy)

def demo():
    env = Mindset_Environment(malleability=0.9,session_length=10)
    agent = DP_Agent(env,time_horizon=env.session_length,discount_rate=0.9)

    agent.dynamic_programming()
    agent.policy_readout()

    print("Optimal policy:\n", agent.policy)
    print(f"\nExpected values list:\n {agent.expected_values_list}")

if __demo:
    demo()

In [5]:
class Mindset_Agent(DP_Agent):
    '''
    A specialized DP Agent for mindset setup, with a belief about malleability.

    Attributes:
    - malleability_belief: The agent's belief about the malleability of the environment.
    '''
    def __init__(self, mindset_environment, malleability_belief=0.5, time_horizon=10, discount_rate=0.9):
        '''
        Initializes the mindset agent with the environment and its belief about malleability.

        Args:
        - env: The environment the agent interacts with. Note that this environment must be a Mindset_Environment instance
        - malleability_belief: The agent's belief about environment probability. 
        - time_horizon: The number of steps the agent plans for.
        - discount_rate: The discount factor for future rewards.
        '''
        super().__init__(mindset_environment, time_horizon, discount_rate)
        # Initialize the malleability belief directly here
        self._malleability_belief = malleability_belief

        # Now use the setter to initialize the model
        self.malleability_belief = malleability_belief
        
        self.model = mindset_environment.build_inner_model(malleability=malleability_belief)
        
        self.parameter={
            "malleability_belief": malleability_belief,
            "time_horizon": time_horizon,
            "discount_rate": discount_rate
        }
    
    @property
    def malleability_belief(self):
        return self._malleability_belief

    @malleability_belief.setter
    def malleability_belief(self, new_belief):
        '''
        Setter for malleability_belief. Automatically updates the agent's 'model' and 'parameter' attributes
        when 'malleability_belief' attributes changes.
        '''
        if new_belief != self._malleability_belief:
            self._malleability_belief = new_belief
            # Regenerate the model with the new malleability belief
            self.model = self.env.build_inner_model(malleability=new_belief)
            # Refresh the parameters
            self.parameter["malleability_belief"] = new_belief
            print(f"Malleability belief updated to {new_belief} and model refreshed.")

def demo():
    env = Mindset_Environment(malleability=0.7,session_length=10) # Actually a malleable environment
    print(f"Environment transition:\n{env.transition_table}")
    
    print("Fixed mindset:")
    agent = Mindset_Agent(env, malleability_belief=0.1, time_horizon=env.session_length)#but the agent holds a fixed mindset

    print(f"FM agent belief transition:\n{agent.model.transition_table}")
    
    agent.dynamic_programming()
    agent.policy_readout()
    
    print("Expected values:\n", agent.expected_values_list)
    print("Optimal policy:\n", agent.policy)

    
    print("Growth mindset:")
    agent.malleability_belief=0.9# In contrast if the agent holds a growth mindset; change automatically
    
    print(f"GM agent belief transition:\n{agent.model.transition_table}")
    
    agent.dynamic_programming()
    agent.policy_readout()
    
    print("Expected values:\n", agent.expected_values_list)
    print("Optimal policy:\n", agent.policy)

    
if __demo:
    demo()

In [6]:
class Simulation:
    '''
    foldername: 
        <folder>
        []: instance
    
    simulation_info:
        <record>
        []: simulation_id
        (): category-field- value
            simulation
            agent
            env

    simulation_results:
        <state, action, reward on time>
        []: time_horizon, session_length, initial_state, session_index(noise), {agent algorithm, environment dynamics}
        (): time

    expected_values:
        <expected_value>
        []: time_horizon, session_index(noise), {agent algorithm, environment dynamics}
        (): session_index, time, action, state

'''


    '''
    A simulation engine that simulate interaction between an environment and an agent, running a simulation
    based on the agent's policy and the environment's dynamics.
    It also store simulation results.
    Attributes:
    - env: A MDP environment (MDP_Environment) of the simulation.
    - agent: The agent that will interact with the environment.
    '''
    def __init__(self, env, agent, folder_name):
        self.env = env
        self.agent = agent
        #When one instance is created, create a corresponding folder to store the result data.
        self.folder_name=folder_name
        self.create_results_folder()
    
    def run_one_session(self, initial_state = None):
        '''
        Runs one simulation session, starting from an initial state and simulating state transitions,
        rewards, and actions based on the agent's policy and environment dynamics.

        Args:
        - initial_state: The starting state of the simulation.
        '''
        if initial_state is not None:
            self.initial_state=initial_state
        elif not hasattr(self, "initial_state"):
            self.initial_state=input("Initial state {s1,s2,s3}:")

        self.agent.policy_readout()

        session_length = self.env.session_length
        s_i_path = np.full(session_length, None)
        a_i_path = np.full(session_length, None)
        r_path = np.full(session_length, None)

        # Initial state index
        s_i_path[0] = int(np.where(self.env.states == self.initial_state)[0][0])

        for t in range(session_length):
            # Choose action according to policy
            a_i_path[t] = random.choice(self.agent.policy[t][s_i_path[t]])

            # Yield reward for the state-action pair
            r_path[t] = self.env.reward_table[a_i_path[t], s_i_path[t]]

            # Transition to next state if not terminal
            if t != session_length - 1:
                transition_probabilities = self.env.transition_table[a_i_path[t], s_i_path[t], :]
                state_indices = list(range(len(self.env.states)))
                s_i_path[t + 1] = random.choices(state_indices, weights=transition_probabilities)[0]
        
        self.reward_path = r_path
        # Re-encode actions and states from indexi
        self.state_path = [self.env.states[int(s_i)] for s_i in s_i_path]
        self.action_path = [self.env.actions[int(a_i)] for a_i in a_i_path]

    def run_simulation(self, simulation_id, initial_state, simulation_num):
        '''
        Run multiple sessions, and store simulation trajectories, DP agent's expected values, simulation information
        
        Args:
        - initial_state: The starting state of the simulation.
        - simulation_num
        '''
        self.simulation_id = simulation_id

        self.initial_state = initial_state
        self.simulation_num = simulation_num
        
        for session_index in range(simulation_num):
            self.session_index = session_index
            self.run_one_session()
            # Store simulation trajectories
            self.save_simulation_trajectory()

        # Store simulation info
        self.save_simulation_info()
        # Store expected values of dynamic programming
        self.save_expected_values()

    def save_simulation_trajectory(self, filename_prefix="simulation_results"):
        '''
        Saves the simulation trajectory (time, action, state, reward) to a CSV file for further visualization and analysis in R.

        Args:
        - session_index: The simulation index for identification in the CSV file.
        '''
        # Get the current working directory to save the files
        current_dir = os.getcwd()

        # Save state path to CSV
        state_path_file = os.path.join(current_dir, self.folder_name, f"{filename_prefix}.csv")
        # Check if the file exists
        file_exists = os.path.exists(state_path_file)


        # Open CSV file to append data
        with open(state_path_file, mode='a', newline='') as file:
            writer = csv.writer(file)
            # Write the header only if the file does not exist
            if not file_exists:
                writer.writerow(['simulation_id', 'session_index', 'time', 'action', 'state', 'reward'])
            # Write each time step
            for t in range(self.env.session_length):
                writer.writerow([self.simulation_id, self.session_index, t+1, self.action_path[t], self.state_path[t], self.reward_path[t]])
        
    def save_expected_values(self, filename_prefix="expected_values"):
        '''
        Saves the expected values for each action and state to a CSV file.
        '''
        # Get the current working directory to save the files
        current_dir = os.getcwd()

        # Save state path to CSV
        state_path_file = os.path.join(current_dir, self.folder_name, f"{filename_prefix}.csv")
        # Check if the file exists
        file_exists = os.path.exists(state_path_file)

        expected_values_list = self.agent.expected_values_list

        # Open CSV file to write expected values
        with open(state_path_file, mode='a', newline='') as file:
            writer = csv.writer(file)
            # Write the header only if the file does not exist
            if not file_exists:
                writer.writerow(['simulation_id', 'time', 'action', 'state', 'expected_value'])
            
            # Iterate over the time steps, actions, and states
            for time_step in range(self.agent.time_horizon):
                for action_index, action in enumerate(self.env.actions):
                    for state_index, state in enumerate(self.env.states):
                        
                        expected_value = expected_values_list[time_step][action_index, state_index]
                        writer.writerow([self.simulation_id, time_step+1, action, state, expected_value])

    def save_simulation_info(self, filename_prefix="simulation_info"):
        '''
        Saves the information about agent, environment, simulation configuration to a CSV file:
        - agent:  malleability_belief, time_horizon, discount_rate
        - environment: state_num, reward_baseline, reward_increment, malleability, session_length
        - simulation: simulation_num, initial_state
        '''
        # Get the current working directory to save the files
        current_dir = os.getcwd()

        # Save state path to CSV
        state_path_file = os.path.join(current_dir, self.folder_name, f"{filename_prefix}.csv")
        # Check if the file exists
        file_exists = os.path.exists(state_path_file)

        # Open CSV file to write simulation information
        with open(state_path_file, mode='a', newline='') as file:
            writer = csv.writer(file)

            # Write the header only if the file does not exist
            if not file_exists:
                writer.writerow(['simulation_id','category', 'field', 'value'])

            simulation_info=[
                ["simulation","initial_state",self.initial_state],
                ["simulation","simulation_num",self.simulation_num]
            ]
            for line in simulation_info:
                writer.writerow([self.simulation_id]+line)
            for field,value in self.agent.parameter.items():
                writer.writerow([self.simulation_id]+["agent",field,value])
            for field,value in self.env.parameter.items():
                writer.writerow([self.simulation_id]+["environment",field,value])

    def create_results_folder(self):
        '''
        Create a folder to store the simulation result data if it doesn't already exist.
        '''
        # Get the current working directory
        current_dir = os.getcwd()

        # Full path to the folder
        folder_path = os.path.join(current_dir, self.folder_name)

        # Create the folder if it doesn't exist
        os.makedirs(folder_path, exist_ok=True)

        print(f"Folder '{self.folder_name}' created (or already exists) at: {folder_path}")

def demo():
    env = Mindset_Environment(malleability=0.7,session_length=10)
    agent = Mindset_Agent(env, malleability_belief=0.1)
    
    simulation = Simulation(env, agent,"Sep_16_pre")

    simulation.run_one_session()
    print("state path:\n",simulation.state_path)
    print("action path:\n",simulation.action_path)
    print("reward path:\n",simulation.reward_path)

    simulation.run_simulation(simulation_id="aaaa", initial_state="s1",simulation_num=10)
    simulation.agent=Mindset_Agent(env, malleability_belief=0.9)
    simulation.run_simulation(simulation_id="a66", initial_state="s2",simulation_num=3)
    simulation.run_simulation(simulation_id="av5", initial_state="s3",simulation_num=10)

if __demo:
    demo()


In [None]:
# In a malleable environment
env = Mindset_Environment(malleability=0.5,session_length=20) 
agent = Mindset_Agent(env, malleability_belief=0.5,  time_horizon = env.session_length, discount_rate = 0.9)

simulation = Simulation(env, agent,"Mindset_InitialState")
for mindset in [i/10 for i in range(0,11)]:
    for s in env.states:
        for horizon in [5,10,20]:
            agent.malleability_belief=mindset
            agent.time_horizon=horizon
            simulation.run_simulation(simulation_id = "_".join([str(mindset),s,str(horizon)]), initial_state = s,simulation_num = 50)
        #Variable
        # - mindset {0,.1,.2,....,.9,1.0}
        # - horizon:{5,10,20}
        # - state {1,2,3}
        #Static: 
        # - env: session_length:20; malleability=0.7
        # - agent: time_horizon=session_length=50, discount_rate=0.9
        # - simulation: 50
        #Output
        # - results (path): action, state, reward
        # - expected_values
        # - simulation_info



simulation = Simulation(env, agent,"Mindset_InitialState_EV")
for mindset in [i/10 for i in range(0,11)]:
    for horizon in [5,10,20]:
        agent.malleability_belief=mindset
        agent.time_horizon=horizon
        simulation.run_simulation(simulation_id = "_".join([str(mindset),str(horizon)]), initial_state = "s1",simulation_num = 50)
        #Variable
        # - mindset {0,.1,.2,....,.9,1.0}
        # - horizon:{5,10,20}
        # - state {1,2,3}
        #Static: 
        # - env: session_length:20; malleability=0.7
        # - agent: time_horizon=session_length=50, discount_rate=0.9
        # - simulation: 50
        #Output
        # - results (path): action, state, reward
        # - expected_values
        # - simulation_info



#argument: if the horizon is relatively short, mindset effect becomes strong; if it is too short, then mindset shall show same behavior. makes it difficult
