In [22]:
from scipy.optimize import minimize
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as anim
import random
from IPython.display import HTML

## Reference Generator

In [65]:
def generate_ref(x_c, x_g, h, dt = 0.05, nominal_speed = 15, bound = (0, 20)):
    """ generates reference speed and predicted positions
    x_c   - current position
    x_g   - goal position
    h     - future horizon
    dt    - time step
    kp    - propotional gain
    bound - the maximum absolute speed 
    """
    us = []
    xs = []
    
    for i in range(h):
        diff = x_g - x_c
        v = nominal_speed * diff/(np.linalg.norm(diff))
        
        for i in range(2):
            if abs(v[i]) < bound[0]:
                v[i] = (v[i]/abs(v[i])) * bound[0]
            if abs(v[i]) > bound[1]:
                v[i] = (v[i]/abs(v[i])) * bound[1]
                                
        x_c = x_c + dt * v
        us.extend(v)
        xs.extend(x_c)
        
    return us + xs

## Environment Setup

In [143]:
# future horizon
hor = 7

# initial position of robot
x_i = np.array([45, 10])

# goal position of robot
x_goal = np.array([45, 75])

# obstacle data
x_o = np.array([[55, 50], [35, 50]])
u_o = np.array([[0, 0], [0, 0]])
num_obs = x_o.shape[0]

# trust values
trust = np.array([0.2, 0.8])

# parameters for the trust part
gamma = 0.03     # lower limit of the cbf weight parameter
delta = 0.08    # trust sensitivity - higher value -> higher effect of trust
sensitivity = 2

# assertion to keep the cbf weight parameter positive for all values of trust
# assert(gamma * delta <= 1.53)

# convert trust to weight
# w = ((1/gamma) + delta * (np.exp((-1 * trust)) - 1)) ** (-1)
w = gamma + delta * trust**sensitivity
print(w)


# parameters
R = 4           # safety margin
dt = 0.05       # time step
bound = 20      # speed bound

# random color generator for plots
r = lambda: random.randint(0, 255)

[0.0332 0.0812]


## Setting up the control problem

In [144]:
def objective(u, u_ref):
    '''Objective is just a reference input tracker
    '''
    return np.sum((u[: 2 * hor] - u_ref[: 2 * hor]) ** 2)

def cbf(u, i, p, x):
    """the CBF constraint (inequality)
    """
    if p != 0:
        x = u[2 * (hor + p - 1): 2 * (hor + p)]
    
    x_1 = u[2 * (hor + p): 2 * (hor + p + 1)]
    
    x_obs = x_o[i, :]
    u_obs = u_o[i, :]
    x_obs = x_obs + u_obs * p * dt

    return np.sum((x_1 - x_obs) ** 2) - R*R + (w[i] - 1) * (np.sum((x - x_obs) ** 2) - R*R)

def model_constr(u, p, x, dt):
    """the model constraint (equality)
    """
    # x_k
    if p != 0:
        x = u[2 * (hor + p - 1): 2 * (hor + p)]
    
    # x_k+1
    x_1 = u[2 * (hor + p): 2 * (hor + p + 1)]
    
    # u_k
    u_c = u[2 * p: 2 * (p + 1)]
    
    # x_k+1 = x_k + u_k * dt 
    return x_1 - x - np.multiply(dt, u_c)

In [145]:
# setting up initial solutions and logging
x = x_i
X = [x]
V = []
X_o = [x_o]

# bounds for the decision variables
bounds = 2 * hor * [
    (-bound, bound),
] + 2 * hor * [(None, None)]


iters = 500
for k in range(iters):
    cstr = []
    
    # generate reference for the next optimization step
    u_ref = generate_ref(x, x_goal, hor, dt = dt, bound = (0, bound))
    
    for i in range(num_obs):
        for p in range(hor):
            cstr.append({
                "type": "ineq",
                "fun": cbf,
                "args": (i, p, x, )
            }) 

    for i in range(hor):
        cstr.append({
            "type": "eq",
            "fun": model_constr,
            "args": (i, x, dt, )
        })
        
    u = minimize(objective, u_ref, method = "SLSQP", args = (u_ref, ), bounds = bounds, constraints = cstr)
    u = np.array(u.x)
    x = u[2 * hor: 2 * (hor + 1)]
    X.append(x)
    V.append(u[: 2])
    
    x_o = x_o + u_o * dt
    X_o.append(x_o)
    
    if (k%20 == 19): print("Completed {} iters".format(k + 1))
    
    # check termination condition - less than a specific distance
    if np.linalg.norm(x - x_goal) < 0.5:
        print("Goal reached with acceptable tolerance")
        break
    
