In [None]:
### --- Implementation of Penalty method with Projected Gradient Descent ---

# Applied to the model with domain, start/end point, velocity and acceleration constraint.
# Velocity and acceleration constraint are penalized.
# Domain constraint is tackled by Projected Gradient Descent.
# Start/end point is hard implemented.

In [None]:
import numpy as np
from numpy import linalg as LA
import plotly.graph_objects as go
import time

In [None]:
import PDEFEMCode.interface as pde_IO

# Elliptic PDE solution
#elliptic_pde_solution = pde_IO.pickle_load('Pickles/elliptic_multi_gaussian.pkl')

# Parabolic PDE solution
parabolic_pde_solution = pde_IO.pickle_load('../PDEFEMCode/Pickles/par_moving_bump_fast.pkl')

In [None]:
### Define model setting including constraints, emission field and initial trajectory
params = parabolic_pde_solution['params']
# Time
T = params.time_disc.T_fin

# Define number of time steps
N = params.time_disc.Nt + 1



# Define domain bounds
l = params.rect_mesh.P0
u = params.rect_mesh.P1

# Define start/end point of trajectory
y_s = [0.8, 0.1]
y_e = [0.8, 0.1]


dt = T/(N-1)
# Define initial trajectory
t = np.linspace(0,1,N)[np.newaxis,:]
opti_initial = (1-t)*np.asarray(y_s)[:,np.newaxis] + \
               t*np.asarray(y_e)[:,np.newaxis]
opti_initial[:,1] = y_s
opti_initial[:,-2] = y_e
y_init = opti_initial.T
print("Initial trajectory:\n")
print(y_init)
        
# Define emission field
#f = elliptic_pde_solution['f']
#df = elliptic_pde_solution['grad_f']
#f_pointwise = lambda coords: elliptic_pde_solution['f'](np.array([coords[1][0], coords[1][1]]))
f = parabolic_pde_solution['f']
df = parabolic_pde_solution['grad_f']
f_pointwise = lambda coords: parabolic_pde_solution['f'](np.array([coords[1][0], coords[1][1]]), coords[0])

# Define velocity constraint
C1 = 0.5

# Define acceleration constraint
C2 = 0.75

In [None]:
# Compute objective function value
def objective_fval(y):
    fval = 0
    fvals = f(y)
    for i in range(1, N):
        fval += fvals[i]
    fval = -dt*fval    
    return fval

# Compute objective gradient gradient
def objective_fgrad(y):
    fgrad = df(y)
    fgrad[0] = np.zeros(2)
    fgrad[1] = np.zeros(2)
    fgrad[-2] = np.zeros(2)
    fgrad[-1] = np.zeros(2)
    fgrad = -dt*fgrad
    return fgrad

In [None]:
# Compute velocity
def velocity(y, t):
    v = (1/dt)*np.sqrt((y[t+1][0]-y[t][0])**2+(y[t+1][1]-y[t][1])**2)
    return v

# Compute velocity constraint value
def velocity_cval(y, t):
    cval = velocity(y, t)**2-C1**2
    return cval

# Compute velocity constraint gradient
def velocity_cgrad(y, t):
    cgrad = np.zeros((N, 2))
    cgrad[t] = [-2*(y[t+1][0]-y[t][0]), -2*(y[t+1][1]-y[t][1])]
    cgrad[t+1] = [2*(y[t+1][0]-y[t][0]), 2*(y[t+1][1]-y[t][1])]
    cgrad = (1/dt)**2*cgrad
    return cgrad

In [None]:
# Compute acceleration
def acceleration(y, t):
    a = (1/dt)**2*np.sqrt((y[t+1][0]-2*y[t][0]+y[t-1][0])**2+(y[t+1][1]-2*y[t][1]+y[t-1][1])**2)
    return a

# Compute acceleration constraint value
def acceleration_cval(y, t):
    cval = acceleration(y, t)**2-C2**2
    return cval

