/
AlarmReward.py
171 lines (139 loc) · 7.12 KB
/
AlarmReward.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.
import numpy as np
from grid2op.Exceptions import Grid2OpException
from grid2op.Reward.BaseReward import BaseReward
from grid2op.dtypes import dt_float
class AlarmReward(BaseReward):
"""
This reward is based on the "alarm feature" where the agent is asked to send information about potential issue
on the grid.
On this case, when the environment is in a "game over" state (eg it's the end) then the reward is computed
the following way:
- if the environment has been successfully manage until the end of the chronics, then 1.0 is returned
- if no alarm has been raised, then -1.0 is return
Examples
---------
You can use this reward in any environment with:
.. code-block:
import grid2op
from grid2op.Reward import AlarmReward
# then you create your environment with it:
NAME_OF_THE_ENVIRONMENT = "rte_case14_realistic"
env = grid2op.make(NAME_OF_THE_ENVIRONMENT,reward_class=AlarmReward)
# and do a step with a "do nothing" action
obs = env.reset()
obs, reward, done, info = env.step(env.action_space())
# the reward is computed with the AlarmReward class
"""
def __init__(self):
BaseReward.__init__(self)
# required if you want to design a custom reward taking into account the
# alarm feature
self.has_alarm_component = True
self.is_alarm_used = False # required to update it in __call__ !!
self.total_time_steps = dt_float(0.0)
self.reward_min = dt_float(-1.)
self.reward_max = dt_float(1.)
self.reward_no_game_over = dt_float(0.)
self.window_size = None
self.best_time = None
self.mult_for_right_zone = 2
def initialize(self, env):
if not env._has_attention_budget:
raise Grid2OpException("Impossible to use the \"AlarmReward\" with an environment for which this feature "
"is disabled. Please make sure \"env._has_attention_budget\" is set to ``True`` or "
"change the reward class with `grid2op.make(..., reward_class=AnyOtherReward)`")
self.reset(env)
def reset(self, env):
self.total_time_steps = env.max_episode_duration()
self.best_time = env.parameters.ALARM_BEST_TIME
self.window_size = env.parameters.ALARM_WINDOW_SIZE
def _tmp_score_time(self, step_alarm, step_game_over):
"""
compute the "temporal" score.
Should give a number between 0 and 1
"""
if step_game_over - step_alarm > self.best_time + self.window_size:
# alarm too soon
res = 0
elif step_game_over - step_alarm < self.best_time - self.window_size:
# alarm too late
res = 0
else:
# square function such that: it gives 1 if step_game_over - step_alarm equals self.best_time
# and 0 if step_game_over - step_alarm = self.best_time + self.window_size or
# if step_game_over - step_alarm self.best_time - self.window_size
dist_to_game_over = step_game_over - step_alarm
dist_to_best = dist_to_game_over - self.best_time
# set it to 0 for the extreme case
polynom = (dist_to_best - self.window_size) * (dist_to_best + self.window_size)
# scale it such that it is 1 for dist_to_best == 0 (ie step_game_over - step_alarm == self.best_time)
res = - polynom / self.window_size**2
return res
def _mult_for_zone(self, alarm, disc_lines, env):
"""compute the multiplicative factor that increases the score if the right zone is predicted"""
res = 1.
# extract the lines that have been disconnected due to cascading failures
lines_disconnected_first = np.where(disc_lines == 0)[0]
if (np.sum(alarm) > 1):#if we have more than one zone in the alarm, we cannot discrtiminate, no bonus points
return res
# extract the zones they belong too
zones_these_lines = set()
zone_for_each_lines = env.alarms_lines_area
for line_id in lines_disconnected_first:
line_name = env.name_line[line_id]
for zone_name in zone_for_each_lines[line_name]:
zones_these_lines.add(zone_name)
# now retrieve the id of the zones in which a powerline has been disconnected
list_zone_names = list(zones_these_lines)
list_zone_ids = np.where(np.isin(env.alarms_area_names, list_zone_names))[0]
# and finally, award some extra points if one of the zone, containing one of the powerline disconnected
# by protection is in the alarm
if np.any(alarm[list_zone_ids]):
res *= self.mult_for_right_zone
return res
def _points_for_alarm(self, step_alarm, alarm, step_game_over, disc_lines, env):
"""how much points are given for this specific alarm"""
is_alarm_used = False
score = self.reward_min
score_for_time = self._tmp_score_time(step_alarm, step_game_over)
if score_for_time != 0:
is_alarm_used = True # alarm is in the right time window
score = score_for_time
score *= self._mult_for_zone(alarm, disc_lines, env) / self.mult_for_right_zone
return score, is_alarm_used
def __call__(self, action, env, has_error, is_done, is_illegal, is_ambiguous):
if is_done:
if not has_error:
# agent went until the end
return self.reward_max
if np.all(env._disc_lines == -1):
# game over is not caused by the tripping of a powerline
return self.reward_min
if len(env._attention_budget._all_successful_alarms) == 0:
# no alarm have been sent, so it's the minimum
return self.reward_min
successfull_alarms = env._attention_budget._all_successful_alarms
step_game_over = env.nb_time_step
disc_lines = env._disc_lines
# so now i can consider the alarms.
best_score, is_alarm_used = self.reward_min, False
for alarm in successfull_alarms:
tmp_sc, tmp_is = self._points_for_alarm(*alarm,
step_game_over=step_game_over,
disc_lines=disc_lines,
env=env)
if tmp_sc > best_score:
best_score = tmp_sc
is_alarm_used = tmp_is
self.is_alarm_used = is_alarm_used
return best_score
else:
res = self.reward_no_game_over
return res