In [27]:
import numpy as np

# Recycling Robot Example

This notebook is inspired by the "*Recycling Robot*" example from [Sutton and Barto, Reinforcement Learning](http://incompleteideas.net/book/the-book-2nd.html).

Suppose we have a robot going anywhere in an office searching for empty cans to trash them into a bin. The robot has three states $\mathcal{S} = \{\texttt{high},\texttt{low},\texttt{broken}\}$ indicating its rechargerable battery status. In each state, the robot can decide among three actions $\mathcal{S} = \{\texttt{search},\texttt{wait},\texttt{recharge}\}$, indicating respectively searching for empty cans in the office, remain stationary and wait, or head back to its charging base and recharge. Depending on the current state $s \in \mathcal{S}$, the robot can choose among varying sets of actions. In our case,
$$\mathcal{A}(\texttt{high}) = \{\texttt{search},\texttt{wait}\}$$
$$\mathcal{A}(\texttt{low}) = \{\texttt{search},\texttt{wait},\texttt{recharge}\}$$
$$\mathcal{A}(\texttt{broken}) = \emptyset$$

The rewards are zero most of the time, but become positive when the robot secures an empty can, or large and negative if the battery runs all the way down. Here is a diagram of the dynamics of this environment:

<img src="img/recycling-robot.svg" />

Below is how this can be implemented with our package.

In [28]:
r_search = 3
r_wait = 1
r_depleted = -3
r_broken = 0
alpha = 0.4
beta = 0.6
gamma = 0.3

## `relearn` Implementation

In order to have a complete Markov Decision Process (MDP) to test we first need to create its building blocks, namely the Environment and the Agent. In turn, the Environment needs States, Actions, Rewards and Transitions to run (see the picture above to get an idea of the Environment's elements), while the Agent needs a Policy. Let's define these objects, and then we will put everything together.

In [29]:
from relearn.agent import *
from relearn.environment import *
from relearn.mdp import *

### Setting up the Environment: Actions, States, Rewards, and Transitions

As just said, we need to declare a few objects for instantiating an Environment. The `Action`, `State` and `Rewards` classes are quite simple, and can be inherited by more complex structures if needed. To instantiate them, we need unique names to assign to each action and state and unique values to assign to rewards. Additionally, the end and start states can be set. If either none or a combination is set, the default behaviour is to assign the first inserted stated the 'start state' status, while the 'end state' status remains unassigned.

In [30]:
actions = [Action(name) for name in ["search", "wait", "recharge"]]
states = [State(name) for name in ["low", "high", "broken"]]
states[-1].end_state = True  # sets the state 'broken' as end state
states[1].start_state = True  # sets the state 'high' as start state
rewards = [Reward(value) for value in [r_search, r_wait, r_broken, r_depleted]]

Now that we have actions, states and rewards, we can also instantiate transitions using the `Transition` class. Each transition is made of 5 objects: 

- the starting state, 
- the action taken in the starting state, 
- the ending state, 
- the reward given by performing the selected action from the starting state and landing in the ending state, 
- and the probability of this transition to occur given the action taken in the starting state.

We can specify each of these objects by referring to names and values given to actions, states, and rewards, respectively. We have to pay attention here because if the names or the reward values do not coincide with the intended object, the Environment class won't accept them as valid.

In [31]:
transitions = [
    Transition(
        start_state_name="low",
        action_name="search",
        landing_state_name="low",
        reward_value=r_search,
        probability=beta,
    ),
    Transition("low", "search", "high", r_depleted, 1 - beta - gamma),
    Transition("low", "search", "broken", r_broken, gamma),
    Transition("low", "wait", "low", r_wait, 1 - gamma),
    Transition("low", "wait", "broken", r_broken, gamma),
    Transition("low", "recharge", "high", 0, 1 - gamma),
    Transition("low", "recharge", "broken", r_broken, gamma),
    Transition("high", "search", "low", r_search, 1 - alpha - gamma),
    Transition("high", "search", "high", r_search, alpha),
    Transition("high", "search", "broken", r_broken, gamma),
    Transition("high", "wait", "high", r_wait, 1 - gamma),
    Transition("high", "wait", "broken", r_broken, gamma),
]

The Environment object can be finally instantiated using actions, states, rewards and transitions.

In [32]:
environment = Environment(
    states=states, actions=actions, rewards=rewards, transitions=transitions
)

### Setting up the Agent and the Policy

The `Agent` class is quite trivial in the sense that its only scope is to make the policy run. Thus, its implementation simply consists in calling the `Agent` constructor and giving it a `Policy`. There is a convenient static method in the `Policy` class to set a random policy given the shape of its probability distribution, which should be `n_states x n_actions`.

In [33]:
random_policy = Policy.random_policy(
    n_states=len(environment.states), n_actions=len(environment.actions)
)
agent = Agent(policy=random_policy)

### Putting everything together in a MDP

In order to make everything work, the `MDP` class is designed to orchestrate and manage all the previous objects seemlessly, allowing for MDP iterations and trajectory retrieval. In order to go reverse an iteration, the trajectory object must be edited so that it does not contain traces of the n-last iteration(s).

In [34]:
mdp = MDP(agent=agent, environment=environment)

That's it! Now we have everything to run iterations and see interactions between the agent and the environment that we have just made.

In [35]:
mdp.iterate()
mdp.print_trajectory()

Reward: 0	State: high	Action: search
Reward: 3	State: low


In [36]:
print(f'We are in state {mdp.current_state.name}, and we have just received the reward {mdp.trajectory["rewards"][-1].value}')

We are in state low, and we have just received the reward 3


As you can see, the `trajectory` object is a dictionary containing three keys (states, actions and rewards), each of which has its own list containing the states, actions and rewards experienced at time $i$, where $i$ is the index in those lists.

In [37]:
print(mdp.trajectory)

{'rewards': [<relearn.environment.Reward object at 0x1081c9090>, <relearn.environment.Reward object at 0x1081cbac0>], 'states': [<relearn.environment.State object at 0x1081cbc10>, <relearn.environment.State object at 0x109033c10>], 'actions': [<relearn.environment.Action object at 0x1081c9900>]}


## Dynamic Programming: Iterative Policy Evaluation

One of the most important computations in RL is the estimation of the value function $v_\pi$, which gives the expected value of each state (refer to [Sutton and Barto, Reinforcement Learning](http://incompleteideas.net/book/the-book-2nd.html) for theory and pseudocode).

In [38]:
theta = 0.0001
discount = 0.01

# initialize V(s), for all s in S+, arbitrarily except that V(terminal)=0
value_function = np.random.rand(len(mdp.environment.states))

# set V(terminal)=0, if there is end state
terminal = [s.idx for s in mdp.environment.states if s.end_state]
if terminal:
    value_function[terminal] = 0

while True:
    delta = 0
    for state in mdp.environment.states:
        old_v = value_function[state.idx].copy()
        value_function[state.idx] = sum(
            mdp.agent.policy.pmf(action=action, state=state)
            * sum(
                mdp.environment.state_reward_proba(
                    next_state=next_state,
                    reward=reward,
                    state=state,
                    action=action,
                )
                * (reward.value + discount * value_function[next_state.idx])
                for next_state in mdp.environment.states
                for reward in mdp.environment.rewards
            )
            for action in state.actions
        )
        delta = max([delta, np.abs(old_v - value_function[state.idx])])
    print(f"Difference between old value function and new value function: {delta}")
    if delta < theta: # this process stops if the accuracy threshold theta is met
        break

Difference between old value function and new value function: 0.6982845496808725
Difference between old value function and new value function: 0.0035326165312502544
Difference between old value function and new value function: 1.8970495003878263e-05


## New Greedy Policy computation with Dynamic Programming

Let's see how we can implement a RL algorithm, namely the computation of a new greedy policy starting from a random one (refer to [Sutton and Barto, Reinforcement Learning](http://incompleteideas.net/book/the-book-2nd.html) for theory and pseudocode).

We need to specify the state from which we want our policy to improve.

In [39]:
state = mdp.current_state
discount = 0.01

# compute the expected rewards of selecting each action and then follow
# the policy
new_expected_rewards = np.array(
    [
        sum(
            mdp.environment.state_reward_proba(
                next_state=next_state,
                reward=reward,
                state=state,
                action=action,
            )
            * (reward.value + discount * value_function[next_state.idx])
            for next_state in mdp.environment.states for reward in mdp.environment.rewards
        )
        for action in state.actions
    ]
)

# assign to each action new probabilities with respect to those actions
# with highest expected sum of rewards. Making sure everything is in (0,1)
# excluding extremes
probas = new_expected_rewards + np.abs(np.min(new_expected_rewards)) + 0.000001
probas = np.float16(probas) / np.sum(probas)
new_probas = np.zeros(len(mdp.environment.actions))
new_probas[[action.idx for action in state.actions]] = probas
print(new_probas)

[0.67773438 0.31860352 0.0033493 ]
