In [21]:
# %pip install gymnasium

Collecting gymnasium
  Obtaining dependency information for gymnasium from https://files.pythonhosted.org/packages/a8/4d/3cbfd81ed84db450dbe73a89afcd8bc405273918415649ac6683356afe92/gymnasium-0.29.1-py3-none-any.whl.metadata
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Obtaining dependency information for farama-notifications>=0.0.1 from https://files.pythonhosted.org/packages/05/2c/ffc08c54c05cdce6fbed2aeebc46348dbe180c6d2c541c7af7ba0aa5f5f8/Farama_Notifications-0.0.4-py3-none-any.whl.metadata
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
   ---------------------------------------- 0.0/953.9 kB ? eta -:--:--
   -- ------------------------------------- 61.4/953.9 kB 1.7 MB/s eta 0:00:01
   ------------ --------------------------- 297.0/953.9 kB 3.7 MB/s eta 0:00:01
   --------------------------- ------------ 665.6/953.9 kB

In [5]:
import numpy as np 
import matplotlib.pyplot as plt 
import pandas as pd 
import gymnasium as gym
from gymnasium import spaces

## Environment

In [58]:
class ChargingEnv(gym.Env):

    def __init__(self, render_mode=None, battery_capacity = 50, alpha = [1,1,1,1,1,1,1,1]):
        
        super(ChargingEnv, self).__init__()
        
        self.battery_capacity = battery_capacity  # The capacity of the battery
        self.alpha = alpha  # The price coefficients per interval
        self.num_intervals = 8 # 15-minute intervals in 2 hours
        self.mu = 30 # expected energy demand
        self.sigma = 5 # sd of energy demand
        
        # Action space: 4 different charging rates, 0 (no charge), 1 (low charge), 2 (medium charge), 3 (high charge)
        self.action_space = spaces.Discrete(4)
        
        # Observation space: SoC (continuous), Time Step (discrete)
        self.observation_space = spaces.Tuple((
            spaces.Box(low=0, high=self.battery_capacity, dtype=np.float32),
            spaces.Discrete(self.num_intervals)
        ))
        
        
        # Initialize SoC for the very first episode
        initial_demand = np.random.normal(self.mu, self.sigma)
        initial_demand = np.clip(initial_demand, 0, self.battery_capacity)
        self.residual_soc = self.battery_capacity - initial_demand # SoC from episode before, assuming it was a full battery minus one working day
        self.soc = self.residual_soc
        
        self.time_step = 0
    
        # Metrics
        self.action_frequency = np.zeros(self.action_space.n)
        self.actions_per_time_slot = np.zeros((self.num_intervals, self.action_space.n))
        self.total_recharge_cost = 0.0
        self.energy_demand = 0.0
        self.residual_energy = 0.0
        self.energy_added = 0.0
        self.missing_energy = 0.0
        self.penalty_count = 0
        self.reward = 0

    
    def reset(self):
        """Reset the environment to the initial state for a new day."""
        self.soc = self.residual_soc  # Start with the residual SoC from the previous day
        self.time_step = 0

        # Reset metrics for next episode
        self.action_frequency.fill(0)
        self.actions_per_time_slot.fill(0)
        self.total_recharge_cost = 0.0
        self.energy_demand = 0.0
        self.residual_energy = 0.0
        self.cumu_energy_added = 0.0
        self.missing_energy = 0.0
        self.penalty_count = 0
        self.reward = 0
        
        return self._get_observation()
    
    def step(self, action):
        """Execute one time step within the environment."""
        assert self.action_space.contains(action), f"{action} is an invalid action"

        charging_rates = [0, 7, 15, 22]  # kW
        charging_rate = charging_rates[action]

        energy_added = min(charging_rate * 0.25, self.battery_capacity - self.soc)  # 15 minutes is 0.25 hours, ensure SoC does not exceed battery capacity
        self.cumu_energy_added += energy_added
        self.soc = self.soc + energy_added
        
        cost =  self._calculate_cost(charging_rate)
        self.reward -= cost
        
        done = self.time_step+1 >= self.num_intervals # Check if charging window is over
        if done:
            self.reward = self._calculate_end_of_day_reward()

        # Update metrics
        self.action_frequency[action] += 1
        self.actions_per_time_slot[self.time_step, action] += 1
        self.total_recharge_cost += cost # cost based on charging rate not actual energy charged
        
        self.time_step += 1
        
        observation = self._get_observation()
        return observation, self.reward, done, {}


    def _get_observation(self):
        """Get the current observation."""
        return (np.array([self.soc], dtype=np.float32), self.time_step)

    def _calculate_cost(self, charging_rate):
        """Calculate the charging cost based on the exponential function."""
        alpha_t = self.alpha[self.time_step]  # coefficient for time based price
        return alpha_t * np.exp(charging_rate)
    
    def _calculate_end_of_day_reward(self):
        """Calculate the reward at the end of the day."""
        self.energy_demand = np.random.normal(self.mu, self.sigma)
        if self.soc < self.energy_demand:
            self.missing_energy = self.energy_demand - self.soc
            penalty = np.exp(22) * 8 * min(1, self.missing_energy) # penalty for not meeting demand is worse than charging fully for 8 timeslots, proportional to missing demand        
            self.reward-= penalty
            self.residual_soc = 0  # Not enough charge, set residual SoC to 0
            self.penalty_count += 1
        else:
            self.missing_energy = 0
            self.residual_soc = self.soc - self.energy_demand  # Update residual SoC for the next day
        
        return self.reward

    def report_metrics(self):
        """Report the metrics at the end of an episode."""
        metrics = {
            "action_frequency": self.action_frequency,
            "actions_per_time_slot": self.actions_per_time_slot,
            "energy_demand": self.energy_demand,
            "residual_energy": self.residual_soc,
            "energy_added": self.cumu_energy_added,
            "missing_energy": self.missing_energy,
            "total_recharge_cost": self.total_recharge_cost,
            "penalty_count": self.penalty_count,
            "reward": self.reward
        }
        return metrics

## Basic Example


ep_2_residual_energy = ep_1_residual_energy + ep_2_energy_added - ep_2_energy demand  
logic: residual episode 1 energy after charging and demand, now in episode 2 we charge and subtract demand

In [68]:
env = ChargingEnv()

# simple fixed policy
def fixed_policy(state):
    return 2  # Always choose the medium charging rate

num_episodes = 10
for episode in range(num_episodes):
    
    state = env.reset()
    done = False
    while not done: # does 8 time steps
        action = fixed_policy(state)
        next_state, reward, done, info = env.step(action)
        state = next_state
    # After each episode, print the metrics
    metrics = env.report_metrics()
    print(f"Episode {episode + 1}: {metrics}")


Episode 1: {'action_frequency': array([0., 0., 8., 0.]), 'actions_per_time_slot': array([[0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.]]), 'energy_demand': 27.833224196732015, 'residual_energy': 15.791470074846256, 'energy_added': 30.0, 'missing_energy': 0, 'total_recharge_cost': 26152138.979776885, 'penalty_count': 0, 'reward': -26152138.979776885}
Episode 2: {'action_frequency': array([0., 0., 8., 0.]), 'actions_per_time_slot': array([[0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.],
       [0., 0., 1., 0.]]), 'energy_demand': 25.163856600020388, 'residual_energy': 20.62761347482587, 'energy_added': 30.0, 'missing_energy': 0, 'total_recharge_cost': 26152138.979776885, 'penalty_count': 0, 'reward': -26152138.979776885}
Episode