/
experience.py
178 lines (146 loc) · 6.02 KB
/
experience.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""Implementation adapted from miyosuda/unreal:
https://github.com/miyosuda/unreal/tree/master/train"""
import math
from collections import deque
from typing import Any, List, Optional
import numpy as np
from ..util import GameRewards
class ExperienceFrame(object):
# The state of the frame
state: Any
# The action taken at "state"
action: int
# The reward given for the action taken at "state"
reward: float
# Whether or not the action resulted in a terminal state
terminal: bool
# The expression grouping change that occurred transitioning to "state"
grouping_change: float
# The action taken that resulted in a transition to "state"
last_action: int
# The reward received for taking the action that transitioned to "state"
last_reward: float
def __init__(
self,
state: Any,
reward: float,
action: int,
terminal: bool,
grouping_change: float,
last_action: int,
last_reward: float,
):
self.state = state
# (Taken action with the 'state')
self.action = action
# Reward with the 'state'. (Clipped)
self.reward = np.clip(reward, -1, 1)
# (Whether terminated when 'state' was inputted)
self.terminal = terminal
self.grouping_change = grouping_change
# (After this last action was taken, agent move to the 'state')
self.last_action = last_action
# (After this last reward was received, agent move to the 'state') (Clipped)
self.last_reward = np.clip(last_reward, -1, 1)
def get_last_action_reward(self, action_size):
"""Return one hot vectored last action + last reward."""
return ExperienceFrame.concat_action_and_reward(
self.last_action, action_size, self.last_reward
)
def get_action_reward(self, action_size):
"""Return one hot vectored action + reward."""
return ExperienceFrame.concat_action_and_reward(
self.action, action_size, self.reward
)
@staticmethod
def concat_action_and_reward(action, action_size, reward):
"""Return one hot vectored action and reward."""
action_reward = np.zeros([action_size + 1])
action_reward[action] = 1.0
action_reward[-1] = float(reward)
return action_reward
class Experience(object):
def __init__(self, history_size: int, ready_at: int = None):
self._ready_at = history_size if ready_at is None else ready_at
self._history_size = history_size
self._frames: deque = deque(maxlen=history_size)
# frame indices for zero rewards
self._zero_reward_indices: deque = deque()
# frame indices for non zero rewards
self._non_zero_reward_indices: deque = deque()
self._top_frame_index = 0
def add_frame(self, frame: ExperienceFrame):
if frame.terminal and len(self._frames) > 0 and self._frames[-1].terminal:
# Discard if terminal frame continues
return
frame_index = self._top_frame_index + len(self._frames)
was_full = self.is_full()
# append frame
self._frames.append(frame)
# append index if there are enough (because we replay 4 at a time)
if frame_index >= 3:
# UNREAL uses 0 or non-zero, but we have a penatly timestep, so
# consider anything less than that to be zero.
if frame.reward <= GameRewards.TIMESTEP:
self._zero_reward_indices.append(frame_index)
else:
self._non_zero_reward_indices.append(frame_index)
if was_full:
self._top_frame_index += 1
cut_frame_index = self._top_frame_index + 3
# Cut frame if its index is lower than cut_frame_index.
if (
len(self._zero_reward_indices) > 0
and self._zero_reward_indices[0] < cut_frame_index
):
self._zero_reward_indices.popleft()
if (
len(self._non_zero_reward_indices) > 0
and self._non_zero_reward_indices[0] < cut_frame_index
):
self._non_zero_reward_indices.popleft()
@property
def frame_count(self) -> int:
return len(self._frames)
def is_full(self):
return len(self._frames) >= self._ready_at
def sample_sequence(self, sequence_size: int):
# -1 for the case if start pos is the terminated frame.
# (Then +1 not to start from terminated frame.)
curr_size = len(self._frames)
start_pos = np.random.randint(0, curr_size - sequence_size - 1)
if self._frames[start_pos].terminal:
start_pos += 1
# Assuming that there are no successive terminal frames.
sampled_frames = []
for i in range(sequence_size):
frame = self._frames[start_pos + i]
sampled_frames.append(frame)
if frame.terminal:
break
return sampled_frames
def sample_rp_sequence(self):
"""Sample 4 successive frames for reward prediction."""
if np.random.randint(2) == 0:
from_zero = True
else:
from_zero = False
if len(self._zero_reward_indices) == 0:
# zero rewards container was empty
from_zero = False
elif len(self._non_zero_reward_indices) == 0:
# non zero rewards container was empty
from_zero = True
if from_zero:
index = np.random.randint(len(self._zero_reward_indices))
end_frame_index = self._zero_reward_indices[index]
else:
index = np.random.randint(len(self._non_zero_reward_indices))
end_frame_index = self._non_zero_reward_indices[index]
start_frame_index = end_frame_index - 3
raw_start_frame_index = start_frame_index - self._top_frame_index
sampled_frames = []
for i in range(4):
frame = self._frames[raw_start_frame_index + i]
sampled_frames.append(frame)
return sampled_frames