# Refactor EV

In [1]:
import itertools
from tqdm import tqdm
import pandas as pd
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
from collections import OrderedDict

import pprint


In [2]:
def safe_div(x, y):
    '''
    Safe division, return 0 if y is 0.

    Parameters
    ----------
    x: float
        numerator.
    y: float
        denominator.
    '''
    if y == 0:
        return 0
    else:
        return x/y

In [3]:
class DictAttr():

    def __init__(self, attr):
        """
        Base class for attribtues stored in OrderedDict

        Parameters
        ----------
        attr: OrderedDict
            Data attribute dictionary
        """
        for key, val in attr.items():
            setattr(self, key, val)
        self._dict = self.as_dict()

    def as_dict(self) -> OrderedDict:
        """
        Return the config fields and values in an ``OrderedDict``.
        """
        out = []
        for key, val in self.__dict__.items():
            if not key.startswith('_'):
                out.append((key, val))
        return OrderedDict(out)

    def __repr__(self):
        return pprint.pformat(self._dict)


In [27]:
class EVData():
    """EV data class"""

    def __init__(self, N, col, idx) -> None:
        self.v = -1 * np.ones((N, len(col)))
        self._col = col
        self._idx = idx

    def __repr__(self):
        info = f'EVData: {self.v.shape[0]} EVs, {self.v.shape[1]} columns'
        return info

    def as_df(self):
        """
        Return data as pandas DataFrame

        Parameters
        ----------
        col: list
            Column names.
        """
        df = pd.DataFrame(self.v, columns=self._col)
        df['idx'] = self._idx
        return df[['idx'] + self._col]


class MCS():
    """Monte-Carlo simulation class"""

    def __init__(self, config) -> None:
        """
        Parameters
        ----------
        """
        self.config = DictAttr(config)
    
    def __repr__(self) -> str:
        info = f'MCS: {self.config.t} to {self.config.tf}'
        return pprint.pformat(info)

    def run(self) -> None:
        """
        Run Monte-Carlo simulation
        """
        pass


class EVStation():
    """
    EV Station class, holds EV data, control EV status, and collecte EV info.
    """

    def __init__(self, config, ud_param, nd_param,
                 t=0,
                 name='EVS') -> None:
        """
        Parameters
        ----------
        config: DictAttr
            EV station configuration.
        ud_param: Dict of Dict
            Uniform distribution parameters.
        nd_param: Dict of Dict
            Normal distribution parameters.

        config
        ------
        N: number of EVs
        Ns: number of SOC intervals
        step: simulation step size
        seed: random seed

        nd_param
        --------
        soci: initial SOC
        socd: demanded SOC
        ts1: start charging time 1
        ts2: start charging time 2
        tf1: finish charging time 1
        tf2: finish charging time 2
        tt: tolerance of increased charging time

        ud_param
        --------
        Pc: charging power
        Pd: discharging power
        nc: charging efficiency
        nd: discharging efficiency
        Q: battery capacity
        """
        self.name = name
        self.config = DictAttr(config)
        MCSconfig = {'t':t, 'tf':t}
        self.MCS = MCS(config=MCSconfig)
        idx = [f'{self.name}_EV{i}' for i in range(self.config.N)]
        dcols = {'s': ['soci', 'socd', 'Pc', 'Pd', 'nc', 'nd',
                       'Q', 'ts', 'tf', 'tt', 'soc0', 'na0'],
                 'd': ['u', 'soc', 'c', 'sx', 'na', 'dP',
                       'agc', 'lc', 'mod'],
                 }
        self.sdata = EVData(N=self.config.N, col=dcols['s'], idx=idx)  # static data
        self.ddata = EVData(N=self.config.N, col=dcols['d'], idx=idx)  # dynamic data

        # --- 1. uniform distribution parameters ---
        ud_cols = ['Pc', 'Pd', 'nc', 'nd', 'Q']
        np.random.seed(self.config.seed)
        for col in ud_cols:
            lb = ud_param[col]['lb']
            ub = ud_param[col]['ub']
            self.sdata.v[:, dcols['s'].index(col)] = np.random.uniform(
                low=ud_param[col]['lb'],
                high=ud_param[col]['ub'],
                size=self.config.N)
        # NOTE: assumtpion: nc = nd
        self.sdata.v[:, dcols['s'].index('nc')] = self.sdata.v[:, dcols['s'].index('nd')]

        # --- 2. normal distribution parameters ---
        # --- 2.1 ---
        nd_cols = ['soci', 'socd', 'tt']
        for col in nd_cols:
            a = (nd_param[col]['lb'] - nd_param[col]['mu']) / nd_param[col]['var']
            b = (nd_param[col]['ub'] - nd_param[col]['mu']) / nd_param[col]['var']
            distribution = stats.truncnorm(a, b, loc=nd_param[col]['mu'], scale=nd_param[col]['var'])
            self.sdata.v[:, dcols['s'].index(col)] = distribution.rvs(self.config.N,
                                                                      random_state=self.config.seed)

        # --- 2.2 time parameters ---
        nd_cols = ['ts1', 'ts2', 'tf1', 'tf2']
        tparam = pd.DataFrame()
        for col in nd_cols:
            a = (nd_param[col]['lb'] - nd_param[col]['mu']) / nd_param[col]['var']
            b = (nd_param[col]['ub'] - nd_param[col]['mu']) / nd_param[col]['var']
            distribution = stats.truncnorm(a, b, loc=nd_param[col]['mu'], scale=nd_param[col]['var'])
            tparam[col] = distribution.rvs(self.config.N, random_state=self.config.seed)

        r1 = 0.5  # ratio of ts1 to ts2
        tp1 = tparam[['ts1', 'tf1']].sample(n=int(self.config.N*r1), random_state=self.config.seed)
        tp2 = tparam[['ts2', 'tf2']].sample(n=int(self.config.N*(1-r1)), random_state=self.config.seed)
        tp = pd.concat([tp1, tp2], axis=0).reset_index(drop=True).fillna(0)
        tp['ts'] = tp['ts1'] + tp['ts2']
        tp['tf'] = tp['tf1'] + tp['tf2']

        check = tp['ts'] > tp.tf
        row_idx = tp[check].index
        mid = tp['tf'].iloc[row_idx].values
        tp['tf'].iloc[row_idx] = tp['ts'].iloc[row_idx]
        tp['ts'].iloc[row_idx] = mid
        self.sdata.v[:, dcols['s'].index('ts')] = tp['ts']
        self.sdata.v[:, dcols['s'].index('tf')] = tp['tf']

        # --- 3. online status ---
        u_check1 = self.sdata.v[:, dcols['s'].index('ts')] < self.MCS.config.t
        u_check2 = self.sdata.v[:, dcols['s'].index('tf')] > self.MCS.config.t
        u_check = u_check1 & u_check2
        self.ddata.v[:, dcols['d'].index('u')] = np.array(u_check)

    def rctrl(self):
        """Response to control signal"""
        pass