X = np.array(X)   
V = np.array(V)

Completed 20 iters
Completed 40 iters
Completed 60 iters
Completed 80 iters
Completed 100 iters
Goal reached with acceptable tolerance


## Plotting Path

In [146]:
# plotting path
plt.figure(figsize = (6, 8))

# plot start and goal
plt.plot(x_i[0], x_i[1], "gx")
plt.plot(x_goal[0], x_goal[1], "gd")

# plot obstacles and safety margins
th = np.linspace(0, 2 * np.pi, 100)
for i in range(num_obs):
    plt.plot(x_o[i, 0], x_o[i, 1], "x", color = '#%02X%02X%02X' % (r(),r(),r()))

for i in range(num_obs):
    xunit = R * np.cos(np.array(th)) + x_o[i, 0]
    yunit = R * np.sin(np.array(th)) + x_o[i, 1]
    if i == 0: plt.plot(xunit, yunit, "r-")
    else: plt.plot(xunit, yunit, "r-", label='_nolegend_')

# plot path
plt.plot(X[:, 0], X[:, 1], "b-")

legends = ["Start", "Goal"]
legends += ["Obstacle with trust: {}".format(t) for t in trust]
legends += ["Safety margin", "Path"]
plt.legend(legends)

plt.xlim((10, 70))
plt.ylim((0, 80))
plt.xlabel("Global x")
plt.ylabel("Global y")

plt.show()

## Plotting Speed

In [100]:
# plotting speeds
fig, ax = plt.subplots(3, figsize = (8, 6))
fig.tight_layout(pad = 2)

# speed in X, Y, and magnitude
t = np.linspace(0, dt * (k + 1), (k + 1))
ax[0].plot(t, V[:, 0], "b-")
ax[1].plot(t, V[:, 1], "b-")
ax[2].plot(t, np.sqrt(V[:, 0] ** 2 + V[:, 1] ** 2), "b-")

ax[0].title.set_text('Velocity-X')
ax[1].title.set_text('Velocity-Y')
ax[2].title.set_text('Velocity Magnitude')

plt.xlabel("Time (s)")
fig.text(-0.02, 0.5, 'Speed (units/s)', va='center', rotation='vertical')

plt.show()

## Display animation to understand trajectory

In [20]:
# plot animation
%matplotlib tk
fig = plt.figure(figsize = (6, 8))
ax = fig.add_subplot(1, 1, 1)

colors = ['#%02X%02X%02X' % (r(),r(),r()) for a in range(num_obs)]

def update(i):
    ax.clear()
    
    # plot start and goal
    ax.plot(x_i[0], x_i[1], "gx")
    ax.plot(x_goal[0], x_goal[1], "gd")

    # plot obstacles and safety margins
    th = np.linspace(0, 2 * np.pi, 100)
    obs = X_o[i]
    for j in range(num_obs):
        ax.plot(obs[j, 0], obs[j, 1], "x", color = colors[j])

    for j in range(num_obs):
        xunit = R * np.cos(np.array(th)) + obs[j, 0]
        yunit = R * np.sin(np.array(th)) + obs[j, 1]
        if j == 0: ax.plot(xunit, yunit, "r-")
        else: ax.plot(xunit, yunit, "r-", label='_nolegend_')

    # plot path
    ax.plot(X[: i + 1, 0], X[: i + 1, 1], "b-")

    legends = ["Start", "Goal"]
    legends += ["Obstacle with trust: {}".format(t) for t in trust]
    legends += ["Safety margin", "Path"]
    plt.legend(legends)

    plt.xlim((10, 70))
    plt.ylim((0, 80))
    plt.xlabel("Global x")
    plt.ylabel("Global y")
    
a = anim.FuncAnimation(fig, update, frames = k + 2, repeat = False, interval = 2)

In [17]:
HTML(a.to_jshtml())

## Save animation if needed

In [18]:
Writer = anim.writers['ffmpeg']
writer = Writer(fps = 30, bitrate = 5000)
a.save('anims/12.mp4', writer = writer)