Skip to content

Commit

Permalink
major updates to support power outage env
Browse files Browse the repository at this point in the history
  • Loading branch information
kingsleynweye committed Aug 28, 2023
1 parent 55edb32 commit 77bb7fb
Showing 1 changed file with 73 additions and 77 deletions.
150 changes: 73 additions & 77 deletions citylearn/energy_model.py
Expand Up @@ -47,16 +47,16 @@ class ElectricDevice(Device):
Parameters
----------
nominal_power : float
Electric device nominal power >= 0. If == 0, set to 0.00001 to avoid `ZeroDivisionError`.
nominal_power : float, default: 0.0
Electric device nominal power >= 0.
Other Parameters
----------------
**kwargs : Any
Other keyword arguments used to initialize super class.
"""

def __init__(self, nominal_power: float, **kwargs: Any):
def __init__(self, nominal_power: float = None, **kwargs: Any):
super().__init__(**kwargs)
self.nominal_power = nominal_power

Expand All @@ -67,8 +67,8 @@ def nominal_power(self) -> float:
return self.__nominal_power

@property
def electricity_consumption(self) -> List[float]:
r"""Electricity consumption time series."""
def electricity_consumption(self) -> np.ndarray:
r"""Electricity consumption time series [kWh]."""

return self.__electricity_consumption

Expand All @@ -80,48 +80,45 @@ def available_nominal_power(self) -> float:

@nominal_power.setter
def nominal_power(self, nominal_power: float):
if nominal_power is None or nominal_power == 0:
self.__nominal_power = ZERO_DIVISION_CAPACITY
else:
assert nominal_power >= 0, 'nominal_power must be >= 0.'
self.__nominal_power = nominal_power
nominal_power = 0.0 if nominal_power is None else nominal_power
assert nominal_power >= 0, 'nominal_power must be >= 0.'
self.__nominal_power = nominal_power

def get_metadata(self) -> Mapping[str, Any]:
return {
**super().get_metadata(),
'nominal_power': self.nominal_power,
}

def update_electricity_consumption(self, electricity_consumption: float):
def update_electricity_consumption(self, electricity_consumption: float, enforce_polarity: bool = None):
r"""Updates `electricity_consumption` at current `time_step`.
Parameters
----------
electricity_consumption : float
value to add to current `time_step` `electricity_consumption`. Must be >= 0.
electricity_consumption: float
Value to add to current `time_step` `electricity_consumption`. Must be >= 0.
enforce_polarity: bool, default: True
Whether to allow only positive `electricity_consumption` values. Some electric
devices like :py:class:`citylearn.energy_model.Battery` may be bi-directional and
allow electricity discharge thus, cause negative electricity consumption.
"""

assert electricity_consumption >= 0, 'electricity_consumption must be >= 0.'
enforce_polarity = True if enforce_polarity is None else enforce_polarity
assert not enforce_polarity or electricity_consumption >= 0.0, 'electricity_consumption must be >= 0.'
self.__electricity_consumption[self.time_step] += electricity_consumption

def next_time_step(self):
r"""Advance to next `time_step` and set `electricity_consumption` at new `time_step` to 0.0."""

super().next_time_step()
self.__electricity_consumption.append(0.0)

def reset(self):
r"""Reset `ElectricDevice` to initial state and set `electricity_consumption` at `time_step` 0 to = 0.0."""

super().reset()
self.__electricity_consumption = [0.0]
self.__electricity_consumption = np.zeros(self.episode_tracker.episode_time_steps, dtype='float32')

class HeatPump(ElectricDevice):
r"""Base heat pump class.
Parameters
----------
nominal_power: float
nominal_power: float, default: 0.0
Maximum amount of electric power that the heat pump can consume from the power grid (given by the nominal power of the compressor).
efficiency : float, default: 0.2
Technical efficiency.
Expand All @@ -136,7 +133,7 @@ class HeatPump(ElectricDevice):
Other keyword arguments used to initialize super class.
"""

def __init__(self, nominal_power: float, efficiency: float = None, target_heating_temperature: float = None, target_cooling_temperature: float = None, **kwargs: Any):
def __init__(self, nominal_power: float = None, efficiency: float = None, target_heating_temperature: float = None, target_cooling_temperature: float = None, **kwargs: Any):
super().__init__(nominal_power = nominal_power, efficiency = efficiency, **kwargs)
self.target_heating_temperature = target_heating_temperature
self.target_cooling_temperature = target_cooling_temperature
Expand Down Expand Up @@ -244,7 +241,7 @@ def get_max_output_power(self, outdoor_dry_bulb_temperature: Union[float, Iterab
if max_electric_power is None:
return self.available_nominal_power*cop
else:
return np.minimum(max_electric_power, self.available_nominal_power)*cop
return np.min([max_electric_power, self.available_nominal_power], axis=0)*cop

def get_input_power(self, output_power: Union[float, Iterable[float]], outdoor_dry_bulb_temperature: Union[float, Iterable[float]], heating: bool) -> Union[float, Iterable[float]]:
r"""Return input power.
Expand Down Expand Up @@ -312,7 +309,7 @@ class ElectricHeater(ElectricDevice):
Parameters
----------
nominal_power : float
nominal_power : float, default: 0.0
Maximum amount of electric power that the electric heater can consume from the power grid.
efficiency : float, default: 0.9
Technical efficiency.
Expand All @@ -323,7 +320,7 @@ class ElectricHeater(ElectricDevice):
Other keyword arguments used to initialize super class.
"""

def __init__(self, nominal_power: float, efficiency: float = None, **kwargs: Any):
def __init__(self, nominal_power: float = None, efficiency: float = None, **kwargs: Any):
super().__init__(nominal_power = nominal_power, efficiency = efficiency, **kwargs)

@ElectricDevice.efficiency.setter
Expand Down Expand Up @@ -354,7 +351,7 @@ def get_max_output_power(self, max_electric_power: Union[float, Iterable[float]]
if max_electric_power is None:
return self.available_nominal_power*self.efficiency
else:
return np.min(max_electric_power, self.available_nominal_power)*self.efficiency
return np.min([max_electric_power, self.available_nominal_power], axis=0)*self.efficiency

def get_input_power(self, output_power: Union[float, Iterable[float]]) -> Union[float, Iterable[float]]:
r"""Return input power.
Expand Down Expand Up @@ -403,7 +400,7 @@ class PV(ElectricDevice):
Parameters
----------
nominal_power : float
nominal_power : float, default: 0.0
PV array output power in [kW]. Must be >= 0.
Other Parameters
Expand All @@ -412,10 +409,9 @@ class PV(ElectricDevice):
Other keyword arguments used to initialize super class.
"""

def __init__(self, nominal_power: float, **kwargs: Any):
def __init__(self, nominal_power: float = None, **kwargs: Any):
super().__init__(nominal_power=nominal_power, **kwargs)


def get_generation(self, inverter_ac_power_per_kw: Union[float, Iterable[float]]) -> Union[float, Iterable[float]]:
r"""Get solar generation output.
Expand Down Expand Up @@ -462,8 +458,8 @@ class StorageDevice(Device):
Parameters
----------
capacity : float
Maximum amount of energy the storage device can store in [kWh]. Must be >= 0 and if == 0 or None, set to 0.00001 to avoid `ZeroDivisionError`.
capacity : float, default: 0.0
Maximum amount of energy the storage device can store in [kWh]. Must be >= 0.
efficiency : float, default: 0.9
Technical efficiency.
loss_coefficient : float, default: 0.006
Expand All @@ -477,7 +473,7 @@ class StorageDevice(Device):
Other keyword arguments used to initialize super class.
"""

def __init__(self, capacity: float, efficiency: float = None, loss_coefficient: float = None, initial_soc: float = None, **kwargs: Any):
def __init__(self, capacity: float = None, efficiency: float = None, loss_coefficient: float = None, initial_soc: float = None, **kwargs: Any):
self.capacity = capacity
self.loss_coefficient = loss_coefficient
self.initial_soc = initial_soc
Expand All @@ -502,7 +498,7 @@ def initial_soc(self) -> float:
return self.__initial_soc

@property
def soc(self) -> List[float]:
def soc(self) -> np.ndarray:
r"""State of charge time series between [0, 1] in [:math:`\frac{\textrm{capacity}_{\textrm{charged}}}{\textrm{capacity}}`]."""

return self.__soc
Expand All @@ -511,10 +507,10 @@ def soc(self) -> List[float]:
def energy_init(self) -> float:
r"""Latest energy level after accounting for standby hourly lossses in [kWh]."""

return self.__soc[-1]*self.capacity*(1 - self.loss_coefficient)
return self.__soc[self.time_step - 1]*self.capacity*(1 - self.loss_coefficient)

@property
def energy_balance(self) -> List[float]:
def energy_balance(self) -> np.ndarray:
r"""Charged/discharged energy time series in [kWh]."""

return self.__energy_balance
Expand All @@ -527,11 +523,9 @@ def round_trip_efficiency(self) -> float:

@capacity.setter
def capacity(self, capacity: float):
if capacity is None or capacity == 0:
self.__capacity = ZERO_DIVISION_CAPACITY
else:
assert capacity >= 0, 'capacity must be >= 0.'
self.__capacity = capacity
capacity = 0.0 if capacity is None else capacity
assert capacity >= 0, 'capacity must be >= 0.'
self.__capacity = capacity

@loss_coefficient.setter
def loss_coefficient(self, loss_coefficient: float):
Expand Down Expand Up @@ -573,26 +567,32 @@ def charge(self, energy: float):
"""

