In [None]:
from math import pi

import numpy as np
import numpy.linalg as npl

import scipy as sp
import scipy.linalg as spl
import scipy.sparse as sps
import scipy.sparse.linalg as sspl

import matplotlib.pyplot as plt

import holoviews as hv
import param
import panel as pn

hv.extension('bokeh')
import bokeh.plotting as bkplt

from tqdm import tqdm

In [None]:
kappa = 1
s = 1
r_max = 1
sigma = 0.5
theta = 4.5
N = 100
a = 0
b = 10

In [None]:
def get_A(dz, N):
    N = N - 2
    tmp = np.ones((N - 1))
    diag = -2 * np.ones(N)
    diag[0] /= 2
    diag[-1] /= 2
    return sps.diags((tmp, diag, tmp),(-1, 0, 1)) * sigma**2 / (dz**2)

def source_normal(Z, mu=5, sigma=sigma):    
    return (.5 / (2 * pi * sigma**2)**.5) * np.exp(-0.5 * ((Z - mu)**2) / sigma**2)

def source_uniform(Z, mid=5, half_dist=sigma):
    height = 1 / (2 * half_dist)
    U = np.where(np.abs(Z - mid) < half_dist, 0, height / 2)
    
    return U

source_types = {'normal': source_normal, 'uniform': source_uniform}

def get_r(Z, rmax, s, theta):
    return rmax - s * (Z - theta)**2

def get_rho(U, dz=None):
    if dz is None:
        dz = (b - a) / N
    
    return dz * ((U[..., 0] + U[..., -1]) / 2 + np.sum(U[..., 1:-1], axis=-1))
#     return dz*(U[0]/2 + U[-1]/2 + np.sum(U[1:-1]))

def solve_explicit(A, U, Z, rho, dz, dt, theta, s):
    r = get_r(Z, r_max, s, theta)
    tmp1 = sps.eye(A.shape[0])
    tmp2 = dt * A
    tmp3 = (dt * (r - kappa * rho))
    U_next = (tmp1 + tmp2).dot(U) + (U * tmp3)
    return U_next

def solve_implicit(A, U, Z, rho, dz, dt, theta, s):
    r = get_r(Z, r_max, s, theta)
    FU = F(U, r, rho)
    
    lhs = (np.eye(A.shape[0]) - dt * A)
    rhs = U + dt * FU
    return sspl.cg(lhs, rhs)[0]

def F(U, r, rho):
    return U * (r - kappa * rho)

def solve_splitting(A, U, Z, rho, dz, dt, theta, s):
    tempU1 = sspl.cg(np.eye(A.shape[0]) - dt / 2 * A, U)[0]
    
    r = get_r(Z, r_max, s, theta)
    
    tempU2 = tempU1 + dt * F(tempU1, r, rho)
    return sspl.cg(np.eye(A.shape[0]) - dt / 2 * A, tempU2)[0]

In [None]:
def mean(Z, U):
    return np.sum(U * Z, axis=-1) / np.sum(U, axis=-1)

def var(Z, U):
    mu = mean(Z, U)
    return np.sum(U * Z**2, axis=-1) / np.sum(U, axis=-1) - mu**2 

In [None]:
def get_noise(nbT, noise_type='normal', noise_std=.5):
    if  noise_type is 'normal':
        NOISE = np.random.normal(loc=0., scale=noise_std, size=nbT + 1)
    elif noise_type is 'uniform':
        NOISE = np.random.uniform(low=-noise_std, high=noise_std, size=nbT + 1)
    elif noise_type is 'discrete':
        noise_values = np.linspace(start=-noise_std, stop=noise_std, num=5)
        NOISE = np.random.choice(noise_values, size=nbT + 1)
    
    accumulate = True
    if accumulate:
        NOISE = np.cumsum(NOISE)
    
    return NOISE

def get_theta(nbT, **theta_args):
    THETA = np.ones(nbT + 1) * theta_args['theta_value']
    
    if theta_args['theta_moving']:
        MOVING = np.arange(nbT + 1) * theta_args['theta_speed']
        THETA += MOVING
    
    if theta_args['theta_noise']:
        noise_type = theta_args.get('theta_noise_type', 'normal')
        noise_std = theta_args.get('theta_noise_std', 0.5)
        
        NOISE = get_noise(nbT, noise_type, noise_std)
        THETA += NOISE
        
    return THETA

def get_s(nbT, **s_args):
    S = np.ones(nbT + 1) * s_args['s_value']
    
    if s_args['s_noise']:
        noise_type = s_args.get('s_noise_type', 'normal')
        noise_std = s_args.get('s_noise_std', 0.5)
        
        NOISE = get_noise(nbT, noise_type, noise_std)
        S += NOISE
        
    return np.clip(a=S, a_min=1e-3, a_max=None)

