https://www.youtube.com/watch?v=wc-FxNENg9U

In [1]:
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import import_ipynb
import stage

importing Jupyter notebook from stage.ipynb


In [2]:
class DeepQNetwork(nn.Module):
    def __init__(self, lr, input_dims, fc1_dims, fc2_dims, n_actions):
        super(DeepQNetwork, self).__init__()
        self.input_dims = input_dims
        self.fc1_dims = fc1_dims
        self.fc2_dims = fc2_dims
        self.n_actions = n_actions
        self.fc1 = nn.Linear(*self.input_dims, self.fc1_dims) # Input Layer
        self.fc2 = nn.Linear(self.fc1_dims, self.fc2_dims) # Hindden Layer 
        self.fc3 = nn.Linear(self.fc2_dims, self.n_actions) # output Layer
        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.MSELoss()
        self.device = T.device("gpu" if T.cuda.is_available() else 'cpu')
        self.to(self.device)
        
    def forward(self, state):
        x = F.relu(self.fc1(state)) 
        x = F.relu(self.fc2(x))
        action = self.fc3(x)
        
        return action

In [3]:
class Agent():
    def __init__(self, gamma, epsilon, lr, input_dims, batch_size, n_actions, 
                 max_mem_size=100_000, eps_end=0.01, eps_dec=5e-4):
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_min = eps_end
        self.eps_dec = eps_dec
        self.lr = lr
        self.action_space = [i for i in range(n_actions)]
        self.mem_size = max_mem_size
        self.batch_size = batch_size
        self.mem_cntr = 0
        
        self.Q_eval = DeepQNetwork(self.lr, n_actions=n_actions, input_dims=input_dims,
                                  fc1_dims=256, fc2_dims=256)
        self.state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        
        self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.mem_size, dtype=np.bool)
        
    def store_transition(self, state, action, reward, state_, done):
        
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        self.reward_memory[index] = reward
        self.action_memory[index] = action
        self.terminal_memory[index] = done
        
        self.mem_cntr += 1
    def choose_action(self, observation):
        
        if np.random.random() > self.epsilon:
            state = T.tensor([observation]).to(self.Q_eval.device)
            actions = self.Q_eval.forward(state)
            action = T.argmax(actions).item()
        else:
            action = np.random.choice(self.action_space)
            
        return action
    
    def learn(self):
        if self.mem_cntr < self.batch_size:
            return
        
        self.Q_eval.optimizer.zero_grad()
        
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, self.batch_size, replace=False)
        
        batch_index = np.arange(self.batch_size, dtype=np.int32)
        
        state_batch = T.tensor(self.state_memory[batch]).to(self.Q_eval.device)
        new_state_batch = T.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)
        reward_batch = T.tensor(self.reward_memory[batch]).to(self.Q_eval.device)
        terminal_batch = T.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)
        
        action_batch = self.action_memory[batch]
        
        q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]
        q_next = self.Q_eval.forward(state_batch)
        q_next[terminal_batch] = 0.0
        
        q_target = reward_batch + self.gamma * T.max(q_next, dim=1)[0]
        
        loss = self.Q_eval.loss(q_target, q_eval).to(self.Q_eval.device)
        loss.backward()
        self.Q_eval.optimizer.step()
        
        if self.epsilon > self.eps_min:
            self.epsilon = self.epsilon - self.eps_dec        
            
        

In [4]:
class DeepQLearn:
    def __init__(self, env: stage.Environment, agents: Agent):
        self.agents = agents
        self.env = env
        
    def train(self, cap: int, episodes: int):
        scores, eps_history = [], []
        n_agents = len(self.agents)
        
        for i in range(episodes):
            env = self.env
            score = 0
            done = False
            env.reset()
            observation = self.state()
            time = 0
            while not env.isDone():
                agent_id = time % n_agents
                agent = self.agents[agent_id]
                action = agent.choose_action(observation)
                env.nextStep(agent_id, action)
                reward = env.reward(agent_id)
                score += reward
                observation_ = self.state(agent_id)
                agent.store_transition(observation, action, reward, observation_, env.isDone())
                agent.learn()
                observation = observation_
                time+= 1
                
                if time > cap:
                    break
                
                
            scores.append(score)
            eps_history.append(agent.epsilon)

            avg_score = np.mean(scores[-100:])
            print('episode', i, 'score %.2f' % score, 'averaeg score %.2f' % avg_score, "epsilon %.2f" % agent.epsilon)
    
    def state(self, agent_id):
        cur_state = []
        agent = self.env.agents[agent_id]
        # Append agent's position
        for pos in agent.getPos():
            cur_state.append(float(pos))
        # Append agent's end point
        for end in self.env.end_points[0]:
            cur_state.append(float(end))
        # Append walls
        x = int(cur_state[0])
        y = int(cur_state[1])
        for i in range(-1, 2):
            for j in range(-1, 2):
                if not(i!=0  ^ j!=0):
                    continue
                if self.env.board[x+i][y+j] == "x":
                    cur_state.append(1.0)
                else:
                    cur_state.append(0.0)
                    
        return cur_state
    
    def play(self, cap):
        env = self.env
        n_agents = len(self.agents)
        env.reset()
        frames = []
        time = 0
        while not env.isDone():
            agent_id = time % n_agents
            agent = self.agents[agent_id]
            time += 1
            observation = self.state()
            action = agent.choose_action(observation)
            env.nextStep(agent_id, action)
            frames.append(env.render())
            if time> cap:
                break
        env.reset()
        
        return frames
    
    def show(self, cap = 2_000):
        frames = self.play(cap)
        stage.print_frames(frames)
                
        
        
        