# The initial State Of Charge (SOC) is the previous SOC minus the energy losses
energy = min(self.energy_init + energy*self.round_trip_efficiency, self.capacity) if energy >= 0 else max(0, self.energy_init + energy/self.round_trip_efficiency)
soc = energy/self.capacity
self.__soc.append(soc)
self.__energy_balance.append(self.set_energy_balance())
energy = self.energy_init + energy
energy = min(energy*self.round_trip_efficiency, self.capacity) if energy >= 0 else max(0.0, energy/self.round_trip_efficiency)
self.__energy_balance[self.time_step] = self.set_energy_balance(energy)
self.__soc[self.time_step] = energy/max(self.capacity, ZERO_DIVISION_CAPACITY)

def set_energy_balance(self, energy: float) -> float:
r"""Calculate energy balance.
def set_energy_balance(self) -> float:
r"""Calculate energy balance
Parameters
----------
energy: float
Energy equivalent of state-of-charge in [kWh].
Returns
-------
energy: float
Charged/discharged energy since last time step in [kWh]
The energy balance is a derived quantity and is the product or quotient of the difference between consecutive SOCs and `round_trip_efficiency`
for discharge or charge events respectively thus, thus accounts for energy losses to environment during charging and discharge.
for discharge or charge events respectively thus, thus accounts for energy losses to environment during charging and discharge. It is the
actual energy charged/discharged irrespective of what is determined in the step function after taking into account storage design limits
e.g. maximum power input/output, capacity.
"""

# actual energy charged/discharged irrespective of what is determined in the step function after
# taking into account storage design limits e.g. maximum power input/output, capacity
previous_soc = self.initial_soc if self.time_step == 0 else self.soc[-2]
current_soc = self.soc[-1]
previous_energy = previous_soc*self.capacity
current_energy = current_soc*self.capacity
energy_balance = current_energy - previous_energy*(1.0 - self.loss_coefficient)
energy_balance = energy_balance/self.round_trip_efficiency if energy_balance >= 0 else energy_balance*self.round_trip_efficiency
energy -= self.energy_init
energy_balance = energy/self.round_trip_efficiency if energy >= 0 else energy*self.round_trip_efficiency

return energy_balance

Expand Down Expand Up @@ -620,16 +620,17 @@ def reset(self):
r"""Reset `StorageDevice` to initial state."""

super().reset()
self.__soc = [self.initial_soc]
self.__energy_balance = [0.0]
self.__soc = np.zeros(self.episode_tracker.episode_time_steps, dtype='float32')
self.__soc[0] = self.initial_soc
self.__energy_balance = np.zeros(self.episode_tracker.episode_time_steps, dtype='float32')

class StorageTank(StorageDevice):
r"""Base thermal energy storage class.
Parameters
----------
capacity : float
Maximum amount of energy the storage device can store in [kWh]. Must be >= 0 and if == 0 or None, set to 0.00001 to avoid `ZeroDivisionError`.
capacity : float, default: 0.0
Maximum amount of energy the storage device can store in [kWh]. Must be >= 0.
max_output_power : float, optional
Maximum amount of power that the storage unit can output [kW].
max_input_power : float, optional
Expand All @@ -641,7 +642,7 @@ class StorageTank(StorageDevice):
Other keyword arguments used to initialize super class.
"""

def __init__(self, capacity: float, max_output_power: float = None, max_input_power: float = None, **kwargs: Any):
def __init__(self, capacity: float = None, max_output_power: float = None, max_input_power: float = None, **kwargs: Any):
super().__init__(capacity = capacity, **kwargs)
self.max_output_power = max_output_power
self.max_input_power = max_input_power
Expand Down Expand Up @@ -694,8 +695,8 @@ class Battery(StorageDevice, ElectricDevice):
Parameters
----------
capacity : float
Maximum amount of energy the storage device can store in [kWh]. Must be >= 0 and if == 0 or None, set to 0.00001 to avoid `ZeroDivisionError`.
capacity : float, default: 0.0
Maximum amount of energy the storage device can store in [kWh]. Must be >= 0.
nominal_power: float
Maximum amount of electric power that the battery can use to charge or discharge.
capacity_loss_coefficient : float, default: 0.00001
Expand All @@ -713,7 +714,7 @@ class Battery(StorageDevice, ElectricDevice):
Other keyword arguments used to initialize super classes.
"""

