# Control strategies for Fokker-Planck equation

The Fokker-Planck equation studied is 
$$
\rho_t = \nabla \cdot (\nu \nabla \rho + \rho \nabla V), x \in \Omega, t \ge 0,
$$
where $J(x,t) = \nu \nabla \rho(x,t) + \rho(x,t) \nabla V(x,t)$ is the probability flux and 
$$
V(x,t) = G(x) + \alpha(x) u(t),
$$
where $G$ is the potential function, $\alpha$ is the control shape function and $u$ is the time-dependent control function.
Moreover, the probability flux is zero in the boundary, that is
$$
(\nu \nabla \rho(x,t) + \rho(x,t) \nabla G(x)) \cdot \vec{n} = 0, \nabla \alpha \cdot \vec{n} = 0, x \in \partial \Omega, t \ge 0.
$$

In [1]:
import numpy as np

from scipy.integrate import quad, solve_ivp, ode, solve_bvp
from scipy.stats import norm, truncnorm, beta
from scipy.linalg import solve_banded, solve_continuous_are
from scipy.special import legendre
from scipy.interpolate import lagrange, interp1d
from scipy.optimize import newton

import matplotlib.pyplot as plt
from tqdm import tqdm

In [2]:
class ControlFPequation:
    
    def __init__(self, b_func, c_func, parameters) -> None:
        
        self.b = b_func
        self.c = c_func
        
        self.v = parameters['v']
        self.alpha_Q = parameters['alpha_Q']
        self.alpha_Omega = parameters['alpha_Omega']
        self.beta = parameters['beta']
        self.gamma = parameters['gamma']
        self.T = parameters['T']
        self.p0 = parameters['p_0']
        self.pQ = parameters['p_Q']
        self.pOmega = parameters['p_Omega']
        
        self.Lambda = None
        self.B = None
        self.C = None
        self.D = None
        self.E = None
        
    def _calculate_Lambda(self, n_f):
        Lambda = self.v*np.array([[np.sqrt((i+0.5)*(j+0.5))*min(i,j)*(min(i,j)+1)*((i+j)%2==0) for j in range(n_f+1)] 
                                   for i in range(n_f+1)])
        return Lambda
    
    def _calculate_C(self, n_f):
        C = np.zeros((n_f+1, n_f+1))
        for i in range(n_f+1):
            for j in range(n_f+1):
                C[i,j] = quad(func=lambda x: self.c(x)*self.legendre[j](x)*self.legendre_diff[i](x), a=-1, b=1)[0]
        return C
    
    def _calculate_B(self, n_f, lim_inf, lim_sup):
        B = np.zeros((n_f+1, n_f+1))
        for i in range(n_f+1):
            for j in range(n_f+1):
                integrate = np.polyint(self.legendre[j]*self.legendre_diff[i])
                B[i,j] = integrate(lim_sup) - integrate(lim_inf)
        return B
    
    def _calculate_D(self, n_f):
        cl = self.c(-1)
        cu = self.c(1) 
        D = np.zeros((n_f-1, n_f+1))
        for k in range(n_f-1):
            M = np.array([
                [np.sqrt(k+1.5)*(self.v*(k+1)*(k+2) - 2*cl), np.sqrt(k+2.5)*(-self.v*(k+2)*(k+3) + 2*cl)],
                [np.sqrt(k+1.5)*(self.v*(k+1)*(k+2) + 2*cu), np.sqrt(k+2.5)*(self.v*(k+2)*(k+3) + 2*cu)],
            ])
            m = np.array([np.sqrt(k+0.5)*(self.v*k*(k+1) - 2*cl), np.sqrt(k+0.5)*(-self.v*k*(k+1) - 2*cu)])
            D[k,k] = 1
            D[k,[k+1,k+2]] = np.linalg.solve(M, m)
        return D
        
    def _calculate_E(self, n_f):
        E = np.zeros((n_f-1, n_f+1))
        for k in range(n_f-1):
            M = np.array([
                [-np.sqrt(k+1.5)*(k+1)*(k+2), np.sqrt(k+2.5)*(k+2)*(k+3)],
                [np.sqrt(k+1.5)*(k+1)*(k+2), np.sqrt(k+2.5)*(k+2)*(k+3)],
            ])
            m = np.array([-np.sqrt(k+0.5)*k*(k+1), -np.sqrt(k+0.5)*k*(k+1)])
            E[k,k] = 1
            E[k,[k+1,k+2]] = np.linalg.solve(M, m)
        return E
    
    def _find_function_coefficients(self, n_f, func):
        coef = np.zeros(n_f+1)
        for i in range(n_f+1):
            coef[i] = quad(func=lambda x: func(x)*self.legendre[i](x), a=-1, b=1)[0]
        return coef