evs_config = {'N': 10000, 'Ns': 20, 'step': 1, 'seed': 2022}
nd_param = {'soci': {'mu': 0.3, 'var': 0.05, 'lb': 0.2, 'ub': 0.4},
            'socd': {'mu': 0.8, 'var': 0.03, 'lb': 0.7, 'ub': 0.9},
            'ts1': {'mu': -6.5, 'var': 3.4, 'lb': 0.0, 'ub': 5.5},
            'ts2': {'mu': 17.5, 'var': 3.4, 'lb': 5.5, 'ub': 24.0},
            'tf1': {'mu': 8.9, 'var': 3.4, 'lb': 0.0, 'ub': 20.9},
            'tf2': {'mu': 32.9, 'var': 3.4, 'lb': 20.9, 'ub': 24.0},
            'tt': {'mu': 0.5, 'var': 0.02, 'lb': 0, 'ub': 1}}
ud_param = {'Pc': {'lb': 5.0, 'ub': 7.0},
            'Pd': {'lb': 5.0, 'ub': 7.0},
            'nc': {'lb': 0.88, 'ub': 0.95},
            'nd': {'lb': 0.88, 'ub': 0.95},
            'Q': {'lb': 20.0, 'ub': 30.0}}

evs = EVStation(name='EVS1', t=18,
                config=evs_config,
                ud_param=ud_param,
                nd_param=nd_param)
evs.ddata.as_df()


Unnamed: 0,idx,u,soc,c,sx,na,dP,agc,lc,mod
0,EVS1_EV0,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0
1,EVS1_EV1,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0
2,EVS1_EV2,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0
3,EVS1_EV3,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0
4,EVS1_EV4,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0
...,...,...,...,...,...,...,...,...,...,...
9995,EVS1_EV9995,1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0
9996,EVS1_EV9996,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0
9997,EVS1_EV9997,0.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0
9998,EVS1_EV9998,1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0


In [None]:
class EVCenter():
    """EV control center, based on state space model"""
    def __init__(self) -> None:
        super().__init__()

    def setup(self):
        """Setup State Space Model"""
        pass

    def efrc(self):
        """Estimate frequency regulation capacity"""
        pass

    def gctrl(self):
        """Generte control signal"""
        pass

In [None]:
# --- Initialization ---
evs1 = EVStation()  # EV Station
evs2 = EVStation()
evc = EVCenter()  # EV Center

# --- Dispatch ---
evc.efrc()  # EVS estimate FRC
evc.send()  # EVS send info to transmisstion

# --- DG Control ---
evc.receive()  # receive info from transmission
evc.gctrl()  # generate control signal
evc.send()  # send control signal to EVStation

evs1.receive()  # receive control signal
evs1.rctrl()  # response to control signal

evs2.receive()  # receive control signal
evs2.rctrl()  # response to control signal

# --- Run ---
# Run Distribution
fd.run()

# RUN DG
evs1.run()
evs2.run()
# Federate Power with Transmission

# RUN Dynamic
tc.run()
