In [15]:
import numpy as np
import sympy as sp
from sympy.matrices import Matrix

def symbolic_dynamics_pendulum():
    """Symbolic dynamics for pendulum
    """
    m, g, L, theta, theta_dot, u, dt = sp.symbols('m g L theta theta_dot u dt')

    # inputs and states
    inputs = Matrix([u])
    states = Matrix([theta, theta_dot])

    # dynamics for a pendulum of mass m and center of mass L
    f = Matrix([theta_dot, m*g*L*sp.sin(theta)/(m*L*L)])

    # discretize the system using euler integration
    f_discrete = states + f*dt

    # take the jacobian wrt states and inputs
    A_discrete = f_discrete.jacobian(states) # df/dx
    B_discrete = f_discrete.jacobian(inputs) # df/du

    # define parameters
    parameters = Matrix([m,g,L])

    # create lambdas
    f_discrete_func = sp.lambdify((states, inputs, dt, parameters), f_discrete)
    A_discrete_func = sp.lambdify((states, inputs, dt, parameters), A_discrete)
    B_discrete_func = sp.lambdify((states, inputs, dt, parameters), B_discrete)

    return f_discrete_func, A_discrete_func, B_discrete_func



(<function _lambdifygenerated(_Dummy_23, _Dummy_24, dt, _Dummy_25)>,
 <function _lambdifygenerated(_Dummy_26, _Dummy_27, dt, _Dummy_28)>,
 <function _lambdifygenerated(_Dummy_29, _Dummy_30, dt, _Dummy_31)>)

In [16]:
import numpy as np

