In [45]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import control as ct
import lmfit
from pytz import timezone
import vaex
import pvlib

import sys
sys.path.append('../../../')

from utils.plot import config_matplotlib, figsize, fig_save_and_show
from utils.optimization import MyOptimizationProblem, convert_to_model_params, plot_optimization_params, plot_optimization_error, plot_compare
from utils.data import get_events, load_df as load_df_inner

config_matplotlib()
latex_img_path = '/home/joaoantoniocardoso/workspace_TCC/repo/tcc/imgs/'

# Data

In [None]:
events, tzinfo = get_events()
events

In [47]:
def read_oscilloscope_csv(file_name: str) -> tuple:
    import csv

    with open(file_name, 'r') as csv_file:
        c = csv.reader(csv_file)

        raw_x = []
        raw_y_CH1 = []
        raw_y_CH2 = []
        raw_y_CH3 = []
        raw_y_CH4 = []
        probe_atten = [0,0,0,0]
        vertical_units = ['','','','']
        vertical_offset = [0,0,0,0]
        vertical_scale = [0,0,0,0]

        in_header = True
        for row in c:
            if row:
                if in_header:
                    if row is []:
                        pass
                    elif 'Model' in row[0]:
                        model_number = str(row[1])
                    elif 'Firmware Version' in row[0]:
                        firmware_version = str(row[1])
                    elif 'Horizontal Unit' in row[0]:
                        horizontal_units = str(row[1])
                    elif 'Horizontal Scale' in row[0]:
                        horizontal_scale = float(row[1])
                    elif 'Sample Interval' in row[0]:
                        sample_interval = float(row[1])
                    elif 'Filter Frequency' in row[0]:
                        sample_interval = float(row[1])
                    elif 'Record Length' in row[0]:
                        record_length = float(row[1])
                    elif 'Probe Attenuation' in row[0]:
                        probe_atten[0] = float(row[1])
                        probe_atten[1] = float(row[2])
                        probe_atten[2] = float(row[3])
                    elif 'Vertical Units' in row[0]:
                        vertical_units[0] = str(row[1])
                        vertical_units[1] = str(row[2])
                        vertical_units[2] = str(row[3])
                    elif 'Vertical Offset' in row[0]:
                        vertical_offset[0] = float(row[1])
                        vertical_offset[1] = float(row[2])
                        vertical_offset[2] = float(row[3])
                    elif 'Vertical Scale' in row[0]:
                        vertical_scale[0] = float(row[1])
                        vertical_scale[1] = float(row[2])
                        vertical_scale[2] = float(row[3])
                    elif row[0] == 'Label':
                        pass
                    elif row[0] == 'TIME':
                        in_header = False
                else:
                    raw_x.append(float(row[0]))
                    raw_y_CH1.append(float(row[1]))
                    raw_y_CH2.append(float(row[2]))
                    raw_y_CH3.append(float(row[3]))
                    raw_y_CH4.append(float(row[4]))

        info_dict = {
            'Record Length': record_length,
            'Sample Interval': sample_interval,
            'Vertical Units': vertical_units,
            'Vertical Scale': vertical_scale,
            'Vertical Offset': vertical_offset,
            'Horizontal Units': horizontal_units,
            'Horizontal Scale': horizontal_scale,
            'Probe Atten': probe_atten,
            'Firmware Version': firmware_version,
        }

        ret = (
            np.array(raw_x),
            np.array(raw_y_CH1),
            np.array(raw_y_CH2),
            np.array(raw_y_CH3),
            np.array(raw_y_CH4),
            info_dict
        )

        return ret

