# 1. Test Random Environment with OpenAI Gym

In [1]:
__credits__ = ["Desmond N.A. Hammond"]
%matplotlib widget

from os import path
from typing import Optional
import numpy as np
import gym
from gym import spaces
from gym.envs.classic_control import utils
from gym.error import DependencyNotInstalled
from numba import cuda, jit
from time import time, sleep
import numpy as np
from math import acos, pi
import matplotlib.pyplot as plt
from IPython.display import display
from ipywidgets import interactive, widgets, IntSlider, Video
from matplotlib import animation
from matplotlib import rc
from IPython.display import HTML, Javascript
from IPython.display import Video
from IPython.display import clear_output

# equivalent to rcParams['animation.html'] = 'html5'
# rc('animation', html='html5')

linestyle_tuple = [
     ('loosely dotted',        (0, (1, 10))),
     ('dotted',                (0, (1, 1))),
     ('densely dotted',        (0, (1, 1))),
     ('long dash with offset', (5, (10, 3))),
     ('loosely dashed',        (0, (5, 10))),
     ('dashed',                (0, (5, 5))),
     ('densely dashed',        (0, (5, 1))),

     ('loosely dashdotted',    (0, (3, 10, 1, 10))),
     ('dashdotted',            (0, (3, 5, 1, 5))),
     ('densely dashdotted',    (0, (3, 1, 1, 1))),

     ('dashdotdotted',         (0, (3, 5, 1, 5, 1, 5))),
     ('loosely dashdotdotted', (0, (3, 10, 1, 10, 1, 10))),
     ('densely dashdotdotted', (0, (3, 1, 1, 1, 1, 1)))]




