In [1]:
import numpy as np
from numpy import float64 as realN

from DaTaReachControl import Interval

from controlN import *
from control import *

import matplotlib.pyplot as plt
%matplotlib widget
%config InlineBackend.figure_format = 'svg'

In [2]:
# Transformation from Interval type to Tuple(array type)
def n2i(x_lb, x_ub):
    if isinstance(x_lb, int) or isinstance(x_lb, float):
        return Interval(float(x_lb), float(x_ub))
    res = np.full(x_lb.shape, Interval(0),dtype=Interval)
    if len(x_lb.shape) == 1:
        for i in range(res.shape[0]):
            res[i] = Interval(x_lb[i], x_ub[i])
    elif len(x_lb.shape) == 2:
        for i in range(res.shape[0]):
            for j in range(res.shape[1]):
                res[i,j] = Interval(x_lb[i,j], x_ub[i,j])
    else:
        for i in range(res.shape[0]):
            for j in range(res.shape[1]):
                for k in range(res.shape[2]):
                    res[i,j,k] = Interval(x_lb[i,j,k], x_ub[i,j,k])
    return res

def i2n(intVal):
    if isinstance(intVal, Interval):
        return intVal.lb, intVal.ub
    res_lb = np.empty(intVal.shape, dtype=realN)
    res_ub = np.empty(intVal.shape, dtype=realN)
    if len(intVal.shape) == 1:
        for i in range(res_lb.shape[0]):
            res_lb[i], res_ub[i] = intVal[i].lb, intVal[i].ub
    elif len(intVal.shape) == 2:
        for i in range(res_lb.shape[0]):
            for j in range(res_lb.shape[1]):
                res_lb[i,j], res_ub[i,j] = intVal[i,j].lb, intVal[i,j].ub
    else:
        for i in range(res_lb.shape[0]):
            for j in range(res_lb.shape[1]):
                for k in range(res_lb.shape[2]):
                    res_lb[i,j,k], res_ub[i,j,k] = intVal[i,j,k].lb, intVal[i,j,k].ub
    return res_lb, res_ub

def gen_int(shape=None, minVal=-10, widthMax=10):
    if shape is None:
        lb = widthMax * np.random.random() + minVal
        ub = lb + widthMax * np.random.random()
        return Interval(float(lb), float(ub))
    else:
        lb = widthMax * np.random.random(shape) + minVal
        ub = lb + widthMax * np.random.random(shape)
        return n2i(lb,ub)

In [3]:
# Define the Unicycle system dynamics and the initial trajectory is generated
# randomnly from the data

# Define a seed for repeatability
np.random.seed(1605) # 801, 994, 3338
# val = int(np.random.uniform(0,4000))
# print (val)
# np.random.seed(val)

# Sampling time
sampling_time = 0.1

###### Define the one step dynamcis of the unicycle ############
def one_step_dyn(current_state, current_input):
    """
    Expects current_state and current_input as 2D matrices where each row is a
    unique time stamp

    Returns a 2D numpy vector of the same number of rows

    current state has 3 dimensions --- position (x, y) and heading (theta)
    current input has 2 dimensions --- velocity (v) and turning rate (w)
    sampling_time has been defined above
    """
    if current_state.ndim == 2 and current_input.ndim == 2:
        if current_state.shape[0] != current_input.shape[0]:
            raise ValueError('Expected current state and input to have the '
                'same number of rows.')
        x, y, theta = current_state.T
        v, w = current_input.T
        nearly_one = np.ones((current_state.shape[0],))
        nearly_zero = np.zeros((current_state.shape[0],))
    elif current_state.ndim == 1 and current_input.ndim == 1:
        x, y, theta = current_state[:]
        v, w = current_input[:]
        nearly_one = 1
        nearly_zero = 0
    else:
        print(current_state, current_input)
        raise ValueError('state and input must be numpy matrices 1D or 2D')

    delta_v = v * sampling_time
    delta_w = w * sampling_time

    if current_state.ndim == 2:
        # Vector delta_w
        nearly_one[abs(delta_w) > 1e-3] = np.sin(delta_w[abs(delta_w) > 1e-3])                                           / delta_w[abs(delta_w) > 1e-3]
        nearly_zero[abs(delta_w) > 1e-3] = (np.cos(delta_w[abs(delta_w) > 1e-3])
                                            - 1) / delta_w[abs(delta_w) > 1e-3]
    elif abs(delta_w) > 1e-3:
        # Scalar delta_w
        nearly_one = np.sin(delta_w) / delta_w
        nearly_zero = (np.cos(delta_w) - 1) / delta_w

    next_state_mat = np.vstack((x + delta_v * (np.cos(theta) * nearly_one
                                               + np.sin(theta) * nearly_zero),
                                y + delta_v * (np.sin(theta) * nearly_one
                                               - np.cos(theta) * nearly_zero),
                                theta + delta_w)).T
    current_state_der = np.vstack((v * np.cos(theta), v * np.sin(theta),
                                    w)).T
    return next_state_mat, current_state_der