def __init__(self, capacity: float, nominal_power: float, capacity_loss_coefficient: float = None, power_efficiency_curve: List[List[float]] = None, capacity_power_curve: List[List[float]] = None, depth_of_discharge: float = None, **kwargs: Any):
def __init__(self, capacity: float = None, nominal_power: float = None, capacity_loss_coefficient: float = None, power_efficiency_curve: List[List[float]] = None, capacity_power_curve: List[List[float]] = None, depth_of_discharge: float = None, **kwargs: Any):
self._efficiency_history = []
self._capacity_history = []
self.depth_of_discharge = depth_of_discharge
Expand All @@ -728,12 +729,6 @@ def efficiency(self) -> float:
"""Current time step technical efficiency."""

return self.efficiency_history[-1]

@ElectricDevice.electricity_consumption.getter
def electricity_consumption(self) -> List[float]:
r"""Electricity consumption time series."""

return self.energy_balance

@property
def degraded_capacity(self) -> float:
Expand Down Expand Up @@ -840,19 +835,20 @@ def charge(self, energy: float):

if energy >= 0:
energy_wrt_degrade = self.degraded_capacity - self.energy_init
energy = min(self.get_max_input_power(), energy_wrt_degrade, energy)
energy = min(self.get_max_input_power(), self.available_nominal_power, energy_wrt_degrade, energy)

