# RL Notebook

In [9]:
# pip install pygame
# pip install gym

Collecting pygame
  Downloading pygame-2.2.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.7 MB)
[K     |████████████████████████████████| 13.7 MB 5.6 MB/s eta 0:00:01
[?25hInstalling collected packages: pygame
Successfully installed pygame-2.2.0
Note: you may need to restart the kernel to use updated packages.


In [10]:
import gym
from gym import spaces
import pygame
import numpy as np

# XY. Create custom environment

In [17]:
class TestEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, render_mode=None, size=5):
        self.size = size  # The size of the square grid
        self.window_size = 512  # The size of the PyGame window

        # Observations are dictionaries with the agent's and the target's location.
        # Each location is encoded as an element of {0, ..., `size`}^2, i.e. MultiDiscrete([size, size]).
        self.observation_space = spaces.Dict(
            {
                "agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
            }
        )

        # We have 4 actions, corresponding to "right", "up", "left", "down"
        self.action_space = spaces.Discrete(4)

        """
        The following dictionary maps abstract actions from `self.action_space` to 
        the direction we will walk in if that action is taken.
        I.e. 0 corresponds to "right", 1 to "up" etc.
        """
        self._action_to_direction = {
            0: np.array([1, 0]),
            1: np.array([0, 1]),
            2: np.array([-1, 0]),
            3: np.array([0, -1]),
        }

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None
    
    def _get_obs(self):
        return {"agent": self._agent_location, "target": self._target_location}
    
    def _get_info(self):
        return {"distance": np.linalg.norm(self._agent_location - self._target_location, ord=1)}

    def reset(self, seed=None, options=None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

        # Choose the agent's location uniformly at random
        self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)

        # We will sample the target's location randomly until it does not coincide with the agent's location
        self._target_location = self._agent_location
        while np.array_equal(self._target_location, self._agent_location):
            self._target_location = self.np_random.integers(
                0, self.size, size=2, dtype=int
            )

        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()

        return observation, info
    
    def step(self, action):
        # Map the action (element of {0,1,2,3}) to the direction we walk in
        direction = self._action_to_direction[action]
        # We use `np.clip` to make sure we don't leave the grid
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )
        # An episode is done iff the agent has reached the target
        terminated = np.array_equal(self._agent_location, self._target_location)
        reward = 1 if terminated else 0  # Binary sparse rewards
        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()

        return observation, reward, terminated, False, info
    
    def render(self):
        if self.render_mode == "rgb_array":
            return self._render_frame()

    def _render_frame(self):
        if self.window is None and self.render_mode == "human":
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode((self.window_size, self.window_size))
        if self.clock is None and self.render_mode == "human":
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface((self.window_size, self.window_size))
        canvas.fill((255, 255, 255))
        pix_square_size = (
            self.window_size / self.size
        )  # The size of a single grid square in pixels

        # First we draw the target
        pygame.draw.rect(
            canvas,
            (255, 0, 0),
            pygame.Rect(
                pix_square_size * self._target_location,
                (pix_square_size, pix_square_size),
            ),
        )
        # Now we draw the agent
        pygame.draw.circle(
            canvas,
            (0, 0, 255),
            (self._agent_location + 0.5) * pix_square_size,
            pix_square_size / 3,
        )

        # Finally, add some gridlines
        for x in range(self.size + 1):
            pygame.draw.line(
                canvas,
                0,
                (0, pix_square_size * x),
                (self.window_size, pix_square_size * x),
                width=3,
            )
            pygame.draw.line(
                canvas,
                0,
                (pix_square_size * x, 0),
                (pix_square_size * x, self.window_size),
                width=3,
            )

        if self.render_mode == "human":
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])
        else:  # rgb_array
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )
        
        def close(self):
            if self.window is not None:
                pygame.display.quit()
                pygame.quit()


In [18]:
from gym.envs.registration import register

register(
    id='gym_examples/TestEnv-v0',
    entry_point='gym_examples.envs:TestEnv',
    max_episode_steps=300,
)

In [19]:
env = gym.make('gym_examples/TestEnv-v0')

ModuleNotFoundError: No module named 'gym_examples'

# XYZ Trying to create an environment
based on the video: https://www.youtube.com/watch?v=bD6V3rcr_54

In [21]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random

In [86]:
class NewEnv(Env):
    
    def __init__(self):
        # actions
        self.action_space = Discrete(3)
        # state
        self.state = 38 + random.randint(-3,3)
        # observation
        self.observation_space = Box(low = np.array([0]), high = np.array([100]))
        # episodes
        self.episodes_length = 60
        
    def step(self, action):
        reward = 0
        self.state += action - 1
        self.episodes_length -= 1
        if self.state >=37 and self.state <= 39:
            reward += 1
        else:
            reward -= 1
        # check if the time is up 
        if self.episodes_length <= 0:
            done = True
        else:
            done = False
        # Apply temperature noise
        self.state += random.randint(-1,1)
        
        # placeholder for info
        info = dict()
        
        return self.state, reward, done, info
        
    def render(self):
        # implement visualization
        pass
    def reset(self):
        # reset
        self.state = 38 + random.randint(-3,3)
        self.episodes_length = 60
        return self.state

In [87]:
env = NewEnv()

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


In [88]:
env.observation_space.sample()

array([0.01424143], dtype=float32)

In [89]:
env.step(env.action_space.sample())

(37, 1, False, {})

In [90]:
episodes = 10
for episode in range(episodes):
    state = env.reset()
    done = False
    score = 0
    
    while not done:
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score += reward
    print('Episode: {} | State: {} | Action: {} | Score: {}'.format(episode, n_state, action, score))

Episode: 0 | State: 37 | Action: 1 | Score: 0
Episode: 1 | State: 33 | Action: 1 | Score: -22
Episode: 2 | State: 28 | Action: 0 | Score: 12
Episode: 3 | State: 37 | Action: 2 | Score: -14
Episode: 4 | State: 45 | Action: 1 | Score: -24
Episode: 5 | State: 49 | Action: 1 | Score: -46
Episode: 6 | State: 44 | Action: 1 | Score: -4
Episode: 7 | State: 42 | Action: 2 | Score: -48
Episode: 8 | State: 29 | Action: 1 | Score: 0
Episode: 9 | State: 54 | Action: 2 | Score: -60


## RL on "shower environment"

In [77]:
from stable_baselines3 import PPO, A2C, SAC, TD3, DQN

In [76]:
# pip install stable_baselines3

In [71]:
# 4000 training timesteps
budget_pendulum = 4000

In [91]:
ppo_model = PPO("MlpPolicy", env, verbose=0).learn(budget_pendulum)

In [92]:
ppo_model.learn(total_timesteps=1000)

<stable_baselines3.ppo.ppo.PPO at 0x7f21d5f3dbb0>

In [93]:
obs = env.reset()

In [94]:
while True:
    action, _states = ppo_model.predict(obs)
    obs, rewards, dones, info = env.step(action)
                                    

ValueError: Error: Unexpected observation shape () for Box environment, please use (1,) or (n_env, 1) for the observation shape.

In [96]:
env.step(action)

(37, 1, False, {})

In [103]:
import gym
env = gym.make("LunarLander-v2", render_mode="human")
env.action_space.seed(42)

observation, info = env.reset(seed=42)

for _ in range(1000):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    if terminated or truncated:
        observation, info = env.reset()

env.close()

ModuleNotFoundError: No module named 'Box2D'