In [None]:
def solve(solver, dt, T, theta_args, s_args, source_args):
    dz = (b - a) / N
    nbT = int(T / dt)
    
    A = get_A(dz, N)
    
    Z = np.linspace(a, b, N)
    U = np.empty(N)
    
    # Initialisation of U_0
    source_type = source_args.get('source_type', 'normal')
    source = source_types[source_type]
        
    U[1:-1] = np.copy(source(Z[1:-1], source_args['source_mu'], source_args['source_std']))
    U[0] = U[1]
    U[-1] = U[-2]
    
    # Arrays that saves the state of the system through time
    Us = np.empty((nbT + 1, N))
    RHO = np.empty(nbT + 1)
    
    THETA = get_theta(nbT, **theta_args)
    S = get_s(nbT, **s_args)
    
    for i in tqdm(range(0, nbT + 1)):
        rho = get_rho(U, dz)
        
        Us[i] = np.copy(U)
        
        U[1:-1] = solver(A, U[1:-1],  Z[1:-1], rho, dz, dt, THETA[i], S[i])
        U[0] = U[1]
        U[-1] = U[-2]
        
    return Z, np.array(Us), THETA

In [None]:
def plot_2D(Us, dt, T, dynamic=True, log_plot=False, nb_images=50, THETA=None):
    Z = np.linspace(a, b, N)
    time_step = Us.shape[0] // nb_images
    
    if log_plot:
        range_plot = np.unique(np.logspace(start=0, stop=np.log10(Us.shape[0] - 1), num=nb_images, endpoint=True, dtype=int))
        range_plot = np.concatenate(([0], range_plot))
    else: 
        range_plot = np.linspace(start=0, stop=Us.shape[0] - 1, num=nb_images, endpoint=True, dtype=int)
    
    if THETA is not None and dynamic:
        curve_dict = {round(dt * i, ndigits=2): hv.Curve((Z, Us[i])) * hv.VLine(THETA[i]).opts(color='orange') for i in range_plot}
    else:
        curve_dict = {round(dt * i, ndigits=2): hv.Curve((Z, Us[i])) for i in range_plot}
    
    if dynamic:
#         curve_dict = {i: hv.Curve((Z, Us[i])).opts(title=f'Approximated solution at t = {dt * i:.1f}') for i in range(0, Us.shape[0], time_step)}
        plot = hv.HoloMap(curve_dict, kdims='Time')
    
    else:
        plot = hv.NdOverlay(curve_dict, kdims='Time').opts(legend_position='right')
        
    return hv.output(plot.opts(width=600, toolbar=None))
    
def plot_params(Z, Us, THETA):
    RHO = get_rho(Us)
    MEAN = mean(Z, Us)
    VAR = var(Z, Us)
    
    options = {'shared_axes': False, 'toolbar': None}
        
    curves = []
    
    curves += [hv.NdOverlay({theta_name: hv.Curve(VALUE, 'Iteration', 'Theta').opts(alpha=alpha) for VALUE, theta_name, alpha in zip(
        [MEAN, THETA], ['Current', 'Optimal'], [1., .5]
    )}, kdims='Thetas').opts(title='Evolution of theta (mean)')]
    
    curves += [hv.Curve(VALUE, 'Iteration', val_name.capitalize()).opts(title=f'Evolution of {val_name}') for VALUE, val_name in zip(
        [VAR, RHO], ['variance', 'population']
    )]
    
    
    return hv.output(hv.Layout(curves).opts(width=900, **options))

## Explicit

In [None]:
N = 100
a, b = [0, 10]
dt, T = 0.02, 20

source_args = {'source_type': 'normal', 'source_mu': 5, 'source_std': sigma}
theta_args = {'theta_value': 4.5, 'theta_moving': False, 'theta_speed': 0.001, 
              'theta_noise': True, 'theta_noise_type': 'normal', 'theta_noise_std': 0.05}
s_args = {'s_value': s, 's_noise': False, 's_noise_type': 'normal', 's_noise_std': 1}

Z, Us, THETA = solve(solve_implicit, dt, T, theta_args, s_args, source_args)

plot_2D(Us, dt, T, dynamic=True, log_plot=False, nb_images=50, THETA=THETA)
plot_params(Z, Us, THETA)

## Implicit

In [None]:
N = 100
a, b = [0, 10]
dt, T = 0.02, 30

source_args = {'source_type': 'normal', 'source_mu': 5, 'source_std': sigma}
theta_args = {'theta_value': 3, 'theta_moving': False, 'theta_speed': 0.001, 
              'theta_noise': True, 'theta_noise_type': 'uniform', 'theta_noise_std': 0.7}
s_args = {'s_value': s, 's_noise': False, 's_noise_type': 'normal', 's_noise_std': 0.2}

Z, Us, THETA = solve(solve_implicit, dt, T, theta_args, s_args, source_args)

plot_2D(Us, dt, T, dynamic=False, log_plot=True, nb_images=50, THETA=THETA)
plot_params(Z, Us, THETA)

## Splitting

In [None]:
N = 150
a, b = [-5, 10]
dt, T = 0.01, 20

source_args = {'source_type': 'normal', 'source_mu': 5, 'source_std': sigma}
theta_args = {'theta_value': 5, 'theta_moving': True, 'theta_speed': -0.003, 
              'theta_noise': True, 'theta_noise_type': 'normal', 'theta_noise_std': 0.5}
s_args = {'s_value': s, 's_noise': False, 's_noise_type': 'normal', 's_noise_std': 0.2}

Z, Us, THETA = solve(solve_implicit, dt, T, theta_args, s_args, source_args)

plot_2D(Us, dt, T, dynamic=True, log_plot=False, nb_images=50, THETA=THETA)
plot_params(Z, Us, THETA)