<a href="https://colab.research.google.com/github/samegbor19/ME206/blob/main/OCRL_106B_Student.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ME/EECS/BioE C106B/206B Homework 6


## Optimal Control and Reinforcement Learning
In this notebook, you'll find the coding section for homework 6, Optimal Control and Reinforcement Learning. To get started, **make a copy** of this notebook and save it in your drive.

If you haven't used a colab notebook environment before, notebooks are just like normal Python files! Each section of the code is split up into a small block called a code cell, which you can run by pressing shift+enter or the play button on the left side of each cell. Run the following code cell to import the libraries we'll be using in this assignment.

In [1]:
#import librarires
import numpy as np
from matplotlib import pyplot as plt

## Problem 1: The Linear Quadratic Regulator
In this question, we'll implement a finite horizon linear quadratic regulator (LQR) controller to stabilize a discrete time system about the origin.
Let's review the formulation of the LQR problem. We have a linear discrete time system:
$$
x_{k+1} = Ax_k + Bu_k
$$
And would like to find the optimal sequence of control inputs $u_0, ..., u_{N-1}$ that minimize the cost function:
$$
J =  x_N^T Q_f x_N + \sum_{k = 0}^{N-1}(x_k^T Q x_k + u_k^T R u_k)
$$
We can show that the optimal inputs $u_i$ in this sequence may be calculated through the expression:
$$
u_i = -(R + B^TP_{i+1}B)^{-1}B^TP_{i+1}A x_i
$$
Where $P_i$ is a special matrix that comes from a recursive equation known as a Riccati equation:
$$
P_i = A^TP_{i+1}A + Q - A^TP_{i+1}B(R + B^TP_{i+1}B)^{-1}B^TP_{i+1}A
$$
To solve for the control input, we must calculate the $P_i$ matrix by iterating backwards in time from $P_N = Q_f$, the matrix in the terminal cost. Let's implement the different pieces of this controller!

#### Riccati Equation
First, let's write a function that implements the recursive Riccati equation from above. It should take in the matrices from the cost function, system dynamics, and $P_{i+1}$, and return $P_i$.

This won't be the only function we'll need, however! Our first function only gives us a way to calculate $P_i$ from $P_{i+1}$. We'll need a second function that computed $P_i$ from the final condition $P_N = Q_f$. This function should perform the following process:

1.   Start with $P_N = Q_f$
2.   Step $P_N$ backwards in time using the recursive function to get $P_{N-1}$
3. Continue stepping the $P$ matrix backwards until you have arrived at the correct index, $i$
4. Return $P_i$



In [2]:
def eval_recursion_step(A, B, Q, R, Pip1):
  """
  Function to recursively evaluate Pi from Pi+1 in the riccati equation.
  Inputs:
    A (NumPy Array): The A matrix from the system dynamics
    B (NumPy Array): The B matrix from the system dynamics
    Q (NumPy Array): The Q weight matrix in the LQR cost function
    R (NumPy Array): The R weight matrix in the LQR cost function
    Pip1 (NumPy Array): Pi+1, the P matrix one step ahead in time
  Returns:
    Pi (NumPy Array): The matrix Pi, as calculated from the recursive Riccati equation
  """
  #Return the recursive Pi value
  Pi = ...
  return Pi

def eval_p_i(A, B, Q, R, PN, N, i):
  """
  Function to recursively evaluate Pi from PN in the riccati equation.
  Inputs:
    A (NumPy Array): The A matrix from the system dynamics
    B (NumPy Array): The B matrix from the system dynamics
    Q (NumPy Array): The Q weight matrix in the LQR cost function
    R (NumPy Array): The R weight matrix in the LQR cost function
    PN (NumPy Array): P(N) = Qf, the P matrix in the terminal cost
    N (int): The horizon, N, in the cost function 
    i (int): The index of Pi we'd like to calculate (zero-indexed)
  Returns:
    Pi (NumPy Array): The matrix Pi, as calculated from iterating the Riccati equation

  Hint:
    To find the number of times we need to iterate, see how close i is to the 
    total number of steps, N. Try using a loop to iterate Pi backwards in time.
  """
  #initialize the value of Pi
  Pi = ...
  return Pi

