-
Notifications
You must be signed in to change notification settings - Fork 0
/
agents.py
354 lines (281 loc) · 10.7 KB
/
agents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
import random
import environments as env
def _alpha(n):
"""
The step size function to ensure convergence. The
function decreases as the number of times a state
has been visited increases (n). It means that the
utility of policy pi for state s will converge to
correct value.
"""
return 50. / (49 + n)
def _getEstimates(transs, utils, currState, R_plus=None, N_e=None, currActions=None):
"""
Gets estimates according to current transition states,
utility, current state and actions that can be executed
in current state.
transs keys:
currState => actions => newState
For every possible action in currState
- get frequencies newState|currState,action
- count them: n
- get probabilities: divide freqs with n
- calculate estimate with bellman
Return (rewardEstimate, action) pairs in a dict
"""
estimates = []
for ac in (currActions or transs.get(currState, {})):
# We get N_s_a from transition table.
freq = transs.get(currState, {}).get(ac, {})
# Number of states.
n = sum(val for val in freq.values())
probs = dict((key, float(val) / n) for key, val in freq.iteritems())
u = sum(p * utils.get(s, 0) for s, p in probs.iteritems())
# This if function f from page 842. Otherwise we are doing normal estimation.
# It means if the number of actions a that were executed in state s is not high enough,
# it means we should set some optimistic reward to search more into that direction.
if R_plus is not None and N_e is not None and n < N_e:
estimates.append((R_plus, ac, ))
else:
estimates.append((u, ac, ))
return estimates
def _policy_iteration(transs, utils, policy, rewards, R_plus=None, N_e=None, th=1):
changes = True
while changes:
for state in transs:
if state not in rewards:
continue
estimates = max(_getEstimates(transs, utils, state, R_plus, N_e))[0]
utils[state] = rewards[state] + th * estimates
changes = False
for state in transs:
estimates = []
for ac in transs.get(state, {}):
freq = transs.get(state, {}).get(ac, {})
# Number of states.
n = sum(val for val in freq.values())
probs = dict((key, float(val) / n) for key, val in freq.iteritems())
estimates.append((sum(p * utils.get(s, 0) for s, p in probs.iteritems()), ac, ))
if not estimates:
continue
maxEst, maxAct = max(estimates)
polEst = dict((act, est, ) for est, act in estimates)[policy.get(state, maxAct)]
if maxEst > polEst or policy.get(state, None) is None:
policy[state] = maxAct
changes = True
def adp_random_exploration(env, transs={}, utils={}, freqs={}, policy={},
rewards={}, **kwargs):
"""
Active ADP (adaptive dynamic programming) learning
algorithm which returns the best policy for a given
environment env and experience dictionary exp
The experience dictionary exp can be empty if
the agent has no experience with the environment
but can also be full with values from
previous trials
The algorithm returns the number of iterations
needed to reach a terminal state
For reference look in page 834.
@param env: Environment
@param transs: A transition table (N_s'_sa) with outcome frequencies given state action pairs, initially zero.
@param utils: Utilities table
@param freqs: A table of frequencies (N_sa) for state-action pairs, initially zero.
@param t: A parameter for choosing best action or random action.
@param tStep: A step to increment parameter t.
@param alpha: Step size function
@param maxItr: Maximum iterations
"""
tStep = kwargs.get('tStep', 0.01)
alpha = kwargs.get('alpha', _alpha)
maxItr = kwargs.get('maxItr', 50)
tFac = kwargs.get('tFac', 1.)
t = kwargs.get('currItrs', 0)/5 if kwargs.get('remember', False) else 0
minRnd = kwargs.get('minRnd', 0.0)
itr = 0
isTerminal = False
state = env.getStartingState()
rewardSum = 0
lastReward = False
# Get possible actions with respect to current state.
actions = env.getActions(state)
_policy_iteration(transs, utils, policy, rewards, th=alpha(itr) )
bestAction = policy.get(state, random.choice(actions))
while not isTerminal: # while not terminal
if random.random() < max(minRnd, 1. / (tFac*(t+1))) or bestAction is None:
# If it is the first iteration or exploration event
# then randomly choose an action. Taking a random action in 1/t instances.
bestAction = random.choice(actions)
# do the action with the best policy
# or do some random exploration
newState, reward, isTerminal = env.do(state, bestAction)
lastReward = reward >= 0
rewards[newState] = reward
rewardSum += reward
# Set to zero if newState does not exist yet. For new state?
freqs.setdefault(newState, 0)
freqs[newState] += 1
# update transition table. The first one returns dictionary of actions for specific state and the
# second one a dictionary of possible states from specific action (best action).
transs.setdefault(state, {}).setdefault(bestAction, {}).setdefault(newState, 0)
transs[state][bestAction][newState] += 1
actions = env.getActions(newState)
for ac in actions:
transs.setdefault(newState, {}).setdefault(ac, {})
_policy_iteration(transs, utils, policy, rewards, th=alpha(itr))
bestAction = policy.get(newState, random.choice(actions))
# Is this part from the book:
# Having obtained a utility function U that is optimal for the learned model,
# the agent can extract an optimal action by one-step look-ahead to maximize
# the expected utility; alternatively, if it uses policy iteration, the
# optimal policy is already available, so it should simply execute the
# action the optimal policy recommends. Or should it?
state = newState
# A GLIE scheme must also eventually become greedy, so that the agent's actions
# become optimal with respect to the learned (and hence the true) model. That is
# why the parameter t needs to be incremented.
t, itr = t + tStep, itr + 1
if itr >= maxItr:
break
return itr, rewardSum, lastReward
def adp_optimistic_rewards(env, transs={}, utils={}, freqs={}, policy={},
rewards = {}, **kwargs):
"""
Active ADP (adaptive dynamic programming)
@param env: Environment
@param transs: A transition table (N_s'_sa) with outcome frequencies given state action pairs, initially zero.
@param utils: Utilities table
@param freqs: A table of frequencies (N_sa) for state-action pairs, initially zero.
@param R_plus: An optimistic estimate of the best possible reward obtainable in any state.
@param N_e: Limit of how many number of optimistic reward is given before true utility.
@param alpha: Step size function
@param maxItr: Maximum iterations
"""
R_plus = kwargs.get('R_plus', 5)
N_e = kwargs.get('N_e', 12)
alpha = kwargs.get('alpha', _alpha)
maxItr = kwargs.get('maxItr', 10)
itr = 0
isTerminal = False
state = env.getStartingState()
rewardSum = 0
lastReward = False
# Get possible actions with respect to current state.
actions = env.getActions(state)
_policy_iteration(transs, utils, policy, rewards, R_plus=R_plus, N_e=N_e, th=alpha(itr))
bestAction = policy.get(state, random.choice(actions))
while not isTerminal: # while not terminal
if bestAction is None:
# If it is the first iteration or exploration event
# then randomly choose an action. Taking a random action in 1/t instances.
bestAction = random.choice(actions)
# do the action with the best policy
# or do some random exploration
newState, reward, isTerminal = env.do(state, bestAction)
rewards[newState] = reward
rewardSum += reward
lastReward = reward >= 0
# Set to zero if newState does not exist yet. For new state?
freqs.setdefault(newState, 0)
freqs[newState] += 1
# update transition table. The first one returns dictionary of actions for specific state and the
# second one a dictionary of possible states from specific action (best action).
transs.setdefault(state, {}).setdefault(bestAction, {}).setdefault(newState, 0)
transs[state][bestAction][newState] += 1
# We need to get actions on new state.
actions = env.getActions(newState)
for ac in actions:
transs.setdefault(newState, {}).setdefault(ac, {})
_policy_iteration(transs, utils, policy, rewards, R_plus=R_plus, N_e=N_e, th=alpha(itr))
#rewardEstimate, bestAction = max(_getEstimatesOptimistic(transs, utils, state, R_plus, N_e, actions))
bestAction = policy.get(newState, random.choice(actions))
state = newState
itr += 1
if itr >= maxItr:
break
return itr, rewardSum, lastReward
# Agent class.
class Agent():
def __init__(self):
self.clearExperience()
def clearExperience(self):
# Frequency table.
self.nTable = {}
# Transition table.
self.transTable = {}
# Utilities table.
self.uTable = {}
# Results
self.results = []
# Policy table
self.policyTable = {}
# Rewards table
self.rewardsTable = {}
# history
self.history = []
def getPolicy(self):
return self.policyTable
def learn(self, env, alg=adp_random_exploration, numOfTrials=150, **kwargs):
"""
Learn best policy given the environment, algorithm and number of trials.
@param env:
@param alg:
@param numOfTrials:
"""
itrs = 0
self.clearExperience()
for trial in range(numOfTrials):
currItrs, reward, win = alg(env,
transs=self.transTable,
utils=self.uTable,
freqs=self.nTable,
currItrs=itrs,
results=self.results,
policy=self.policyTable,
rewards=self.rewardsTable,
**kwargs)
itrs += currItrs
result, energy = self.solve(env, self.getPolicy())
self.history.append({
'reward': reward,
'steps': currItrs,
'length': len(result),
'energy': energy,
'win': win,
})
return self.getPolicy()
def solve(self, env, policy):
# solve environment with respect to policy
actions, energy = [], 0
# Set state to starting state of environment.
state, prevState = env.getStartingState(), None
isTerminalState = False
while not isTerminalState:
# Policy has best actions for given state.
act = policy.get(state)
if act is None:
act = random.choice(env.getActions(state))
# Execute selected action in current state.
state, reward, isTerminalState = env.do(state, act)
actions.append(act)
energy += reward
if energy < -1000:
break
# We get a list of actions that were executed and sum of rewards that were given when agent entered certain state.
return actions, energy
"""
# lets test it on simple4
a = Agent()
a.learn(env.simple4, alg=adp_optimistic_rewards, numOfTrials=15000, **{'maxItr': 15,
'R_plus': 7,
'N_e': 5,})
env.simple4.printPolicy(a.getPolicy())
# get solution and print it for this simple example
solution = a.solve(env.simple4, a.getPolicy())
print "Solution steps: " + str(solution)
# print solution steps
state = env.simple4.getStartingState()
for move in solution[0]:
env.simple4.printState(state)
state, reward, is_terminal = env.simple4.do(state, move)
env.simple4.printState(state)
"""