# Define the initial satte and the axis limits
initial_state = np.array([-2, -2.5, np.pi/2])

# Planning goals
target_position = np.zeros(2)

# Terminate the sequential control if cost is below this threshold
cost_thresh = 0.5 * (0.5 ** 2)

# Number of data in initial trajectory
n_data_max = 20

# max number of iteration
max_iteration = 70 - n_data_max

################### Input bounds  #################################
v_max = 4
w_max = 0.5 * (2*np.pi)
v_min = -v_max
w_min = -w_max
input_lb = np.array([v_min, w_min])
input_ub = np.array([v_max, w_max])

# Generate input sequence
v_seq = -1 *(np.random.rand(n_data_max,1) - 0) * v_max       # Go only backwards
w_seq = 2 * (np.random.rand(n_data_max,1) - 0.5) * w_max
nSep = int(n_data_max /2)
sepIndexes = np.random.choice([ i for i in range(n_data_max)], nSep, replace=False)
# The trajectory should try system response in each control direction
w_seq[0,0] = 0.0 #
v_seq[0,0] = 0.0
for i in range(1,nSep):
  v_or_theta = np.random.randint(0,2)
  if v_or_theta == 0: # pick v
    w_seq[sepIndexes[i],0] = 0
  else: # pick theta
    v_seq[sepIndexes[i],0] = 0
rand_init_input_vec = np.hstack((v_seq,w_seq))
# print (rand_init_input_vec)
###################################################################

# Generate the random trajectory corresponding to random input sequence
rand_init_traj_vec = np.zeros((n_data_max + 1, initial_state.size))
rand_init_traj_der_vec = np.zeros((n_data_max, initial_state.size))
rand_init_traj_vec[0, :] = initial_state
for indx_data in range(n_data_max):
    # Get the next state based on the current state and current input
    rand_init_traj_vec[indx_data+1,:], rand_init_traj_der_vec[indx_data,:]=\
            one_step_dyn(rand_init_traj_vec[indx_data, :],
                                rand_init_input_vec[indx_data, :])
#######################################################################

# This might need to be adjust depending on the seed
xlim_tup = [-3,1.5]
ylim_tup = [-4,1.1]

def compute_cost(next_state_mat):
    """
    next_state_mat is a Nx4 dimensional vector

    Returns a numpy 1D matrix
    :param next_state_mat:
    :return:
    """
    if next_state_mat.ndim != 2:
        raise ValueError('next_state_mat must be a 2D numpy matrix')
    delta_x_y = next_state_mat[:, :2] - np.tile(target_position[:2],
                                                (next_state_mat.shape[0], 1))
    cost = 0.5 * (np.linalg.norm(delta_x_y, axis=1)) ** 2
    return cost

# Generate the cost of the initial trajectory
rand_init_cost_val_vec = compute_cost(rand_init_traj_vec[:-1,:])

