# 1. Discretize the state space

This section we define some arguments for discretization and generate discretized state space according to them.

In [None]:
import numpy as np
import warnings
from saved_models.LSTM import LSTM
from System import Plant
from Estimator import Estimator
from tqdm import tqdm
warnings.filterwarnings('ignore')
np.random.seed(0)

# Choose if to use saved action table for only simulation
use_saved_model = True
model_name = 'saved_models/Action_table_square.pt' 
# Notice that if you choose to use Action_table_dt_1/5/10.pt, you need to change the c_resolution to 0.2 and p_resolution to 0.5. 
# Also for dt=5 or 10, change the ctrl_freq to 12 or 6

# Define discretize resolution for states and input
c_resolution = 1   #unit: kWh
p_resolation = 1   #unit: kW
l_resolution = 0.5 #unit: kW
u_resolution = 1 

# Define time step and dynamic programming time horizon
simu_freq = 60 # simulation frequency, unit:1/hour
ctrl_freq = 60 # control frequency, unit:1/hour
dt = 1/simu_freq # unit: hour
T  = 1440

# Define property of random variable
n_sample = 50 #number of monte carlo samples to discretize randomness
std = 0.1 # variation of random variable

c_dis = np.arange(start=0, stop=7.01, step=c_resolution) 
p_dis = np.arange(start=0, stop=15.01, step=p_resolation) 
l_dis = np.arange(start=0, stop=10.01, step=l_resolution)
u_dis = np.arange(start=-5, stop=4.1, step=u_resolution)
n = c_dis.shape[0] * p_dis.shape[0] * l_dis.shape[0]
print("Size of state space: {}".format(n))
print("Size of action space: {}".format(u_dis.shape[0]))

# 2. Generate transition matrix

## 2.1 Functions
In this section we defined some functions for the following calculation

In [None]:
def close_idx(list, target):
    """Find in a list the closest number to the target number and return the index in the list

    Args:
        list (np.array): Search list
        target (float): target number

    Returns:
        int: index of the number in the list that is closest to target
    """
    return np.argmin(np.abs(list - target))

def get_idx(p_idx, c_idx, l_idx):
    """Given the index of p, c, l in discretized array and return the index of the state in state space

    Args:
        p_idx (int): index of p in p_dis
        c_idx (int): index of c in c_dis
        l_idx (int): index of l in l_dis

    Returns:
        int: index of state (p,c,l) in state space
    """
    return int(p_idx * c_dis.shape[0] * l_dis.shape[0] + 
            c_idx * l_dis.shape[0]                  +
            l_idx)
    
def get_pcl(idx):
    """Decode the state index into (p, c, l)

    Args:
        idx (int): state index

    Returns:
        (p, c, l): state arguments
    """
    p_idx = idx // (c_dis.shape[0] * l_dis.shape[0])
    c_idx = (idx % (c_dis.shape[0] * l_dis.shape[0])) // l_dis.shape[0]
    l_idx = idx % l_dis.shape[0]
    return [p_dis[p_idx], c_dis[c_idx], l_dis[l_idx]]

def idx_prob(dis, p):
    """Calculate the probability of state appearing in discretized state space
       e.g. with dis = [1,2,3,4] and p=1.2, this function would return with left_idx=0 right_idx=1 left_prob=0.8 right

    Args:
        dis (np.array): discretized space
        p (float): undiscretized space

    Returns:
        int, int, float, float: [left_idx, right_idx, left_prob, right_prob]
    """
    p_close_idx = close_idx(dis, p)
    if p >= dis[-1]:
        p_left_idx = dis.shape[0] - 1
        p_right_idx = dis.shape[0] - 2
        p_left_prob = 1
        p_right_prob = 0
    elif p <= 0:
        p_left_idx = 0
        p_right_idx = 1
        p_left_prob = 1
        p_right_prob = 0
    elif p >= dis[p_close_idx]:
        p_left_idx = p_close_idx
        p_right_idx = p_close_idx + 1
        p_right_prob = (p - dis[p_close_idx]) / (dis[p_close_idx + 1] - dis[p_close_idx])
        p_left_prob = (dis[p_close_idx + 1] - p) / (dis[p_close_idx + 1] - dis[p_close_idx])
    elif p < dis[p_close_idx]:
        p_left_idx = p_close_idx - 1
        p_right_idx = p_close_idx
        p_right_prob = (p - dis[p_close_idx - 1]) / (dis[p_close_idx] - dis[p_close_idx - 1])
        p_left_prob = (dis[p_close_idx] - p) / (dis[p_close_idx] - dis[p_close_idx - 1])
    return [p_left_idx, p_right_idx, p_left_prob, p_right_prob]