# Compute acceleration constraint gradient
def acceleration_cgrad(y, t):
    cgrad = np.zeros((N, 2))
    cgrad[t-1] = [2*(y[t+1][0]-2*y[t][0]+y[t-1][0]), 2*(y[t+1][1]-2*y[t][1]+y[t-1][1])]
    cgrad[t] = [-4*(y[t+1][0]-2*y[t][0]+y[t-1][0]), -4*(y[t+1][1]-2*y[t][1]+y[t-1][1])]
    cgrad[t+1] = [2*(y[t+1][0]-2*y[t][0]+y[t-1][0]), 2*(y[t+1][1]-2*y[t][1]+y[t-1][1])]
    cgrad = (1/dt)**4*cgrad
    return cgrad

In [None]:
# Compute penalty term value
def penalty_tval_v(y):
    penalty_term = 0
    for t in range(0, N-1):
        penalty_term += (1/(N-1))*(max(0, velocity_cval(y, t)))**2
    return penalty_term

def penalty_tval_a(y):
    penalty_term = 0
    for t in range(1, N-1):
        penalty_term += (1/(N-2))*(max(0, acceleration_cval(y, t)))**2
    return penalty_term

# Compute penalty function value
def penalty_fval_nvec(y, alpha1, alpha2):
    fct = objective_fval(y)+(alpha1/2)*penalty_tval_v(y)+(alpha2/2)*penalty_tval_a(y)
    return fct
# ... but for vector penalties
def penalty_fval_vec(y, alpha1_vec, alpha2_vec):
    fct = objective_fval(y)
    penalty_v = 0
    penalty_a = 0
    for i in range(0, N-1):
        penalty_v += alpha1_vec[i]*(1/(N-1))*(max(0, velocity_cval(y, i)))**2
    for j in range(1, N-1):
        penalty_a += alpha2_vec[j-1]*(1/(N-2))*(max(0, acceleration_cval(y, j)))**2
    return fct + penalty_v + penalty_a

def penalty_fval(y, alpha1, alpha2, vec = False):
    if vec:
        return penalty_fval_vec(y, alpha1, alpha2)
    else:
        return penalty_fval_nvec(y, alpha1, alpha2)

# Compute penalty function gradient
def penalty_fgrad_nvec(y, alpha1, alpha2):
    penalty_term_grad = np.zeros((N, 2))
    for t in range(0, N-1):
        penalty_term_grad += alpha1*(1/(N-1))*(max(0, velocity_cval(y, t)))*velocity_cgrad(y, t)
    for t in range(1, N-1):
        penalty_term_grad += alpha2*(1/(N-2))*(max(0, acceleration_cval(y, t)))*acceleration_cgrad(y, t)
    penalty_term_grad[0] = np.zeros(2)
    penalty_term_grad[1] = np.zeros(2)
    penalty_term_grad[-2] = np.zeros(2)
    penalty_term_grad[-1] = np.zeros(2)
    fct_grad = objective_fgrad(y)+penalty_term_grad    
    return fct_grad
# ... but for vector penalties
def penalty_fgrad_vec(y, alpha1_vec, alpha2_vec):
    penalty_term_grad = np.zeros((N, 2))
    for i in range(0, N-1):
        penalty_term_grad += alpha1_vec[i]*(1/(N-1))*(max(0, velocity_cval(y, i)))*velocity_cgrad(y, i)
    for j in range(1, N-1):
        penalty_term_grad += alpha2_vec[j-1]*(1/(N-2))*(max(0, acceleration_cval(y, j)))*acceleration_cgrad(y, j)
    penalty_term_grad[0] = np.zeros(2)
    penalty_term_grad[1] = np.zeros(2)
    penalty_term_grad[-2] = np.zeros(2)
    penalty_term_grad[-1] = np.zeros(2)
    fct_grad = objective_fgrad(y)+penalty_term_grad
    return fct_grad

def penalty_fgrad(y, alpha1, alpha2, vec = False):
    if vec:
        return penalty_fgrad_vec(y, alpha1, alpha2)
    else:
        return penalty_fgrad_nvec(y, alpha1, alpha2)

In [None]:
# Proposal for adaptive penalty parameters

def p(a0 = None,  k = None):
    # Returns the non adapted penalty parameters associated to a, at step k
    return a0*((k+1)**5)

def adp(err, eps = 1e-3, a0 = None, k = None):
    # Returns adapted penalty parameter, depending on the error err in a certain constraint, and the tolerance eps
    # up to which we're happy when the constraint is satisfied
    p_nad = p(a0 = a0, k=k)
    num = np.clip(err, eps, np.inf)
    return p_nad * (1 + np.log(num/eps))   # further increase the penalty parameter if err is much bigger than eps