#### LQR Controller
Now, let's implement a function that returns the control input $u_i$ from the LQR solution discussed above. Recall, the input is:
$$
u_i = -(R + B^TP_{i+1}B)^{-1}B^TP_{i+1}A x_i
$$

In [None]:
def eval_lqr_input(A, B, Q, R, Pip1, xi):
  """
  Function to evaluate the LQR control input, ui.
  Inputs:
    A (NumPy Array): The A matrix from the system dynamics
    B (NumPy Array): The B matrix from the system dynamics
    Q (NumPy Array): The Q weight matrix in the LQR cost function
    R (NumPy Array): The R weight matrix in the LQR cost function
    Pip1 (NumPy Array): Pi+1, the P matrix one step ahead in time
    xi (NumPy Array): The state vector of the system at time step i
  Returns:
    ui (NumPy Array): The LQR input to the system at time step i
  """
  #evaluate and return the control input ui
  ui = ...
  return ui

#### LQR Simulation
Let's simulate the LQR controller using a simple system. Run the code cell below to test out your LQR controller. If your contoller works, you should see the trajectories of the system converging smoothly to the origin. Attach these plots to your solution!

In [None]:
# Define system matrices
A = np.array([[0, 1, 0], [0, 0, 1], [1, -2, 4]])
B = np.array([[0, 0, 1]]).T

# Define initial condition
x0 = np.array([[1, 1, 1]]).T

# Define LQR weight matrices (positive definite or positive semidefinite)
Qf = np.eye(3)
P = np.eye(3)
R = np.eye(1)
Q = np.eye(3)

# Define LQR horizon
N = 20

#initialize the state at the initial condition
xi = x0

#define history vector
xHist = x0

#simulate the system over the control horizon
for i in range(N):
  #find Pi+1 evaluating from P(N) = Qf
  Pip1 = eval_p_i(A, B, Q, R, Qf, N, i+1)

  #get the input from the LQR controller
  ui = eval_lqr_input(A, B, Q, R, Pip1, xi)

  #apply the input to the system
  xi = A @ xi + B @ ui

  #update the history matrix
  xHist = np.hstack((xHist, xi))

#extract the results and plot
x1Hist = xHist[0, :].tolist()  
x2Hist = xHist[1, :].tolist()
x3Hist = xHist[2, :].tolist()

plt.plot(x1Hist)
plt.plot(x2Hist)
plt.plot(x3Hist)
plt.title("Stabilization Under LQR Control")
plt.xlabel("Time Step")
plt.ylabel("State Vector Values")
plt.legend(['x1', 'x2', 'x3'])
plt.show()

## Problem 2: Gradient Descent in Q-Learning
In practce, Q-learning depends on an algorithm called *gradient descent* to optimally fit a Q function based on data. What is gradient descent?

Gradient descent is an algorithm that helps us numerically find the minimum of a function and the argument that minimizes the function. It's widely used in machine learning, for instance, to optimize the weights in neural networks.

Let's discuss the steps of the algorithm. Suppose we have a function $f(x)$ that we'd like to minimize with respect to $x$.
1. Start with an initial guess, $x_0$, of the value of $x$ that minimizes $f(x)$.
2. Compute the gradient of $f(x)$ with respect to $x$ at the value of the guess, $\nabla_x f(x_0)$ (where $\nabla_x$ denotes the gradient operator with respect to $x$).
3. Update the guess $x_0$ through the update rule $x_1 = x_0 - \alpha ∇_xf(x_0)$. $\alpha$ is a value known as the *learning rate*, and controls how quickly and precisely the algorithm converges.
4. Continue iterating the value of $x_i$ using the update rule until the algorithm converges to the minimum of $f(x)$.

