# Imports

In [1]:
from collections import OrderedDict

In [2]:
import multiprocessing as mp

In [3]:
import numpy as np
import pandas as pd

from scipy.integrate import solve_ivp

# Model Imports

In [4]:
from models import Model_1
from models import Model_2
from models import Model_3
from models import Model_4

# Auxiliary Functions

In [5]:
from cybernetics.controls.piecewise_constant_controller import PiecewiseConstantController as PWC_Controller
from cybernetics.controls.piecewise_linear_controller import PiecewiseLinearController as PWL_Controller

In [6]:
def create_PWC(time_grid, control_settings):
    return PWC_Controller(time_grid, control_settings, default_value=0.0)

def create_PWL(time_grid, control_settings):
    return PWL_Controller(time_grid, control_settings, default_value=0.0)

In [7]:
def create_task(model_id, controller_type, n_dim, bound, initial_state):
    if model_id == 1:
        model = lambda control: Model_1(control)
    elif model_id == 2:
        model = lambda control: Model_2(control)
    elif model_id == 3:
        model = lambda control: Model_3(control)
    elif model_id == 4:
        model = lambda control: Model_4(control)
    else:
        raise Exception(f"Unsupported model: {model_id}")
    
    temp_model = model(None)
    time_grid = np.linspace(temp_model.t0, temp_model.t1, n_dim + 1)
    accurate_time_grid = np.linspace(temp_model.t0, temp_model.t1, 10 * n_dim + 1)
    search_area = [(-bound, bound) for i in range(n_dim + int(controller_type == "PWL"))]
    
    if controller_type == "PWC":
        controller = lambda control_settings: create_PWC(time_grid, control_settings)
    elif controller_type == "PWL":
        controller = lambda control_settings: create_PWL(time_grid, control_settings)
    else:
        raise Exception(f"Unsupported controller type: {controller_type}")
    
    def f(control_settings):
        control = controller(control_settings)
        m = model(control)
        sol = m.calc_solution(initial_state)
        return m.I(sol) + m.I_term(sol)
    
    def make_dataframe(control_settings):
        control = controller(control_settings)
        m = model(control)
        sol = m.calc_solution(initial_state, accurate_time_grid)
        controls = m.get_controls(sol)[:-1]
        
        rows = list()
        for t, y, u in list(zip(sol.t, sol.y[:-1, :].T, controls)):
            o = OrderedDict(t=t)
            for i, y_ in enumerate(initial_state):
                o[f"x{i + 1}_initial"] = y_
            for i, y_ in enumerate(y):
                o[f"x{i + 1}"] = y_
            o["u"] = u
            o["t"] = t
            rows.append(o)

        df = pd.DataFrame(rows)
        return df
        
    
    return f, search_area, make_dataframe


# Optimization Tool Parameters

In [8]:
from osol.extremum.algorithms.statistical_anti_gradient_random_search import StatisticalAntiGradientRandomSearch as SAG_RS

In [9]:
sag_rs_radius = 1.0
sag_rs_number_of_samples = 17

# Model Example №1

In [10]:
initials_1 = [[v] for v in np.linspace(-25.0, 25.0, 51)]

In [11]:
def calc_1(initial_value):
    n_dim = 10
    bound = 100
    working_time = 60 * 30
    
    f, search_area, make_dataframe = create_task(
        model_id=1, 
        controller_type="PWL", 
        n_dim=n_dim, 
        bound=bound,
        initial_state=initial_value)
    
    tool = SAG_RS(radius=sag_rs_radius, number_of_samples=sag_rs_number_of_samples)
    tool.initialize(x=np.array(([0] * len(search_area))), f=f)
    
    optimal_control = tool.optimize_max_runtime(
        f, 
        search_area, 
        max_seconds=working_time)
    
    df = make_dataframe(optimal_control)
    
    return df

In [None]:
pool = mp.Pool(mp.cpu_count())
dataframes_1 = pool.map(calc_1, initials_1)
pool.close()

In [None]:
dataframe_1 = pd.concat(dataframes_1, axis=0)
dataframe_1.to_csv("results_1.csv", index=False)