This code snippet will test an already trained DQ-DTC agent on the validation profile:

<img src="Figures/Validation_Profile.png" width="600">

In [None]:
import numpy as np
from pathlib import Path
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, LeakyReLU, ELU
from tensorflow.keras.optimizers import Adam
from rl.agents.dqn import DQNAgent
from rl.policy import LinearAnnealedPolicy, EpsGreedyQPolicy
from rl.memory import SequentialMemory, EpisodeParameterMemory
from CustomKerasRL2Callbacks_torqueCtrl import StoreEpisodeLogger
from gym.wrappers import FlattenObservation
from gym.core import Wrapper
from gym.spaces import Box, Tuple
import sys, os
import h5py
sys.path.append(os.path.abspath(os.path.join('..')))
import gym_electric_motor as gem
from gym_electric_motor.reward_functions import WeightedSumOfErrors
from gym_electric_motor.physical_systems import ConstantSpeedLoad, ExternalSpeedLoad
from gym_electric_motor.reference_generators import WienerProcessReferenceGenerator, ConstReferenceGenerator, \
    MultipleReferenceGenerator, StepReferenceGenerator

In [None]:
def test_profile_speed(t):
    """
    This function defines the speed profile of the validation episode.
    """
    lim = 12000 * 2 * np.pi / 60

    niveau0 = 00
    niveau1 = 0.15 * lim
    niveau2 = 0.5 * lim

    if t <= 0.05:
        omega = niveau0
    elif t <= 0.20:
        omega = (t - 0.05) * (niveau1 - niveau0) / 0.15 + niveau0
    elif t <= 1.3:
        omega = niveau1
    elif t <= 1.45:
        omega = (t - 1.3) * -2 * niveau1 / 0.15 + niveau1
    elif t <= 2.55:
        omega = - niveau1
    elif t <= 2.7:
        omega = (t - 2.55) * (niveau1 + niveau2) / 0.15 - niveau1
    elif t <= 3.8:
        omega = niveau2
    elif t <= 3.95:
        omega = (t - 3.8) * -2 * niveau2 / 0.15 + niveau2
    elif t <= 5.05:
        omega = - niveau2
    elif t <= 5.2:
        omega = (t - 5.05) * (niveau0 + niveau2) / 0.15 - niveau2
    else:
        omega = niveau0

    return omega

