-
Notifications
You must be signed in to change notification settings - Fork 0
/
random_walk_1000_poly&fourier_value_function.py
264 lines (211 loc) · 8.41 KB
/
random_walk_1000_poly&fourier_value_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import numpy as np
import matplotlib
# matplotlib.use('Agg')
import matplotlib.pyplot as plt
# value prediction for MC and TD(0)
# 1000 state随机步游,共有1000个位置,中心位置500为起点,最左侧和最右侧为终点,
# 每次行动会向左侧或者右侧的100个位置中随机选择一个位置,
# 达到边缘的时候,到达不存在的位置时,其概率等于边界终止的概率
# 达到最右侧 reward=1,到达最左侧rewards=-1, 其余都为0
# ENV --
LENGTH = 1000
# ACTION --
LEFT = 0
RIGHT = 1
ACTIONS = [LEFT, RIGHT]
ACTION = ["LEFT", "RIGHT"]
# SATAE --
START = 500
END = 1001
RANGES = 100
class ENVIRONMENT:
def __init__(self):
self.player = None # player is corresponding to STATE
self.num_steps = 0
self.CreateEnv()
self.DrawEnv()
def CreateEnv(self, inital_grid = None):
self.grid = ["o"] * 1000
def DrawEnv(self):
# print(self.grid)
pass
def step(self, action):
# random choice actiion
step = np.random.randint(1, RANGES + 1)
if action == 0:
step = step * -1
else:
step = step * 1
self.player = self.player + step
self.player = max(min(self.player, LENGTH+1),0)
self.num_steps = self.num_steps + 1
# Rewards, game on
reward = 0
done = False
if self.player == 0:
reward = -1
done = True
elif self.player == END:
reward = 1
done = True
return self.player, reward, done
def reset(self):
self.player = START
self.num_steps = 0
return self.player
def RenderEnv(self, action=None, render=False):
if render:
self.grid = ["o"] * (LENGTH + 2)
self.grid[ self.player ] = "*"
print(self.grid)
def action_policy(epsilon=0.5):
if np.random.binomial(1, epsilon) == LEFT:
return LEFT
else:
return RIGHT
def compute_true_value_DP(episodes=1000):
true_value = np.zeros(1002)
# while True:
for i in range(episodes):
old_value = np.copy(true_value)
for state in range(LENGTH+1): # 对每一个state 循环,计算其 value function
# 求和函数
true_value[state] = 0
for action in ACTIONS:
for step in range(RANGES):
if action == 0:
step = step * -1
else:
step = step * 1
next_state= state + step
next_state = max(min(next_state, LENGTH+1),0)
reward = 0
if next_state == 0:
reward = -1
elif next_state == LENGTH+1:
reward = 1
gamma = 1
true_value[state] += 1.0 / (2 * RANGES) * ( reward + gamma * true_value[next_state] )
error = np.sum( np.abs(true_value-old_value) )
if i % 50 == 0:
print(error)
if error < 1e-2:
break
true_value[0] = true_value[-1] = 0
return true_value
# 测试不同episodes下价值函数的值,用于比较与真值之间的差别
def test_true_value():
plt.figure(1)
values = compute_true_value_DP()
plt.plot(values, label='DP true values')
plt.xlabel('state')
plt.ylabel('estimated value')
plt.legend()
plt.show()
# 多项式基 作为 拟合特征
class POLY_ValueFunction:
def __init__(self, num_of_param):
# W, 参数 parameter
self.weights = np.zeros(num_of_param + 1) # each function also has one more constant parameter (called bias in machine learning)
# 每一个参数对应的 state 数量
self.bases = []
for i in range(0, num_of_param+1):
# 只有一个数字s1 表示 多项式,如果两个数字就用两重循环构建,三个数字就三重循环构建
# 由于一般维数一般不超过6,所以多重循环也能允许
self.bases.append(lambda s, i=i:pow(s,i))
# get the value of @state
def value(self, state):
# map the state space into [0, 1]
state /= float(LENGTH)
# get the feature vector
feature = np.asarray([func(state) for func in self.bases])
return np.dot(self.weights, feature)
# update parameters
def update(self, delta_W, state):
# map the state space into [0, 1]
state /= float(LENGTH)
# get derivative value
derivative_value = np.asarray([func(state) for func in self.bases])
self.weights += delta_W * derivative_value
# 傅里叶基 作为 拟合特征
class FOURIER_ValueFunction:
def __init__(self, num_of_param):
# W, 参数 parameter
self.weights = np.zeros(num_of_param + 1) # each function also has one more constant parameter (called bias in machine learning)
# 每一个参数对应的 state 数量
self.bases = []
for i in range(0, num_of_param+1): #假设状态有n个表示数字,k为维数,则复杂度为 : (n+1)^k
# 只有一个数字s1 表示 多项式,如果两个数字就用两重循环构建,三个数字就三重循环构建
# 由于一般维数一般不超过6,所以多重循环也能允许
self.bases.append(lambda s, i=i: np.cos(i * np.pi * s))
# get the value of @state
def value(self, state):
# map the state space into [0, 1]
state /= float(LENGTH)
# get the feature vector
feature = np.asarray([func(state) for func in self.bases])
return np.dot(self.weights, feature)
# update parameters
def update(self, delta_W, state):
# map the state space into [0, 1]
state /= float(LENGTH)
# get derivative value
derivative_value = np.asarray([func(state) for func in self.bases])
self.weights += delta_W * derivative_value
## gradient monte carlo algorithm
def semi_gradient_temporal_difference(env, episodes_, epsilon=0.5, learning_rate=2e-4, gamma=1, n=1):
value_function = POLY_ValueFunction(5) # 多项式 最多次数为 5
episode = episodes_
for _ in range(episode):
state = env.reset()
states = [state]
REWARDS = [0]
done = False
# track the time
time = 0
# the length of this episode
T = float('inf')
while True:
# go to next time step
time += 1
if time < T:
# choose an action randomly
action = action_policy()
next_state, reward, done = env.step(action)
# store new state and new reward
states.append(next_state)
REWARDS.append(reward)
if done == True:
T = time
# get the time of the state to update
update_time = time - n
if update_time >= 0:
returns = 0.0
# calculate corresponding rewards
for t in range(update_time + 1, min(T, update_time + n) + 1):
returns += REWARDS[t]
# add state value to the return
if update_time + n <= T:
returns += value_function.value(states[update_time + n])
state_to_update = states[update_time]
# update the value function
if not state_to_update in [0, LENGTH+1]:
delta = learning_rate * (returns - value_function.value(state_to_update))
value_function.update(delta, state_to_update)
if update_time == T - 1:
break
return value_function
# true_value = compute_true_value_DP()
def PIC():
env = ENVIRONMENT()
distribution = np.zeros(LENGTH + 2)
values_eval = semi_gradient_temporal_difference(env, episodes_=100000)
state_values = [values_eval.value(i) for i in np.arange(1, LENGTH + 1)]
plt.figure(1)
plt.plot(state_values, label='Approximate TD value')
# plt.plot(true_value, label='true value')
plt.xlabel('State')
plt.ylabel('Value')
plt.legend()
plt.show()
PIC()