def read_oscilloscope_wfm_csv(file_name: str, samplerate) -> tuple:
    import numpy as np
    raw_y_CH1, raw_y_CH2, raw_y_CH3, raw_y_CH4, _ = np.genfromtxt(
        file_name, delimiter=';', dtype=float, filling_values=0.0,
        skip_header=1, unpack=True,
    )
    info_dict = None
    ret = (
        #np.arange(len(raw_y_CH1)),
        np.linspace(start=0, stop=len(raw_y_CH1)//samplerate, num=len(raw_y_CH1)),
        np.array(raw_y_CH1),
        np.array(raw_y_CH2),
        np.array(raw_y_CH3),
        np.array(raw_y_CH4),
        info_dict
    )
    return ret


In [48]:
def compute_df_soc(df, nominal_Q, inplace=False):
    """
    Compute the State of Charge (SOC) of a battery.

    Parameters:
        df (pd.DataFrame): DataFrame containing battery data.
        nominal_Q (float): Nominal capacity of the battery in Ampere-hours (Ah).
        inplace (bool, optional): If True, the SOC values are added to the input DataFrame `df`.
                                  Default is False.

    Returns:
        np.ndarray: Array containing the computed SOC values as a fraction of the nominal capacity.

    Notes:
        - The input DataFrame `df` is expected to contain a column named 'batt_Q' representing the battery charge.
        - The output SOC values are dimensionless fractions representing the battery's charge level relative to its nominal capacity.
        - If `inplace` is True, the computed SOC values are added as a new column 'batt_z' to the input DataFrame `df`.
    """
    SOC = df['batt_Q'].to_numpy() / nominal_Q

    if inplace:
        df['batt_z'] = SOC
    return SOC

def compute_df_capacity(df, nominal_Q, initial_SOC, inplace=False):
    """
    Compute the capacity of a battery based on cumulative integration of current over time.

    Parameters:
        df (pd.DataFrame): DataFrame containing battery data.
        nominal_Q (float): Nominal capacity of the battery in Ampere-hours (Ah).
        initial_SOC (float): Initial State of Charge (SOC) as a fraction of the nominal capacity.
        inplace (bool, optional): If True, the computed capacity values are added to the input DataFrame `df`.
                                  Default is False.

    Returns:
        np.ndarray: Array containing the computed capacity values in Ampere-hours (Ah).

    Notes:
        - The input DataFrame `df` is expected to contain columns 'batt_i' for battery current and 't' for time.
        - The output capacity values represent the remaining battery capacity after cumulative integration of current over time.
        - The input parameter `initial_SOC` is a dimensionless fraction representing the initial battery charge relative to its nominal capacity.
        - If `inplace` is True, the computed capacity values are added as a new column 'batt_Q' to the input DataFrame `df`.
    """
    from scipy.integrate import cumulative_trapezoid as cumtrapz

    time_hours = df['t'].to_numpy(dtype='float64')*1e-9 / 3600  # Converts time in seconds to time in hours
    Q = (nominal_Q * initial_SOC) - cumtrapz(df['batt_i'], time_hours, initial=0).astype('float64')  # units: Amper-hour

    if inplace:
        df['batt_Q'] = Q
    return Q

def load_df(filename, start, end, nominal_Q, initial_SOC, resample_rule='1s', rename_columns={}):
    df = vaex.from_csv(filename).to_pandas_df()
    df["timestamp"] = pd.DatetimeIndex(df["timestamp"]).tz_convert(None)
    df = df.set_index("timestamp", drop=True)

    display('original columns:', df.columns)
    display('renamed columns:', rename_columns)

    df = df.loc[
        (df.index >= start) & (df.index <= end),
        rename_columns.keys(),
    ].rename(columns=rename_columns).dropna().resample(resample_rule).mean().interpolate(method='time', limit_area='inside')

    # After resampling, create the equally-spaced 't' index, used for control simulations
    dT = df.index.diff().median().to_numpy().astype(np.float64) * 1e-9   # simulation time in seconds
    lenT = len(df.index)
    df['t'] = np.linspace(0, lenT * dT, lenT, endpoint=False)  # Recreate the time array because of numerical issues from the index datetime to float transformation

    compute_df_capacity(df, nominal_Q, initial_SOC, inplace=True)
    compute_df_soc(df, nominal_Q, inplace=True)

    return df


In [49]:
batt_name = 'D35'
cell_amps_hour = 48
series_cells = 3
parallel_cells = 1
cell_voltage = 12
nominal_Q = parallel_cells * cell_amps_hour  # Capacity, in Amper-hour
initial_SOC = 0.99
nominal_E = series_cells * parallel_cells * cell_amps_hour * cell_voltage

In [None]:
file = 'data/Waveform_2019-11-22_232730_576_-1.csv'

time, batt1_v, batt_i, batt2_v, batt3_v, _ = read_oscilloscope_csv(file)
batt_v = batt1_v + batt2_v + batt3_v

df = pd.DataFrame({
    't': time,
    'batt_i': batt_i,
    'batt_v': batt_v,
}).iloc[:600]

t = df['t'].to_numpy()
df['t'] = ((t - t[0])*1e9).astype('timedelta64[ns]')
df.set_index('t', inplace=True)
df = df.resample('10ms').mean().interpolate(method='time', limit_area='inside')
df.reset_index(inplace=True)

compute_df_capacity(df, nominal_Q, initial_SOC, inplace=True)
compute_df_soc(df, nominal_Q, inplace=True)

df.set_index('t', inplace=True)
df[['batt_i', 'batt_v', 'batt_z']].plot(subplots=True);

In [None]:
file = 'data/teste29-11.Wfm.csv'
samplerate = 10

time, batt1_v, batt2_v, batt3_v, batt_i, _ = read_oscilloscope_wfm_csv(file, samplerate)
batt_v = batt1_v + batt2_v + batt3_v

df = pd.DataFrame({
    't': time,
    'batt_i': batt_i,
    'batt_v': batt_v,
})

t = df['t'].to_numpy()
print(t)
df['t'] = ((t - t[0])*1e9).astype('timedelta64[ns]')
df.set_index('t', inplace=True)
df = df.resample('1s').mean().interpolate(method='time', limit_area='inside')
df.reset_index(inplace=True)

compute_df_capacity(df, nominal_Q, initial_SOC, inplace=True)
compute_df_soc(df, nominal_Q, inplace=True)

df.set_index('t', inplace=True)
df[['batt_i', 'batt_v', 'batt_z']].plot(subplots=True);

dT = df.index.diff().median().to_numpy().astype(np.float64) * 1e-9   # simulation time in seconds
lenT = len(df.index)
df['t'] = np.linspace(0, lenT * dT, lenT, endpoint=False)  # Recreate the time array because of numerical issues from the index datetime to float transformation

dfa = df.copy(deep=True)

# Battery Equivalent Circuit Model


In [None]:
# !pip install lcapy zfitpy pyspice
# %pip install https://github.com/mph-/lcapy/archive/master.zip
# Open `tllocalmgr`, then `install circuitikz`, then `exit`, then `sudo texhash`
import lcapy as lca

circuit_draw_params = dict(
    # help_lines=1,
    # debug=True,
    draw_nodes=True,
    label_nodes=False,
    label_ids=True,
    label_values=False,
    cpt_size=1,
    node_spacing=3,
    scale=1,
    dpi=150,
)

circuit = lca.Circuit("""
C0 R1.1 0 2 0; down, l={OCV(z(t))}
R1 .1 .2; right
C1 .1 .2; right
W R1.1 C1.1; up=0.5
W R1.2 C1.2; up=0.5
R0 R1.2 .2; right, i>={i(t)}
P1 R0.2 0_2; down, v={v(t)}
W 0 0_2; right
;""")
circuit.draw(**circuit_draw_params)
# circuit.draw(**circuit_draw_params, filename="/home/joaoantoniocardoso/workspace_TCC/repo/tcc/imgs/battery_model.png")


![](feZkCLr.png)

De acordo com [Mateo Basic](https://www.sciencedirect.com/science/article/pii/S2405896322003469), as seguintes equações representam o sistema da imagem acima, onde $z(t)$ é o estado de carga da bateria:

$$ \begin{aligned}
    \frac{dv_{C_{1}}(t) }{ dt } &=
        -\frac{ v_{C_{1}}(t) }{ R_{1} C_{1} }
        +\frac{ i(t) }{ C_{1} } \\
    \frac{ dv_{C_{0}}(t) }{ dt } &=
        \frac{ i(t) }{ C_{0} } \\
    v(t) &= 
        -v_{C_{0}}
        -v_{C_{1}}
        -R_{0} i(t)
\end{aligned} $$

O estado de carga ($z(t)$) pode ser definido como nas equações abaixo, onde $Q$ é a carga nominal da bateria, e $\eta$, a eficiência culombica:

$$ \begin{aligned}
    z(t) &= 
        z(t_{0}) 
        -\frac{1}{Q} \int_{t_{0}}^t \eta(t) i(t) dt \\
    \frac{dz(t)}{dt} &= 
        -\frac{\eta(t) i(t)}{Q}
\end{aligned} $$

Para que possam ser utilizadas os métodos de solução de sistemas no espaço de estados, podemos separar $VOC(z(t))$ em um modelo externo, considerando-o como uma entrada, deste modo, a função $VOC(z(t))$ pode ser um modelo caixa-preta.

In [None]:
import control as ct
from math import sqrt, pi

class Battery:
    @classmethod
    def _update(cls, t, x, u, params: dict):
        # Params
        batt_eta, batt_Q, batt_R_1, batt_C_1, batt_R_2, batt_C_2 = (
            params['batt_eta'],
            params['batt_Q'],
            params['batt_R_1'],
            params['batt_C_1'],
            params['batt_R_2'],
            params['batt_C_2'],
        )

        # Inputs
        batt_i = u[0]  # Battery current [A]

        # States
        # batt_z = x[0]  # battery State Of Charge [unitless]
        batt_v_C_1 = x[1]  # battery current flowing through the internal serie-parallel resistance [A]
        batt_v_C_2 = x[2]  # battery current flowing through the internal serie-parallel resistance [A]

        # System of differential equations
        d_batt_v_C_1 = -(batt_v_C_1 / (batt_R_1 * batt_C_1)) + (batt_i / batt_C_1)
        d_batt_v_C_2 = -(batt_v_C_2 / (batt_R_2 * batt_C_2)) + (batt_i / batt_C_2)
        d_batt_z = -batt_eta * batt_i / batt_Q

        return np.array([d_batt_z, d_batt_v_C_1, d_batt_v_C_2])

    @classmethod
    def _outputs(cls, t, x, u, params: dict):
        # Params
        batt_R_0, batt_k_V_OC_coeffs = (
            params['batt_R_0'],
            params['batt_k_V_OC_coeffs'],
        )

        # Inputs
        batt_i = u[0]  # Battery current [A]

        # States
        batt_z = x[0]  # battery State Of Charge [unitless]
        batt_v_C_1 = x[1]  # battery current flowing through the internal serie-parallel capacitor [A]
        batt_v_C_2 = x[2]  # battery current flowing through the internal serie-parallel capacitor [A]

        # Output equations
        batt_ocv = np.polynomial.Polynomial(batt_k_V_OC_coeffs)(batt_z)
        batt_v = batt_ocv - batt_v_C_1 - batt_v_C_2 - batt_R_0 * batt_i

        return np.array([batt_v, batt_z, batt_ocv])


    @classmethod
    def build(cls, params: dict):
        return ct.NonlinearIOSystem(
            cls._update,
            cls._outputs,
            name='battery',
            states=('batt_z', 'd_batt_v_C_1', 'd_batt_v_C_2'),
            inputs=('batt_i',),
            outputs=('batt_v', 'batt_z', 'batt_ocv'),
            params=params,
        )

Battery.build({})

In [None]:
plt.figure()

# From: https://imgv2-2-f.scribdassets.com/img/document/682244176/original/0a169c4659/1730632177?v=1
v = np.array([13.1, 12.9, 12.72, 12.55, 12.39, 12.22, 12.05, 11.87, 11.65, 11.3, 10.5]) * 3
z = np.array([1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0])
plt.plot(z, v)

deg = 4
batt_k_V_OC_coeffs = np.polynomial.Polynomial.fit(z, v, deg, [0, 1]).convert().coef
display(batt_k_V_OC_coeffs)

v = np.linspace(0, 1, 100)
y_ref2 = np.polynomial.Polynomial(batt_k_V_OC_coeffs)(v)
plt.plot(v, y_ref2)

battery_params = {
    'batt_eta': 1,
    'batt_Q': 48 * 3600,
    'batt_R_0': 0.05,
    'batt_R_1': 0.01,
    'batt_C_1': 50,
    'batt_R_2': 10,
    'batt_C_2': 5000,
    'batt_k_V_OC_coeffs': batt_k_V_OC_coeffs,
}

In [None]:
df = dfa.copy(deep=True)

# Time array
T = df['t'].to_numpy()

# Inputs
U = df[['batt_i']].to_numpy().T

# Model
battery = Battery.build(battery_params)

# Initial state
X0 = np.zeros(battery.nstates)
X0[battery.state_index['batt_z']] = 1

# Simulation
res = ct.input_output_response(battery, T=T, U=U, X0=X0, solve_ivp_method='Radau')
ydata = res.to_pandas().set_index('time')[res.output_labels]

ydata.plot(subplots=True, figsize=(10,7));

# Optimization

In [56]:
from sklearn.preprocessing import RobustScaler
from pymoo.optimize import minimize
from pymoo.algorithms.soo.nonconvex.nelder import NelderMead
from pymoo.algorithms.soo.nonconvex.pso import PSO
from pymoo.algorithms.soo.nonconvex.isres import ISRES
from pymoo.algorithms.soo.nonconvex.es import ES
from pymoo.algorithms.soo.nonconvex.de import DE
from pymoo.algorithms.soo.nonconvex.g3pcx import G3PCX
from pymoo.algorithms.soo.nonconvex.pattern import PatternSearch
from pymoo.operators.sampling.lhs import LHS
from pymoo.termination.default import DefaultSingleObjectiveTermination
from pymoo.parallelization.starmap import StarmapParallelization
from multiprocessing.pool import ThreadPool
from multiprocessing import Pool

import warnings
warnings.filterwarnings("ignore")


def model_function(T, U, X0, **params):
    return ct.input_output_response(
        Battery.build(params=params),
        T=T,
        U=U,
        X0=X0,
        solve_ivp_method='Radau',
    ).to_pandas()


In [None]:
%%time
## BATTERY OPTIMIZATION

# Select the optimization data
data_cols = ['batt_v', 'batt_z']
input_cols = ['batt_i']

# Put simulation data into df
df_train = dfa.copy(deep=True)[data_cols + input_cols]

# Set model states initial conditions
X0_dict = {'batt_z': df_train['batt_z'][0]}

# Initialize Parameters
params_bounds = {
    'batt_R_0': dict(min=1e-6, max=1),
    'batt_R_1': dict(min=1e-6, max=1),
    'batt_C_1': dict(min=1, max=1e3),
    'batt_R_2': dict(min=1e-3, max=100),
    'batt_C_2': dict(min=10, max=1e6),
    'batt_eta': dict(min=0.1, max=1),
}
initial_coeffs = np.zeros(1)
# n_coeffs = len(battery_params['batt_k_V_OC_coeffs'])
n_coeffs = 5
for i in range(0, n_coeffs):
    params_bounds[f'batt_ocv_coeff_{i}'] = dict(min=-100, max=100)

# Initialize the thread pool and create the runner
n_processes = 32
pool = Pool(processes=n_processes)
runner = StarmapParallelization(pool.starmap)

# Initialize the problem
model = Battery
problem = MyOptimizationProblem(
    model=model,
    model_function=model_function,
    training_data=df_train,
    training_data_columns=data_cols,
    model_params=battery_params,
    opt_params_bounds=params_bounds,
    input_initial_state=input_initial_state,
    input_columns=input_cols,
    data_scaler=RobustScaler,
    elementwise_runner=runner,
)

algorithm = ISRES(n_offsprings=10*len(params_bounds), rule=1.0 / 7.0, gamma=0.85, alpha=0.2)
# algorithm = DE(
#     pop_size=10*len(params_bounds),
#     sampling=LHS(),
#     variant="DE/rand/1/bin",
#     F=0.2,
#     CR=0.9,
# )

# Specify termination criteria (optional)
termination = DefaultSingleObjectiveTermination(
    xtol=1e-8,
    cvtol=1e-6,
    ftol=1e-6,
    period=20,
    n_max_gen=1000,
    n_max_evals=100000
)

# Run the optimization
result = minimize(
    problem,
    algorithm,
    termination=termination,
    seed=42,
    verbose=True,
    save_history=True,
)
print('Threads:', result.exec_time)
pool.close()
pool.join()

# Print the results
best_params = convert_to_model_params({k: result.X[i] for (i, k) in enumerate(params_bounds.keys())})
original_params = {k: convert_to_model_params(solar_boat_params)[k] for k in best_params.keys()}
print("Original parameters were:")
display(original_params)
print("Best parameters found:")
display(best_params)
print("Objective value at solution:", result.F)

plot_optimization_params(result, params_bounds)
plot_optimization_error(result, params_bounds)

model_tmp = model.build({})
X0 = np.zeros(model_tmp.nstates)
for key, value in X0_dict.items():
    X0[model_tmp.state_index[key]] = value
best_fit_data = model_function(T, U, X0, **(solar_boat_params | best_params))

for col in data_cols:
    plot_compare(df, best_fit_data, col)

In [None]:
soc_test = np.linspace(0, 1, 101, endpoint=True)
plt.plot(soc_test, np.polynomial.Polynomial(original_params['batt_k_V_OC_coeffs'])(soc_test)/3, label='original')
plt.plot(soc_test, np.polynomial.Polynomial(best_params['batt_k_V_OC_coeffs'])(soc_test)/3, label='best1')
plt.legend()
plt.show()