# 16th International Modelica & FMI Conference Workshop
## FMUGym: From Uncertainty-Aware Simulation to Learning-Based Control with FMI and Python

In this tutorial, we are utilize the [FMUGym toolbox](https://github.com/Fraunhofer-IIS/fmugym) for Monte Carlo simulation, classical controller parametrization and training of a residual reinforcement learning (RL) agent for control under uncertainties. We leverage and slightly modify a DC motor drive example of the [Credibility library](https://github.com/DLR-SR/Credibility).
![Modelica_view](DC_motor_workshop/Control_System_Overview.png)

### 0 Check all Imports
Check if all necessary python libraries are installed without conflicts. If Google Colab or similar providers are used, uncomment installation commands.

In [None]:
# !git clone https://github.com/wredsen/fmugym.git
# %cd fmugym
# !pip install -e .
# !pip install stable_baselines3

In [None]:
import os, sys
import time

import numpy as np

!pip install scipy
from scipy.optimize import differential_evolution, minimize
from scipy.interpolate import griddata

from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

from fmpy import read_model_description, extract

from fmugym import FMUGym, FMUGymConfig, VarSpace, State2Out, TargetValue

import gymnasium as gym
from stable_baselines3 import PPO

### 1 Implementation of abstract environment class
Implement the abstract FMUGym class and add utility functions supporting this tutorial

In [None]:
class FMUEnv(FMUGym):
    def __init__(self, config):
        self.init_states = {}
        super().__init__(config)
    
    # Use negative squared control error as reward
    def compute_reward(self, obs, info):
        reward = -obs[2]**2
        return reward
    
    ### Tutorial specific utility functions ###
    def get_random_vars(self):
        params = {key: self.init_states[value[0]] for key, value in self.random_vars_refs.items()}
        return params
    
    def set_PID_params(self, k, Ti):
        self.random_vars_refs["speedController.k"][1].low = np.array([k])
        self.random_vars_refs["speedController.k"][1].high = np.array([k])
        self.random_vars_refs["speedController.Ti"][1].low = np.array([Ti]) # in s
        self.random_vars_refs["speedController.Ti"][1].high = np.array([Ti]) # in s
    
    def set_disturbance_torque(self, tau_min, tau_max):
        self.random_vars_refs["torqueStep.stepTorque"][1].low = np.array([tau_min]) # in Nm
        self.random_vars_refs["torqueStep.stepTorque"][1].high = np.array([tau_max]) # in Nm
    
    def set_spring_damper_parameters(self, d_bounds, k_bounds):
        self.random_vars_refs["damper.d"][1].low = np.array([d_bounds[0]]) # in Nms/rad
        self.random_vars_refs["damper.d"][1].high = np.array([d_bounds[0]]) # in Nms/rad
        self.random_vars_refs["spring.k"][1].low = np.array([k_bounds[0]])
        self.random_vars_refs["spring.k"][1].high = np.array([k_bounds[1]])

### 2 Parametrization and instantiation of simulation environment
Parametrize model in-/outputs and define uncertainties. Spring and damper of load side with $\pm$5%, PI controller parameters and disturbance load torque constant for now.

In [None]:
# only relevant for final RL task
inputs = VarSpace("inputs")
inputs.add_var_box("in_torque", -10.0, 10.0) # in Nm

# measurements as model outputs
outputs = VarSpace("outputs")
outputs.add_var_box("out_control_err", -1e6, 1e6) # in rad/s
outputs.add_var_box("out_load_speed", -1e6, 1e6) # in rad/s
outputs.add_var_box("out_motor_speed", -1e6, 1e6) # in rad/s

random_vars = VarSpace("random_vars") 
# uncertrain dynamic parameters for domain randomization
random_vars.add_var_box("damper.d", 38.0, 42.0) # in Nms/rad
random_vars.add_var_box("spring.k", 0.95, 1.05)
# default control gains
random_vars.add_var_box("speedController.k", 100.0, 100.0)
random_vars.add_var_box("speedController.Ti", .1, .1) # in s

# no external torque step value 
random_vars.add_var_box("torqueStep.stepTorque", 5.0, 5.0) # in Nm

config = FMUGymConfig(fmu_path="examples/DC_motor_workshop/DC_motor_dymola.fmu", #os.path.abspath('DC_motor_workshop/DC_motor_dymola.fmu'),
                      start_time=0.0,
                      stop_time=1.0,
                      sim_step_size=0.005,
                      action_step_size=0.005,
                      inputs=inputs,
                      input_noise=None,
                      outputs=outputs,
                      output_noise=None,
                      random_vars=random_vars,
                     )                        

model_description = read_model_description(config.fmu_path)
print(model_description)

#### Instantiation

In [None]:
DCMotorEnv = FMUEnv(config)

### 4 Monte Carlo evaluation of uncertain dynamic parameters with classical controller
Only spring constant ($1.0 \pm 0.05$) and damper parameter (($40 \pm 2.0$) Nms/rad) are uncertain yet.
![Modelica_view_kd](DC_motor_workshop/Control_System_Overview1.png)

#### Build dictionaries of indices for system parameters and outputs

In [None]:
idx_params = {}
idx_outs = {}

for i in range(len(DCMotorEnv.get_random_vars())):
    param_i = list(DCMotorEnv.get_random_vars().keys())[i]
    idx_params[param_i] = i
    print("Parameter to randomize: ", param_i, "at index", i)
    
for i in range(len(DCMotorEnv.output_dict)):
    out_i = list(DCMotorEnv.output_dict.keys())[i]
    idx_outs[out_i] = i
    print("Output: ", out_i, "at index", i)

#### Monte Carlo simulation for N=500 episodes

In [None]:
num_episodes = 500
num_steps_per_episode = int((config.stop_time - config.start_time) / config.sim_step_size)
random_vars_collection = []
obs_collection = []

for iter in range(num_episodes):
    # resetting the environment and setting random initial and target values
    observation, info = DCMotorEnv.reset()
    obs_episode = [observation]

    # capturing trajectories here
    for k in range(num_steps_per_episode):
        action = 0
        observation, reward, terminated, truncated, info = DCMotorEnv.step(action)
        obs_episode.append(observation)
    random_vars_collection.append(list(DCMotorEnv.get_random_vars().values()))
    obs_collection.append(obs_episode)
    
random_vars_array = np.array(random_vars_collection)
obs_array = np.array(obs_collection)
squared_means = (obs_array**2).mean(axis=1)
mean_squared_errors = squared_means[:, idx_outs["out_control_err"]]

#### Plotting system parameter distribution, MSE and relation

In [None]:
print("Distribution of system paramters")
for k in range(len(DCMotorEnv.get_random_vars())):
    plt.figure(figsize=(6, 4))
    plt.hist(random_vars_array[:, k], bins=10, edgecolor='black', alpha=0.7)
    plt.xlabel("Value")
    plt.ylabel("Frequency")
    plt.title(f"Histogram for {list(DCMotorEnv.get_random_vars().keys())[k]}, mean: {np.mean(random_vars_array[:, k]):.4f}, std: {np.std(random_vars_array[:, k]):.4f}")
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.show()

In [None]:
print("Distribution of MSE")
plt.figure(figsize=(6, 4))
plt.hist(mean_squared_errors, bins=10, edgecolor='black', alpha=0.7)
plt.xlabel("Value in (rad/s)$^2$")
plt.ylabel("Frequency")
plt.title(f"Histogram for mean squared control error")
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.ticklabel_format(style='sci', axis='x', scilimits=(0,0))
plt.show()

In [None]:
# Axes for 3D Plot
x = random_vars_array[:, idx_params["damper.d"]]
y = random_vars_array[:, idx_params["spring.k"]]
z = mean_squared_errors

# Create grid for surface
grid_x, grid_y = np.mgrid[
    x.min():x.max():100j,  # 100 points in each direction
    y.min():y.max():100j
]

# Interpolate MSE values onto the grid
grid_z = griddata(
    points=(x, y),
    values=z,
    xi=(grid_x, grid_y),
    method='cubic'  # can be 'linear', 'cubic', 'nearest'
)


# Create figure
fig = plt.figure(figsize=(10, 7), constrained_layout=True)
ax = fig.add_subplot(111, projection='3d')


# Plot surface
surf = ax.plot_surface(
    grid_x, grid_y, grid_z,
    cmap='viridis',
    alpha=0.7,
    edgecolor='none'
)

# Also scatter plot for original points
ax.scatter(x, y, z, c=z, cmap='viridis', s=20, edgecolor='k')

# Labels
ax.set_xlabel('Damper constant d in Nms/rad')
ax.set_ylabel('Spring constant k')
ax.set_zlabel('MSE in (rad/s)$^2$', labelpad=13)
ax.tick_params(axis='z', pad=8)
ax.zaxis.label.set_rotation(90)  

plt.tight_layout()
plt.show()

#### Ensemble plots of output timeseries

In [None]:
import numpy as np
import matplotlib.pyplot as plt

times = np.arange(config.start_time, config.stop_time+config.sim_step_size, step=config.sim_step_size)

for comp in range(obs_array.shape[2]):
    data = obs_array[:, :, comp]          
    plt.figure()
    # Plot all episodes as faint lines (ensemble)
    plt.plot(times[:], data.T, alpha=0.2, linewidth=0.8)

    plt.title(f"Ensemble plot — Component {list(idx_outs.keys())[comp]}")
    plt.xlabel("Time in s")
    plt.ylabel("Value in rad/s")
    plt.tight_layout()
    plt.show()

### 5 Manual PID Tuning for nominal system parameters
Deterministic simulation, fix spring and damper values to nominal values and allow for manual PID tuning.

In [None]:
DCMotorEnv.set_spring_damper_parameters([40.0, 40.0], [1.0, 1.0]) # in in Nms/rad, 1
DCMotorEnv.reset()
print("Nominal system parameters:")
print(DCMotorEnv.get_random_vars())

In [None]:
def obs_MSE_per_episode(Kp, Ti):
    DCMotorEnv.set_PID_params(Kp, Ti)
    
    num_steps_per_episode = int((config.stop_time - config.start_time) / config.sim_step_size)
    observation, info = DCMotorEnv.reset()
    obs_episode = [observation]
    
    # capturing trajectories here
    for k in range(num_steps_per_episode):
        action = 0 # no residual RL agent used yet
        observation, reward, terminated, truncated, info = DCMotorEnv.step(action)
        obs_episode.append(observation)

    obs_episode = np.array(obs_episode)
    mean_squared_error = (obs_episode**2).mean(axis=0)[idx_outs["out_control_err"]]
    
    return obs_episode, mean_squared_error

In [None]:
### PID parameters
Kp = 100
Ti = 0.1 # in s
obs, mses = obs_MSE_per_episode(Kp, Ti)

print(f"MSE: {mses:.3e} (rad/s)²")

plt.figure()
plt.plot(times, obs[:, idx_outs["out_control_err"]])
plt.title("Control error")
plt.xlabel("Time in s")
plt.ylabel("Value in rad/s")
plt.tight_layout()
plt.show()

### 6 Optimize PID gains for uncertainty in system parameters and amplitude of torque disturbance
Uncertain spring constant ($1.0 \pm 0.05$) and damper parameter (($40 \pm 2.0$) Nms/rad) as well as disturbance load torque (($10.0 \pm 2.5$) Nm). Allow for automated PID tuning.
![Modelica_view_kp_dist](DC_motor_workshop/Control_System_Overview2.png)

In [None]:
DCMotorEnv.set_disturbance_torque(2.5, 7.5) # Nm, disturbance torque step now varies
DCMotorEnv.set_spring_damper_parameters([38.0, 42.0], [0.95, 1.05]) # original parameter range in in Nms/rad, 1

In [None]:
# only take MSE, no other observations
def MSE_per_episode(Kp, Ti):
    _, MSE = obs_MSE_per_episode(Kp, Ti)
    return MSE
    
# utilize classic optimization function from scipy
def optimize_PID_gains(mse_fn, bounds, x0=None, seed=41, maxiter=200, polish=True, **mse_kwargs):
    # Global search
    de_res = differential_evolution(
        lambda p: float(mse_fn(*p, **mse_kwargs)), 
        bounds=bounds,
        seed=seed,
        maxiter=maxiter,
        updating='deferred',
        workers=1,
        polish=False
    )

    best_x = de_res.x if x0 is None else np.asarray(x0, dtype=float)

    # Optional local polish
    lbfgs_res = None
    if polish:
        lbfgs_res = minimize(
            lambda p: float(mse_fn(*p, **mse_kwargs)),
            x0=best_x,
            method='L-BFGS-B',
            bounds=bounds
        )
        final_x, final_mse = lbfgs_res.x, float(lbfgs_res.fun)
        success, message = bool(lbfgs_res.success), str(lbfgs_res.message)
    else:
        final_x, final_mse = de_res.x, float(de_res.fun)
        success, message = bool(de_res.success), str(de_res.message)

    return {
        'x': np.asarray(final_x, dtype=float),
        'mse': float(final_mse),
        'success': success,
        'message': message,
        'global_result': de_res,
        'local_result': lbfgs_res
    }

In [None]:
timer_start_time = time.time()
bounds = [(50.0, 150), (0.01, 1.0)] # in 1, s
# if it takes >3 min, reduce maxiter parameter
res = optimize_PID_gains(MSE_per_episode, bounds, seed=0, maxiter=100)
print(res)
print("optimization duration: ", time.time() - timer_start_time)

#### Compare with previous parametrization

In [None]:
num_eps = 100

mse_sum = 0.0
for i in range(num_eps):
    mse_sum += MSE_per_episode(Kp, Ti) 
print(f"Mean squared error for Kp = {Kp:.1f} and Ti = {Ti:.3f} s: {mse_sum/(num_eps):.3e} (rad/s)²")

mse_sum = 0.0
for i in range(num_eps):
    mse_sum += MSE_per_episode(*res["x"])
print(f"Mean squared error for Kp = {res['x'][0]:.1f} and Ti = {res['x'][1]:.3f} s: {mse_sum/(num_eps):.3e} (rad/s)²")

### 7 Training a residual RL policy to improve disturbance rejection
Uncertain spring constant ($1.0 \pm 0.05$) and damper parameter (($40 \pm 2.0$) Nms/rad) as well as disturbance load torque (($10.0 \pm 2.5$) Nm). Use previously optimized PID parameters. Now also include residual RL agent to compensate for unceratainties and disturbances.
![Modelica_view_kp_dist_RL](DC_motor_workshop/Control_System_Overview3.png)

In [None]:
DCMotorEnv.set_PID_params(res['x'][0], res['x'][1])
DCMotorEnv.reset()

#### Instantiate an RL agent to be trained with the PPO algorithm

In [None]:
model = PPO(
    "MlpPolicy",
    DCMotorEnv,
    verbose=1,
    #tensorboard_log="/tmp/tensor_board"
)

In [None]:
# structure of NN
model.policy

#### Train the agent for < 10 min

In [None]:
timer_start_time = time.time()
model.learn(total_timesteps=200_000)
print("learn duration: ", time.time() - timer_start_time)

#### Compare performance with and without residual agent

In [None]:
# Infer trained RL agent multiple episodes
def MSE_per_episode_RL(N):
    squared_err_sum = 0.0
    num_steps = int((config.stop_time - config.start_time) / config.sim_step_size)
    for k in range(N):
        observation, info = DCMotorEnv.reset()
        for _ in range(num_steps):
            action, _state = model.predict(observation, deterministic=True) 
            observation, reward, terminated, truncated, info = DCMotorEnv.step(action)
            squared_err_sum += observation[idx_outs["out_control_err"]]**2
    return squared_err_sum/num_steps 

In [None]:
num_eps = 100

# 100 episodes without RL agent
DCMotorEnv.reset()
mse_sum = 0.0
for i in range(num_eps):
    mse_sum += MSE_per_episode(res['x'][0], res['x'][1])
print(f"Mean squared error without RL agent: {mse_sum/num_eps:.3e} (rad/s)²")

# 100 episodes with RL agent
mse_sum_agent = MSE_per_episode_RL(num_eps)
print(f"Mean squared error with RL agent: {mse_sum_agent/num_eps:.3e} (rad/s)²")