In [None]:
import warnings
warnings.filterwarnings('ignore')

### Run in collab
<a href="https://colab.research.google.com/github/racousin/rl_introduction/blob/master/notebooks/1_Environment_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Run and restart runtime
!pip install gymnasium[box2d,atari,accept-rom-license]
!git clone https://github.com/racousin/rl_introduction.git > /dev/null 2>&1

In [None]:
import seaborn as sns
import numpy as np
from time import sleep
import matplotlib.pyplot as plt
import gymnasium
from time import time,sleep
# from rl_introduction.render_colab import gym_render
sns.set_style("darkgrid")

# 1_Environment_and_Agent

### Introduction to Reinforcement Learning (RL)

In RL, we study the interaction between an **agent** and an **environment**. The agent takes actions to achieve a goal, guided by rewards from the environment. Our aim is to develop agents that can learn optimal behaviors through these interactions.



### Creating an Environment

An environment in RL defines the space in which the agent operates. It returns a new state and a reward for each action taken by the agent.

In [None]:
class Env:
    def __init__(self):
        self.state = np.random.randint(2)
        self.done = False
        
    def step(self, action):
        if (action % 2 == self.state):
            reward = 1
        else:
            reward = -1
        self.state = np.random.randint(2)
        return self.state, reward, self.done, {}
        
    def reset(self):
        self.state = np.random.randint(2)
        self.done = False
        return self.state

### Building an Agent
Agents in RL decide which actions to take in an environment. A simple agent might act randomly or follow a predetermined policy.



In [None]:
class Agent:
    def __init__(self, env):
        pass
    
    def act(self, state):
        return np.random.randint(2) 

### Running an Experiment

To evaluate our agent's performance, we generate trajectories of state-action-reward sequences and compute the total reward.


In [None]:
def run_experiment(env, agent, nb_steps):
    state = env.reset()
    res = [state]
    for _ in range(nb_steps):
        action = agent.act(state)
        state, reward, done, info = env.step(action)
        res += [action, reward, state]
        
    return res

## Understanding the Environment and Agent

**Question 1:** What is the **state space** in the provided `Env` class?


**Question 2:** What is the **action space** in the provided `Env`/`Agent` class?


**Question 3:** What is the **Transition model** in the provided `Env` class?


**Question 4:** What is the **Policy** in the provided `Agent` class?


**Question 5:** What is the **Reward Function** in the provided `Env` class?


**Question 6:** What object **run_experiment** is returning?


**Exercise 1:** Instantiating the class `Agent` and `Env` to `run_experiment` on **100 steps**.



**Exercise 2:** Compute the **cumulative reward** and **discouted cumultative reward**, also known as the return value. You can return more information from `run_experiment` to help.


**Question 7:** In this `MDP`, what is the **Expected Return** when following the random policy of the `Agent`?


**Question 8:** what would be the **best policy** function for the `Env` environment? 


**Exercise 3:** Implement the best policy function and use it to run the best agent. Compare its performance to the random agent.



### Corrections

**Question 1:** What is the **state space** in the provided `Env` class?

**States:** $S = \{0,1\}$

**Question 2:** What is the **action space** in the provided `Env`/`Agent` class?

**Actions:** $A = \{0,1\}$ or $\mathbb{N^+}$

**Question 3:** What is the **Transition model** in the provided `Env` class?

**Transition model:** $P_{ss'}^a = \mathbb{P} [S_{t+1} = s' \vert S_t = s, A_t = a]$

For all $a \in A$, and for all $s, s' \in S : P_{ss'}^a = 0.5$

**Question 4:** What is the **Policy** in the provided `Agent` class?

Policy $\pi$ is defined as follows:
$\pi(0) = 0$ with probability $0.5$, $\pi(0) = 1$ with probability $0.5$ 
$\pi(1) = 0$ with probability $0.5$, $\pi(1) = 1$ with probability $0.5$ 

**Question 5:** What is the **Reward Function** in the provided `Env` class?

Reward Function $R(s, a)$ is deterministic in this case:

$R(0, a) = 1$ for all $a \in 2\mathbb{N}$
$R(0, a) = -1$ for all $a \in 2\mathbb{N} + 1$
$R(1, a) = 1$ for all $a \in 2\mathbb{N} + 1$
$R(1, a) = -1$ for all $a \in 2\mathbb{N}$

**Question 6:** What object does **run_experiment** return?

It returns a trajectory $(s0, a0, r0, s1, a1, r1, ...)$.

**Exercise 1:** Instantiate the class `Agent` and `Env` to `run_experiment` on **100 steps**.




In [None]:
# Instantiation
env = Env()
agent = Agent(env)
run_experiment(env, agent, nb_steps=100)


