In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 安裝gymnassium套件

In [2]:
!pip install gymnasium
!pip install swig
!pip install gymnasium[box2d]

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/958.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m50.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0
Collecting swig
  Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m49.2 M

# import 必要套件

In [3]:
import random
import time,math
import numpy as np
import gymnasium as gym
import gymnasium.wrappers as gym_wrap
import matplotlib.pyplot as plt
import matplotlib.animation as animation #輸出動畫影片
import matplotlib.font_manager as plt_font
twfont1=plt_font.FontProperties(fname="/content/drive/MyDrive/解密AI黑盒子分享/字型/kaiu.ttf")
from IPython import display
from tqdm import tqdm

In [4]:
import torch
import torch.nn.functional as F
import collections
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [5]:
class ImageEnv(gym.Wrapper):
  def __init__(self,env,stack_frames=4,delay_op=50):
    super(ImageEnv, self).__init__(env)
    self.delay_op = delay_op
    self.stack_frames = stack_frames
  def reset(self):
    s, info = self.env.reset()
    for i in range(self.delay_op):
      s, r, terminated, truncated, info = self.env.step(0)
      s=s[:84, 6:90]/255.0
      self.stacked_state = np.tile( s , (self.stack_frames,1,1) )  # [4, 84, 84]
    return self.stacked_state, info

  def step(self, action):
    reward = 0
    for _ in range(self.stack_frames):
      s, r, terminated, truncated, info = self.env.step(action)
      if r==-100:terminated=True
      s=s[:84, 6:90]/255.0
      reward += r
      if terminated or truncated:break
      self.stacked_state = np.concatenate((self.stacked_state[1:], s[np.newaxis]), axis=0)
    return self.stacked_state, reward, terminated, truncated, info

# 建立SumTree類別

In [6]:
class SumTree:
  def __init__(self,capacity):
    self.capacity=capacity
    self.tree=np.zeros(2*capacity-1)
    self.data=np.zeros(capacity,dtype=object)
    self.size=0
    self.ptr=0

  def add(self,priority,data):
    tree_idx=self.ptr+self.capacity-1
    self.data[self.ptr]=data
    self.update(tree_idx,priority)
    self.ptr=(self.ptr+1) % self.capacity
    self.size=min(self.size+1,self.capacity)

  def update(self,tree_idx,priority):
    delta=priority-self.tree[tree_idx]
    self.tree[tree_idx]=priority
    while tree_idx != 0:
      tree_idx=(tree_idx-1)//2
      self.tree[tree_idx]+=delta

  def sample(self,value):
    tree_idx=0
    while tree_idx<self.capacity-1:
      left=2*tree_idx+1
      right=left+1
      if value<=self.tree[left]:
        tree_idx=left
      else:
        value-=self.tree[left]
        tree_idx=right
    data_idx=tree_idx-self.capacity+1
    return tree_idx,self.tree[tree_idx],self.data[data_idx]

  def total_priority(self):
    return self.tree[0]

# 建立Replay Buffer類別

In [7]:
class ReplayBuffer:
  def __init__(self,capacity,alpha=0.6,max_size=int(1e5),num_steps=1):  #alpha控制優先影響程度
    self.s = np.zeros((max_size,4,84,84), dtype=np.float32)
    self.a = np.zeros((max_size,), dtype=np.int64)
    self.r = np.zeros((max_size, 1), dtype=np.float32)
    self.s_ = np.zeros((max_size,4,84,84), dtype=np.float32)
    self.done = np.zeros((max_size, 1), dtype=np.float32)
    self.ptr = 0
    self.size = 0
    self.max_size = max_size
    self.num_steps = num_steps
    self.alpha=alpha
    self.tree=SumTree(capacity)
    self.capacity=capacity

  def append(self,s,a,r,s_,done):
    self.s[self.ptr] = s
    self.a[self.ptr] = a
    self.r[self.ptr] = r
    self.s_[self.ptr] = s_
    self.done[self.ptr] = done
    self.ptr = (self.ptr + 1) % self.max_size
    self.size = min(self.size+1,self.max_size)
    max_priority=np.max(self.tree.tree[-self.tree.capacity:])
    if max_priority==0:
      max_priority=0.01
    self.tree.add(max_priority,(s,a,r,s_,done))

  def sample(self,batch_size,beta=0.4):   #控制重要性採樣
    indices=[]
    priorities=[]
    samples=[]
    segment=self.tree.total_priority()/batch_size
    for i in range(batch_size):
      value=np.random.uniform(segment*i,segment*(i+1))
      idx,priority,data=self.tree.sample(value)
      indices.append(idx)
      priorities.append(priority)
      samples.append(data)
    sampling_pro=np.array(priorities)/self.tree.total_priority()
    is_weights=np.power(self.tree.size*sampling_pro,-beta)
    is_weights/=is_weights.max()
    b_s,b_a,b_r,b_s_,b_done=zip(*samples)
    return (np.array(b_s),np.array(b_a),np.array(b_r),np.array(b_s_),np.array(b_done),indices,is_weights)

  def update_priorities(self,indices,priorities):
    for idx,priority in zip(indices,priorities):
      self.tree.update(idx,priority**self.alpha)