In [2]:
class PMSMEnv(gym.Env):

    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, render_mode: Optional[str] = None, parameters: Optional[dict] = None):
        #Initialize motor and all necessary parameters   
        if parameters is None:
            self.Rs = np.array(0.5) 
            self.Ld = np.array(3.5e-3)
            self.Lq = np.array(5e-3)
            self.p = np.array(3.)
            self.psi_f = np.array(0.33)
            self.Bm = np.array(0.0028)
            self.J = np.array(0.004)
            self.max_speed = np.array(500.)
            self.max_torque = np.array(10.)
            self.max_voltage = np.array(800.)
            self.max_current = np.array(500.)
            self.dt = np.array(0.001)
            self.simTime = np.array(1.)
        else:
            self.Rs = np.array(parameters.get("Rs")) if "Rs" in parameters else np.array(0.5) 
            self.Ld = np.array(parameters.get("Ld")) if "Ld" in parameters else np.array(3.5e-3)
            self.Lq = np.array(parameters.get("Lq")) if "Lq" in parameters else np.array(5e-3)
            self.p = np.array(parameters.get("p")) if "p" in parameters else np.array(3.)
            self.psi_f = np.array(parameters.get("psi_f")) if "psi_f" in parameters else np.array(0.33)
            self.Bm = np.array(parameters.get("Bm")) if "Bm" in parameters else np.array(0.0028)
            self.J = np.array(parameters.get("J")) if "J" in parameters else np.array(0.004)
            self.max_speed = np.array(parameters.get("max_speed")) if "max_speed" in parameters else np.array(500.)
            self.max_torque = np.array(parameters.get("max_torque")) if "max_torque" in parameters else np.array(10.)
            self.max_voltage = np.array(parameters.get("max_voltage")) if "max_voltage" in parameters else np.array(800.)
            self.max_current = np.array(parameters.get("max_current")) if "max_current" in parameters else np.array(500.)  
            self.dt = np.array(parameters.get("sample_time")) if "sample_time" in parameters else np.array(0.001)
            self.simTime = np.array(parameters.get("simulation_time")) if "simulation_time" in parameters else np.array(1.) 
        
        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode
        self.n = np.prod(2)
        min_action = np.array([-self.max_voltage, -self.max_voltage], dtype=np.float32)
        max_action = np.array([self.max_voltage, self.max_voltage], dtype=np.float32)

        min_observation = np.array([-self.max_current, -self.max_current, -self.max_speed, np.array(0.)], dtype=np.float32)
        max_observation = np.array([self.max_current, self.max_current, self.max_speed, 2*np.pi], dtype=np.float32)

        self.action_space = spaces.Box(low=min_action, high=max_action, shape=(2,), dtype=np.float32)
        self.action_space.__dict__['n'] = 2
        self.observation_space = spaces.Box(low=min_observation, high=max_observation, shape=(4,), dtype=np.float32)
        self.t = t = np.linspace(0, self.simTime, int(self.simTime/self.dt))
        self.sim_length = len(t)
        
        print('t', len(t))
        # State variables
        self.state = None
        self.reset()
        
    
    def rungekutta4_step(self, diff_eqn, x0, dt):
        k1 = diff_eqn(x0)
        k2 = diff_eqn(x0 + k1 * dt/2.)
        k3 = diff_eqn(x0 + k2 * dt/2.)
        k4 = diff_eqn(x0 + k3 * dt)
        return x0 + (dt/6.) * (k1 + 2*k2 + 2*k3 + k4)
        
    def step(self, u, id_ref, iq_ref, omega_ref, theta_ref, T_load, episode):
        self.id_ref = np.array(id_ref)
        self.iq_ref = np.array(iq_ref)
        self.omega_ref = np.array(omega_ref)
        self.theta_ref = np.array(theta_ref)

        self.T_load = np.clip(T_load, -self.max_torque, self.max_torque)
        self.vd, self.vq = np.clip(u, -self.max_voltage, self.max_voltage).tolist()
        self.x_prev = np.array([self.id,self.iq,self.omega,self.theta])
        
        # Compute new states
        self.id, self.iq, self.omega, self.theta = self.rungekutta4_step(diff_eqn=self.Xdot, x0=self.x_prev, dt=self.dt).tolist()
        self.x_current = np.array([self.id,self.iq,self.omega,self.theta])
        self.id_dot, self.iq_dot, self.omega_dot, self.theta_dot = self.Xdot(self.x_current).tolist()
        self.lambda_d = self.Ld*self.id + self.psi_f
        self.lambda_q = self.Lq*self.iq
        self.Te = (3/2)*(self.p)*(self.lambda_d*self.iq - self.lambda_q*self.id)
        
        # if self.render_mode == "human":
        #     self.render()
        
        # calculate error
        self.id_error = self.id - self.id_ref
        self.iq_error = self.iq - self.iq_ref
        self.omega_error = self.omega - self.omega_ref 
        self.theta_error = self.omega - self.theta_ref

        # calculate error
        self.reward = -np.abs(self.id_error) -np.abs(self.omega_error)

        # check if episode is done
        done = self.step_count==self.sim_length-1
        self.step_count += 1 if not done else -1

        self.dataset1[self.step_count,:] = np.array([np.array(self.id), np.array(self.iq), np.array(self.omega), np.array(np.mod(self.theta,2*np.pi)), np.array(self.id_dot), np.array(self.iq_dot),
                                            np.array(self.omega_dot), np.array(self.theta_dot), np.array(self.id_error), np.array(self.iq_error), np.array(self.omega_error), 
                                            np.array(self.vd), np.array(self.Te), np.array(self.lambda_d), np.array(self.reward), np.array(self.vq)], dtype=object)
        
        self.dataset2[self.step_count,:] = np.array([self.id_ref, self.iq_ref, self.omega_ref, self.theta_ref, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, 
                                        self.T_load, self.lambda_q, np.nan, np.nan], dtype=object).reshape(1,-1)
        
        
        # self.dataset1[self.step_count,:] = np.random.rand(16)            
        # self.dataset2[self.step_count,:] = np.random.rand(16)

        # if done:
            # anim = animation.ArtistAnimation(self.fig, self.ims, interval=60, blit=True, repeat_delay=1000, repeat=True).save(f"episode{episode}.mp4")
            # self.ims = []
            # self.dataset1 = np.zeros(shape=(int(self.simTime/self.dt),16))*(np.nan)
            # self.dataset2 = np.zeros(shape=(int(self.simTime/self.dt),16))*(np.nan)
            
            
            # for ax in self.ax: ax.cla() 
            # HTML(anim.to_html5_video())
            #Javascript('document.querySelector(".anim-buttons > button:nth-child(6)").click()') 
        # set placeholder for info
        info = {}
        return self.get_observations(), self.reward, done, info

    def get_observations(self):

        # sensor measurement noise can be implemented here
        observations = np.array([self.id, self.iq, self.omega, self.theta], dtype=np.float32)
        return np.array([self.id[0], self.iq[0], self.omega[0],  self.theta[0]], dtype=np.float32)

    def _get_info(self):
        # return {"distance": np.linalg.norm(self._agent_location - self._target_location, ord=1)}
        pass

    def Xdot(self, xdot):
        id_, iq_, omega_, theta_= xdot.tolist()

        x_dot = np.array([self.vd/self.Ld - (self.Rs*id_)/self.Ld + (self.Lq*iq_*omega_*self.p)/self.Ld,
                         self.vq/self.Lq - (self.Rs*iq_)/self.Lq - (omega_*self.p*self.psi_f)/self.Lq - (self.Ld*id_*omega_*self.p)/self.Lq,
                         -(self.T_load + self.Bm*omega_ - (3*self.p*(iq_*(self.psi_f + self.Ld*id_) - self.Lq*id_*iq_))/2)/self.J,
                         omega_])
            
        id_dot, iq_dot, omega_dot, theta_dot = x_dot.tolist()
        
        return np.array([id_dot, iq_dot, omega_dot, theta_dot])


    def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        self.state = None
        self.step_count = -1
        self.dataset1 = np.zeros(shape=(int(self.simTime/self.dt),16))*(np.nan)
        self.dataset2 = np.zeros(shape=(int(self.simTime/self.dt),16))*(np.nan)
        self.ims = []
           
       
        # reset state variables
        if options is None:
            self.id_dot = np.array([0.])
            self.iq_dot = np.array([0.])
            self.omega_dot = np.array([0.])            
            self.theta_dot = np.array([0.])            
            self.id = np.array([0.])
            self.iq = np.array([0.])
            self.omega = np.array([0.])
            self.theta = np.array([0.])
        else:
            self.id = options.get("id_init") if "id_init" in options else np.array([0.])
            self.iq = options.get("iq_init") if "iq_init" in options else np.array([0.])
            self.omega = options.get("omega_init") if "omega_init" in options else np.array([0.])
            self.theta = options.get("theta_init") if "theta_init" in options else np.array([0.])
            self.id_dot = np.array([0.])
            self.iq_dot = np.array([0.])
            self.omega_dot = np.array([0.])
            self.theta_dot = np.array([0.])

        return self.get_observations(), {}

   
    def render(self, options: Optional[int] = None):
        self.render_fps = np.array(options.get("render_fps")) if "render_fps" in options else np.array(60) 
        # if mode != 'console':
        #     raise NotImplementedError()
        # agent is represented as a cross, rest as a dot
        # create a figure and axes
        self.fig = plt.figure(figsize=(25,10))
        self.num_subplots = 16
        self.nrows = 4
        self.ncols =int(int(self.num_subplots/self.nrows))
        self.ax = [None]*self.num_subplots
        self.line1 = [None]*self.num_subplots
        self.line2 = [None]*self.num_subplots
        self.label1 = ['id', 'iq', 'omega', 'theta', 'id_dot', 'iq_dot', 'omega_dot', 'theta_dot', 'id_error', 'iq_error', 'omega_error', 'vd', 'T_elec', 'lambda_d', 'reward', 'vq']
        self.label2 = ['id_ref', 'iq_ref', 'omega_ref', 'theta_ref', '_', '_', '_', '_', '_', '_', '_', '_', 'T_load', 'lambda_q', '_', '_']
        self.ims = []

        for i in range(self.num_subplots):
            # create axes
            self.ax[i] = plt.subplot(self.nrows,self.ncols,i+1, autoscale_on=True)
            self.ax[i].set_xlim(( 0, self.simTime))       
            # ax[i].set_ylim()
            self.ax[i].set_xlabel('Time') if i>=(self.num_subplots-4) else self.ax[i].tick_params('x', labelbottom=False)
            #self.ax[i].tick_params('x', labelbottom=False)
            # ax[i].set_ylabel('Magnitude')
            self.ax[i].set_title(self.label1[i])
            # ax[i].legend([label1[i], label2[i]])
            self.ax[i].grid(True)# create objects that will change in the animation. These are initially empty, and will be given new values for each frame in the animation.
            self.ax[i].autoscale(enable=True)
            # self.ax[i].text(0.02, 0.95, '', transform=self.ax[i].transAxes)
            # self.ax[i].text(0.02, 0.90, '', transform=self.ax[i].transAxes)
        plt.tight_layout()
        plt.close()

    def step_draw(self, done, episode):
        if self.step_count%int(self.sim_length/self.render_fps) == 0:
            self.fig.suptitle(f'Episode:{episode}')
            for i in range(self.num_subplots): #plot data
                self.line1[i], = self.ax[i].plot(self.t, self.dataset1[:,i], color='blue', lw=1, animated=True)     # ax.plot returns a list of 2D line objects
                self.line2[i], = self.ax[i].plot(self.t, self.dataset2[:,i],  linestyle='dotted', color='red', lw=1, animated=True)
                # self.fig.supxlabel(f'Time:{self.step_count*self.dt}')
            self.ims.append(tuple(self.line1 + self.line2))
        if done:
            anim = animation.ArtistAnimation(self.fig, self.ims, interval=60, blit=True, repeat_delay=1000, repeat=True).save(f"episode{episode}.mp4")
            for ax, label in zip(self.ax,self.label1): 
                ax.cla(); 
                ax.grid(True)
                ax.set_title(label)
        # print(self.step_count)
        return

        # choose the interval based on dt and the time to animate one step
        # t0 = time()
        # drawframe(0)
        # t1 = time()
        # interval = 1000*self.dt - (t1 - t0)
        # print(interval,'interva
                
    def close(self):
        pass