In [None]:
class TransformObservationWrapper(Wrapper):
    """
    The following environment considers the dead time in the real-world motor control systems.
    The real-world system changes its state, while the agent calculates the next action based on a previously measured
    observation. Therefore, for the agent it seems as if the applied action effects the state one step delayed.
    (with a dead time of one time-step)

    For complete observability of the system at each time-step we append the last played action of the agent to the
    observation, because this action will be the one that is active in the next step.
    """
    def __init__(self, environment):
        super().__init__(environment)
        # reduced observation space [w_me, i_d, i_q, u_d, u_q, cos(eps), sin(eps), T_ref] (all normalized)
        self.observation_space = Tuple((Box(
            np.concatenate(([environment.observation_space[0].low[0]],
                            environment.observation_space[0].low[5:7],
                            environment.observation_space[0].low[10:12],
                            [-1, -1],
                            [-1])),
            np.concatenate(([environment.observation_space[0].high[0]],
                            environment.observation_space[0].high[5:7],
                            environment.observation_space[0].high[10:12],
                            [+1, +1],
                            [+1])),
        ), environment.observation_space[1]))

        self.subactions = -np.power(-1, self.env.physical_system._converter._subactions)
        
        # gamma = 0 is assumed for calculating the return G in the test case
        self.gamma = 0
        self.test = True

    def step(self, action):

        (state, ref), rew, term, info = self.env.step(action)

        self._obs_logger = np.concatenate((state, ref))

        eps = state[12] * np.pi
        angle_scale = 0.1
        angles = [angle_scale * np.cos(eps), angle_scale * np.sin(eps)]

        u_abc = self.subactions[action]
        u_dq = self.env.physical_system.abc_to_dq_space(u_abc, epsilon_el=eps)
        now_requested_voltage = u_dq
        
        i_d = state[5]
        i_q = state[6]
        T = state[1]
        T_ref = ref[0]

        current_total = np.sqrt(i_d ** 2 + i_q ** 2)

        # building the custom observation vector
        observable_state = np.concatenate(([state[0]],
                                           state[5:7],
                                           now_requested_voltage,
                                           angles,
                                           [2 * current_total - 1]))

        # as this script is only for testing there is no benefit in defining the reward
        reward = None

        return (observable_state, ref), rew, term, info
    

    def reset(self, **kwargs):
        state, ref = self.env.reset()

        self._obs_logger = np.concatenate((state, ref))

        eps = state[12] * np.pi
        angle_scale = 0.1
        angles = [angle_scale * np.cos(eps), angle_scale * np.sin(eps)]
        torque_error = [(ref[0] - state[1]) / 2]

        u_abc = self.subactions[0]
        u_dq = self.env.physical_system.abc_to_dq_space(u_abc, epsilon_el=eps)
        now_requested_voltage = u_dq  # reduced observation

        i_d = state[5]
        i_q = state[6]

        current_total = np.sqrt(i_d ** 2 + i_q ** 2)


        observable_state = np.concatenate(([state[0]], 
                                           state[5:7], 
                                           now_requested_voltage, 
                                           angles, 
                                           [2 * current_total - 1]))  # reduced observation

        return (observable_state, ref)

torque_ref_generator = ConstReferenceGenerator(reference_state='torque', reference_value=np.random.uniform(-1, 1))

motor_parameter = dict(p=3,            # [p] = 1, nb of pole pairs
                       r_s=17.932e-3,  # [r_s] = Ohm, stator resistance
                       l_d=0.37e-3,    # [l_d] = H, d-axis inductance
                       l_q=1.2e-3,     # [l_q] = H, q-axis inductance
                       psi_p=65.65e-3, # [psi_p] = Vs, magnetic flux of the permanent magnet
                       )  # BRUSA

u_sup = 350
nominal_values=dict(omega=12000 * 2 * np.pi / 60,
                    i=240,
                    u=u_sup
                    )

limit_values=nominal_values.copy()
limit_values["i"] = 270
limit_values["torque"] = 200

