In [None]:
from torch import randint
from torch import nn, optim
import torch 
import gym
import numpy as np

from collections import deque
import random

from scores.score_logger import ScoreLogger

In [None]:
class Learners: 
    def __init__(self):
        self.Q = "Q"
        self.SARSA = "SARSA"

NV_NAME = "CartPole-v1"
GAMMA = 0.95
MEMORY = 1000000
BATCH_SIZE = 50
LEARNING_RATE = 0.01
STEP_SIZE = 1
EXPLORATION_DECAY = 0.995
EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.01


In [None]:
class DQN:
    def __init__(self, observation_space, action_space, learner):
        self.model = nn.Sequential(
            nn.Linear(observation_space.shape[0], 16),
            nn.ReLU(),
            nn.Linear(16, 16),
            nn.ReLU(),
            nn.Linear(16, 16),
            nn.ReLU(),
            nn.Linear(16, action_space.n)
        )
        self.observation_space = observation_space
        self.action_space = action_space
        self.learner = learner

        self.optimizer = optim.Adam(self.model.parameters(), lr=LEARNING_RATE)
        self.loss_fn = nn.MSELoss()
        self.exploration_rate = EXPLORATION_MAX
        self.discount = GAMMA
        self.memory = deque(maxlen=MEMORY)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    # Sometimes act randomly. Do so less and less as the exploration rate decays.
    def act(self, state):
        if (np.random.rand() < self.exploration_rate):
            return self.action_space.sample()
        # print(self.model(torch.from_numpy(state)))
        # print(self.model(torch.from_numpy(state)).argmax().item())
        return self.model(torch.from_numpy(state)).argmax().item()
    
    def get_q_next(self, next_state):
        next_qs = self.model(torch.from_numpy(next_state))

        if self.learner == "Q":
            return self.discount * next_qs.max()
        elif self.learner == "SARSA":
            return next_qs[0][random.randint(0,next_qs[0].size(0) - 1)]

    def experience_replay(self):
        # Don't replay if we don't have enough memory
        if len(self.memory) < BATCH_SIZE:
            return
            
        batch = random.sample(self.memory, BATCH_SIZE)
        # self.optimizer.zero_grad()
        for state, action, reward, next_state, terminal in batch:    
            q_update = reward
            # Update the q value for the action we took
            # Bellman inspired update
            # Current state rewards plus next state rewards discounted by gamma
            old_q_values = self.model(torch.from_numpy(state))
            if not terminal:
                q_update = old_q_values[0][action] + STEP_SIZE * (reward + self.get_q_next(next_state) - old_q_values[0][action])
            else: 
                # create long tensor
                q_update = torch.tensor(q_update, dtype=torch.float32)
            
            ## Update the new Q value
            new_q_values = old_q_values.clone()

            ## Update the q_value for the action we took
            new_q_values[0][action] = q_update

            ## Update the q_value for the action we took
            loss = self.loss_fn(old_q_values, new_q_values)

            # We reset the optimizer each time because we are training in batches of one
            self.optimizer.zero_grad()

            # Back propagate the loss
            loss.backward(retain_graph=True)

            # Update the weights
            self.optimizer.step()   
            
        # Decay the exploration rate
        self.exploration_rate *= EXPLORATION_DECAY
        self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)     

    

In [None]:
# Create environment and a way to track the score
env = gym.make(ENV_NAME)
score_logger = ScoreLogger(ENV_NAME)
learner = Learners().Q

# Reset the environment and get the first state
state, info = env.reset(seed=46, return_info=True)

# Create the agent
DQN_AGENT = DQN(env.observation_space, env.action_space, learner)

run = 0 # run is the number of episodes
while run < 100:
    run += 1
    state = env.reset()
    state = np.reshape(state, [1, env.observation_space.shape[0]])
    step = 0
    while(True): 
        step += 1

        # Predict action then take action in environment
        action = DQN_AGENT.act(state)
        state_next, reward, terminal, info = env.step(action)

        # Get set reward negative if game over
        reward = reward if not terminal else -reward
        state_next = np.reshape(state_next, [1, env.observation_space.shape[0]])

        # Store experience in memory
        DQN_AGENT.remember(state, action, reward, state_next, terminal)
        state = state_next

        if terminal:
            print("Run: " + str(run) + ", exploration: " + str(DQN_AGENT.exploration_rate) + ", score: " + str(step))
            score_logger.add_score(step, run)
            break
        
        # Experience replay - train model
        DQN_AGENT.experience_replay()
        


In [None]:
model = nn.Sequential(
            nn.Linear(4, 16),
            nn.ReLU(),
            nn.Linear(16, 16),
            nn.ReLU(),
            nn.Linear(16, 16),
            nn.ReLU(),
            nn.Linear(16, 2)
        )

In [None]:
model(torch.from_numpy(state))

In [None]:
## select a random value from the tensor
model(torch.from_numpy(state))[0][random.randint(0,model(torch.from_numpy(state))[0].size(0) - 1)]


In [None]:
#get a random integer between 0 and 4
random.randint(0,model(torch.from_numpy(state))[0].size(0) - 1)