########## Objective funcions ########################################
def true_objective(current_state, current_input):
    """
    Compute true objective function
    """
    next_state, _ = one_step_dyn(current_state, current_input)
    return compute_cost(next_state)

def DaTa(next_state, current_control):
    """
    Defining the cost function for the Taylor method approach
    """
    return 0.5 *((next_state[0]-target_position[0])*(next_state[0]-target_position[0])+\
            (next_state[1]-target_position[1])*(next_state[1]-target_position[1]))

########## Utilities functions #######################################
def exit_condition(current_state, current_input, next_state):
    """
    Exit condition
    """
    return compute_cost(next_state) <= cost_thresh

cost_markersize = 5
traj_markersize = 10
target_markersize=30
initstate_markersize =30

# File prefix name for logging
log_file="unicycle_dyn"
log_extension_file=".png"
# Save plot in a tex file
save_plot_tex = True

def draw_initial_plot(xlim_tup, ylim_tup, target_position, cost_thresh,
                      initial_state, rand_init_traj_vec):
    """
    Plot the environment, distance contours, initial starting point, initial
    data, and the target set
    """
    # Draw the plot
    fig = plt.figure()
    ax = fig.gca()
    ax.set_aspect('equal')                                 # Equal x and y axis
    plt.xlabel(r'$\mathrm{p_x}$')
    plt.ylabel(r'$\mathrm{p_y}$')
    ax.set_xlim(xlim_tup)
    ax.set_xticks(np.round(np.arange(xlim_tup[0], xlim_tup[1] + 1, 2)))
    ax.set_ylim(ylim_tup)
    plt.grid()

    draw_theta = np.linspace(0, 2 * np.pi, 100)         # For plotting circles
    zorder_init = 1e4                                   # Zorder for plotting
    skip_marker = 1

    # Draw contour plots
    for r_temp in np.arange(1, 20, 1):
        plt.plot(target_position[0] + r_temp * np.cos(draw_theta),
                 target_position[1] + r_temp * np.sin(draw_theta),
                 color='k', linewidth=1)

    # Draw the initial state
    plt.scatter(initial_state[0], initial_state[1], initstate_markersize,
            marker='d', color='y', label=r'$\mathrm{Initial\ state}$',
            zorder=zorder_init + 2)

    # Draw initial trajectory
    plt.scatter(rand_init_traj_vec[1::skip_marker, 0],
                rand_init_traj_vec[1::skip_marker, 1], traj_markersize,
                marker='s', color='b', zorder=zorder_init,
                label=r'$\mathrm{Initial\ data}$')

    plt.scatter(target_position[0], target_position[1], target_markersize,
                marker = '*', color='r', zorder=11,
                label=r'$\mathrm{Target\ state}$')

    # Draw the acceptable closeness to the target
    dist_thresh = np.sqrt(cost_thresh * 2)
    plt.plot(target_position[0] + dist_thresh * np.cos(draw_theta),
             target_position[1] + dist_thresh * np.sin(draw_theta),
             color='k', linestyle=':', linewidth=1, zorder=zorder_init,
             label=r'$\mathrm{Target\ set}$')

    # Interpolate the data points --- approximation
    plt.plot(rand_init_traj_vec[1:, 0], rand_init_traj_vec[1:, 1], color='b')
    ax.legend(loc='center left', ncol=1, labelspacing=0.25, framealpha=1,
              bbox_to_anchor=(1.,0.5))
    plt.tight_layout()
    return ax

def runtime_plot(p_state, p_input, n_state, mt, mc, ms, ml,first=False):
    ax = plt.gca()
    if first:
        ax.plot([p_state[0, 0], n_state[0, 0]],
            [p_state[0, 1], n_state[0, 1]], linestyle='-', marker=mt,
            markerSize=ms, color=mc, label=ml)
        ax.legend(loc='center left', ncol=1, labelspacing=0.25, framealpha=1,
              bbox_to_anchor=(1.,0.5))
    else:
       ax.plot([p_state[0, 0], n_state[0, 0]],
            [p_state[0, 1], n_state[0, 1]], linestyle='-', marker=mt,
            markerSize=ms, color=mc)
    plt.draw()

