In [5]:
import numpy as np
import torch
from torch import nn
import gym
import random
from collections import deque
np.random.seed(42)
torch.manual_seed(42)
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

In [6]:
class MLP(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_sizes[0])
        self.fc2 = nn.Linear(hidden_sizes[0], hidden_sizes[1])
        self.output = nn.Linear(hidden_sizes[1], output_size)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        out = self.output(x)
        return out

In [14]:
class DeepQLearnFrozenLake():
    def __init__(self):
        self.env = gym.make("FrozenLake-v0")
        self.env.reset()
        num_of_actions = self.env.nA
        num_of_states = self.env.nS
        self.memory  = deque(maxlen=500)
        
        self.model = MLP(num_of_states, [13, 8], num_of_actions).to(device)
        self.target_model = MLP(num_of_states, [13, 8], num_of_actions).to(device)
        self.criterion = nn.MSELoss()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.0001)
        
    def explore(self):
        return self.env.action_space.sample()
    
    def exploit(self, state):
        return int(torch.argmax(self.model(self.encode(state))))

    def encode(self, state):
        result = torch.eye(16)[state]
        result = result.to(device)
        return result

    def experience(self, state, action, reward, new_state, done):
        self.memory.append([state, action, reward, new_state, done])

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def train_models(self, info, episode, gamma):
        train_state, train_action, train_reward, train_new_state, train_won_lost = info.T
        target = self.target_model(self.encode(train_state))
        done_indices = np.where(train_won_lost == 1)
        not_done_indices = np.where(train_won_lost == 0)
        done_actions = train_action[done_indices]
        not_done_actions = train_action[not_done_indices]
        target[done_indices, done_actions] = torch.from_numpy(train_reward[done_indices]).to(device)
        max_Q = torch.max(self.target_model(self.encode(train_new_state)), dim=1)[0]
        target[not_done_indices, not_done_actions] = torch.from_numpy(train_reward[not_done_indices]).to(device) + max_Q[not_done_indices] * gamma
        h = self.model(self.encode(train_state))
        self.optimizer.zero_grad()
        loss = self.criterion(h, target)
        loss.backward()
        self.optimizer.step()
        if episode % 10 == 0:
            self.update_target_model()

    def fit(self, epochs, max_eps, gamma, should_experience, batch_size=0):
        eps = max_eps
        rewards = []
        for epoch in range(epochs):
            if epoch % 300 == 299: 
                print("Training " + str((100 * epoch)//epochs) + "% completed...")
            state = self.env.reset()
            running_reward = 0
            for i in range(150):
                if np.random.random() > eps:
                    action = self.exploit(state)
                else:
                    action = self.explore()
                new_state, reward, won_lost, _ = self.env.step(action)
                running_reward += reward
                if should_experience:
                    self.experience(state, action, reward, new_state, won_lost)                    
                if won_lost:
                    eps = max(0.011, eps * 0.94)
                    break

                prev_state = state
                state = new_state
                if should_experience:
                    if len(self.memory) < batch_size:
                        continue
                    else:
                        info = random.sample(self.memory, batch_size)
                        info = np.array(info, dtype="float32")
                else:
                    info = np.array([[prev_state], [action], [reward], [new_state], [won_lost]], dtype="float32").T
                self.train_models(info, i, gamma)
                
            rewards.append(running_reward)
        print("mean reward is: {}".format(np.mean(rewards)))
        self.best_actions = []
        for state in range(self.env.nS):
            self.best_actions.append(int(torch.argmax(self.model(self.encode(state)))))
        
    def test(self, num_of_trials):
        print()
        print("Starting to test ...")
        env = self.env
        best_actions = self.best_actions
        env.reset()
        success_rate = 0
        for i in range(num_of_trials):
            state = env.reset()
            for j in range(200):
                new_state, reward, won_lost, _ = env.step(best_actions[state])
                if won_lost:
                    if reward != 0: 
                        success_rate += (1 / num_of_trials)
                    break
                state = new_state
        print(f"success rate is: {success_rate*100}%")

In [15]:
deepqlearn = DeepQLearnFrozenLake()
deepqlearn.fit(epochs=6000, max_eps=1, gamma=0.95, should_experience=False)
deepqlearn.test(num_of_trials=1000)

Training 4% completed...
Training 9% completed...
Training 14% completed...
Training 19% completed...
Training 24% completed...
Training 29% completed...
Training 34% completed...
Training 39% completed...
Training 44% completed...
Training 49% completed...
Training 54% completed...
Training 59% completed...
Training 64% completed...
Training 69% completed...
Training 74% completed...
Training 79% completed...
Training 84% completed...
Training 89% completed...
Training 94% completed...
Training 99% completed...
mean reward is: 0.022833333333333334

Starting to test ...
success rate is: 3.200000000000002%


In [16]:
deepqlearn = DeepQLearnFrozenLake()
deepqlearn.fit(epochs=2000, max_eps=1, gamma=0.99, should_experience=True, batch_size=256)
deepqlearn.test(num_of_trials=1000) 

Training 14% completed...
Training 29% completed...
Training 44% completed...
Training 59% completed...
Training 74% completed...
Training 89% completed...
mean reward is: 0.359

Starting to test ...
success rate is: 74.10000000000005%