In [3]:
env = PMSMEnv(
    render_mode='human',
    parameters = {            
                    "Rs" : 0.5,
                    "Ld" : 3.5e-3,
                    "Lq" : 5e-3,
                    "p" : 3.,
                    "psi_f" : 0.33,
                    "Bm" : 0.0028,
                    "J" : 0.004,
                    "max_speed" : 500.,
                    "max_torque" : 10.,
                    "max_voltage" : 800.,
                    "max_current" : 500.,
                    "sample_time" : 0.001,
                    "simulation_time" : 5.
                }
)


t 5000


In [149]:
env.render(options= {'render_fps':30} )


In [150]:
env.action_space.sample()
action = np.random.randint(0,100)


In [153]:
env.observation_space.sample(), env.action_space.sample()
episodes = 2
for episode in range(1, episodes+1):
    initial_conditions = {  "id_init": np.array([0.]),
                            "iq_init": np.array([3.]),                            
                            "omega_init": np.array([0.]),
                            "theta_init": np.array([0.])
                         }
    state = env.reset(options=initial_conditions)
    done = False
    score = 0 
    action1 = np.random.randint(0,100)
    T_load = np.random.randint(0,50)

    while not done:
        #env.render()
        # action = action1
        action = np.array([1, action1 ], dtype=np.float32)
        id_ref = 0
        iq_ref = 0
        omega_ref = 2
        theta_ref = 2
        n_state, reward, done, info = env.step(action, id_ref, iq_ref, omega_ref, theta_ref, T_load, episode)
        # env.step_draw(done, episode)   
        score+=reward
        
    print('Episode:{} Score:{}'.format(episode, score))