Why does this algorithm work? In short, we know that the direction of steepest increase of a function is given by its gradient $\nabla_x f(x)$. If we move opposite to this direction of steepest increase, we should eventually converge to a minimum (subject to certain conditions).

Let's implement the gradient descent algorithm on a very simple example. Let's find the minimum of the scalar function $f(x) = x^2$ using gradient descent.

In [None]:
def param_update(xi, grad, alpha):
  """
  Function to implement the gradient descent parameter update step
  xi+1 <- xi - alpha * grad f(xi)
  Inputs:
    xi (float): guess for the minimizer x at step i
    grad (float): gradient of the function f, calculated at xi
    alpha (float): learning rate
  Returns:
    xi+1 (float): the next guess of xi based on the parameter update equation
  """
  return ...

def get_grad(x):
  """
  Function to calculate the gradient of the function f(x) = x^2 with respect to x
  Inputs:
    x (float): value to evaluate the gradient at
  Returns:
    grad f(x) (float): gradient of f with respect to x, evaluated at x
  """
  return ...

def find_minimizer(x0, N, alpha):
  """
  Function to perform N iterations of gradient descent to find the value of x
  that minimizes f(x).
  Inputs:
    x0 (float): initial guess for the minimum of the function
    N (int): Number of times to perform gradient descent update step
    alpha (float): learning rate
  Returns:
    xN (float): estimate of the minimum after applying gradient descent N times
  """
  return ...

#### Testing Gradient Descent
Now that you've implemented the required functions for gradient descent, test your implementation by running the code cell below. This cell provides a visualization of gradient descent for a set of three different learning rates. You should see each plot converge to the minimum of $f(x)$ at a different rate. Attach the plot generated by the code to your solution.


In [None]:
#define lower and upper bounds for plotting
lBound = -5
uBound = -lBound

#get quadratic values for plot
numPts = 100
f_x = lambda x: x**2
xList = np.linspace(lBound, uBound, numPts).tolist()
fVals = [f_x(x) for x in xList]

#pick a random initial guess uniformly
x0 = np.random.uniform(uBound-1, uBound)

#define learning rate and number of iterations
alpha1 = 0.75
alpha2 = 0.5
alpha3 = 0.25
N = 10

#define history arrays to keep track of iterated values
xHist1 = []
yHist1 = []
xHist2 = []
yHist2 = []
xHist3 = []
yHist3 = []

#call gradient descent iteratively to show its progression
for i in range(N):
  #note: we call find_minimizer on "i" just so we can visualize the algorithm
  xHist1.append(find_minimizer(x0, i, alpha1)) #append x guess to history
  yHist1.append(f_x(xHist1[-1])) #find the y value associated with the x guess
  
  #repeat for second and third learning rates
  xHist2.append(find_minimizer(x0, i, alpha2))
  yHist2.append(f_x(xHist2[-1]))

  xHist3.append(find_minimizer(x0, i, alpha3))
  yHist3.append(f_x(xHist3[-1]))

#plot results
plt.plot(xList, fVals) #plot the function
plt.plot(xHist1, yHist1) #plot the guess using the first learning rate
plt.plot(xHist2, yHist2)
plt.plot(xHist3, yHist3)
plt.xlabel("x")
plt.ylabel("y")
plt.legend(['f(x)', 'alpha 1', 'alpha 2', 'alpha 3'])
plt.title("Convergence of Gradient Descent")
plt.show()

## Problem 3: Reinforcement Learning For Legged Robotics
In this problem, we'll use deep reinforcement learning to teach a simple legged robot system to walk!

#### MuJoCo and Stable Baselines Setup
We'll need a few libraries to train our robot to walk. First, we'll install MuJoCo. MuJoCo is a popular physics simulator in the reinforcement learning community. It contains many useful benchmark models and integrates well with standard reinforcement learning libraries.