def test_agent(param_dict):
    
    """
    this function is used to perform one validation episode with predefined network weights
    although these weights were saved in beforehand one still needs to initialize a network of the
    corresponding form to load the weights into
    """

    # unpack the parameters
    subfolder_name = param_dict["subfolder_name"]

    layers = param_dict["layers"]
    neurons = param_dict["neurons"]
    activation_fcn =  param_dict["activation_fcn"]
    activation_fcn_parameter = param_dict["activation_fcn_parameter"]
   
    tf.config.set_visible_devices([], 'GPU')

    Path(subfolder_name).mkdir(parents=True, exist_ok=True)
    
    # build the environment for the validation profile
    env = gem.make("Finite-TC-PMSM-v0",
                   motor = dict(
                       motor_parameter=motor_parameter,
                       limit_values=limit_values,
                       nominal_values=nominal_values
                   ),
                   supply=dict(u_nominal=u_sup),
                   load=ExternalSpeedLoad(speed_profile=test_profile_speed, # here, the speed profile is implemented
                                          tau=50e-6),
                   tau=50e-6,
                   reward_function=WeightedSumOfErrors(reward_weights={'torque': 1}, 
                                                              gamma=0),
                   reference_generator=torque_ref_generator,
                   ode_solver='scipy.solve_ivp'
                   )

    (x, r) = env.reset()
    tau=env._physical_system.tau
    limits = env.physical_system.limits

    # wrap the environment for proper 
    env = FlattenObservation(TransformObservationWrapper(env))

    # define the network, has to be the same as in the training profile
    # select special procedure for parameterized activations
    if activation_fcn == "leaky_relu" or activation_fcn == "elu":
        dense_activation_fcn = 'linear'
    else:
        dense_activation_fcn = activation_fcn
    
    
    nb_actions = env.action_space.n
    window_length = 1
    model = Sequential()
    model.add(Flatten(input_shape=(window_length,) + env.observation_space.shape))
    for i in range(layers):
        model.add(Dense(neurons, activation=dense_activation_fcn))
        if activation_fcn == 'leaky_relu':
            model.add(LeakyReLU(alpha=activation_fcn_parameter))
        elif activation_fcn == 'elu':
            model.add(ELU(alpha=activation_fcn_parameter))
    model.add(Dense(nb_actions,
                    activation='linear'
                    ))

    # memory will not be used in testing episodes, probably one could avoid initializing it
    memory = SequentialMemory(limit=0, window_length=window_length)

    policy = LinearAnnealedPolicy(EpsGreedyQPolicy(eps=0),
                                  attr='eps',
                                  value_max=0,
                                  value_min=0,
                                  value_test=0, # this is the epsilon used for testing episodes, 0 means deterministic operation
                                  nb_steps=0)

    # define the agent
    agent = DQNAgent(model=model,
                     nb_actions=nb_actions,
                     gamma=0,
                     batch_size=4,
                     memory=memory,
                     memory_interval=1,
                     policy=policy,
                     train_interval=1,
                     target_model_update=0,
                     enable_double_dqn=False)

    # compile the agent and load the weights that were learned during training
    agent.compile(Adam(lr=0), metrics=['mse'])
    agent.load_weights(filepath=subfolder_name + "/" + "weights.hdf5")

    # define the callback for the testing routine
    logger = StoreEpisodeLogger(folder_name=subfolder_name,
                                file_name="DQ_DTC_validation_episode",
                                tau=tau, limits=limits, training=True,
                                lr_max=0, lr_min=0,
                                nb_steps_start=0,
                                nb_steps_reduction=0,
                                speed_generator=None,
                                create_eps_logs=True,
                                test=True)
    callbacks = [logger]

    # perform one testing episode, the length of the episode "nb_max_episode_steps" was adjusted to the speed / torque profile
    history = agent.test(env,
                        nb_episodes=1,
                        action_repetition=1,
                        verbose=0,
                        visualize=False,
                        nb_max_episode_steps=130000,
                        callbacks=callbacks)


In [None]:
# parameterize the agent
# the "subfolder_name" is the directory where the network weights "weights.hdf5" will be taken from

subfolder_name = "Exemplary_Weights"

# this short block will read out the weights dimensions to set the correct network geometry
# only activation fcn and activation fcn parameter will need to be set manually
with h5py.File(subfolder_name + "/weights.hdf5", "r") as f:
    dense = np.copy(f["dense"]["dense"]["kernel:0"])
    nb_neurons = np.shape(dense)[1]
    keys = list(f.keys())
nb_layers = -1
for key in keys:
    if "dense" in key:
        nb_layers += 1

param_dict = {"subfolder_name": subfolder_name,
              "layers": nb_layers,
              "neurons": nb_neurons,
              "activation_fcn": "leaky_relu",
              "activation_fcn_parameter": 0.3425,
              }

In [None]:
# this will run the test episode
# run time depends on the CPU speed, might take more than 20 minutes
# please stay patient although no progress bar is displayed
test_agent(param_dict)

In [None]:
from Plot_TimeDomain_torqueCtrl import plot_episode

# this function will save a pdf of the validation episode to the "Plots" folder
# a "Plots" folder will be created if there is none
plot_episode(training_folder = "Exemplary_Weights",
             episode_number = 0,
             episode_type = "DQ_DTC_validation_episode")