In [4]:
print(rand_init_input_vec)

[[ 0.          0.        ]
 [-0.19923851 -0.31024989]
 [ 0.          1.97601667]
 [-3.09921685  0.        ]
 [ 0.         -1.34734367]
 [-0.54810895  2.24120606]
 [-2.0463684   0.        ]
 [-1.45293717  1.51704646]
 [-3.47789329  0.        ]
 [-3.52722983 -3.09718958]
 [-3.54479197  2.01994236]
 [-2.80351684  2.31845043]
 [-3.0552482   1.40898402]
 [ 0.          0.5500807 ]
 [-1.79214526  2.73060321]
 [ 0.          0.87202811]
 [-0.2675446   0.        ]
 [-2.90371676  2.0299919 ]
 [-1.84936864  2.45134579]
 [-2.99617204  2.02864788]]


In [5]:
# Draw the initial environment and the initial random trajectory
xlim_tup = [-3,1.5]
ylim_tup = [-5.5,1.1]
ax = draw_initial_plot(xlim_tup, ylim_tup, target_position, cost_thresh,
                       initial_state, rand_init_traj_vec)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [6]:
from MyopicDataDrivenControlTrue import MyopicDataDrivenControlTrue
# Construct the initial data package for true one-step optimal control ########
training_data = {'trajectory': rand_init_traj_vec,
                 'input_seq': rand_init_input_vec,
                 'cost_val': rand_init_cost_val_vec}
true_ddc = MyopicDataDrivenControlTrue(training_data,
                                       input_lb,
                                       input_ub,
                                       true_objective,
                                       one_step_dyn=one_step_dyn,
                                       exit_condition=exit_condition)
res_true_ddc = true_ddc.solve(max_iteration,
                                runtime_info=runtime_plot, verbose=True)


0. Current state: [[-0.65 -4.99  3.11]] | Initial action guess: [-4.   -3.14]
Solver method: L-BFGS-B
Best action = [[-4.    3.14]] | Attained cost = 12.2599 | Time = 0.0053 s 

1. Current state: [[-0.25 -4.95  3.42]] | Initial action guess: [-4.   -3.14]
Solver method: L-BFGS-B
Best action = [[-4.    3.14]] | Attained cost = 11.4146 | Time = 0.0023 s 

2. Current state: [[ 0.11 -4.78  3.74]] | Initial action guess: [-4.   -3.14]
Solver method: L-BFGS-B
Best action = [[-4.    3.14]] | Attained cost = 10.2271 | Time = 0.0016 s 

3. Current state: [[ 0.4  -4.51  4.05]] | Initial action guess: [-4.   -3.14]
Solver method: L-BFGS-B
Best action = [[-4.    3.14]] | Attained cost = 8.8135 | Time = 0.0016 s 

4. Current state: [[ 0.59 -4.16  4.36]] | Initial action guess: [-4.   -3.14]
Solver method: L-BFGS-B
Best action = [[-4.    3.14]] | Attained cost = 7.3123 | Time = 0.0020 s 

5. Current state: [[ 0.67 -3.77  4.68]] | Initial action guess: [-4.   -3.14]
Solver method: L-BFGS-B
Best acti

In [8]:
from MyopicDataDrivenControlDaTaControl import MyopicDataDrivenControlDaTaControl
training_data_DaTa = {'trajectory': rand_init_traj_vec,
                 'trajectory_der': rand_init_traj_der_vec,
                 'input_seq': rand_init_input_vec,
                 'cost_grad': rand_init_cost_val_vec}
DaTa_ddc = MyopicDataDrivenControlDaTaControl(training_data_DaTa,
                            input_lb,
                            input_ub,
                            DaTa, delta_time=sampling_time,
                            one_step_dyn=one_step_dyn,
                            exit_condition=exit_condition,
                            state_to_context=None)
res_DaTa_ddc = DaTa_ddc.solve(max_iteration,
                        runtime_info=runtime_plot, verbose=True)