Episode:1 Score:[-29467.15624157]
Episode:2 Score:[-106612.26872597]


In [8]:
from IPython.display import Video
from ipywidgets import Output, GridspecLayout
from IPython import display
import os, re, glob
from pathlib import Path 
from IPython.display import HTML
from base64 import b64encode


path = Path('.')
filepaths=sorted(glob.glob( os.path.join(path, '*.mp4') ),key=lambda x:float(re.findall("([0-9]+?)\.mp4",x)[0]))

# grid = GridspecLayout(4, len(filepaths))

# for i, filepath in enumerate(filepaths):
#     out = Output()
#     with out:
#         display.display(display.Video(filepath, embed=True, html_attributes="controls muted autoplay"))
#     grid[0, i] = out

html_str=""

for filepath in filepaths:
  mp4 = open(filepath,'rb').read()
  data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
  html_str += """
  <video width=400 controls muted autoplay">
        <source src="%s" type="video/mp4">
  </video>
  """ % data_url
HTML(html_str)

Video(filepaths[0], embed=True, html_attributes="controls muted autoplay")

In [9]:
import matplotlib.animation as manimation; 
manimation.writers.list() 

['pillow', 'ffmpeg', 'ffmpeg_file', 'html']

Make plots appear as a pop up window, chose the backend: 'gtk', 'inline', 'osx', 'qt', 'qt4', 'tk', 'wx'

# 2. Create a Deep Learning Model with Keras

In [60]:
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam
from rl.agents import DDPGAgent, ContinuousDQNAgent, DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

In [73]:
n_states = env.observation_space.shape[0]
n_actions = env.action_space.n
actions = env.action_space.sample()

In [74]:
env.observation_space.shape

(4,)

In [76]:
n_actions
states_shape
actions.shape


(2,)

In [111]:
def build_model(n_states, n_actions):
    model = Sequential()    
    model.add(Dense(24, activation='relu', input_shape=(1,states_shape)))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(n_actions, activation='linear'))
    model.summary()
    return model

In [112]:
del model 

In [125]:
V_model = build_model(n_states, 1)
mu_model = build_model(n_states, n_actions)
P_model = build_model(n_states, n_actions*n_states)

# critic_model = build_model(states_shape, n_actions)

Model: "sequential_22"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_66 (Dense)            (None, 1, 24)             120       
                                                                 
 dense_67 (Dense)            (None, 1, 24)             600       
                                                                 
 dense_68 (Dense)            (None, 1, 1)              25        
                                                                 