def update_penalties(err_v, err_a, eps_v = 1e-3, eps_a = 1e-3, a_v0 = None, a_a0 = None, k = None):
    new_av = adp(err_v, eps = eps_v, a0 = a_v0, k = k)
    new_aa = adp(err_a, eps = eps_a, a0 = a_a0, k = k)
    return new_av, new_aa

In [None]:
# Helper functions and variables for KKT check

# Vector implementation of inequality constraint functions
def gv_val(y):
    vc = np.zeros(N-1)
    for t in range(0, N-1):
        vc[t] = velocity_cval(y, t)
    return vc

def ga_val(y):
    ac  = np.zeros(N-2)
    for t in range(1, N-1):
        ac[t-1] = acceleration_cval(y, t)
    return ac

def inorm(q):
    return np.max(np.abs(q))

def KKT_check(av, aa, y, last_y=None, last_Lv=None, last_La=None, last_D_L=None, verbose=True, init=False, vec=False):

    gv = gv_val(y)
    ga = ga_val(y)

    # The supposed Lagrange multipliers
    Lv = av*np.maximum(0, gv)
    La = aa*np.maximum(0, ga)

    # Gradient of Lagrangian
    D_L = penalty_fgrad(y, av, aa, vec).flatten()

    if init:
        return Lv, La, y, D_L

    # Normalized complementarity conditions
    ccv = np.dot(gv, Lv)/(N-1)
    cca = np.dot(ga, La)/(N-2)

    # Are these quantities 0 ?
    grad_check = inorm(D_L)
    ccv_check = inorm(ccv)
    cca_check = inorm(cca)

    # Checking whether what should converge is converging (4 quantities)
    conv_D_P = inorm(D_L-last_D_L)
    conv_Lv = inorm(Lv-last_Lv)
    conv_La = inorm(La-last_La)
    conv_y = inorm(y-last_y)

    if verbose:
        print("KKT check:", "\n   [gradient of penalty objective, Lagrange multipliers (v, a), y] are converging if these vanish:", [conv_D_P, conv_Lv, conv_La, conv_y], "\n   [gradient of Lagrangian, complementarity conditions (v, a)] should then also vanish:", [grad_check, ccv_check, cca_check])

    return Lv, La, y, D_L

In [None]:
# Project trajectory to domain
def project_domain(y):
    for t in range(0, N-1):
        if y[t][0]<l[0]:
            y[t][0] = l[0]
        if y[t][1]<l[1]:
            y[t][1] = l[1]
        if y[t][0]>u[0]:
            y[t][0] = u[0]
        if y[t][1]>u[1]:
            y[t][1] = u[1]
    return y

In [None]:
# Backtracking
def backtracking(x, grad, fval, alpha1, alpha2, beta=0.5, gamma=1e-2, vec=True):
    count_f = 0
    sigma = 1.0
    y = x-grad
    y = project_domain(y)
    fval_ = penalty_fval(y, alpha1, alpha2, vec=vec)
    count_f += 1
    grad_m = x-y
    norm_grad_m = LA.norm(grad_m)
    rhs = gamma*(norm_grad_m**2)
    while fval-fval_<sigma*rhs:
        sigma *= beta
        y = x-sigma*grad
        y = project_domain(y)
        fval_ = penalty_fval(y, alpha1, alpha2, vec=vec)
        count_f += 1
        grad_m = (1/sigma)*(x-y)
        norm_grad_m = LA.norm(grad_m)
        rhs = gamma*(norm_grad_m**2)
    return y, fval_, count_f

In [None]:
# Projected Gradient Descent method
def projected_grad_descent(init, alpha1, alpha2, tol=1e-5, maxiter=25000, vec=True):
    iter = 0
    count_f = 0
    count_grad = 0
    y = init
    fval = penalty_fval(y, alpha1, alpha2, vec=vec)
    count_f += 1
    grad = penalty_fgrad(y, alpha1, alpha2, vec=vec)
    count_grad += 1
    while iter<maxiter:
        iter +=1
        y_prev = y
        y, fval, counted_f = backtracking(y_prev, grad, fval, alpha1, alpha2, vec=vec)
        count_f += counted_f
        grad = penalty_fgrad(y, alpha1, alpha2, vec=vec)
        count_grad += 1
        norm_yy =inorm(y_prev-y)
        if norm_yy<tol:
            print("Number of iterations:", iter)
            break
    if iter>=maxiter:
        print("Maximum number of iterations in Projected Gradient Descent step reached.")
    return y, count_f, count_grad