**Exercise 2:** Compute the **cumulative reward** and **discouted cumultative reward** also known as the return value for each step of the trajectory. Provide the **cumulative reward** and **discouted (0.8) cumultative reward** at step 42.  You can return more information from `run_experiment` to help.

In [None]:
def run_experiment(env, agent, nb_steps):
    state = env.reset()
    res = [state]
    rewards = []
    for _ in range(nb_steps):
        action = agent.act(state)
        state, reward, done, info = env.step(action)
        res += [action, reward, state]
        rewards.append(reward)
        
    return res, rewards

In [None]:
def compute_cumulative_reward(rewards, discout_factor=1):
    trajectory_steps_length = len(rewards)
    cumulative_rewards = []
    for step in range(trajectory_steps_length):
        rewards_from_step = rewards[step:]
        cumulative_reward = 0
        step = 0
        for reward in rewards_from_step:
            cumulative_reward += reward * discout_factor ** step
            step += 1
        cumulative_rewards.append(cumulative_reward)
    return cumulative_rewards

In [None]:
_, rewards = run_experiment(env, agent, nb_steps=100)
print(compute_cumulative_reward(rewards)[42], compute_cumulative_reward(rewards, 0.8)[42])




**Question 7:** In this `MDP`, what is the **Expected Return** when following the random policy of the `Agent`?


The expected return is 0.


**Question 8:** what would be the **best policy** function for the `Env` environment? 

$\pi(0) = 0 with probablitly 1, 1 with probability 0$ 
$\pi(1) = 0 with probablitly 0, 1 with probability 1$ 

**Exercise 3:** Implement the best policy function and use it to run the best agent. Compare its performance to the random agent.


In [None]:

def best_policy(state):
    if state == 0:
        return 0
    else:
        return 1
class Best_Agent:
    def __init__(self, env):
        pass
    
    def act(self, state):
        return best_policy(state)
# Instantiation
env = Env()
my_random_agent = Agent(env)
my_best_agent = Best_Agent(env)

nb_experiment = 100
sum_random_agent_rewards = []
sum_best_agent_rewards = []
for exp in range(nb_experiment):
    _, random_agent_rewards = run_experiment(env, my_random_agent, nb_steps=100)
    _, best_agent_rewards = run_experiment(env, my_best_agent, nb_steps=100)
    sum_random_agent_rewards.append(sum(random_agent_rewards))
    sum_best_agent_rewards. append(sum(best_agent_rewards))

plt.plot(sum_random_agent_rewards, 'o')
plt.plot(sum_best_agent_rewards,'o')
plt.title('Best agent vs Random agent / sum reward')

# ENVIRONMENT FROM GYMNASIUM
https://gymnasium.farama.org/

## Discrete state action environment

### FrozenLake

<img src="https://github.com/racousin/rl_introduction/blob/master/notebooks/images/FrozenLake.png?raw=1">

In [None]:
env = gym.make('FrozenLake-v1')
print('description action space:', env.action_space)
print('description observation space:', env.observation_space)
print('run some random iteration:')
env.reset()
for _ in range(3):
    print()
    action = env.action_space.sample()
    print('action: ')
    print(action)
    state, reward, done, info = env.step(action)
    print('state: ')
    print(state)
    print('reward: ')
    print(reward)

In [None]:
gym_render(env_name='FrozenLake-v1', directory='./video', agent = 'random', slow_coeff=10)

## Discrete action continuous space environment

### CartPole
A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The system is controlled by applying a force of +1 or -1 to the cart. The pendulum starts upright, and the goal is to prevent it from falling over. A reward of +1 is provided for every timestep that the pole remains upright. The episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.
<img src="https://github.com/racousin/rl_introduction/blob/master/notebooks/images/CartPole-v1.png?raw=1">
observations: position of cart, velocity of cart, angle of pole, rotation rate of pole

In [None]:
env = gym.make('CartPole-v0')
env.reset()
print('description action space:', env.action_space)
print('description observation space:', env.observation_space)
print('run some random iteration:')
for _ in range(3):
    print()
    action = env.action_space.sample()
    print('action: ')
    print(action)
    state, reward, done, info = env.step(action)
    print('state: ')
    print(state)
    print('reward: ')
    print(reward)

Most of the environments are provide with a render:

In [None]:
gym_render(env_name='CartPole-v0', directory='./video', agent = 'random', slow_coeff=10)

## Continuous action-space environment

### MountainCarContinuous
An underpowered car must climb a one-dimensional hill to reach a target. Unlike MountainCar v0, the action (engine force applied) is allowed to be a continuous value.

The target is on top of a hill on the right-hand side of the car. If the car reaches it or goes beyond, the episode terminates.