# 搭建DQN神經網路的類別

In [8]:
class DQN(torch.nn.Module):
  def __init__(self,n_act):
    super(DQN,self).__init__()
    self.conv1 = torch.nn.Conv2d(4, 16, kernel_size=8, stride=4)  #[N,4,84,84]->[N,16,20,20]
    self.conv2 = torch.nn.Conv2d(16, 32, kernel_size=4, stride=2)  #[N,16,20,20]->[N,32,9,9]
    self.fc1 = torch.nn.Linear(32 * 9 * 9, 256)
    self.fc2 = torch.nn.Linear(256, n_act)
  def forward(self,x):
    x = F.relu(self.conv1(x))
    x = F.relu(self.conv2(x))
    x = x.view((-1, 32 * 9 * 9))
    x = self.fc1(x)
    x = self.fc2(x)
    return x

# 設定是否載入模型參數，舊參數檔路徑，新參數檔路徑

In [9]:
Load_File=0
folder="/content/drive/MyDrive/強化學習期末專題(小組)/PrioritizedDQN/model/"
Old_File=folder+f"model_PrioritizedDQN-{Load_File}.pt"
if Load_File>0:
  Log= np.load(folder+f"Log_PrioritizedDQN-{Load_File}.npy", allow_pickle=True).item()
else:
  Log={"TrainReward":[],"TestReward":[],"Loss":[]} # 確認模型如何變好

In [10]:
env=gym.make('CarRacing-v3',render_mode="rgb_array",domain_randomize=False, continuous=False)
env = gym_wrap.GrayscaleObservation(env)
env = ImageEnv(env)

# 搭建智能體Agent的類別

