In [2]:
import random
import os
import time

import numpy as np

- Implement a simple 2D-maze game in which a single player can move around in the four major directions.
- Initialize the maze as a 5x5 grid to which player is confirmed
- One the 25 grid cells in the "goal" that a player cell is called the "seeker" must reach
- Instead of hard-coding the solution, employ a reinforcement learning algorithm, so that the seeker learns to find the goal
- This is done by repeatedly running simulations of the maze, rewarding the seeker for finding the goal and keeping track of which decisions of the seeker worked and didn't
- As running simulations can be parallelized and our RL algorithm can also be trained in parallel, we use the Ray API to parallelize the whole process

In [3]:
class Discrete:
    def __init__(self, num_actions: int):
        self.n = num_actions

    def sample(self):
        return random.randint(0, self.n-1)

In [4]:
# 0=down, 1=left, 2=right, 3=up
action_space = Discrete(4)
print(action_space.sample())

1


In [5]:
from IPython.display import clear_output
class Environment:
    seeker, goal= (0,0), (4,4)
    info = {"seeker":seeker , "goal":goal}

    def __init__(self, *args, **kwargs):
        self.action_space = Discrete(4)
        # 25 positions in the grid, hence 25 states in the observation space
        self.observation_space = Discrete(5*5)

    def reset(self):
        self.seeker = (0,0)
        self.goal = (4,4)

        return self.get_observation()

    def get_observation(self):
        return 5 * self.seeker[0] + self.seeker[1]

    def get_reward(self):
        return 1 if self.seeker == self.goal else 0

    def is_done(self):
        return self.seeker == self.goal

    def step(self, action):
        if action == 0:
            # move down
            self.seeker = (min(self.seeker[0] + 1, 4), self.seeker[1])
        elif action == 1:
            # move left
            self.seeker = (self.seeker[0], max(self.seeker[1]-1, 0))
        elif action == 2:
            # move up
            self.seeker = (max(self.seeker[0] - 1, 0), self.seeker[1])
        elif action == 3:
            # move right
            self.seeker = (self.seeker[0], min(self.seeker[1]+1, 4))
        else:
            raise ValueError("invalid action")

        return self.get_observation(), self.get_reward(), self.is_done(), self.info

    def render(self, *args, **kwargs):
        os.system("cls" if os.name=="nt" else "clear")
        clear_output(wait=True)
        grid = [["| " for _ in range(5)] + ["|\n"] for _ in range(5)]
        grid[self.goal[0]][self.goal[1]] = "|G"
        grid[self.seeker[0]][self.seeker[1]] = "|S"
        print("".join(["".join(grid_row) for grid_row in grid]))

In [6]:
environment = Environment()

while not environment.is_done():
    random_action = environment.action_space.sample()
    environment.step(random_action)
    time.sleep(0.1)
    environment.render()

| | | | | |
| | | | | |
| | | | | |
| | | | | |
| | | | |S|



In [7]:
class Policy:
    def __init__(self, env):
        self.state_action_table = [
            [0 for _ in range(env.action_space.n)] for _ in range(env.observation_space.n)
        ]
        self.action_space = env.action_space

    def get_action(self, state, explore=True, epsilon=0.1):
        if explore and random.uniform(0,1) < epsilon:
            return self.action_space.sample()
        return np.argmax(self.state_action_table[state])


In [8]:
class Simulation(object):
    def __init__(self, env):
        self.env = env

    def rollout(self, policy, render=False, explore=True, epsilon=0.1):
        experiences = []
        state = self.env.reset()
        done = False

        while not done:
            action = policy.get_action(state, explore, epsilon)
            next_state, reward, done, info = self.env.step(action)
            experiences.append([state, action, reward, next_state])
            state = next_state

            if render:
                time.sleep(0.05)
                self.env.render()

        return experiences

In [9]:
untrained_policy = Policy(environment)
sim = Simulation(environment)

exp = sim.rollout(untrained_policy, render=True, epsilon=1.0)
for row in untrained_policy.state_action_table:
    print(row)

| | | | | |
| | | | | |
| | | | | |
| | | | | |
| | | | |S|

[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]


In [10]:
def update_policy(policy, experiences):
    alpha = 0.1
    gamma = 0.6
    for state, action, reward, next_state in experiences:
        next_max = np.max(policy.state_action_table[next_state])
        value = policy.state_action_table[state][action]
        new_value = (1 - alpha) * value + alpha * (reward + gamma * next_max)
        policy.state_action_table[state][action] = new_value

- Initialize policy and simulation
- Run the simulation many times (i.e. 10000)
- For each game, collect the experiences by running rollout
- Then update policy on collected experiences

