<a href="https://colab.research.google.com/github/jimmy93029/Selected_topic_for_RL_sophomore_fall/blob/master/lab2_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 強化學習專論 lab2 Colab code

enviroment

In [None]:
%pip install "gym[atari, accept-rom-license]"

In [6]:
import torch
import torch.nn as nn
import numpy as np
import os
import time
from collections import deque
from torch.utils.tensorboard import SummaryWriter
from abc import ABC, abstractmethod

In [7]:
from collections import deque
import random

In [9]:
from torch.utils.tensorboard import SummaryWriter
import gym

In [None]:
import ale_py

print(gym.envs.registry.keys())

## **Tools**

### DQN 模型

In [11]:
class AtariNetDQN(nn.Module):
    def __init__(self, num_classes=9, init_weights=True):
        super(AtariNetDQN, self).__init__()
        self.cnn = nn.Sequential(nn.Conv2d(4, 32, kernel_size=8, stride=4),
                                        nn.ReLU(True),
                                        nn.Conv2d(32, 64, kernel_size=4, stride=2),
                                        nn.ReLU(True),
                                        nn.Conv2d(64, 64, kernel_size=3, stride=1),
                                        nn.ReLU(True)
                                        )
        self.classifier = nn.Sequential(nn.Linear(7*7*64, 512),
                                        nn.ReLU(True),
                                        nn.Linear(512, num_classes)
                                        )

        if init_weights:
            self._initialize_weights()

    def forward(self, x):
        x = x.float() / 255.
        x = self.cnn(x)
        x = torch.flatten(x, start_dim=1)
        x = self.classifier(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0.0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1.0)
                nn.init.constant_(m.bias, 0.0)
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                nn.init.constant_(m.bias, 0.0)


### reply buffer

In [12]:
class ReplayMemory(object):
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def append(self, *transition):
        """Saves a transition"""
        self.buffer.append(tuple(map(tuple, transition)))

    def sample(self, batch_size, device):
        """Sample a batch of transitions"""
        transitions = random.sample(self.buffer, batch_size)
        return (torch.tensor(np.asarray(x), dtype=torch.float, device=device) for x in zip(*transitions))

&emsp;

## **Agents**

### base agent

In [24]:
class DQNBaseAgent(ABC):
	def __init__(self, config):
		self.gpu = config["gpu"]
		self.device = torch.device("cuda" if self.gpu and torch.cuda.is_available() else "cpu")
		self.total_time_step = 0
		self.training_steps = int(config["training_steps"])
		self.batch_size = int(config["batch_size"])
		self.epsilon = 1.0
		self.eps_min = config["eps_min"]
		self.eps_decay = config["eps_decay"]
		self.eval_epsilon = config["eval_epsilon"]
		self.warmup_steps = config["warmup_steps"]
		self.eval_interval = config["eval_interval"]
		self.eval_episode = config["eval_episode"]
		self.gamma = config["gamma"]
		self.update_freq = config["update_freq"]
		self.update_target_freq = config["update_target_freq"]

		self.replay_buffer = ReplayMemory(int(config["replay_buffer_capacity"]))
		self.writer = SummaryWriter(config["logdir"])

	@abstractmethod
	def decide_agent_actions(self, observation, epsilon=0.0, action_space=None):
		### TODO ###
		# get action from behavior net, with epsilon-greedy selection

		return NotImplementedError

	def update(self):
		if self.total_time_step % self.update_freq == 0:
			self.update_behavior_network()
		if self.total_time_step % self.update_target_freq == 0:
			self.update_target_network()

	@abstractmethod
	def update_behavior_network(self):
		# sample a minibatch of transitions
		state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size, self.device)
		### TODO ###
		# calculate the loss and update the behavior network

	def update_target_network(self):
		self.target_net.load_state_dict(self.behavior_net.state_dict())

	def epsilon_decay(self):
		self.epsilon -= (1 - self.eps_min) / self.eps_decay
		self.epsilon = max(self.epsilon, self.eps_min)

	def train(self):
		episode_idx = 0
		while self.total_time_step <= self.training_steps:
			observation, info = self.env.reset()
			episode_reward = 0
			episode_len = 0
			episode_idx += 1
			while True:
				if self.total_time_step < self.warmup_steps:
					action = self.decide_agent_actions(observation, 1.0, self.env.action_space)
				else:
					action = self.decide_agent_actions(observation, self.epsilon, self.env.action_space)
					self.epsilon_decay()

				next_observation, reward, terminate, truncate, info = self.env.step(action)
				self.replay_buffer.append(observation, [action], [reward], next_observation, [int(terminate)])

				if self.total_time_step >= self.warmup_steps:
					self.update()

				episode_reward += reward
				episode_len += 1

				if terminate or truncate:
					self.writer.add_scalar('Train/Episode Reward', episode_reward, self.total_time_step)
					self.writer.add_scalar('Train/Episode Len', episode_len, self.total_time_step)
					print(f"[{self.total_time_step}/{self.training_steps}]  episode: {episode_idx}  \
					episode reward: {episode_reward}  episode len: {episode_len}  epsilon: {self.epsilon}")
					break

				observation = next_observation
				self.total_time_step += 1

			if episode_idx % self.eval_interval == 0:
				# save model checkpoint
				avg_score = self.evaluate()
				self.save(os.path.join(self.writer.log_dir, f"model_{self.total_time_step}_{int(avg_score)}.pth"))
				self.writer.add_scalar('Evaluate/Episode Reward', avg_score, self.total_time_step)

	def evaluate(self):
		print("==============================================")
		print("Evaluating...")
		all_rewards = []
		for i in range(self.eval_episode):
			observation, info = self.test_env.reset()
			total_reward = 0
			while True:
				self.test_env.render()
				action = self.decide_agent_actions(observation, self.eval_epsilon, self.test_env.action_space)
				next_observation, reward, terminate, truncate, info = self.test_env.step(action)
				total_reward += reward
				if terminate or truncate:
					print(f"episode {i+1} reward: {total_reward}")
					all_rewards.append(total_reward)
					break

				observation = next_observation


		avg = sum(all_rewards) / self.eval_episode
		print(f"average score: {avg}")
		print("==============================================")
		return avg

	# save model
	def save(self, save_path):
		torch.save(self.behavior_net.state_dict(), save_path)

	# load model
	def load(self, load_path):
		self.behavior_net.load_state_dict(torch.load(load_path))

	# load model weights and evaluate
	def load_and_evaluate(self, load_path):
		self.load(load_path)
		self.evaluate()