Using license file /home/franck/gurobi.lic
Academic license - for non-commercial use only

0. Current state: [[-0.35 -4.97  3.31]]
Best action = [[4.   3.14]] | Time = 0.0173 s 

1. Current state: [[-0.73 -5.1   3.62]]
Best action = [[4. 0.]] | Time = 0.0106 s 

2. Current state: [[-1.08 -5.29  3.62]]
Best action = [[-4.  0.]] | Time = 0.0072 s 

3. Current state: [[-0.73 -5.1   3.62]]
Best action = [[-4.    3.14]] | Time = 0.0077 s 

4. Current state: [[-0.41 -4.86  3.94]]
Best action = [[-4.    3.14]] | Time = 0.0070 s 

5. Current state: [[-0.18 -4.54  4.25]]
Best action = [[-4.    3.14]] | Time = 0.0075 s 

6. Current state: [[-0.06 -4.16  4.57]]
Best action = [[-4.  0.]] | Time = 0.0073 s 

7. Current state: [[-1.24e-03 -3.76e+00  4.57e+00]]
Best action = [[-4.  0.]] | Time = 0.0074 s 

8. Current state: [[ 0.06 -3.37  4.57]]
Best action = [[-4.    3.14]] | Time = 0.0080 s 

9. Current state: [[ 0.05 -2.97  4.88]]
Best action = [[-4.    3.14]] | Time = 0.0071 s 

10. Current state

In [9]:
from MyopicDataDrivenControlDaTaN import MyopicDataDrivenControlDaTaN
training_data_DaTa = {'trajectory': rand_init_traj_vec,
                 'trajectory_der': rand_init_traj_der_vec,
                 'input_seq': rand_init_input_vec,
                 'cost_grad': rand_init_cost_val_vec}
Q = np.array([[1.0, 0, 0], [0, 1, 0], [0, 0, 0]], dtype=np.float64)
DaTa_ddn = MyopicDataDrivenControlDaTaN(training_data_DaTa,
                            input_lb,
                            input_ub,
                            delta_time=sampling_time, Q = Q,
                            one_step_dyn=one_step_dyn,
                            exit_condition=exit_condition,
                            state_to_context=None)
# res_DaTa_ddn = DaTa_ddn.solve(max_iteration,
#                         runtime_info=runtime_plot, verbose=True)

In [10]:
res_DaTa_ddn = DaTa_ddn.solve(max_iteration,
                        runtime_info=runtime_plot, verbose=True)


0. Current state: [[-0.35 -4.97  3.31]]
2
Best action = [[4.   3.14]] | Time = 13.0586 s 

1. Current state: [[-0.73 -5.1   3.62]]
2
Best action = [[-4.    3.14]] | Time = 2.9359 s 

2. Current state: [[-0.41 -4.86  3.94]]
2
Best action = [[-4.    3.14]] | Time = 0.0003 s 

3. Current state: [[-0.18 -4.54  4.25]]
2
Best action = [[-4.    3.14]] | Time = 0.0004 s 

4. Current state: [[-0.06 -4.16  4.57]]
2
Best action = [[-4.    3.14]] | Time = 0.0003 s 

5. Current state: [[-0.06 -3.76  4.88]]
2
Best action = [[-4.    3.14]] | Time = 0.0003 s 

6. Current state: [[-0.19 -3.38  5.19]]
2
Best action = [[-4.    3.14]] | Time = 0.0003 s 

7. Current state: [[-0.43 -3.06  5.51]]
2
Best action = [[-4.    3.14]] | Time = 0.0003 s 

8. Current state: [[-0.75 -2.83  5.82]]
2
Best action = [[-4.    3.14]] | Time = 0.0003 s 

9. Current state: [[-1.13 -2.71  6.14]]
7
Best action = [[2.14 3.14]] | Time = 0.0003 s 

10. Current state: [[-0.92 -2.71  6.45]]
2
Best action = [[4.   3.14]] | Time = 0.