On the left-hand side, there is another hill. Climbing this hill can be used to gain potential energy and accelerate towards the target. On top of this second hill, the car cannot go further than a position equal to -1, as if there was a wall. Hitting this limit does not generate a penalty (it might in a more challenging version).
<img src="https://github.com/racousin/rl_introduction/blob/master/notebooks/images/MountainCarContinuous-v0.png?raw=1">

In [None]:
env = gym.make('MountainCarContinuous-v0')
env.reset()
print('description action space:', env.action_space)
print('description observation space:', env.observation_space)
print('run some random iteration:')
for _ in range(3):
    print()
    action = env.action_space.sample()
    print('action: ')
    print(action)
    state, reward, done, info = env.step(action)
    print('state: ')
    print(state)
    print('reward: ')
    print(reward)

In [None]:
gym_render(env_name='MountainCarContinuous-v0', directory='./video', agent = 'random', slow_coeff=1, max_step=50)

### LunarLanderContinuous
Landing pad is always at coordinates (0,0). Coordinates are the first two numbers in state vector. Reward for moving from the top of the screen to landing pad and zero speed is about 100..140 points. If lander moves away from landing pad it loses reward back. Episode finishes if the lander crashes or comes to rest, receiving additional -100 or +100 points. Each leg ground contact is +10. Firing main engine is -0.3 points each frame. Solved is 200 points. Landing outside landing pad is possible. Fuel is infinite, so an agent can learn to fly and then land on its first attempt. Action is two real values vector from -1 to +1. First controls main engine, -1..0 off, 0..+1 throttle from 50% to 100% power. Engine can't work with less than 50% power. Second value -1.0..-0.5 fire left engine, +0.5..+1.0 fire right engine, -0.5..0.5 off.
<img src="https://github.com/racousin/rl_introduction/blob/master/notebooks/images/LunarLanderContinuous-v2.png?raw=1">

In [None]:
env = gym.make('LunarLanderContinuous-v2')
env.reset()
print('description action space:', env.action_space)
print('description observation space:', env.observation_space)
print('run some random iteration:')
print()
for _ in range(3):
    action = env.action_space.sample()
    print('action: ')
    print(action)
    state, reward, done, info = env.step(action)
    print('state: ')
    print(state)
    print('reward: ')
    print(reward)

In [None]:
gym_render(env_name='LunarLanderContinuous-v2', directory='./video', agent = 'random', slow_coeff=1)

## High space dimension environment

### Atari games


In [None]:
env = gym.make('Pong-v0')
env.reset()
print('description action space:', env.action_space)
print('description observation space:', env.observation_space)
print('run some random iteration:')
print()
for _ in range(3):
    action = env.action_space.sample()
    print('action: ')
    print(action)
    state, reward, done, info = env.step(action)
    print('state: ')
    print(state)
    print('reward: ')
    print(reward)

In [None]:
gym_render(env_name='Pong-v0', directory='./video', agent = 'random', slow_coeff=1)

# Evaluate Random agent in open ai gym

In [None]:
env = gym.make('Pong-v0')

In [None]:
class RandomAgent(Agent):
    def act(self, state):
            return self.env.action_space.sample()

In [None]:
rand_agent = RandomAgent(env)
rewards = run_experiment_episode(env, rand_agent, 20)
plt.plot(rewards)
plt.title('cumulative reward per episode - rand_agent')

### Build your CartPole euristic agent

In [None]:
env = gym.make('CartPole-v0')

In [None]:
#TODO: Create a cartPole agent that is better than Random:
class MyCartPoleAgent(Agent):
    def __init__(self, env):
        super().__init__(env)
    def act(self, state):
        #Complete
        return action

In [None]:
#Done: Create a cartPole agent that is better than Random:
class MyCartPoleAgent(Agent):
    def __init__(self, env, params=[-0.9,-0.1,0,0,-0.9, 0, 0.9]):
        super().__init__(env)
        self.params = params
    def act(self, state):
        if state[0] > self.params[0] and state[2] > self.params[1] and state[3] <= self.params[2]:
            action = 0
        elif state[3] > self.params[4] and state[1] >= self.params[5] and state[1] <= self.params[6]:
            action = 1
        else:
            action = 1
        return action
        

In [None]:
nb_experience = 100
plt.figure(figsize=(20,20))
rand_agent = RandomAgent(env)
rewards = run_experiment_episode(env, rand_agent, nb_experience)
print(f'total reward random agent: {sum(rewards)}')
plt.plot(rewards, label=f'random agent')
rand_agent = MyCartPoleAgent(env)
rewards = run_experiment_episode(env, rand_agent, nb_experience)
print(f'total reward manual agent: {sum(rewards)}')
plt.plot(rewards, label=f'manual agent')
plt.title('cumulative reward per episode - my agent')
plt.legend()