class ilqr:
    def __init__(self, init_state, target_state, initial_guess, dt, start_time, end_time, f_discrete, A, B, Q_k, R_k, Q_T, parameters, n_iterations):
        # states
        self.init_state_ = init_state
        self.target_state_ = target_state
        self.inputs_ = initial_guess
        self.n_states_ = np.shape(init_state)[0] # The dimensions of the state vector
        self.n_inputs_ = np.shape(initial_guess)[1] # The dimension of the control vector

        # timing
        self.dt_ = dt
        self.start_time_ = start_time
        self.end_time_ = end_time
        self.time_span_ = np.arange(start_time, end_time, dt).flatten()
        self.n_timesteps_ = np.shape(self.time_span_)[0]

        # dynamics
        self.f_ = f_discrete
        self.A_ = A
        self.B_ = B

        # weighting for loss function, i.e. L = x_T^T Q_T x_T + sum of (x_k^T Q_k x_k + u_k^T R_k u_k)
        self.Q_k_ = Q_k # Weight for state vector
        self.R_k_ = R_k # Weight for control vector
        self.Q_T_ = Q_T # Weight for terminal state
        self.parameters_ = parameters

        # max iterations to run
        self.n_iterations_ = n_iterations

        # costs
        self.expected_cost_reduction_ = 0
        self.expected_cost_reduction_grad_ = 0
        self.expected_cost_reduction_hess_ = 0

    def rollout(self):
        """rollout of the simulated system given an initial state
        """
        # we store states and inputs as:
        # state = [., x_1, x_2, ..., x_N]
        # input =    [u_0, u_1, ..., u_{N-1}]
        # the first value in state is understood as self.init_state_
        states = np.zeros((self.n_timesteps_+1, self.n_states_)) # including initial state, x_0 to x_N
        inputs = np.zeros((self.n_timesteps_, self.n_inputs_)) # u_0 to u_{N-1}
        current_state = self.init_state_

        for i in range(0, self.n_timesteps_): # 0 to N-1
            current_input = self.inputs_[i,:] # u_k
            next_state = self.f_(current_state, current_input, self.dt_, self.parameters_).flatten() # x_{k+1} = f(x_k, u_k) 
            # store both u_k and x_{k+1}
            states[i+1,:] = next_state
            inputs[i,:] = current_input
            # update current state
            current_state = next_state

        # store trajectories
        self.states_ = states
        self.inputs_ = inputs

        return states, inputs

    def compute_cost(self, states, inputs):
        """Computes the cost

        Args:
            states (ndarray): State trajectory
            inputs (ndarray): Input trajectory

        Returns:
            double: Total cost, i.e. terminal cost + running cost
        """
        # accumulate cost to go
        total_cost = 0
        for i in range(0, self.n_timesteps_):
            current_x = states[i,:]
            current_u = inputs[i,:].flatten()
            current_cost = current_u.T @ self.R_k_ @ current_u
            total_cost = total_cost + current_cost
        # add terminal cost
        terminal_diff = (states[-1,:] - self.target_state_).flatten()
        terminal_cost = terminal_diff.T @ self.Q_T_ @ terminal_diff
        total_cost = total_cost + terminal_cost

        return total_cost

    def backward_pass(self):
        """[summary]

        Returns:
            ndarray: feedforward gain, k
            ndarray: feedback gain, K
            double: expected cost reduction
        """
        # starting from the last state
        V_xx = self.Q_T_ # since V_N = x_T^T Q_T x_T, V_xx(N) = Q_T_
        terminal_diff = (self.states_[-1, :] - self.target_state_).flatten()
        V_x = self.Q_T_ @ terminal_diff # V_x(N)

        # initialize control modifications to be stored
        k_trj = np.zeros((self.n_timesteps_, self.n_inputs_)) # (8b)
        K_trj = np.zeros((self.n_timesteps_, self.n_inputs_, self.n_states_)) # (8b)

        # initialize cost reduction
        expected_cost_reduction = 0
        expected_cost_reduction_grad = 0
        expected_cost_reduction_hess = 0

        # looping backwards from N-1 using the value V_{N}
        for i in reversed(range(0, self.n_timesteps_)):
            # current variables
            current_x = self.states_[i,:]
            current_u = self.inputs_[i,:]

            # updates to partial derivatives of cost function
            l_xx = self.Q_k_
            l_uu = self.R_k_
            l_x = self.Q_k_ @ np.zeros(self.n_states_).flatten()
            l_u = self.R_k_ @ current_u.flatten()

            # get jacobian of discrete dynamics
            A_k = self.A_(current_x, current_u, self.dt_, self.parameters_) # V'_x
            B_k = self.B_(current_x, current_u, self.dt_, self.parameters_) # V'_u
    
            # all the Q vector/matrices
            Q_x = l_x + A_k.T @ V_x # (5a)
            Q_u = l_u + B_k.T @ V_x # (5b)
            Q_ux = B_k.T @ V_xx @ A_k # (5c)
            Q_uu = l_uu + B_k.T @ V_xx @ B_k # (5d)
            Q_xx = l_xx + A_k.T @ V_xx @ A_k # (5e)

            # compute and store gains
            Q_uu_inv = np.linalg.inv(Q_uu) # TODO: this can be singular, try using (9)
            k = -Q_uu_inv @ Q_u # (6)
            K = -Q_uu_inv @ Q_ux # (6)

            k_trj[i,:] = k 
            K_trj[i,:,:] = K 

            # update the expected reduction (11a)
            expected_cost_reduction_grad += (-Q_u.T @ k)
            expected_cost_reduction_hess += (0.5 * k.T @ (Q_uu) @ (k))
            expected_cost_reduction += (expected_cost_reduction_grad + expected_cost_reduction_hess)

            # update hessian and gradient of value function for the next iteration
            V_x = Q_x + K.T @ Q_uu @ k + K.T @ Q_u + Q_ux.T @ k # (11b)
            V_xx = Q_xx + K.T @ Q_uu @ K + K.T @ Q_ux + Q_ux.T @ K # (11c)

        # store values
        self.expected_cost_reduction_grad_ = expected_cost_reduction_grad
        self.expected_cost_reduction_hess_ = expected_cost_reduction_hess
        self.expected_cost_reduction_ = expected_cost_reduction

        # store gains
        self.k_feedforward_ = k_trj
        self.K_feedback_ = K_trj

        return (k_trj, K_trj, expected_cost_reduction)

    def forward_pass(self, learning_rate):
        """Forward pass of ilqr

        Args:
            learning_rate (double): learning rate (\alpha)
            
        Returns:
            ndarray: updated states, \hat x (8a,b,c)
            ndarray: updated inputs, \hat u (12)
        """
        # initialize before integration
        states = np.zeros((self.n_timesteps_ + 1, self.n_states_)) # to store updated trajectory
        inputs = np.zeros((self.n_timesteps_, self.n_inputs_)) # to store updated inputs
        current_state = self.init_state_

        # initialize and start integrating going forward for:
        # state = [., x_1, x_2, ..., x_N]
        # input =    [u_0, u_1, ..., u_{N-1}]
        states[1,:] = current_state # (8a), assume that index 0 contains x_0
        for i in range(0, self.n_timesteps_):
            # use current gains
            current_feedforward = learning_rate * self.k_feedforward_[i,:] # (12), \hat represents the updated variables
            current_feedback = self.K_feedback_[i,:,:] @ (current_state - self.states_[i,:]) # (12)
            current_input = self.inputs_[i,:] + current_feedforward + current_feedback
            # simulate
            next_state = self.f_(current_state, current_input, self.dt_, self.parameters_).flatten()
            # store states and inputs
            states[i+1,:] = next_state
            inputs[i,:] = current_input.flatten()
            
            # update states
            current_state = next_state

        return (states, inputs)

    def solve(self):
        # rollout with the initial guess
        [states, inputs] = self.rollout()

        # compute initial cost
        current_cost = self.compute_cost(states, inputs)

        # learning parameters
        learning_speed = 0.95 # this can be modified, 0.95 is very slow
        low_learning_rate = 0.05 # if learning rate drops to this value stop the optimization
        low_expected_reduction = 1e-3 # determines optimality
        armijo_threshold = 0.1 # determines if current line search solve is good (labelled as 'c' in (13))

        # start solving
        for i in range(0, self.n_iterations_):
            # backward pass
            (k_feedforward, K_feedback, expected_reduction) = self.backward_pass()
            # print statistics
            print("Iteration = {}, Cost = {}, Expected reduction = {}".format(i, current_cost, expected_reduction))
            if (abs(expected_reduction) < low_expected_reduction):
                # there is no further reduction, end the optimization
                print("Stopping optimization, accepting solution")
                break
            
            # start forward pass and line search with \alpha = 1
            learning_rate = 1
            armijo_flag = 0
            # execute linesearch until the armijo condition is met (for now just check if the cost decreased)
            # TODO: add real armijo condition
            while (learning_rate > 0.05 and armijo_flag == 0):
                # compute forward pass
                (new_states, new_inputs) = self.forward_pass(learning_rate)
                new_cost = self.compute_cost(new_states, new_inputs)

                # compute armijo condition
                cost


SyntaxError: invalid syntax (1222818014.py, line 177)

In [15]:
a = np.array([1,2,3,4])
print(a[0])

1
