In [1]:
import numpy as np
# import matplotlib.pyplot as plt


import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

import gym
import minihack

In [2]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.is_available()

True

# Simple Neural Network (Policy)

In [3]:
class PolicyNetwork(nn.Module):
    def __init__(self, obs_space=4, hidden=16, act_space=2):
        ''' A simple Neural Network.
        :params obs_space: Observation space (Default=4)
        :params hidden: Hiddens size (Default=16)
        :params act_space: Action space (Default=2)
        '''
        super(PolicyNetwork, self).__init__()
        
        self.linear1 = nn.Linear(obs_space, hidden)
        self.dropout = nn.Dropout(p=0.6)
        self.linear2 = nn.Linear(hidden, act_space)

    def forward(self, x):

        x = self.linear1(x)
        x = self.dropout(x)
        x = F.relu(x)
        actions = self.linear2(x)

        act_probs = F.softmax(actions, dim=1)

        return act_probs

# REINFORCE Agent

In [23]:
class Agent():
    
    def __init__(self,obs_size, action_size, policy_model, optimizer):
        self.eps = np.finfo(np.float32).eps.item()
        self.obs_size = obs_size
        self.action_size = action_size
        self.policy_model = policy_model
        self.optim = optimizer
        
        self.states = []
        self.actions = []
        self.gradients = []
        self.rewards = []
        self.probs = []
        
        
    def __compute_returns(self,rewards, gamma):
        returns = []
        cumul_rets = 0
        
        for reward in reversed(rewards):
            cumul_rets = reward + cumul_rets*gamma
            returns.append(0, cumul_sum)
            
        returns = torch.tensor(G).to(DEVICE)
        returns = (returns - returns.mean()) / (G.std() + self.eps)
        return returns
        
    def choose_action(self,  state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(DEVICE)
        
        probs = self.policy_model(state)
        state = state.detach()
        
        dist = Categorical(probs)
        action = dist.sample()
        
        return action.item(), dist.log_prob(action)
    
    def save_trajectory(self, state, action, reward, prob):
        self.states.append(state)
        self.actions.append(action)
        self.rewards.append(reward)
        self.prob.append(prob)
        
    
    def train(self,):
        G = self.__compute_returns(self.rewards)
        
        policy_loss= []
        for ret,prob in zip(G, self.probs):
            policy_loss.append(-prob*ret)
        
        
        # Backpropagation
        self.optim.zero_grad()
        policy_loss = torch.cat(policy_loss).sum()
        policy_loss.backward()
        self.optim.step()
        
        # Reset
        self.states, self.probs, self.gradients, self.rewards = [],[],[],[]

In [36]:
def main(env, agent, seed, num_episodes, max_episode_len, gamma):
    
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
#     random.seed(seed)
    env.seed(seed)
    
    tot_scores = []
    
    for ep in range(num_episodes):
        
        state = env.reset()['glyphs']
        score = 0
        
        # Create episode
        for step in range(max_episode_len):
            act, prob = agent.choose_action(state)
            
            new_state['glyphs'], reward, done, _ = env.step(act)
            
            score +=reward
        
            agent.save_trajectory(state, act, reward, prob)
            
            if done:
                break
            state = new_state
        
        tot_scores.append(score)
        
        # For every step in episode
        agent.train()
        
        if ep % 100 == 0:
            print(f'Episode {ep}\tLast score: {score}\tAverage reward: {np.array(scores).mean()}')
        

# Training

In [37]:
# Initialise
env = gym.make("MiniHack-Quest-Hard-v0",observation_keys=("glyphs","pixel"))
env_obs_space = env.observation_space['glyphs'].shape[0]

policy_model = PolicyNetwork(obs_space=env_obs_space, hidden=128, act_space=env.action_space.n)
policy_model.to(DEVICE)
optimizer = optim.Adam(policy_model.parameters(), lr=1e-2)


agent = Agent(env_obs_space, env.action_space.n, policy_model, optimizer)

main(env, agent, seed=54, num_episodes=1500, max_episode_len=1000, gamma=0.99)

/home/thabo/anaconda3/envs/MINIHACK_ENV/lib/python3.8/site-packages/minihack/scripts/mh_patch_nhdat.sh: line 25: cd: too many arguments


RuntimeError: mat1 and mat2 shapes cannot be multiplied (21x79 and 21x128)