/
ObservationSpace.py
130 lines (99 loc) · 5.48 KB
/
ObservationSpace.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.
import copy
from grid2op.Observation.SerializableObservationSpace import SerializableObservationSpace
from grid2op.Reward import RewardHelper
from grid2op.Observation.CompleteObservation import CompleteObservation
from grid2op.Observation._ObsEnv import _ObsEnv
class ObservationSpace(SerializableObservationSpace):
"""
Helper that provides usefull functions to manipulate :class:`BaseObservation`.
BaseObservation should only be built using this Helper. It is absolutely not recommended to make an observation
directly form its constructor.
This class represents the same concept as the "BaseObservation Space" in the OpenAI gym framework.
Attributes
----------
observationClass: ``type``
Class used to build the observations. It defaults to :class:`CompleteObservation`
_empty_obs: :class:`grid2op.Observation.BaseObservation`
An empty observation with the proper dimensions.
parameters: :class:`grid2op.Parameters.Parameters`
Type of Parameters used to compute powerflow for the forecast.
rewardClass: ``type``
Class used by the :class:`grid2op.Environment.Environment` to send information about its state to the
:class:`grid2op.BaseAgent.BaseAgent`. You can change this class to differentiate between the reward of output of
:func:`BaseObservation.simulate` and the reward used to train the BaseAgent.
action_helper_env: :class:`grid2op.Action.ActionSpace`
BaseAction space used to create action during the :func:`BaseObservation.simulate`
reward_helper: :class:`grid2op.Reward.HelperReward`
BaseReward function used by the the :func:`BaseObservation.simulate` function.
obs_env: :class:`_ObsEnv`
Instance of the environment used by the BaseObservation Helper to provide forcecast of the grid state.
_empty_obs: :class:`BaseObservation`
An instance of the observation that is updated and will be sent to he BaseAgent.
"""
def __init__(self,
gridobj,
env,
rewardClass=None,
observationClass=CompleteObservation):
"""
Env: requires :attr:`grid2op.Environment.parameters` and :attr:`grid2op.Environment.backend` to be valid
"""
SerializableObservationSpace.__init__(self, gridobj, observationClass=observationClass)
# TODO DOCUMENTATION !!!
# print("ObservationSpace init with rewardClass: {}".format(rewardClass))
self.parameters = copy.deepcopy(env.parameters)
# for the observation, I switch between the _parameters for the environment and for the simulation
self.parameters.ENV_DC = self.parameters.FORECAST_DC
if rewardClass is None:
self.rewardClass = env.rewardClass
else:
self.rewardClass = rewardClass
# helpers
self.action_helper_env = env.helper_action_env
self.reward_helper = RewardHelper(rewardClass=self.rewardClass)
self.reward_helper.initialize(env)
other_rewards = {k: v.rewardClass for k, v in env.other_rewards.items()}
# TODO here: have another backend maybe
self.backend_obs = env.backend.copy()
_ObsEnv_class = _ObsEnv.init_grid(self.backend_obs)
self.obs_env = _ObsEnv_class(backend_instanciated=self.backend_obs,
obsClass=self.observationClass,
parameters=env.parameters,
reward_helper=self.reward_helper,
action_helper=self.action_helper_env,
thermal_limit_a=env._thermal_limit_a,
legalActClass=env.legalActClass,
donothing_act=env.helper_action_player(),
other_rewards=other_rewards,
completeActionClass=env.helper_action_env.actionClass,
helper_action_class=env.helper_action_class,
helper_action_env=env.helper_action_env)
for k, v in self.obs_env.other_rewards.items():
v.initialize(env)
self._empty_obs = self.observationClass(obs_env=self.obs_env,
action_helper=self.action_helper_env)
self._update_env_time = 0.
def reset_space(self):
self.obs_env.reset_space()
self.action_helper_env.actionClass.reset_space()
def __call__(self, env):
self.obs_env.update_grid(env)
res = self.observationClass(obs_env=self.obs_env,
action_helper=self.action_helper_env)
# TODO how to make sure that whatever the number of time i call "simulate" i still get the same observations
# TODO use self.obs_prng when updating actions
res.update(env=env)
return res
def size_obs(self):
"""
Size if the observation vector would be flatten
:return:
"""
return self.n