In [None]:
# Laurent LEQUIEVRE
# Research Engineer, CNRS (France)
# Institut Pascal UMR6602
# laurent.lequievre@uca.fr

# Solution based on :
# https://www.kaggle.com/wuhao1542/pytorch-rl-0-frozenlake-q-network-learning

In [23]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch import optim
import numpy as np 

In [24]:
import gym
from gym.envs.registration import register

In [25]:
# If you got that error after a registration :
# Error: Cannot re-register id: FrozenLakeNotSlippery-v0
# So you need to delete an env registered

env_dict = gym.envs.registration.registry.env_specs.copy()

for env in env_dict:
    if 'FrozenLakeNotSlippery-v0' in env:
        print("Remove {} from registry".format(env))
        del gym.envs.registration.registry.env_specs[env]


Remove FrozenLakeNotSlippery-v0 from registry


In [26]:
register(
   id="FrozenLakeNotSlippery-v0",
   entry_point='gym.envs.toy_text:FrozenLakeEnv',
   kwargs={'map_name': '4x4', 'is_slippery': False},
)

env = gym.make("FrozenLakeNotSlippery-v0")

In [27]:
observation_space = env.observation_space.n
action_space = env.action_space.n

print("observation space = {}, action space = {}".format(observation_space, action_space))

observation space = 16, action space = 4


In [28]:
env.reset()
env.render()


[41mS[0mFFF
FHFH
FFFH
HFFG


In [12]:
def uniform_linear_layer(linear_layer):
    linear_layer.weight.data.uniform_()
    linear_layer.bias.data.fill_(-0.02)

class Agent(nn.Module):
    def __init__(self, observation_space_size, action_space_size):
        super(Agent, self).__init__()
        self.observation_space_size = observation_space_size
        self.hidden_size = observation_space_size
        self.l1 = nn.Linear(in_features=observation_space_size, out_features=self.hidden_size)
        self.l2 = nn.Linear(in_features=self.hidden_size, out_features=action_space_size)
        uniform_linear_layer(self.l1)
        uniform_linear_layer(self.l2)
    
    def forward(self, state):
        obs_emb = F.one_hot(torch.LongTensor([int(state)]), num_classes=self.observation_space_size)
        out1 = torch.sigmoid(self.l1(obs_emb.float()))
        return self.l2(out1).view((-1)) # 1 x ACTION_SPACE_SIZE == 1 x 4  =>  4

In [13]:
def take_action(action, env):
    new_state, reward, done, info = env.step(action)
    # Reward function
    # if new_state is a Hole
    if new_state in [5, 7, 11, 12]:
        reward = -1
    # else if new_state is the Goal (Final State)
    elif new_state == 15:
        reward = 1
    # else penalize research
    else:
        reward = -0.01
    return new_state, reward, done, info

class Trainer:
    def __init__(self, env):
        self.agent = Agent(env.observation_space.n, env.action_space.n)
        self.optimizer = optim.Adam(params=self.agent.parameters())
        self.env = env
        self.gamma = 0.99
    
    def train(self, epoch):
        for i in range(epoch):
            print('.', end='')
            current_state = self.env.reset()
            j = 0
            while j < 200:
                # perform chosen action
                an_action = self.choose_action(current_state)
                next_state, reward, done, _ = take_action(an_action,self.env)
                
                # calculate target and loss
                target_q = reward + self.gamma * torch.max(self.agent(next_state).detach()) # detach from the computing flow
                loss = F.smooth_l1_loss(self.agent(current_state)[an_action], target_q)
                
                # update model to optimize Q
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                
                # update state
                current_state = next_state
                j += 1
                if done == True: break
            
           
        print("Train is done !")

    def choose_action(self, s):
        if (np.random.rand(1) < 0.1): 
            #print("sample action !")
            return self.env.action_space.sample()
        else:
            agent_out = self.agent(s).detach()
            #print(agent_out)
            index_max = np.argmax(agent_out)
            #print("index max = {}".format(index_max))
            #print("torch max action !")
            return index_max.item() 

In [14]:
# Use Trainer class to train Agent network
t = Trainer(env)
t.train(2000)

........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

In [15]:
# Test the best solution from an Agent (with a neural network)
# Initial state = 0, Final state = 15
current_state = env.reset()  # S is the initial state = 0
env.render()

while (current_state != 15):
  agent_out = t.agent(current_state).detach()   
  an_action = np.argmax(agent_out).item()
  print("Take action {}".format(an_action))
  current_state, _, _, _ = env.step(an_action)
  env.render()


[41mS[0mFFF
FHFH
FFFH
HFFG
Take action 1
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
Take action 1
  (Down)
SFFF
FHFH
[41mF[0mFFH
HFFG
Take action 2
  (Right)
SFFF
FHFH
F[41mF[0mFH
HFFG
Take action 2
  (Right)
SFFF
FHFH
FF[41mF[0mH
HFFG
Take action 1
  (Down)
SFFF
FHFH
FFFH
HF[41mF[0mG
Take action 2
  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m


In [16]:
# If you got that error after a registration :
# Error: Cannot re-register id: FrozenLakeNotSlippery-v1
# So you need to delete an env registered

env_dict = gym.envs.registration.registry.env_specs.copy()

for env in env_dict:
    if 'FrozenLakeNotSlippery-v1' in env:
        print("Remove {} from registry".format(env))
        del gym.envs.registration.registry.env_specs[env]


Remove FrozenLakeNotSlippery-v1 from registry


In [17]:
register(
   id="FrozenLakeNotSlippery-v1",
   entry_point='gym.envs.toy_text:FrozenLakeEnv',
   kwargs={'map_name': '8x8', 'is_slippery': False},
)

env2 = gym.make("FrozenLakeNotSlippery-v1")

In [18]:
env2.reset()
env2.render()


[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG


In [19]:
# Define new take_action function for a frozen lake 8x8
def take_action(action, env):
    new_state, reward, done, info = env.step(action)
    # Reward function
    # if new_state is a Hole
    if new_state in [19, 29, 35, 41, 42, 46, 49, 52, 54, 59]:
        reward = -1
     # else if new_state is the Goal (Final State)
    elif new_state == 63:
        reward = 1
    # else penalize research
    else:
        reward = -0.01
    return new_state, reward, done, info

In [20]:
# Use Trainer class to train Agent network
t = Trainer(env2)
t.train(2000)

........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

In [21]:
# Test the best solution from an Agent (with a neural network)
# Initial state = 0, Final state = 63
current_state = env2.reset()  # S is the initial state = 0
env2.render()

while (current_state != 63):
  agent_out = t.agent(current_state).detach()   
  an_action = np.argmax(agent_out).item()
  print("Take action {}".format(an_action))
  current_state, _, _, _ = env2.step(an_action)
  env2.render()


[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG
Take action 1
  (Down)
SFFFFFFF
[41mF[0mFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG
Take action 1
  (Down)
SFFFFFFF
FFFFFFFF
[41mF[0mFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG
Take action 2
  (Right)
SFFFFFFF
FFFFFFFF
F[41mF[0mFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG
Take action 1
  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
F[41mF[0mFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG
Take action 2
  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FF[41mF[0mFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG
Take action 2
  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFF[41mF[0mFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG
Take action 2
  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFF[41mF[0mHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG
Take action 1
  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFH[41mF[0mFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG
Take action 1
  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHF[41mF[0m