After the pre-calculation step, we can define the solving function.

In [85]:
def _solve_ode_equations(self, u_k, n_f, N_t, lim1=None, lim2=None):

    h_t = self.T/N_t
    self.t = np.linspace(0,self.T,N_t+1)

    if self.Lambda is None:
        self.legendre = [(k+0.5)**(0.5)*legendre(k) for k in range(n_f+1)]
        self.legendre_diff = [np.polyder(poly, 1) for poly in self.legendre]
        self.Lambda = self._calculate_Lambda(n_f)
        self.C = self._calculate_C(n_f)
        self.y0 = self._find_function_coefficients(n_f, self.p0)
        self.yQ = self._find_function_coefficients(n_f, self.pQ)
        self.y_omega = self._find_function_coefficients(n_f, self.pOmega)
        
    if self.B is None:
        self.B = self._calculate_B(n_f, lim1, lim2)
        
    u = interp1d(self.t, u_k)
                
    fun = lambda t, x: np.vstack([-(self.Lambda + self.C)@x[:n_f+1, :] - self.B@x[:n_f+1, :]@np.diag(u(t)),
                                  (self.Lambda + self.C.T)@x[n_f+1:, :] + self.B.T@x[n_f+1:, :]@np.diag(u(t)) -\
                                   self.alpha_Q*(x[:n_f+1,:].T - self.yQ).T])

    bc = lambda ya, yb: np.hstack([ya[:n_f+1] - self.y0, yb[n_f+1:] - self.alpha_Omega*(yb[:n_f+1] - self.y_omega)])
                
    sol = solve_bvp(fun=fun, bc=bc, x=self.t, y=np.zeros((2*(n_f+1), N_t+1)))
    rho_vec, p_vec = sol.y[:n_f+1, :], sol.y[n_f+1:, :]
    
    return rho_vec, p_vec

def _solve_ode_equations_v2(self, u_k, n_f, N_t, lim1=None, lim2=None):

    h_t = self.T/N_t

    if self.Lambda is None:
        self.legendre = [(k+0.5)**(0.5)*legendre(k) for k in range(n_f+1)]
        self.legendre_diff = [np.polyder(poly, 1) for poly in self.legendre]
        self.Lambda = self._calculate_Lambda(n_f)
        self.C = self._calculate_C(n_f)
        self.y0 = self._find_function_coefficients(n_f, self.p0)
        self.yQ = self._find_function_coefficients(n_f, self.pQ)
        self.y_omega = self._find_function_coefficients(n_f, self.pOmega)
        
    if self.B is None:
        self.B = self._calculate_B(n_f, lim1, lim2)
        
    u = interp1d(self.t, u_k, fill_value='extrapolate')
        
    # Solve for rho
    rho_vec = np.zeros((n_f+1, N_t+1))
    rho_vec[:,0] = self.y0
    sol = ode(lambda t,y: -(self.Lambda+self.C+u(t)*self.B)@y)
    sol.set_integrator('dopri5')
    sol.set_initial_value(self.y0, 0.0)
    for i in range(1, N_t+1):
        rho_vec[:,i] = sol.integrate(i*h_t)
        
    rho = interp1d(self.t, rho_vec, fill_value='extrapolate')
    
    # Solve for p
    p_vec = np.zeros((n_f+1, N_t+1))
    p_vec[:,0] = self.alpha_Omega*(rho_vec[:,-1] - self.y_omega)
    sol = ode(lambda t,y: -(self.Lambda+self.C.T+u(self.T-t)*self.B.T)@y + self.alpha_Q*(rho(self.T-t) - self.yQ))
    sol.set_integrator('dopri5')
    sol.set_initial_value(p_vec[:,0], 0.0)
    for i in range(1, N_t+1):
        p_vec[:,i] = sol.integrate(i*h_t)
    p_vec = np.flip(p_vec, axis=1) 
    
    return rho_vec, p_vec

