In [2]:
import numpy as np
import random
from imp_env import ImpEnv

## Description of the IMP environment

In this simple tutorial, we are going to create an IMP environment for an engineering system that deteriorates over time.

The damage condition is quantified through four discrete bins: [no_damage, minor_damage, major_damage, failure]. The exact damage is not known and the condition is described by $p$, which is equal to the probability of being in any of the four damage bins . 

At the initial step, all components damage probabilities are set up as:

$p_0=[0.9, 0.08, 0.02, 0]$

The system consists of $n$ components, each of them deteriorating according to the following transition model:

$$
T_0 = \begin{bmatrix}
0.9 & 0.05 & 0.03 & 0.02\\
0 & 0.9 & 0.06 & 0.04\\
0 & 0 & 0.94 & 0.06\\
0 & 0 & 0 & 1\\
\end{bmatrix}
$$

The failure probability of a component is equal to the probability of being in the last damage bin.

$p_{f_{comp}} = p(last_{bin})$

The failure event is defined as a series systems and the system failure probability can be formulated as:

$p_{f_{sys}} = 1 - \prod_{n_{comp}}[(1-p_{f_{comp}})]$

If an inspection is conducted, an indication from two possible outcomes (damage, no_damage) is collected. The inspection model is defined conditional on the damage condition:

$p(i_{\text{no_damage}}|\text{damage_size})=[0.1, 0.3, 0.6, 0.9]$

$p(i_{damage}|\text{damage_size})=[0.9, 0.7, 0.4, 0.1]$

An agents can either:
- Do-nothing (action=0): the component damage transitions according to the deterioration model described
- Inspect (action=1): the component damage transitions according to the deterioration model described, but the inspection outcome is taken into account in order to update the damage probabilities
- Repair (action=2): the component damaga probabilities reset to the initial damage probabilities

In the reward model, the action:
- Do-nothing (action=0) does not cost anything
- Inspect (action=1) costs 1 unit (reward = -1)
- Repair (action=2): costs 10 units (reward = -10)

Additionally, the system risk is penalised at every time step:
- Consequences of a system failure: c_f = 100 (reward = -100)
$Risk = p_{{f}_{sys}} \cdot c_f$

The objective is the maxisimisation of the expected sum of discounted rewards.

## Steps we will follow to create an IMP environment

To implement a new environment, we can go through the following steps:

1. Define initial damage probabilities, transition, and inspection models. Alternatively, you can define these models as explained in the tutorial imp_env/pomdp_models/generate_transitions.ipynb. Once they are stored, you can then directly jump to step 2.
2. Create a new class from the interface ImpEnv => `class NewIMPenv(ImpEnv):`
3. Set up the initialisation of the class including, for instance, the number of components, discount factor, and any other necessary variables => `def __init__(self, config=None):`
4. Implement the reset method. This function will reset the episode to the intial stage => `def reset():`
5. Implement the step method. This function models the dynamics of the environment at each time step: given the agents' actions, the damage condition probabilities transitions, and a reward is collected => `def step(self, action: dict):`
6. Implement additionally required methods. We can define more methods if they are necessary for modelling the dynamics of the environment. In this example, we will need: (i) a transition function, (ii) an immediate_reward function to compute the rewards at each time step, and (iii) a pf_sys function that estimates the system failure probability given the components failure probabilities. 

## 1) Defining transition and inspection models

### Initial damage probabilities

In [3]:
initial_damage_proba = np.array([0.9, 0.08, 0.02, 0])

### Transition model

In [4]:
transition_model = np.zeros((3, 4, 4))

# Do-nothing action
transition_model[0,0,0] = 0.9
transition_model[0,0,1] = 0.05
transition_model[0,0,2] = 0.03
transition_model[0,0,3] = 0.02

transition_model[0,1,1] = 0.9
transition_model[0,1,2] = 0.06
transition_model[0,1,3] = 0.04

transition_model[0,2,2] = 0.94
transition_model[0,2,3] = 0.06

transition_model[0,3,3] = 1

transition_model[1] = transition_model[0]

transition_model[2] = np.tile(initial_damage_proba, (4,1))

In [5]:
## Transition example ##
trans_damage = initial_damage_proba.dot(transition_model[2])
trans_damage

