In [10]:
import torch
from torch import nn
import gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import copy

In [12]:
class Agent():
  def __init__(self, observations, actions, env, eps, disc, epochs, buffer_size, lr=0.01, batch_size=256):
    super(Agent,self).__init__()
    self.eps = eps
    self.epochs = epochs
    self.obs_shape = observations
    self.num_actions = actions
    self.env = env
    self.action_buffer = deque(maxlen=buffer_size)
    self.batch_size = batch_size
    self.lr = lr
    self.disc = disc

  def DQN(self):
    model = torch.nn.Sequential(nn.Linear(*self.obs_shape, 128),
                                nn.ReLU(),
                                nn.Linear(128, self.num_actions),
                              )
    return model


  def train(self):

    model = self.DQN()
    model2 = copy.deepcopy(model)
    lossFN = torch.nn.MSELoss()
    optim = torch.optim.Adam(model.parameters(), lr=self.lr)
    losses = []
    for i in range(self.epochs):
      done = False;
      score = 0
      state = self.env.reset()
      while not done:
        state = torch.tensor(state,dtype=torch.float32)
        qval = model(state)

        if random.random() < self.eps:
            action = torch.argmax(qval).item()
        else:
            action = np.random.randint(self.num_actions)

        state_, reward, done, _ = self.env.step(action)

        score += reward
        state_ = torch.tensor(state_, dtype=torch.float32)
        self.action_buffer.append((state, action, reward, state_, done))
        state = state_

        if len(self.action_buffer) >= self.batch_size:
          minibatch = random.sample(self.action_buffer, self.batch_size)
          state1_batch = [s for (s, a, r, s_, d) in minibatch]
          action_batch = [a for (s, a, r, s_, d) in minibatch]
          reward_batch = [r for (s, a, r, s_, d) in minibatch]
          state2_batch = [s_ for (s, a, r, s_, d) in minibatch]
          done_batch = [d for (s, a, r, s_, d) in minibatch]
          q1 = model(torch.stack(state1_batch))
          with torch.no_grad():
              q2 = model2(torch.stack(state2_batch))

          X = q1[torch.arange(0, self.batch_size), action_batch]
          Y = torch.tensor(reward_batch) + self.disc * torch.max(q2, dim=1)[0] * (1-torch.tensor(done_batch,dtype=torch.int32))
          Y = Y.detach()

          loss = lossFN(Y, X)
          optim.zero_grad()
          losses.append(loss.item())
          loss.backward()
          optim.step()
      scores.append(score)
      model2.load_state_dict(model.state_dict())
      if(self.eps < 0.975):
        self.eps += 0.01
      print("Epoch: ", i, "score: ", score," eps: ",self.eps, " action buffer:", len(self.action_buffer))
    env.close()
    return scores,losses




In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

"""available environments
    MountainCar-v0
    CartPole-v1
    Acrobot-v1"""

env = gym.make('MountainCar-v0')        #render_mode="human"
num_actions = env.action_space.n
obs_shape = env.observation_space.shape
epss = 0.0
replay_sizze = 2048
discc = 0.99
epochs=500
agent = Agent(obs_shape, num_actions, env, epss, discc, epochs, replay_sizze, lr=0.01, batch_size=252)
scores,losses = agent.train()

In [None]:
#torch.save(model, 'acrobot.pth')
fig, axs = plt.subplots(2)
axs[1].plot(losses)
axs[1].set_title("loss")
axs[0].plot(scores)
axs[0].set_title("reward")
plt.show()