### DQN_Agent ( 作業重點 )

In [25]:
class AtariDQNAgent(DQNBaseAgent):
	def __init__(self, config):
		super(AtariDQNAgent, self).__init__(config)

		self.env = gym.make(config["env_id"])

		self.test_env = gym.make(config["env_id"])

		# initialize behavior network and target network
		self.behavior_net = AtariNetDQN(self.env.action_space.n)
		self.behavior_net.to(self.device)
		self.target_net = AtariNetDQN(self.env.action_space.n)
		self.target_net.to(self.device)
		self.target_net.load_state_dict(self.behavior_net.state_dict())

		# initialize optimizer
		self.lr = config["learning_rate"]
		self.optim = torch.optim.Adam(self.behavior_net.parameters(), lr=self.lr, eps=1.5e-4)

	def decide_agent_actions(self, observation, epsilon=0.0, action_space=None) -> int:
		if random.random() <= epsilon:
			return random.randrange(self.env.action_space.n)
		else:
			action = self.behavior_net(observation).argmax().cpu().item()
			return action

	def choose_batch_actions(self, net, states) -> torch.Tensor:
		if net == "behavior":
			_, actions = self.behavior_net(states).max(dim=1, keepdim=True)
			return actions
		elif net == "target":
			_, actions = self.target_net(states).max(dim=1, keepdim=True)
			return actions

	def Q_value(self, net, states, actions):
		# get Q values from actions and target network
		if net == "target":
			next_q_values = self.target_net.forward(states).gather(dim=1, index=actions)
			return next_q_values
		elif net == "behavior":
			q_values = self.behavior_net.forward(states).gather(dim=1, index=actions)
			return q_values

	# implement DDQN
	def update_behavior_network(self):
		# sample a minibatch of transitions
		state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size, self.device)

		next_actions = self.choose_batch_actions("behavior", next_state)
		Q_target = reward + self.gamma * self.Q_value("target", next_state, next_actions)
		Q_output = self.Q_value("behavior", state, action)

		criterion = nn.SmoothL1Loss()
		loss = criterion(Q_output, Q_target)

		self.writer.add_scalar('DQN/Loss', loss.item(), self.total_time_step)

		self.optim.zero_grad()
		loss.backward()
		self.optim.step()

&emsp;

## main

In [26]:
if __name__ == '__main__':
    # my hyperparameters, you can change it as you like
    config = {
		"gpu": True,
		"training_steps": 1e8,
		"gamma": 0.99,
		"batch_size": 32,
		"eps_min": 0.1,
		"warmup_steps": 20000,
		"eps_decay": 1000000,
		"eval_epsilon": 0.01,
		"replay_buffer_capacity": 100000,
		"logdir": 'log/DQN/',
		"update_freq": 4,
		"update_target_freq": 10000,
		"learning_rate": 0.0000625,
        "eval_interval": 100,
        "eval_episode": 5,
		"env_id": 'ALE/MsPacman-v5',
	}
    agent = AtariDQNAgent(config)
    agent.train()

ValueError: ignored