def _calculate_gradient(self, N_t, u_k, rho_k, p_k):
    grad = self.T*(self.gamma * u_k + self.beta - np.array([p_k[:,i]@self.B@rho_k[:,i] for i in range(N_t+1)]))/N_t
    grad[[1,-1]] /= 2 
    return grad

ControlFPequation._solve_ode_equations = _solve_ode_equations
ControlFPequation._solve_ode_equations_v2 = _solve_ode_equations_v2
ControlFPequation._calculate_gradient = _calculate_gradient

Finally we perform the optimal control finding.

In [115]:
def gradient_descent(self, tol, k_max, u_old, u_new, n_f, N_t, lim1, lim2):
    
    k = 0
    rho_vec, p_vec = self._solve_ode_equations(u_old, n_f, N_t, lim1, lim2)
    gradient_old = self._calculate_gradient(N_t, u_old, rho_vec, p_vec)
    gradient_new = np.ones_like(gradient_old)
    while (max(abs(gradient_new)) > tol) and (k < k_max):
        rho_vec, p_vec = self._solve_ode_equations_v2(u_new, n_f, N_t, lim1, lim2)
        gradient_new = self._calculate_gradient(N_t, u_new, rho_vec, p_vec)
        alpha_k = np.dot(u_new - u_old, 
                         gradient_new - gradient_old)/np.dot(gradient_new - gradient_old, gradient_new - gradient_old)
        u_old, u_new = u_new, u_new - alpha_k * gradient_new
        gradient_old = gradient_new
        k += 1
        print("Done - Iteration {}, ||Grad F(u)||_max = {}".format(k, max(abs(gradient_new))))
        
    rho_vec, p_vec = self._solve_ode_equations_v2(u_new, n_f, N_t, lim1, lim2)
    return rho_vec, p_vec, u_new

def solve_problem(self, tol, k_max, u0, u1, n_f, N_t, lim1=-0.99, lim2=0.99):
    
    rho_vec, p_vec, control = self.gradient_descent(tol, k_max, u0, u1, n_f, N_t, lim1, lim2)
    
    cost = self.compute_cost(N_t, control, rho_vec)
    
    if self.D is None:
        self.D = self._calculate_D(n_f)
        self.E = self._calculate_E(n_f)
    
    rho_vec = np.linalg.solve(self.D.T[:-2,:], rho_vec[:-2,:])
    p_vec = np.linalg.solve(self.E.T[:-2,:], p_vec[:-2,:])
    
    return cost, rho_vec, p_vec, control
    
def discretise_space_time(self, rho_vec, p_vec, n_f, N_x):
    
    phi_matrix = np.zeros((n_f-1, N_x+1))
    p_matrix = np.zeros((n_f-1, N_x+1))
    x = np.arange(-1.0, 1.0+1.8/N_x, 2/N_x)
    for k in range(n_f-1):
        phi_matrix[k,:] = self.legendre[k](x) + self.D[k,k+1]*self.legendre[k+1](x) + self.D[k,k+2]*self.legendre[k+2](x)
        p_matrix[k,:] = self.legendre[k](x) + self.E[k,k+1]*self.legendre[k+1](x) + self.E[k,k+2]*self.legendre[k+2](x)
    rho = rho_vec.T @ phi_matrix
    p = p_vec.T @ p_matrix
    
    return rho, p