In [11]:
class DQNAgent():
  def __init__(self,gamma=0.9,eps_low=0.1,lr=0.001,beta_start=0.4,beta_end=1.0):
    self.env=env
    self.n_act=self.env.action_space.n
    self.PredictDQN= DQN(self.n_act)
    self.TargetDQN= DQN(self.n_act)
    if Load_File>0:
      self.PredictDQN.load_state_dict(torch.load(Old_File))
      self.TargetDQN.load_state_dict(torch.load(Old_File))
    self.PredictDQN.to(device)
    self.TargetDQN.to(device)
    self.LossFun=torch.nn.SmoothL1Loss()
    self.optimizer=torch.optim.Adam(self.PredictDQN.parameters(),lr=lr)
    self.gamma=gamma
    self.eps_low=eps_low
    self.rb=ReplayBuffer(capacity=1000000,alpha=0.6)
    self.beta_start=beta_start
    self.beta_end=beta_end
  def PredictA(self,s):
    with torch.no_grad():
      return torch.argmax(self.PredictDQN(torch.FloatTensor(s).to(device))).item()
  def SelectA(self,a):
    return self.env.action_space.sample() if np.random.random()<self.EPS else a
  def Train(self,N_EPISODES):
    for i in tqdm(range(Load_File,N_EPISODES)):
      beta=self.beta_start+(self.beta_end-self.beta_start)*i/N_EPISODES
      self.EPS=self.eps_low+(1-self.eps_low)*math.exp(-i*5/(N_EPISODES))
      total_reward=0
      s,_=self.env.reset()
      while True:
        a=self.SelectA(self.PredictA(s))
        s_,r,done,stop,_=self.env.step(a)
        self.rb.append(s,a,r,s_,done)
        if self.rb.size > 200 and i%self.rb.num_steps==0: self.Learn()
        if i % 20==0:  self.TargetDQN.load_state_dict(self.PredictDQN.state_dict())
        s=s_
        total_reward+=r
        if done or stop:break
      Log["TrainReward"].append(total_reward)
      if i % 10 == 9:
        test_reward=self.Test()
        print(f"\n訓練次數{i+1}，總回報{test_reward}")
        Log["TestReward"].append(test_reward)
        torch.save(self.PredictDQN.state_dict(), f"{folder}/model_PrioritizedDQN-{i+1}.pt")
        np.save(f"{folder}/Log_PrioritizedDQN-{i+1}.npy", Log)
  def Learn(self):
    self.optimizer.zero_grad()
    batch_s,batch_a,batch_r,batch_s_,batch_done,indices,is_weights=self.rb.sample(32,beta=0.4)
    batch_s=torch.FloatTensor(batch_s).to(device)
    batch_a=torch.LongTensor(batch_a).to(device)
    batch_r=torch.FloatTensor(batch_r).to(device)
    batch_s_=torch.FloatTensor(batch_s_).to(device)
    batch_done=torch.FloatTensor(batch_done).to(device)
    is_weights=torch.FloatTensor(is_weights).to(device)
    predict_Q=(self.PredictDQN(batch_s.to(device))*F.one_hot(batch_a.long().to(device),self.n_act)).sum(1)
    with torch.no_grad():
      next_act=self.PredictDQN(batch_s_.to(device)).argmax(1,keepdims=True)
      target_Q=batch_r.to(device)+(1-batch_done.to(device))*self.gamma*self.TargetDQN(batch_s_).gather(1,next_act).squeeze(1)
    td_error=(predict_Q-target_Q).detach().cpu().numpy()
    self.rb.update_priorities(indices,abs(td_error))
    loss=(is_weights*(predict_Q-target_Q)**2).mean()
    Log["Loss"].append(float(loss))
    loss.backward()
    self.optimizer.step()
  def Test(self,VIDEO=False):
    total_reward=0
    video=[]
    s,_=self.env.reset()
    while True:
      video.append(self.env.render())
      a=self.PredictA(s)
      s,r,done,stop,_=self.env.step(a)
      total_reward+=r
      if done or stop:break
    if VIDEO:
      patch = plt.imshow(video[0]) #產生展示圖形物件
      plt.axis('off') #關閉坐標軸
      def animate(i): #設定更換影格的函數
        patch.set_data(video[i])
        #plt.gcf()=>建新繪圖區 animate=>更換影格函數 frames=>影格數 interval=>影隔間距(毫秒)
      anim = animation.FuncAnimation(plt.gcf(),animate,frames=len(video),interval=200)
      anim.save('Car_racing.mp4') #儲存為mp4擋
    return total_reward

# 實體化智能體Agent，開始訓練智能體

In [None]:
Agent=DQNAgent(gamma=0.95,eps_low=0.05,lr=0.00025)
Agent.Train(N_EPISODES=16000)

  0%|          | 9/16000 [02:06<63:20:00, 14.26s/it]


訓練次數10，總回報-64.66802973977737


  0%|          | 20/16000 [05:09<85:00:12, 19.15s/it]


訓練次數20，總回報-94.99999999999899


  0%|          | 26/16000 [06:31<62:42:20, 14.13s/it]

In [None]:
plt.figure(figsize=(8,5)) #設定繪圖區大小
#繪圖區的標題，設定用中文字體twfont1，字體大小15
plt.title("TestReward vs episodes",fontsize=15)
#設定橫軸和縱軸的標題
plt.xlabel("TestReward",fontsize=15)
plt.ylabel("episodes(x100)",fontsize=15)
plt.plot(Log["TestReward"],"b-",label="Reward")
plt.legend()

## 測試智能體平均表現


## 生成智能體Agent測試影片

In [None]:
Agent.Test(VIDEO=True)