In [1]:
#| default_exp environments

# Imports

In [2]:
#| export
import numpy as np
from abc import ABC, abstractmethod

# Abstract Environment

In [3]:
#| export
class Abstract_Env(ABC):
    """A minimal Environment, every environment should be Derived from this class.

    Examples:
    >>> pass
    """

    def __init__(self, 
                 state: object):
        """
        Args:
            state: an object that defines the state of the environment            
        """
        self.state = state

    @abstractmethod
    def transition(self, action):
        """
        Args:
            action: an action (or actions) to process
        """
        raise NotImplementedError

    @abstractmethod
    def get_observation(self):
        """
        should determine and return an observation for an agent or agents as a function of self.state
        """
        raise NotImplementedError

# Cyclic Environment

This envirnoment defines a fixed sequence of percepts that may be passed to an agent. Primarily useful for testing predictive ECMs in Hidden Markov Processes with no observation error.

In [4]:
#| export
class Cyclic_Env(Abstract_Env):
    """
    An environment that cycles deterministically through a sequence of percepts that may be passed to an agent
    """
    def __init__(self,
                 percept_cycle: np.ndarray, #an N x K array, where N is the number of states in the cycles and K is the number of categories in a percept.
                                             #if 1d, will be converted to Nx1
                 initial_state: int = 0):
        if percept_cycle.ndim == 1:
            percept_cycle = percept_cycle[:,np.newaxis]
        self.percept_cycle = percept_cycle
        state = initial_state
        super().__init__(state = state)

    def transition(self):
        """
        This environment has deterministic transitions and does not take actions as input
        """
        self.state = (self.state + 1) % np.shape(self.percept_cycle)[0]

    def get_observation(self):
        return self.percept_cycle[self.state,:]  

## Example
In this example, we set up a cyclical environment in which a light turns green, turns off, turns blue, turns off, and then repeats.

In [5]:
percept_cycle = np.array(["green", "off", "blue", "off"])
light_cycle_instance1 = Cyclic_Env(percept_cycle)
T = 8 #total time steps to simulate
observed_percepts = ["None"] * T #data structure for storing observations

#simulate for T steps and store observations
for t in range(T):
    observed_percepts[t] = light_cycle_instance1.get_observation()
    light_cycle_instance1.transition()

observed_percepts

[array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['blue'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['blue'], dtype='<U5'),
 array(['off'], dtype='<U5')]

We can also choose to initiate anywhere in the cycle by giving the desired index. In this example, we start in State 2, which returns the "blue" percept.

In [6]:
light_cycle_instance2 = Cyclic_Env(percept_cycle, initial_state = 2)
T = 8 #total time steps to simulate
observed_percepts = ["None"] * T #data structure for storing observations

#simulate for T steps and store observations
for t in range(T):
    observed_percepts[t] = light_cycle_instance2.get_observation()
    light_cycle_instance2.transition()
    
observed_percepts