Secondly, we'll install Stable Baselines 3 and gymnasium, two environments that will enable us to actually apply reinforcement learning. Stable Baselines 3 contains a set of strong implementations of reinformcent learning algorithms. We'll use an implementation of PPO, a deep RL algorithm, from stable baselines. This library makes it exceptionally easy to apply RL in practice! Gymnasium will provide us with an environment that interfaces between our deep RL policy and our MuJoCo model.

**Before running the cells below, click the "runtime" tab at the top of the page and then "change runtime type." Ensure "GPU" is selected under this option. This is required by MuJoCo to run correctly.** Once you've done this, run the cells below to install these libraries! Note that the code cells may take around a minute to run.

In [None]:
!pip install mujoco==2.3.3

from google.colab import files

import distutils.util
import subprocess
if subprocess.run('nvidia-smi').returncode:
  raise RuntimeError(
      'Cannot communicate with GPU. '
      'Make sure you are using a GPU Colab runtime. '
      'Go to the Runtime menu and select Choose runtime type.')
# Configure MuJoCo to use the EGL rendering backend (requires GPU)
print('Setting environment variable to use GPU rendering:')
%env MUJOCO_GL=egl

try:
  print('Checking that the installation succeeded:')
  import mujoco
  mujoco.MjModel.from_xml_string('<mujoco/>')
except Exception as e:
  raise e from RuntimeError(
      'Something went wrong during installation. Check the shell output above '
      'for more information.\n'
      'If using a hosted Colab runtime, make sure you enable GPU acceleration '
      'by going to the Runtime menu and selecting "Choose runtime type".')

print('Installation successful.')

In [None]:
!apt-get update && apt-get install ffmpeg freeglut3-dev xvfb  # For visualization
!pip install "stable-baselines3[extra]>=2.0.0a4"
!pip install glfw
!pip install gymnasium
!pip install gymnasium[box2d]

#### Defining the Training Environment
When setting up a reinforcement learning problem, the first thing we must do is define the training environment. We may do this with the library *gymnasium*. Gymnasium, or ``gym" environments are a standard type of environment in reinforcement learning.

To create an environment, we first import the gymnasium library. Then, to make an environment, we use the syntax:
```
import gymnasium as gym
env = gym.make('env-name')
```
Where 'env-name' is the name of the environment we wish to use. The gym library has a set of built-in environments we can use, but also supports custom environments.

Fill in the cell below by creating an environment with the name **'HalfCheetah-v4'**. This will create our legged robotics environment.



In [None]:
#import gym
import gymnasium as gym

#make the environment 'HalfCheetah-v4'
env = ...

#### Defining the Policy
Now that we've defined our environment, we can define our reinforcement learning *algorithm*. This will determine the numerical procedure our system will use to determine an optimal control policy for our robot.

The library *Stable Baselines 3* has a set of implementations of popular reinforcement learning libraries that integrate smoothly with Gym environments. We'll use an algorithm known as *Proximal Policy Optimization* (PPO). This is a popular RL algorithm that'll help us converge efficiently to an optimal policy. Stable baselines also has implementations of other popular algorithms such as deep Q-learning (DQN) and advantage actor critic (A2C).

To define a PPO algorithm for our environment, we use the following syntax:
```
model = PPO('policyType', env, verbose = 1)
```
Where 'policyType' is a string that refers to the type of neural network used in the algorithm, 'env' is our environment from above, and verbose specifies some printing options during the training procedure.

In the code cell below, create an instance of a PPO algorithm called policyModel. Your PPO should use a policy type called 'MlpPolicy' - this refers to a standard type of neural network called a multi-layer perceptron. You should pass in env from above and set verbose = 1.





In [None]:
#import some algorithms from stable baselines 3
from stable_baselines3 import SAC, PPO, DQN, A2C

#declare an initial observation (don't edit this line)
obs = env.reset()

#define your PPO algorithm using an 'MlpPolicy' - remember to pass this in as a string.
policyModel = ...

#### Training Your RL System
Now that we've defined our environment and algorithm, we're ready to begin the training process! This is the process by which we simulate our system, collect data, and iteratively improve our control policy.