In [11]:
def train_policy(env, num_episodes=10000, weight=0.1, discount_factor=0.9):
    policy = Policy(env)
    sim = Simulation(env)
    for i in range(num_episodes):
        os.system("cls" if os.name=="nt" else "clear")
        clear_output(wait=True)
        print(f"Episode {i}")
        experiences = sim.rollout(policy)
        update_policy(policy, experiences)
    return policy

In [12]:
trained_policy = train_policy(environment)

Episode 9999


In [13]:
def evaluate_policy(env, policy, num_episodes=10):
    """Evaluate a trained policy through rollouts."""
    simulation = Simulation(env)
    steps = 0

    for i in range(num_episodes):
        os.system("cls" if os.name=="nt" else "clear")
        clear_output(wait=True)
        print(f"Episode {i}")
        experiences = simulation.rollout(policy, render=True, explore=False)
        steps += len(experiences)

    print(f"{steps / num_episodes} steps on average "
          f"for a total of {num_episodes} episodes.")


evaluate_policy(environment, trained_policy)

| | | | | |
| | | | | |
| | | | | |
| | | | | |
| | | | |S|

8.0 steps on average for a total of 10 episodes.


In [14]:
trained_policy.state_action_table

[[0.027993599999999893,
  0.016796159999982237,
  0.016796159999999075,
  0.017538973931170574],
 [0.03963142369601317,
  0.016796159999895574,
  0.007438860085394064,
  0.0005441558191252189],
 [0.007097437421282746, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0.04665599999999983,
  0.0279935999999982,
  0.016796159999949184,
  0.046655983523067386],
 [0.07775999947179606,
  0.027993518560430433,
  0.007562327837657654,
  0.017680383604204555],
 [0.07867258836218466, 0, 0, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0.07775999999999973,
  0.046655999999933015,
  0.027993599999956088,
  0.07775999996343501],
 [0.12959999999999955,
  0.046624749716573044,
  0.03145598478993046,
  0.07792548994172488],
 [0.20848001895445598, 0.007619668707953201, 0, 0.009204991963268945],
 [0.19266739150389361, 0, 0, 0],
 [0, 0, 0, 0],
 [0.12959999999999955,
  0.0777599999996754,
  0.0466559999999485,
  0.12959999999943222],
 [0.21599999999999944,
  0.07086817931815799,
  0.06797146952334471,
  0.20580823617335486

In [15]:
os.environ["RAY_DISABLE_MEMORY_MONITOR"] = "1"
import ray

ray.init()
environment = Environment()
env_ref = ray.put(environment)

@ray.remote
def create_policy():
    env = ray.get(env_ref)
    return Policy(env)

@ray.remote
class SimulationActor(Simulation):
    def __init__(self):
        env = ray.get(env_ref)
        super().__init__(env)

@ray.remote
def update_policy_task(policy_ref, experiences_list):
    [update_policy(policy_ref, ray.get(xp)) for xp in experiences_list]
    return policy_ref

def train_policy_parallel(num_episodes=10000, num_simulations=4):
    policy = create_policy.remote()
    simulations = [SimulationActor.remote() for _ in range(num_simulations)]

    for _ in range(num_episodes):
        experiences = [sim.rollout.remote(policy) for sim in simulations]
        policy = update_policy_task.remote(policy, experiences)

    return ray.get(policy)

2022-08-12 16:39:40,350	INFO services.py:1470 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [16]:
parallel_policy = train_policy_parallel()
evaluate_policy(environment, parallel_policy)

| | | | | |
| | | | | |
| | | | | |
| | | | | |
| | | | |S|

8.0 steps on average for a total of 10 episodes.


In [17]:
ray.shutdown()

In [18]:
parallel_policy.state_action_table

[[0.027993599999999893,
  0.016796159999999928,
  0.016796159999999928,
  0.027993599999999893],
 [0.04665599999999983,
  0.015917136942005056,
  0.026148098019024697,
  0.024515125524111975],
 [0.0644979254914315, 0, 0.0028434279334112592, 0],
 [0, 0, 0, 0],
 [0, 0, 0, 0],
 [0.04665599999999983,
  0.027993599999999893,
  0.016796159999999928,
  0.04665599999999983],
 [0.07775999999999973,
  0.027908408400389263,
  0.027867459631817547,
  0.07662763727417805],
 [0.1295108114618403, 0.004664233404552535, 0, 0.004675726524452857],
 [0.06010165876746837, 0, 0, 0],
 [0, 0, 0, 0],
 [0.07775999999999973,
  0.04665599999999983,
  0.027993599999999893,
  0.07775999999999973],
 [0.12959999999999955,
  0.04664856585172074,
  0.04663696415623727,
  0.12919285424593374],
 [0.21599998686151864,
  0.04428686974497553,
  0.01372677185309154,
  0.047439534429514],
 [0.35005772270159574, 0, 0.0, 0],
 [0.08792045660134448, 0, 0, 0],
 [0.12959999999999955,
  0.07775999999999973,
  0.04665599999999983,
  