In [None]:
# Penalty method
def penalty_method(init, alpha1=1e-2, alpha2=1e-4, tol=1e-5, maxstep=200, adaptive=True, eps_v=1e-3, eps_a=1e-3, subproblem_tol=1e-7, verbose=True):
    start_time = time.time()
    step = 0
    count_f = 0
    count_grad = 0
    y = init

    if adaptive:
        a1 = alpha1*np.ones(N-1)
        a2 = alpha2*np.ones(N-2)
        a_v0 = a1
        a_a0 = a2
    else:
        a1 = alpha1
        a2 = alpha2

    # Initial KKT computations
    last_Lv, last_La, last_y, last_D_L = KKT_check(a1, a2, y, init=True, vec=adaptive)
    count_grad += 1

    while step<maxstep:
        step += 1
        print("\n--- Penalty method step", step, "---")
        y, counted_f, counted_grad = projected_grad_descent(y, a1, a2, tol=subproblem_tol, vec=adaptive)
        count_f += counted_f
        count_grad += counted_grad
        fval = objective_fval(y)
        count_f += 1
        print("Objective function:", fval)
        penalty_term_v = penalty_tval_v(y)
        print("Penalty term (velocity):", penalty_term_v)
        penalty_term_a = penalty_tval_a(y)
        print("Penalty term (acceleration):", penalty_term_a)

        if verbose:
            # KKT check
            last_Lv, last_La, last_y, last_D_L =  KKT_check(a1, a2, y, last_y, last_Lv, last_La, last_D_L, verbose=True, init=False, vec=adaptive)
            count_grad += 1

        a1_old = a1
        a2_old = a2

        if penalty_term_v+penalty_term_a<tol:
            break

        # Update penalties
        if adaptive:
            a1, a2 = update_penalties(gv_val(y), ga_val(y), eps_v=eps_v, eps_a=eps_a, a_v0=a_v0, a_a0=a_a0, k=step)
        else:
            a1 = p(a1, step)
            a2 = p(a2, step)

    if step>=maxstep:
        print("\nMaximum number of steps in Penalty method reached.")

    print("\n***************************************************")
    print("\n--- Solution found: ---\n")
    print(y)
    print("\nObjective function value:", fval)

    if verbose:
        print("\n***************************************************")
        print("\nDone after {:.2f} sec.".format(time.time() - start_time))
        print("\nObjective function evaluations:", count_f)
        print("Objective gradient evaluations:", count_grad)
        D_L = penalty_fgrad(y, a1_old, a2_old, vec=adaptive).flatten()
        dual_infeasibility = inorm(D_L)
        print("\nDual infeasibility (i.e. infinity norm of the gradient of the Lagrangian):", dual_infeasibility)
        gv_violation = inorm(np.maximum(0, gv_val(y)))
        ga_violation = inorm(np.maximum(0, ga_val(y)))
        print("\nConstraint violation (velocity):", gv_violation)
        print("Constraint violation (acceleration):", ga_violation)
        # Check velocity
        print("\nCheck velocity for every timestamp (bound was {:}):".format(C1))
        for t in range(0, N-1):
            print(velocity(y, t))
        # Check acceleration
        print("\nCheck acceleration for every timestamp (bound was {:}):".format(C2))
        for t in range(1, N-1):
            print(acceleration(y, t))

    return y

In [None]:
# Optimize
y = penalty_method(y_init)

In [None]:
# Create data for visualization/animation
x1_scale = np.linspace(l[0], u[0], 100)
x2_scale = np.linspace(l[1], u[1], 100)

z = np.zeros((N, 100, 100))
for i in range(0, 100):
    for j in range(0, 100):
        for t in range(0, N):
            z[t][j][i] = f_pointwise([t, [x1_scale[i], x2_scale[j]]])

z_max = np.max(z)
z_min = np.min(z)