def compute_cost(self, N_t, control, rho_vec):
    cost = 0.5*self.gamma*control[0]**2 + self.beta*control[0] + 0.5*self.alpha_Q*np.sum((rho_vec[:,0] - self.yQ)**2)
    cost += 0.5*self.gamma*control[-1]**2 + self.beta*control[-1] + 0.5*self.alpha_Q*np.sum((rho_vec[:,-1] - self.yQ)**2)
    for i in range(1,N_t):
        cost += self.gamma*control[i]**2 + 2*self.beta*control[i] + self.alpha_Q*np.sum((rho_vec[:,i] - self.yQ)**2)
    cost *= self.T/N_t
    cost += self.alpha_Omega * np.sum((rho_vec[:,-1] - self.y_omega)**2)
    cost *= 0.5
    return cost
        
ControlFPequation.gradient_descent = gradient_descent
ControlFPequation.solve_problem = solve_problem
ControlFPequation.compute_cost = compute_cost
ControlFPequation.discretise_space_time = discretise_space_time

Let's try out.

In [124]:
N_t = 1000
N_x = 500

c_func = lambda x: 2*x
b_func = lambda x: 1*(x > -0.9)*(x < 0.9)
p_0 = lambda x: 0.5*beta(5,5).pdf(x/2+1/2)
p_Q = lambda x: 0.5*beta(7,7).pdf(x/2+1/2)
c = 1/quad(lambda x: np.exp(-x**2/parameters['v']), a=-1, b=1)[0]
p_infty = lambda x: c*np.exp(-x**2/parameters['v'])
interval = [-1.0, 1.0]
parameters = {'v': 0.1, 'T': 10.0, 'p_0': p_0, 'p_Q': p_Q, 'p_Omega': p_Q, 'interval': interval, 
              'gamma': 0.001, 'beta': 0.0, 'alpha_Q': 1.0, 'alpha_Omega': 1.0}
u0 = np.random.random(N_t+1)
u1 = np.random.random(N_t+1)
                     
FP_equation = ControlFPequation(b_func, c_func, parameters)
cost, rho_vec, p_vec, control = FP_equation.solve_problem(tol=1e-8, k_max=50, u0=u0, u1=u1, 
                                                          n_f=15, N_t=N_t, lim1=-0.9, lim2=0.9)
rho, p = FP_equation.discretise_space_time(rho_vec, p_vec, n_f=15, N_x=N_x)

Done - Iteration 1, ||Grad F(u)||_max = 0.016993444500582575
Done - Iteration 2, ||Grad F(u)||_max = 0.12384807831271523
Done - Iteration 3, ||Grad F(u)||_max = 0.009905587759630546
Done - Iteration 4, ||Grad F(u)||_max = 0.019103319015383816
Done - Iteration 5, ||Grad F(u)||_max = 0.005438272692091445
Done - Iteration 6, ||Grad F(u)||_max = 0.006185127382276683
Done - Iteration 7, ||Grad F(u)||_max = 0.003380373101870817
Done - Iteration 8, ||Grad F(u)||_max = 0.003125851601987393
Done - Iteration 9, ||Grad F(u)||_max = 0.0018422520460208347
Done - Iteration 10, ||Grad F(u)||_max = 0.0017623075478903677
Done - Iteration 11, ||Grad F(u)||_max = 0.0012269358454934612
Done - Iteration 12, ||Grad F(u)||_max = 0.0017380492151882563
Done - Iteration 13, ||Grad F(u)||_max = 0.001060391190452036
Done - Iteration 14, ||Grad F(u)||_max = 0.0010832113682601213
Done - Iteration 15, ||Grad F(u)||_max = 0.0010184777097385716
Done - Iteration 16, ||Grad F(u)||_max = 0.0007002914657266729
Done - Iter

KeyboardInterrupt: 

In [None]:
x = np.linspace(-1,1,501)
plt.plot(x, rho[0,:], label='Initial')
plt.plot(x, rho[-1,:], label='Final')
plt.plot(x, FP_equation.pOmega(x), label='Desired')
plt.plot(x, p_infty(x), label='Steady')
plt.legend()
plt.show()

In [None]:
plt.plot(FP_equation.t, control)