<a href="https://colab.research.google.com/github/skywalker0803r/mxnet_course/blob/master/mxnet_ReplayBuffer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
#!pip install mxnet
import gym
import numpy as np
from mxnet import ndarray as nd
import pandas as pd

class ReplayBuffer(object):
    def __init__(self, replay_buffer_size):
        self.replay_buffer_size = replay_buffer_size
        self.memory = []
        self.position = 0
    
    @property
    def memory_len(self):
        return len(self.memory)

    def push(self,Transition):
        if len(self.memory) < self.replay_buffer_size:
            self.memory.append(None)
        self.memory[self.position] = Transition
        self.position = (self.position + 1) % self.replay_buffer_size

    def sample(self, batch_size,prior_prob = None):
        batch_index = np.random.choice(len(self.memory),size = batch_size,replace = False,p = prior_prob)
        batch = np.array(self.memory)[batch_index]
        state_dim = len(batch[0,0])
        
        # batch_state
        batch_state = nd.array(np.zeros(shape=(batch_size,state_dim)))
        for i in range(batch_size):
          batch_state[i] = nd.array(batch[i,0])
        
        # batch action
        batch_action = nd.array(batch[:,1])

        # batch reward
        batch_reward = nd.array(batch[:,2])

        # batch done
        batch_done = nd.array(batch[:,3])

        # batch_next_state
        batch_next_state = nd.array(np.zeros(shape=(batch_size,state_dim)))
        for i in range(batch_size):
          batch_next_state[i] = nd.array(batch[i,-1])

        return batch_state,batch_action,batch_reward,batch_done,batch_next_state

# 初始化一個記憶儲存體

In [2]:
m = ReplayBuffer(replay_buffer_size=1000)
print(m.memory_len)
print(m.memory)

0
[]


# push 記憶

In [0]:
env = gym.make("CartPole-v1")

for episode in range(1):
  state = env.reset()
  for time_step in range(100):
    action = env.action_space.sample()
    next_state, reward, done, info = env.step(action)
    m.push([state,action,reward,done,next_state])
    
    if done:
      break
    else:
      state = next_state

# 查看記憶體

In [4]:
print(m.memory_len)
print(m.memory[-2:])

13
[[array([ 0.11161143,  0.54956362, -0.18164743, -1.14501466]), 1, 1.0, False, array([ 0.1226027 ,  0.74653233, -0.20454772, -1.4887179 ])], [array([ 0.1226027 ,  0.74653233, -0.20454772, -1.4887179 ]), 0, 1.0, True, array([ 0.13753335,  0.55440358, -0.23432208, -1.2662513 ])]]


# 定義抽取樣本數 和 抽取概率

In [0]:
batch_size = 10

In [0]:
p = [np.exp(i) for i in range(m.memory_len)]
p = np.array(p) / np.sum(np.array(p))

In [7]:
pd.DataFrame(p).apply(lambda x:round(x,3)).T.style.background_gradient(axis=1)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12
0,0,0,0,0,0,0.001,0.002,0.004,0.012,0.031,0.086,0.233,0.632


# 測試

In [0]:
b_s ,b_a ,b_r ,b_d ,b_s_ = m.sample(batch_size,p)

In [9]:
for i,name in zip([b_s,b_a,b_r,b_d,b_s_],['state','action','reward','done','next_state']):
  print(name,i)

state 
[[ 0.1226027   0.7465323  -0.20454772 -1.4887179 ]
 [ 0.11161143  0.54956365 -0.18164742 -1.1450146 ]
 [ 0.1045593   0.35260624 -0.16554403 -0.8051696 ]
 [ 0.0936484   0.5455455  -0.14455989 -1.049207  ]
 [ 0.08667069  0.3488854  -0.13018899 -0.71854573]
 [ 0.07582344  0.5423623  -0.11069886 -0.97450566]
 [ 0.05810308  0.53999233 -0.07925398 -0.91950226]
 [ 0.06890292  0.3460259  -0.09764402 -0.6527422 ]
 [ 0.04341597  0.73435533 -0.05536737 -1.1943303 ]
 [ 0.03264108  0.53874433 -0.03756579 -0.8900791 ]]
<NDArray 10x4 @cpu(0)>
action 
[0. 1. 1. 0. 1. 0. 0. 1. 0. 1.]
<NDArray 10 @cpu(0)>
reward 
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
<NDArray 10 @cpu(0)>
done 
[1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
<NDArray 10 @cpu(0)>
next_state 
[[ 0.13753335  0.5544036  -0.23432207 -1.2662513 ]
 [ 0.1226027   0.7465323  -0.20454772 -1.4887179 ]
 [ 0.11161143  0.54956365 -0.18164742 -1.1450146 ]
 [ 0.1045593   0.35260624 -0.16554403 -0.8051696 ]
 [ 0.0936484   0.5455455  -0.14455989 -1.049207  ]
 [ 0.086670