x1_coordinate = []
x2_coordinate = []
for t in range(0, N):
    x1_coordinate.append(y[t][0])
    x2_coordinate.append(y[t][1])

In [None]:
### Visualization of optimal trajectory

fig = go.Figure(data=go.Contour(x=x1_scale, y=x2_scale, z=z[-1], zmin=z_min, zmax=z_max, colorscale="haline",
                                colorbar=dict(title='emission field', titleside='right'), name="emission field"))
fig.add_trace(go.Scatter(x=np.array(x1_coordinate).flatten(), y=np.array(x2_coordinate).flatten(),
                         mode="lines+markers+text", line=go.scatter.Line(color="red"), name="opt trajectory"))
fig.add_annotation(x=y[0][0], y=y[0][1], text="optimal trajectory", yshift=-10, font=dict(color="red"),
                   showarrow=False)
fig.update_layout(width = 600, height = 570)
fig.show()

In [None]:
### Animation of the optimal path and the emission field

animation_dict = {"data": [], "layout": {}, "frames": []}   

updatemenus_dict = {"buttons": [{"args": [None, {"frame": {"duration": 700, "redraw": True}, "mode": "immediate",
                                          "fromcurrent": True, "transition": {"duration": 0}}],
                                 "label": "Play",
                                 "method": "animate"},
                                {"args": [[None], {"frame": {"duration": 0, "redraw": True}, "mode": "immediate",
                                          "transition": {"duration": 0}}],
                                 "label": "Pause",
                                 "method": "animate"}],
                    "direction": "right",
                    "pad": {"r": 10, "t": 47},
                    "showactive": False,
                    "type": "buttons",
                    "x": 0.2,
                    "xanchor": "right",
                    "y": 0,
                    "yanchor": "top"}

sliders_dict = {"active": 0,
                "yanchor": "top",
                "xanchor": "left",
                "currentvalue": {"prefix": "time step: ", "visible": True, "xanchor": "right"},
                "transition": {"duration": 0},
                "pad": {"b": 10, "t": 30},
                "len": 1,
                "x": 0.2,
                "y": 0,
                "steps": []}

init_emission_field = {"type": "heatmap",
                       "x": x1_scale,
                       "y": x2_scale, 
                       "z": z[0],
                       "zmin": z_min,
                       "zmax": z_max,
                       "colorscale": "haline",
                       "colorbar": {"title": "emission field", "titleside": "right"},
                       "name": "emission field"}
animation_dict["data"].append(init_emission_field)
start_point = {"type": "scatter",
               "x": [x1_coordinate[0]],
               "y": [x2_coordinate[0]],
               "mode": "lines+markers",
               "line": {"color": "red", "width": 2},
               "name": "opt trajectory"}
animation_dict["data"].append(start_point)

for t in range(0, N):
    frame = {"data": []}
    emission_field = {"type": "heatmap",
                      "x": x1_scale,
                      "y": x2_scale, 
                      "z": z[t],
                      "zmin": z_min,
                      "zmax": z_max,
                      "colorscale": "haline",
                      "colorbar": {"title": "emission field", "titleside": "right"},
                      "name": "emission field"}
    frame["data"].append(emission_field)
    trajectory = {"type": "scatter",
                  "x": x1_coordinate[0:t+1],
                  "y": x2_coordinate[0:t+1],
                  "mode": "lines+markers",
                  "line": {"color": "red", "width": 2},
                  "name": "opt trajectory"}
    frame["data"].append(trajectory)
    frame["name"] = f'time step {t}'
    animation_dict["frames"].append(frame)
    slider_step = {"args": [[f'time step {t}'], {"frame": {"duration": 700, "redraw": True}, "mode": "immediate",
                            "transition": {"duration": 0}}],
                   "label": t,
                   "method": "animate"}
    sliders_dict["steps"].append(slider_step)
    
animation_dict["layout"] = {"width": 520,
                            "height": 555,
                            "xaxis": {"range": [l[0], u[0]]}, 
                            "yaxis": {"range": [l[1], u[1]]}, 
                            "title": "Animation of the optimal path",
                            "hovermode": "closest", 
                            "updatemenus": [updatemenus_dict], 
                            "sliders": [sliders_dict]}

animation = go.Figure(animation_dict)
animation.show()

In [None]:
animation.write_html("moving_bump_fast_animation.html")