array([0.9 , 0.08, 0.02, 0.  ])

### Inspection model

In [6]:
inspection_model = np.zeros((3, 4, 2))

inspection_model[0, :, 0] = np.ones(4)
inspection_model[2, :, 0] = np.ones(4)

inspection_model[1, :, 1] = np.array([0.1, 0.3, 0.6, 0.9])
inspection_model[1, :, 0] = 1 - inspection_model[1, :, 1]

In [7]:
## Bayesian updating example ##
bayes_upd = trans_damage*inspection_model[1, :, 0] 
bayes_norm = bayes_upd / np.sum(bayes_upd)
bayes_norm

array([0.92677346, 0.06407323, 0.00915332, 0.        ])

## 2) Constructing your IMP environment class

In [8]:
class NewIMPenv(ImpEnv):
    
    ## 3) Set up the initialisation of the class ##
    def __init__(self, config=None):
        
        if config is None:
            config = {"n_comp": 2,
                      "discount_reward": 1}
            
        self.n_comp = config["n_comp"]
        self.discount_reward = config["discount_reward"]
        self.ep_length = 20  # Horizon length
        self.proba_size = 4  # Damage size bins
        self.n_obs_inspection = 2  # Total number of possible information received from inspection (crack detected or not)
        self.actions_per_agent = 3
        self.agent_list = ["agent_" + str(i) for i in range(self.n_comp)]
        
        self.initial_damage_proba = np.zeros((self.n_comp, self.proba_size))
        self.initial_damage_proba[:,:] = trans_damage 
        self.transition_model = transition_model
        self.inspection_model = inspection_model
        
        self.reset()
        
    ## 4) Implement the reset method ##
    def reset(self):
        # We need the following line to seed self.np_random
        # super().reset(seed=seed)

        # Choose the agent's belief
        self.time_step = 0
        self.damage_proba = self.initial_damage_proba
        self.observations = {}
        for i in range(self.n_comp):
            self.observations[self.agent_list[i]] = np.concatenate(
                (self.damage_proba[i], [self.time_step / self.ep_length]))
    
    ## 5) Implement the step method ##
    def step(self, action: dict):
        action_list = np.zeros(self.n_comp, dtype=int)
        
        for i in range(self.n_comp):
            action_list[i] = action[self.agent_list[i]]
        
        inspection, next_proba = self.transition(self.damage_proba, action_list)
        
        reward_ = self.immediate_reward(next_proba, action_list)
        
        reward = self.discount_reward ** self.time_step * reward_.item()  # Convert float64 to float

        rewards = {}
        for i in range(self.n_comp):
            rewards[self.agent_list[i]] = reward

        self.time_step += 1

        self.observations = {}
        for i in range(self.n_comp):
            self.observations[self.agent_list[i]] = np.concatenate(
                (next_proba[i], [self.time_step / self.ep_length]))

        self.damage_proba = next_proba

        # An episode is done if the agent has reached the target
        done = self.time_step >= self.ep_length

        # info = {"belief": self.beliefs}
        return self.observations, rewards, done, inspection
    
     ## 6) Implement additionally required methods ##
    def transition(self, damage_proba, action):
        damage_proba_prime = damage_proba
        inspection_outcome = np.zeros(self.n_comp, dtype=int)
        
        for i in range(self.n_comp):
            trans_damage = damage_proba[i].dot(self.transition_model[action[i]])
            
            if action[i] == 1:
                insp_nodetec_prob = np.sum(trans_damage*self.inspection_model[1, :, 0])
                
                if (1-insp_nodetec_prob) < 1e-5:
                    inspection_outcome[i] = 0
                else:
                    ins_dist = np.array([insp_nodetec_prob, 1-insp_nodetec_prob])
                    inspection_outcome[i] = np.random.choice(range(0, self.n_obs_inspection), size=None,
                                                     replace=True, p=ins_dist)
                
                trans_damage = trans_damage*self.inspection_model[1, :, inspection_outcome[i]] 
                trans_damage = trans_damage / np.sum(trans_damage)
                
            damage_proba_prime[i] = trans_damage
        return inspection_outcome, damage_proba_prime
    
    def immediate_reward(self, damage_proba, action):
        reward_system = 0
        for i in range(self.n_comp):
            if action[i] == 1:
                reward_system += -1
            elif action[i] == 2:
                reward_system += -10
        pf_sys = self.pf_sys(damage_proba)
        reward_system += pf_sys*(-100)
        return reward_system
    
    def pf_sys(self, damage_proba):
        pf = damage_proba[:, -1]
        rel_sys = 1
        for i in range(self.n_comp):
            rel_sys *= (1-pf[i])
        return 1-rel_sys
        