To train our policy over N iterations, we use the syntax:
```
model.learn(total_timesteps = N)
```
Where model is the name of the algorithm instance we created in the previous step and N is some positive integer. Generally, the higher N is, and the more iterations we have, the better our policy will be! The basic idea behind this is that for larger N, we'll be able to collect more data and therefore get a better idea of how to control our system.

In the code cell below, call the .learn() function on the policyModel from the previous step. Set the value of N to be somewhere in the region of 700,000 - you should be able to get reasonable performance from the legged robot in this scenario for this value. The training process will take around 20-30 minutes to complete with this number of iterations.

As the training proceeds, a window will be printed specifying the current performance of the system. To gain an idea of how your system is performing, look at the data point titled 'ep_rew_mean' - this tells you the mean reward obtained by the system for a training episode - the higher this value is, the better! You should see this number increase greatly as the training proceeds.

With this in mind, in the code cell below, call the .learn() function on the policyModel from the previous step for N ~ 700,000 to train the model. Remember, this will take some time!

In [None]:
#call .learn(total_timesteps = N) on policyModel to train the system
...

#### Testing the Learned Control Policy
Now that we've finished training the model, we can test it out on the system and observe its performance! After we've trained the model, we retrieve a copy of the environment. We may use the syntax:
```
vec_env = policyModel.get_env()
```
After this, we can test out how our control policy performs by "stepping" the environment forward in time through our control policy. We can examine the behavior of our system under 1000 steps, for instance, using the syntax:
```
obs = vec_env.reset()
for i in range(1000):
    action, _state = policyModel.predict(obs, deterministic=True)
    obs, reward, done, info = vec_env.step(action)
```
Where policyModel.predict() returns the optimal action of our system given an observation of its current state.

Run the following code cell, which performs this simulation procedure and generates a video of the robot. Note that the extra functions in this code cell are simply required by nature of the Colab environment. In a standard Python file, the code above would be all we'd need to test our system.

If your training process was successful, you should see the half-cheetah robot moving to the right! It's alright if it performs the task in a way you weren't expecting - this is part of the challenge of RL!

As this cell renders a video, it will take around a minute to watch. Record the video and attach a link to your recording (either a YouTube or Google Drive link) to your solution.


In [None]:
import os
import base64
from pathlib import Path
from IPython import display as ipythondisplay
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv

# Set up display
os.system("Xvfb :1 -screen 0 1024x768x24 &")
os.environ['DISPLAY'] = ':1'

def show_videos(video_path="", prefix=""):
    """
    :param video_path: (str) Path to the folder containing videos
    :param prefix: (str) Filter the video, showing only the only starting with this prefix
    """
    html = []
    for mp4 in Path(video_path).glob("{}*.mp4".format(prefix)):
        video_b64 = base64.b64encode(mp4.read_bytes())
        html.append(
            """<video alt="{}" autoplay 
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{}" type="video/mp4" />
                </video>""".format(
                mp4, video_b64.decode("ascii")
            )
        )
    ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))

def record_video(env_id, model, video_length=500, prefix="", video_folder="videos/"):
    """
    :param env_id: (str)
    :param model: (RL model)
    :param video_length: (int)
    :param prefix: (str)
    :param video_folder: (str)
    """
    eval_env = DummyVecEnv([lambda: gym.make("HalfCheetah-v4", render_mode="rgb_array")])
    # Start the video at step=0 and record 500 steps
    eval_env = VecVideoRecorder(
        eval_env,
        video_folder=video_folder,
        record_video_trigger=lambda step: step == 0,
        video_length=video_length,
        name_prefix=prefix,
    )

    obs = eval_env.reset()
    for _ in range(video_length):
        action, _ = model.predict(obs)
        obs, _, _, _ = eval_env.step(action)

    # Close the video recorder
    eval_env.close()
  
#Run video recording!
record_video("HalfCheetah-v4", policyModel, video_length=500, prefix="policy-cheetah")
show_videos("videos", prefix="policy")