def get_convex_combination(p, c, l):
    """Get convex combination given target state

    Returns:
        dict: dictionary of hit state and probability
    """
    
    hit_state_idx_list = {}
    p_idx = np.zeros(2)
    p_prob = np.zeros(2)
    c_idx = np.zeros(2)
    c_prob = np.zeros(2)
    l_idx = np.zeros(2)
    l_prob = np.zeros(2)
    
    p_idx[0], p_idx[1], p_prob[0], p_prob[1] = idx_prob(p_dis, p)
    c_idx[0], c_idx[1], c_prob[0], c_prob[1] = idx_prob(c_dis, c)
    l_idx[0], l_idx[1], l_prob[0], l_prob[1] = idx_prob(l_dis, l)
    
    for i in range(2):
        for j in range(2):
            for k in range(2):
                hit_idx = get_idx(p_idx[i], c_idx[j], l_idx[k])
                hit_prob = p_prob[i] * c_prob[j] * l_prob[k]
                hit_state_idx_list[hit_idx] = hit_prob
    
    return hit_state_idx_list
    

## 2.2 Calculating transition matrix
Here we use `P_trans` to store transition matrix for discretized state space. `P_trans` is a dictionary with key of *(state_index, u_index, hit_state_index)* and value of transition probability. If we apply u to the system with state with *state_index*, the probability of the system ending up with *hit_state*.

In [None]:
if not use_saved_model:
    P_trans = {}
    plant = Plant(dt=dt)
    estimator = Estimator()

    for l_idx, l in enumerate(tqdm(l_dis)):
        l_hat = estimator.estimate(l)
        l_hats = np.random.normal(loc=l_hat, scale=std, size=n_sample)
        for p_idx, p in enumerate((p_dis)):
            for c_idx, c in enumerate(c_dis):
                for u_idx, u in enumerate(u_dis):
                    for l_hat in l_hats:
                        state_idx = get_idx(p_idx, c_idx, l_idx)
                        plant.reset(c, p, l)
                        plant.step(u, l_hat)
                        
                        ## using closest state
                        # hit_state_idx = get_idx(close_idx(p_dis, plant.p),
                        #                         close_idx(c_dis, plant.battery.c),
                        #                         close_idx(l_dis, plant.l))
                        # if P_trans.__contains__((state_idx, u_idx, hit_state_idx)):
                        #     P_trans[state_idx, u_idx, hit_state_idx] += (1/n_sample)
                        # else:
                        #     P_trans[state_idx, u_idx, hit_state_idx] = (1/n_sample)
                        
                        ## using cvx combination
                        hit_state_idx_list = get_convex_combination(plant.p, plant.battery.c, plant.l)
                        for hit_state_idx, hit_prob in hit_state_idx_list.items():
                            if P_trans.__contains__((state_idx, u_idx, hit_state_idx)):
                                P_trans[state_idx, u_idx, hit_state_idx] += (1/n_sample) * hit_prob
                            else:
                                P_trans[state_idx, u_idx, hit_state_idx] = (1/n_sample) * hit_prob

Ensure the transition matrix is valid:

In [None]:
if not use_saved_model:
    state_idx = 1000
    u_idx = 5

    sum = 0
    for j in range(n):
        if P_trans.__contains__((state_idx, u_idx, j)):
            sum += P_trans[state_idx, u_idx, j]

    print(sum)

# 3. Dynamic programming

In [None]:
import torch

if not use_saved_model:
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    P_trans_tensor = torch.sparse_coo_tensor(indices=list(zip(*P_trans.keys())), values=list(P_trans.values()), size=(n, u_dis.shape[0], n), device=device, dtype=torch.float)
    J = torch.exp(torch.tensor([get_pcl(i)[0] for i in range(n)], device=device).float())
    Actions = torch.zeros(size=(T, n), device=device)

    for t in tqdm(range(T-1, 0, -1)):
        J_update = torch.zeros_like(J, device=device)
        for i in (range(n)): 
            J_update[i], Actions[t][i] = torch.min(P_trans_tensor[i] @ J, dim=0)
        J = J_update


Save Actions table:

In [None]:
#torch.save(Actions, 'saved_models/Action_table_exp.pt')

# 4. Testing with real data

In [None]:
import torch

if use_saved_model:
    with open(model_name, 'rb') as f:
        Actions = torch.load(f)
    device = 'cpu'
    Actions = Actions.to(device=device)

In [None]:
import pandas as pd
from matplotlib import rc


df = pd.read_csv('data/data.csv')  # read df from csv file

df['Datetime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'], format='%d-%m-%Y %H;%M:%S')
df = df.drop(columns=['Date', 'Time'])
df.set_index('Datetime', inplace=True)

print(df[:5])

In [None]:
import matplotlib.pyplot as plt
import datetime

simu_time = T * dt # hour
simu_step = T 
cur_time = df.index[0] #2007-10-01 00:00:00 

peaks = []
loads = []
times = []
batteries = []

saved_money = 0

env = Plant(dt=1/simu_freq)
p, c, l = env.reset(init_battery=0, init_peak=0, init_load=0)
for day in range(1):
    l_peak = 0
    for i in range(simu_step):
        hit_state_list = get_convex_combination(p, c, l)
        if i % int(simu_freq / ctrl_freq) == 0:
            u = 0
            for state_idx, state_prob in hit_state_list.items():
                action_idx = int(Actions[int(i/(simu_freq/ctrl_freq))%T][int(state_idx)].item())
                u += u_dis[action_idx] * state_prob
        
        # print("peak: {}, c: {}, load: {}".format(p, c, l))
        # print("action: {}".format(u_dis[action_idx]))
        # print("current time:", cur_time)
        # print("=============================")
        
        real_load = df.loc[cur_time, 'Load']
        if l_peak < real_load:
            l_peak = real_load
        
        p, c, l = env.step(u, real_load)
        cur_time += datetime.timedelta(hours=1/simu_freq)
        times.append(cur_time)
        peaks.append(p)
        loads.append(l)
        batteries.append(c)
    saved_money += (l_peak - env.p)*1.5
    env.p = 0

font = {'family' : 'serif',
        'serif'  : 'Times New Roman',
        'weight' : 'normal',
        'size'   : '20'}
rc('font', **font)
rc('mathtext', default='regular')

fig = plt.figure(figsize=(24, 8))
ax = fig.add_subplot(111)
ln1 = ax.plot(times, peaks, label='Peaks')
ln2 = ax.plot(times, loads, label='Real Load')

ax2 = plt.twinx()
ln3 = ax2.plot(times, batteries, '--', color="red",linewidth=2,label='Battery')
lns = ln1+ln2+ln3
labs = [l.get_label() for l in lns]

ax.legend(lns, labs, loc=0)
ax.grid()
ax.set_xlabel("Time")
ax.set_ylabel(r"Power ($kW$)")
ax2.set_ylabel(r"Battery capacity ($kWh$)")
fig.show()
fig.savefig("fig/one_day.png")

print("total saved money:", saved_money)