## Test the newly created IMP environment

Define the number of components and discount factor of your choice:

In [9]:
config = {
    'n_comp' : 4,
    'discount_reward' : 0.97
}

Initialise an environment `imp_model` from the newly implemented class `NewIMPenv`:

In [10]:
imp_model = NewIMPenv(config)

Check the initial damage probabilities:

In [11]:
imp_model.initial_damage_proba

array([[0.9 , 0.08, 0.02, 0.  ],
       [0.9 , 0.08, 0.02, 0.  ],
       [0.9 , 0.08, 0.02, 0.  ],
       [0.9 , 0.08, 0.02, 0.  ]])

Assign actions (in this case, do nothing in all components):

In [12]:
actions_ = {}
for k in imp_model.agent_list:
    actions_[k] = 0
actions_

{'agent_0': 0, 'agent_1': 0, 'agent_2': 0, 'agent_3': 0}

Test the selected action:

In [13]:
imp_model.step(actions_)

({'agent_0': array([0.81  , 0.117 , 0.0506, 0.0224, 0.05  ]),
  'agent_1': array([0.81  , 0.117 , 0.0506, 0.0224, 0.05  ]),
  'agent_2': array([0.81  , 0.117 , 0.0506, 0.0224, 0.05  ]),
  'agent_3': array([0.81  , 0.117 , 0.0506, 0.0224, 0.05  ])},
 {'agent_0': -8.66341459329023,
  'agent_1': -8.66341459329023,
  'agent_2': -8.66341459329023,
  'agent_3': -8.66341459329023},
 False,
 array([0, 0, 0, 0]))

Simulate one episode:

In [14]:
imp_model.reset()
done = False
rewards_sum = 0

while not done:
    actions = {f"agent_{i}": random.randint(0,2) for i in range(imp_model.n_comp)}
    obs, rewards, done, insp_outcomes = imp_model.step(actions) 
    rewards_sum += rewards['agent_0']
    print('Step: ', imp_model.time_step, '| actions : ', actions, '| reward: ', rewards['agent_0'])
    
print('sum_of_rewards: ', rewards_sum)
    

Step:  1 | actions :  {'agent_0': 1, 'agent_1': 0, 'agent_2': 0, 'agent_3': 0} | reward:  -31.842371043246086
Step:  2 | actions :  {'agent_0': 0, 'agent_1': 0, 'agent_2': 1, 'agent_3': 1} | reward:  -31.07608269354173
Step:  3 | actions :  {'agent_0': 2, 'agent_1': 2, 'agent_2': 0, 'agent_3': 0} | reward:  -25.210612413432855
Step:  4 | actions :  {'agent_0': 0, 'agent_1': 2, 'agent_2': 1, 'agent_3': 0} | reward:  -18.185883121397307
Step:  5 | actions :  {'agent_0': 1, 'agent_1': 2, 'agent_2': 1, 'agent_3': 0} | reward:  -19.22923554734219
Step:  6 | actions :  {'agent_0': 1, 'agent_1': 2, 'agent_2': 2, 'agent_3': 2} | reward:  -26.93753217047434
Step:  7 | actions :  {'agent_0': 2, 'agent_1': 1, 'agent_2': 2, 'agent_3': 0} | reward:  -19.577144588065718
Step:  8 | actions :  {'agent_0': 2, 'agent_1': 2, 'agent_2': 1, 'agent_3': 1} | reward:  -18.462641655931147
Step:  9 | actions :  {'agent_0': 1, 'agent_1': 0, 'agent_2': 2, 'agent_3': 1} | reward:  -20.679345780675316
Step:  10 | a

Now you are ready to create your new environment!!