[array(['blue'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['blue'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5')]

# Noisy Cycle

In [7]:
#| export
class Noisy_Cycle(Abstract_Env):
    def __init__(self,
                 percepts: np.ndarray,      #an SxK array where S is the number of Percepts and K is the number of categories for each percept
                                            #if 1d, converted to Sx1
                 percept_cycle: np.ndarray, #An NxS array, where N is the number of states in the cycle and S is the number of possible percepts
                 initial_state: int = 0
                ):
        assert isinstance(percepts, np.ndarray)
        if percepts.ndim == 1:
            percepts = percepts[:,np.newaxis]
        assert np.shape(percepts)[0] == np.shape(percept_cycle)[1]
        for i in range(np.shape(percept_cycle)[0]):
            assert np.isclose(np.sum(percept_cycle,axis =1), 1, atol=1e-9).all() #all rows sum to 1
        
        self.percepts = percepts
        self.percept_cycle = percept_cycle
        state = initial_state
        super().__init__(state = state)

    def transition(self):
        """
        This environment has deterministic transitions and does not take actions as input
        """
        self.state = (self.state + 1) % np.shape(self.percept_cycle)[0]

    def get_observation(self):
        percept_probs = self.percept_cycle[self.state,:]
        percept_index = np.random.choice(np.shape(self.percepts)[0], p = percept_probs)
        return self.percepts[percept_index,:]

In [8]:
percepts = np.array(["green","off","blue"])
percept_cycle = np.array([[0.9,0.,0.1],
                 [0.,1.,0.],
                 [0.1,0.,0.9],
                 [0.,1.,0.]])
noisy_light_cycle = Noisy_Cycle(percepts, percept_cycle)
T = 20 #total time steps to simulate
observed_percepts = ["None"] * T #data structure for storing observations

#simulate for T steps and store observations
for t in range(T):
    observed_percepts[t] = noisy_light_cycle.get_observation()
    noisy_light_cycle.transition()

observed_percepts

[array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['blue'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['blue'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['blue'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['green'], dtype='<U5'),
 array(['off'], dtype='<U5'),
 array(['blue'], dtype='<U5'),
 array(['off'], dtype='<U5')]

# RLGL

A description

In [9]:
#| export
class RLGL(Abstract_Env):
    def __init__(self, state = 0, transition_matrix = None):
        self.state = state
        self.state_labels = {0: "red", 1: "green"}
        if transition_matrix is None:
            #create random uniform transition probabilities
            transition_matrix = np.array([[0.5,0.5],[0.5,0.5]])
        assert np.shape(transition_matrix) == (2,2)
        self.transition_matrix = transition_matrix            

    def transition(self, action):
        '''
        In this environment the agents action determines the reward but does not determine the state
        '''
        self.state = np.random.choice(range(2), p = self.transition_matrix[self.state,])

    def get_observation(self):
        return self.state_labels[self.state]

    def get_reward(self, action):
        if action == self.state:
            return 1
        else:
            return 0

A minimal example

# Causal Dynamic Bayesian Network
This environment implements a specific kind of Dynamic Bayesian Network in which variables are not allowed to have parents in the same time step. This the state of all variables at a given time are conditionally independent given the state of the system in the previous time step.

In [10]:
#|export
import inspect
class Causal_DBN(Abstract_Env):
    def __init__(self,
                 state: np.ndarray, #a one dimensional array. Each element gives the state of a variable.
                 update_functions: dict, #a dictionary of the functions used to update each variable
                 variable_names: np.ndarray = None, #an optional list of variables names. Default is integers. Must match keys of update functions
                 action_variables: np.ndarray = None, #indicates which system variables are under the control of an agent. Used to ensure inputs to transition function are correct
                 causal_network: np.ndarray = None #a square boolean array that indicates whether a variable at t is a parent of another variable at t+1. Optional: merely used to check transition function inputs
                ):
        if variable_names is None:
            variable_names = np.array(range(self.num_variables))

        #check variables
        if not state.ndim == 1:
            raise ValueError("'state' must be a numpy array with a single dimension")
        self.num_variables = np.shape(state)[0]

        if causal_network is not None:
            assert causal_network.dtype == np.bool_
            if not np.shape(causal_network) == (self.num_variables, self.num_variables):
                raise ValueError("causal network must be a square matrix with each dimension equal to the number of variables given by the state input")

        if action_variables is None:
            action_variables = np.full(self.num_variables, fill_value = False)
        for action_variable in variable_names[action_variables]:
            update_functions[action_variable] = self.action_function        
        for key, update_f in update_functions.items():
            if not key in variable_names:
                raise ValueError("Keys of update_function dictionary must correspond to variable names. Default variable names are integer indices")
            assert callable(update_f)            
            ## Check that update function inputs match causal_network
            if causal_network is not None:
                function_parents = list(inspect.signature(update_f).parameters)
                i = np.where(variable_names == key)[0][0] #get parent index (indexes get first match in first dimension)
                if not set(function_parents) == set(variable_names[causal_network[:,i]]): #compare input variables names to children in DBN
                    raise ValueError(f'The update function for {variable_names[i]} does not have input variables that match parents in causal_network')

        if not len(update_functions) == self.num_variables:
            raise ValueError("there must be an update function for each variable in 'state'")


        self.state = state
        self.update_functions = update_functions
        self.variable_names = variable_names
        self.action_variables = action_variables

    def transition(self, action: dict = None):
        if action is None:
            action = {}

        #update action states
        for key, value in action.items():
            if not key in self.variable_names:
                raise ValueError("keys of action dictionary must be environment variable names")
            self.state[self.variable_names == key] = value

        #apply transition functions
        new_states = np.zeros(self.num_variables, dtype = self.state.dtype)
        for variable, update_f in self.update_functions.items():
            required_args = set(inspect.signature(update_f).parameters.keys())
            input_dict = {k: v for k, v in zip(self.variable_names, self.state) if k in required_args}
            new_states[self.variable_names == variable] = update_f(**input_dict)
        self.state = new_states

    def get_observation(self):
        return self.state
    
    def action_function(self, x): #as actions are given as input, the stored update functions for these variables should not do anything
        return x

### Example

In the following example, we create a Dynamic Bayesian Network with independant variables that take a state based on a bernoulli trial, and a third variable that is true if the first variables matched in the previous step and false if they did not. Note that because the variable states are mixed in type, numpy outmatically converts the state array to the highest order subtype. The Causal_DBN class is primarily meant to be used as a base class from which subclasses that implement specific DBNs can be built. These subclasses will typically have methods to map each element of the state variable to proper types.

In [11]:
state = np.array((1.,2.,True))
causal_network = np.array([[False,False,True],
                           [False,False,True],
                           [False,False,False]]
                         )
def Bernoulli():
    return np.random.binomial(1,0.5)
def Pair_Match(a,b):
    return a == b
update_functions = {"a": Bernoulli, "b": Bernoulli, "test": Pair_Match}
variable_names = np.array(["a", "b", "test"])
test_DBN = Causal_DBN(state, update_functions, variable_names, causal_network = causal_network)
test_DBN.transition()
test_DBN.state

array([0., 1., 0.])

### Hidden Markov Model

In [21]:
#| export
class HMM(Abstract_Env):
    def __init__(self,
                 percepts: np.ndarray,      #an SxK array where S is the number of Percepts and K is the number of categories for each percept
                                            #if 1d, converted to Sx1
                 observation_function: np.ndarray, #An NxS array, where N is the number of states in the cycle and S is the number of possible percepts. Rows contain probability distributions
                 transiton_function: np.ndarray, #An NxN array, rows contain probability distributions
                 initial_state: int = 0
                ):
        assert isinstance(percepts, np.ndarray)
        if percepts.ndim == 1:
            percepts = percepts[:,np.newaxis]
        assert np.shape(percepts)[0] == np.shape(observation_function)[1]
        assert np.shape(transition_function)[0] == np.shape(observation_function)[0]
        assert np.shape(transition_function)[0] == np.shape(transition_function)[1]
        N = np.shape(transition_function)[0]
        for i in range(N):
            assert np.isclose(np.sum(transition_function,axis =1), 1, atol=1e-9).all() #all rows sum to 1
            assert np.isclose(np.sum(observation_function,axis =1), 1, atol=1e-9).all() #all rows sum to 1
        
        self.percepts = percepts
        self.observation_function = observation_function
        self.transition_function = transition_function
        state = initial_state
        super().__init__(state = state)

    def transition(self):
        """
        randomly select a new state using transition probabilites from current state
        """
        transition_probs = self.transition_function[self.state,:]
        self.state = np.random.choice(len(transition_probs), p = transition_probs)

    def get_observation(self):
        """
        randomly select a percept using observation probabilites from current state
        """
        percept_probs = self.observation_function[self.state,:]
        percept_index = np.random.choice(len(percept_probs), p = percept_probs)
        return self.percepts[percept_index,:]

In [30]:
percepts = np.array(["a", "b", "c"])
observation_function = np.array([(1.,0.,0.),
                                  (0.,1.,0.),
                                  (1.,0.,0.),
                                  (1.,0.,0.),
                                  (0.,0.,1.),
                                  (1.,0.,0.)
                                ])
transition_function = np.array([(0.,1.,0.,0.,0.,0.),
                                 (0.,0.,1.,0.,0.,0.),
                                 (0.9,0.,0.,0.1,0.,0.),
                                 (0.,0.,0.,0.,1.,0.),
                                 (0.,0.,0.,0.,0.,1.),
                                 (0.1,0.,0.,0.9,0.,0.)
                               ])
test_HMM = HMM(percepts, observation_function, transition_function, initial_state = 0)
for t in range(30):
    print(test_HMM.get_observation())
    test_HMM.transition()
                                  

['a']
['b']
['a']
['a']
['b']
['a']
['a']
['b']
['a']
['a']
['b']
['a']
['a']
['b']
['a']
['a']
['b']
['a']
['a']
['b']
['a']
['a']
['c']
['a']
['a']
['c']
['a']
['a']
['c']
['a']


### Light and Lever

In [12]:
#| export
class Light_And_Lever(Causal_DBN):
    def __init__(self, interval, state = None):
        self.state_space = {"light": np.array(("off", "green", "blue")),
                            "lever": np.array(("unpressed", "pressed")),
                            "reward_stimulus": np.array(("none", "food", "shock")),
                            "timer": np.array((range(2+interval*2))) #number of states in light cycle, on for green, on for blue, and one for each interval step between the two
                           }
        variable_names = np.array(["light", "lever", "reward_stimulus", "timer"])
        if state is None:
            state = np.array((1,0,0,0)) #default start state green light, unpressed lever, no reward, timer at 0
        update_functions = {"light": self.update_light, "lever": self.bernoulli, "reward_stimulus": self.update_reward, "timer": self.update_timer}
        super().__init__(state, update_functions, variable_names)
        assert np.issubdtype(self.state.dtype, np.integer)
        
    def update_light(self, timer):
        if timer == len(self.state_space["timer"]) -1: #if timer is in last state . . .
            return np.where(self.state_space["light"] == "green")[0][0] #. . . light will turn green. 0 indices get first match in first dimension
        elif timer == len(self.state_space["timer"])/2 - 1: #if timer is one step from half-way . . .
            return np.where(self.state_space["light"] == "blue")[0][0] #. . . light will turn blue
        else:
            return np.where(self.state_space["light"] == "off")[0][0]

    def update_reward(self, light, lever):
        if self.state_space["light"][light] == "green" and self.state_space["lever"][lever] == "pressed":
            return np.where(self.state_space["reward_stimulus"] == "food")[0][0]
        elif self.state_space["light"][light] == "blue" and self.state_space["lever"][lever] == "pressed":
            return np.where(self.state_space["reward_stimulus"] == "shock")[0][0]
        else:
            return np.where(self.state_space["reward_stimulus"] == "none")[0][0]

    def update_timer(self, timer):
        reset_at = len(self.state_space["timer"])
        return (timer + 1) % reset_at

    @staticmethod
    def bernoulli():
        return int(np.random.binomial(1, 0.5)) #1 trial, 50 percept probability 1.

    def get_observation(self):
        return self.state[np.array((0,1,2))] #return tuple of light, lever, and reward

### Example

In [13]:
test_env = Light_And_Lever(interval = 2)
for t in range(10):
    print(test_env.get_observation())
    test_env.transition()    

[1 0 0]
[0 0 0]
[0 0 0]
[2 1 0]
[0 0 2]
[0 1 0]
[1 0 0]
[0 1 0]
[0 0 0]
[2 0 0]


In [14]:
test_env.update_reward(2,1)

2