<a href="https://colab.research.google.com/github/tvaditya/intro_ds_and_ml/blob/main/%5BRL5%5DHumanLevelControlThroughDeepReinforcementLearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gym
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch as T

In [None]:
class LinearDeepQNetwork(nn.Module):
  def __init__(self, lr, n_actions, input_dims):
    super(LinearDeepQNetwork, self).__init__()

    self.fc1 = nn.Linear(*input_dims, 128)
    self.fc2 = nn.Linear(128, n_actions)

    self.optimizer = optim.Adam(self.parameters(), lr=lr)
    self.loss = nn.MSELoss()
    self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
    self.to(self.device)

  def forward(self, state):
    layer1 = F.relu(self.fc1(state))
    actions = self.fc2(layer1)

    return actions

In [None]:
# Code the agent class

class Agent():
  def __init__(self, input_dims, n_actions, lr, gamma=0.99,
               epsilon=1.0, eps_dec=1e-5, eps_min=0.01):
    
    self.lr = lr
    self.input_dims = input_dims
    self.n_actions = n_actions
    self.gamma = gamma
    self.eps_dec = eps_dec 
    self.eps_min = eps_min
    self.epsilon = epsilon
    self.action_space = [i for i in range(self.n_actions)]

    self.Q = LinearDeepQNetwork(self.lr, self.n_actions, self.input_dims)

  def choose_action(self, observation):
    if np.random.random() > self.epsilon:
      state = T.tensor(observation, dtype=T.float).to(self.Q.device)
      actions = self.Q.forward(state)
      action = T.argmax(actions).item()
    else:
      action = np.random.choice(self.action_space)

    return action

  def decrement_epsilon(self):
    self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min \
                    else self.eps_min

  def learn(self, state, action, reward, state_):
    self.Q.optimizer.zero_grad()
    states = T.tensor(state, dtype=T.float).to(self.Q.device)
    actions = T.tensor(action).to(self.Q.device)
    rewards = T.tensor(reward).to(self.Q.device)
    states_ = T.tensor(state_, dtype=T.float).to(self.Q.device)

    q_pred = self.Q.forward(states)[actions]

    q_next = self.Q.forward(states_).max()

    q_target = reward + self.gamma*q_next

    loss = self.Q.loss(q_target, q_pred).to(self.Q.device)
    loss.backward()
    self.Q.optimizer.step()
    self.decrement_epsilon()


In [None]:
import collections
import cv2

import numpy as np
import matplotlib.pyplot as plt

def plot_learning_curve(x, scores, epsilons, filename):
  fig = plt.figure()
  ax = fig.add_subplot(111, label="1")
  ax2 = fig.add_subplot(111, label="2", frame_on=False)

  ax.plot(x, epsilons, color="C0")
  ax.set_xlabel("Training Steps", color="C0")
  ax.set_ylabel("Epsilon", color="C0")
  ax.tick_params(axis='x', colors="C0")
  ax.tick_params(axis='y', color="C0")

  N = len(scores)
  running_avg = np.empty(N)
  for t in range(N):
    running_avg[t] = np.mean(scores[max(0, t-100):(t+1)])

  ax2.scatter(x, running_avg, color="C1")
  ax2.axes.get_xaxis().set_visible(False)
  ax2.yaxis.tick_right()
  ax2.set_ylabel('Score', color='C1')
  ax2.yaxis.set_label_position('right')
  ax2.tick_params(axis='y', color="C1")

  plt.savefig(filename)

class RepeatActionAndMaxFrame(gym.Wrapper):
  def __init__(self, env=None, repeat=4):
    super(RepeatActionAndMaxFrame, self).__init__(env)
    self.repeat = repeat
    self.shape = env.observation_space.low.shape
    self.frame_buffer = np.zeros_like((2, self.shape))

  def step(self, action):
    t_reward = 0.0
    done = False
    for i in range(self.repeat):
      obs, reward, done, info = self.env.step(action)
      t_reward += reward
      idx = i%2
      self.frame_buffer[idx] = obs
      if done:
        break

    max_frame = np.maximum(self.frame_buffer[0], self.frame_buffer[1])
    return max_frame, t_reward, done, info

  def reset(self):
    obs = self.env.reset()
    self.frame_buffer = np.zeros_like((2, self.shape))
    self.frame_buffer[0] = obs

    return obs  

  

In [None]:
class PreprocessFrame(gym.ObservationWrapper):
  def __init__(self, shape, env=None):
    super(PreprocessFrame, self).__init__(env)
    self.shape = (shape[2], shape[0], shape[1])
    self.observation_space = gym.spaces.Box(low=0.0, high=1.0,
                                            shape=self.shape, dtype=np.float32)
    
  def observation(self, obs):
    new_frame = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
    resized_screen = cv2.resize(new_frame, self.shape[1:],
                                interpolation=cv2.INTER_AREA)
    new_obs = np.array(resized_screen, dtype=np.uint8).reshape(self.shape)
    new_obs = new_obs / 255.0

    return new_obs

In [None]:
class StackFrames(gym.ObservationWrapper):
  def __init__(self, env, repeat):
    super(StackFrames, self).__init__(env)
    self.observation_space = gym.spaces.Box(
        env.observation_space.low.repeat(repeat, axis=0),
        env.observation_space.high.repeat(repeat, axis=0),
        dtype=np.float32
    )
    self.stack = collections.deque(maxlen=repeat)

  def reset(self):
    self.stack.clear()
    observation = self.env.reset()
    for _ in range(self.stack.maxlen):
      self.stack.append(observation)

    return np.array(self.stack).reshape(self.observation_space.low.shape)

  def observation(self, observation):
      self.stack.append(observation)

      return np.array(self.stack).reshape(self.observation_space.low.shape)

In [None]:
# # initialize our main loop

# if __name__ == '__main__':
#   env = gym.make('CartPole-v1')
#   n_games = 10000
#   scores = []
#   eps_history = []

#   agent = Agent(input_dims=env.observation_space.shape,
#                 n_actions=env.action_space.n, lr=0.0001)
  
#   for i in range(n_games):
#     score = 0
#     done = False
#     obs = env.reset()

#     while not done:
#       action = agent.choose_action(obs)
#       obs_, reward, done, info = env.step(action)
#       score += reward
#       agent.learn(obs, action, reward, obs_)
#       obs = obs_
#     scores.append(score)
#     eps_history.append(agent.epsilon)

#     if i%100 == 0:
#       avg_score = np.mean(scores[-100:])
#       print('episode ', i, 'score %.1f avg score %.1f epsilon %.2f' % (score, avg_score, agent.epsilon))
  
#   filename = 'cartpole_naive_dqn.png'
#   x = [i+1 for i in range(n_games)]
#   plot_learning_curve(x, scores, eps_history, filename)

    