Total params: 745
Trainable params: 745
Non-trainable params: 0
_________________________________________________________________
Model: "sequential_23"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_69 (Dense)            (None, 1, 24)             120       
                                                                 
 dense_70 (Dense)            (No

# 3. Build Agent with Keras-RL

In [126]:
actions

array([622.568   , -83.453636], dtype=float32)

In [127]:
ContinuousDQNAgent.__init__




<function rl.agents.dqn.NAFAgent.__init__(self, V_model, L_model, mu_model, random_process=None, covariance_mode='full', *args, **kwargs)>

In [129]:
DQNAgent.__init__

<function rl.agents.dqn.DQNAgent.__init__(self, model, policy=None, test_policy=None, enable_double_dqn=False, enable_dueling_network=False, dueling_type='avg', *args, **kwargs)>

In [130]:
model=(V_model, P_model, mu_model)

In [131]:
def build_agent(*model, n_actions=None):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=50000, window_length=1)
    # dqn = ContinuousDQNAgent(model=model, memory=memory, policy=policy, 
    #               nb_actions=n_actions, nb_steps_warmup=10, target_model_update=1e-2)
    dqn = ContinuousDQNAgent(V_model=V_model, L_model=P_model, mu_model=mu_model, random_process=None, covariance_mode='full', 
                  memory=memory, nb_actions=n_actions, nb_steps_warmup=10, target_model_update=1e-2)
    return dqn

In [132]:
env.observation_space.sample(), env.get_observations()

(array([171.19473 , -57.991627, 443.63757 ,   5.557103], dtype=float32),
 array([3.0203304e-01, 2.1085613e+01, 2.0347824e+00, 4.6245594e-04],
       dtype=float32))

In [133]:
dqn = build_agent(model, n_actions=n_actions)
dqn.compile(Adam(learning_rate=1e-3), metrics=['mae'])
# dqn.fit(env, nb_steps=50000, visualize=True, verbose=1)
dqn.summary()

ValueError: Layer "sequential_24" expects 1 input(s), but it received 2 input tensors. Inputs received: [<tf.Tensor 'action_input_3:0' shape=(None, 2) dtype=float32>, <tf.Tensor 'observation_input_0_3:0' shape=(None, 1, 4) dtype=float32>]

In [None]:
scores = dqn.test(env, nb_episodes=100, visualize=False)
print(np.mean(scores.history['episode_reward']))

In [None]:
_ = dqn.test(env, nb_episodes=15, visualize=True)

# 4. Reloading Agent from Memory

In [None]:
dqn.save_weights('dqn_weights.h5f', overwrite=True)

In [None]:
del model
del dqn
del env

In [None]:
env = gym.make('CartPole-v0')
actions = env.action_space.n
states = env.observation_space.shape[0]
model = build_model(states, actions)
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])

In [None]:
dqn.load_weights('dqn_weights.h5f')

In [None]:
_ = dqn.test(env, nb_episodes=5, visualize=True)

In [None]:

# state matrix
a = g/(lp*(4.0/3 - mp/(mp+mk)))
A = np.array([[0, 1, 0, 0],
              [0, 0, a, 0],
              [0, 0, 0, 1],
              [0, 0, a, 0]])

# input matrix
b = -1/(lp*(4.0/3 - mp/(mp+mk)))
B = np.array([[0], [1/mt], [0], [b]])

In [None]:

R = np.eye(2, dtype=int)          # choose R (weight for input)
Q = 5*np.eye(4, dtype=int)        # choose Q (weight for state)

# get riccati solver
from scipy import linalg

# solve ricatti equation
P = linalg.solve_continuous_are(A, B, Q, R)

# calculate optimal controller gain
K = np.dot(np.linalg.inv(R),
           np.dot(B.T, P))

In [None]:
def apply_state_controller(K, x):
    # feedback controller
    u = -np.dot(K, x)   # u = -Kx
    if u > 0:
        return 1, u     # if force_dem > 0 -> move cart right
    else:
        return 0, u     # if force_dem <= 0 -> move cart left

In [None]:

# get environment
env = gym.make('CartPole-v0')
env.env.seed(1)     # seed for reproducibility
obs = env.reset()

for i in range(1000):
    env.render()
    
    # get force direction (action) and force value (force)
    action, force = apply_state_controller(K, obs)
    
    # absolute value, since 'action' determines the sign, F_min = -10N, F_max = 10N
    abs_force = abs(float(np.clip(force, -10, 10)))
    
    # change magnitute of the applied force in CartPole
    env.env.force_mag = abs_force

    # apply action
    obs, reward, done, _ = env.step(action)
    if done:
        print(f'Terminated after {i+1} iterations.')
        break

env.close()