else:
soc_limit_wrt_dod = 1.0 - self.depth_of_discharge
current_soc = self.soc[-1]
soc_difference = current_soc - soc_limit_wrt_dod
energy_limit_wrt_dod = max(soc_difference*self.capacity*self.efficiency, 0.0)*-1
soc_init = self.soc[self.time_step - 1]
soc_difference = soc_init - soc_limit_wrt_dod
energy_limit_wrt_dod = max(soc_difference*self.capacity*self.round_trip_efficiency, 0.0)*-1
energy = max(-self.get_max_output_power(), energy_limit_wrt_dod, energy)

self.efficiency = self.get_current_efficiency(energy)
super().charge(energy)
degraded_capacity = max(self.degraded_capacity - self.degrade(), 0.0)
self._capacity_history.append(degraded_capacity)
self.update_electricity_consumption(self.energy_balance[self.time_step], enforce_polarity=False)

def get_max_output_power(self) -> float:
r"""Get maximum output power while considering `capacity_power_curve` limitations if defined otherwise, returns `nominal_power`.
Expand All @@ -876,7 +872,7 @@ def get_max_input_power(self) -> float:

#The initial SOC is the previous SOC minus the energy losses
if self.capacity_power_curve is not None:
soc = self.energy_init/self.capacity
soc = self.energy_init/max(self.capacity, ZERO_DIVISION_CAPACITY)
# Calculating the maximum power rate at which the battery can be charged or discharged
idx = max(0, np.argmax(soc <= self.capacity_power_curve[0]) - 1)
max_output_power = self.nominal_power*(
Expand All @@ -900,7 +896,7 @@ def get_current_efficiency(self, energy: float) -> float:

if self.power_efficiency_curve is not None:
# Calculating the maximum power rate at which the battery can be charged or discharged
energy_normalized = np.abs(energy)/self.nominal_power
energy_normalized = np.abs(energy)/max(self.nominal_power, ZERO_DIVISION_CAPACITY)
idx = max(0, np.argmax(energy_normalized <= self.power_efficiency_curve[0]) - 1)
efficiency = self.power_efficiency_curve[1][idx]\
+ (energy_normalized - self.power_efficiency_curve[0][idx]
Expand All @@ -921,7 +917,7 @@ def degrade(self) -> float:
"""

# Calculating the degradation of the battery: new max. capacity of the battery after charge/discharge
capacity_degrade = self.capacity_loss_coefficient*self.capacity*np.abs(self.energy_balance[-1])/(2*self.degraded_capacity)
capacity_degrade = self.capacity_loss_coefficient*self.capacity*np.abs(self.energy_balance[self.time_step])/(2*max(self.degraded_capacity, ZERO_DIVISION_CAPACITY))
return capacity_degrade

def reset(self):
Expand Down

0 comments on commit 77bb7fb

Please sign in to comment.