In [None]:
%matplotlib inline

from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np

from numpy import exp, sqrt, log
from scipy.linalg import expm

In [None]:
class SmallNoise():
    '''
        Small noise toy example
    '''
    def __init__(self, name='Double well', d=1, T=1, epsilon=1.0, alpha=1.0):
        self.name = name
        self.d = d
        self.T = T
        self.epsilon = epsilon
        self.alpha = alpha
        self.B = np.sqrt(epsilon) * np.eye(self.d)
        self.X_0 = 0.1 * np.ones(self.d)

        if self.d != 1:
            print('Only implemented for d = 1.')

    def b(self, x):
        return np.zeros(x.shape)

    def sigma(self, x):
        return self.B

    def f(self, x, t):
        return np.zeros(x.shape[0]).to(device)

    def h(self, t, x, y, z):
        return 0.5 * np.sum(z**2, dim=1)

    def g(self, x):
        return 0.5 * self.alpha * ((1 - np.abs(x) / sqrt(self.alpha))**2).squeeze()


    def compute_reference_solution(self, delta_t=0.01, xb=5, nx=1000):
        self.xb = xb # range of x, [-xb, xb]
        self.nx = nx # number of discrete interval
        self.dx = 2.0 * self.xb / self.nx
        self.delta_t = delta_t
        self.N = int(np.ceil(self.T / self.delta_t))

        f = 0.0
        sigma = self.B

        x_val = np.linspace(-self.xb, self.xb, self.nx)

        # discretization of the generator
        L = np.zeros([nx, nx])

        L[0, 0] = - 2 * sigma**2 / 2 / self.dx**2 + self.b(x_val[0]) / self.dx - f
        L[0, 1] = sigma**2 / self.dx
        L[self.nx - 1, self.nx - 2] = sigma**2 / 2 / self.dx**2 - self.b(x_val[self.nx - 1]) / self.dx
        L[self.nx - 1, self.nx - 1] = - sigma**2 / self.dx**2 + sigma * self.b(x_val[self.nx - 1]) / self.dx - f

        for i in range(1, self.nx - 1):
            L[i, i - 1] = sigma**2 / 2 / self.dx**2 - self.b(x_val[i]) / self.dx
            L[i, i] = - sigma**2 / self.dx**2 + self.b(x_val[i]) / self.dx - f
            L[i, i + 1] = sigma**2 / 2 / self.dx**2

        self.psi = np.zeros([self.N + 1, self.nx])
        self.psi[self.N, :] = exp(-self.g(x_val) / self.epsilon)

        for n in range(self.N - 1, -1, -1):
            #band = - self.delta_t * np.vstack([np.append([0], np.diagonal(A, offset=1)),
            #                                   np.diagonal(A, offset=0) - N / self.T,
            #                                   np.append(np.diagonal(A, offset=1), [0])])

            #self.psi[n, :] = D.dot(solve_banded([1, 1], band, D_inv.dot(self.psi[n + 1, :])))
            self.psi[n, :] = np.linalg.solve(np.eye(self.nx) - delta_t * L, self.psi[n + 1, :]);

        self.u = np.zeros([self.N + 1, self.nx - 1])
        for n in range(self.N + 1):
            for i in range(self.nx - 1):
                self.u[n, i] = -self.B**2 * (- log(self.psi[n, i + 1]) + log(self.psi[n, i])) / self.dx
        #self.u = 2 / beta * np.gradient(np.log(self.psi), self.dx, 1)

    def v_true(self, x, t):
        i = np.floor((x + self.xb) / self.dx).astype(int)
        i = np.maximum(0, np.minimum(i, problem.nx - 1))
        n = int(np.ceil(t / self.delta_t))
        return np.array(- log(self.psi[n, i]))
    
    def v_0_true(self, x, t):
        return self.alpha * (1 - np.abs(x) / sqrt(self.alpha))**2 / (2 * (self.T + 1 - t))

    def u_true(self, x, t):
        i = np.floor((np.clip(x, -self.xb, self.xb - 2 * self.dx) + self.xb) / self.dx).astype(int)
        i = np.maximum(0, np.minimum(i, self.nx - 1))
        n = int(np.ceil(t / self.delta_t))
        return np.array(self.u[n, i])
        #return interpolate.interp1d(self.xvec, self.u)(x)[:, n]
    
    def u_0_true(self, x, t):
        return sqrt(self.alpha) * (np.sign(x) - x / sqrt(self.alpha)) / (self.T + 1 - t)

In [None]:
problem_1 = SmallNoise(epsilon=0.02)
problem_1.compute_reference_solution(delta_t=0.001, nx=3000)

problem_2 = SmallNoise(epsilon=0.2)
problem_2.compute_reference_solution(delta_t=0.001, nx=3000)

In [None]:
x_val = np.linspace(-3, 3, 1000);

fig, ax = plt.subplots(1, 2, figsize=(10, 3))
ax[0].set_title(r'$\eta = 0.02$')
ax[0].plot(x_val, problem_1.u_true(x_val, 0.0), label=r'$u^*$');
ax[0].plot(x_val, problem_1.u_0_true(x_val, 0.0), label=r'$u^0$');
ax[0].set_xlabel(r'$x$')
ax[0].legend()

ax[1].set_title(r'$\eta = 0.2$')
ax[1].plot(x_val, problem_2.u_true(x_val, 0.0), label=r'$t = 0$');
ax[1].plot(x_val, problem_2.u_0_true(x_val, 0.0), label=r'$t = 0$');
ax[1].set_xlabel(r'$x$');

#fig.savefig('img/small_noise_diffusion_u_0_vs_optimal.pdf')

### change epsilon or T variation accordingly

In [None]:
np.random.seed(42)

epsilons = np.linspace(0.01, 0.2, 20)
Ts = np.linspace(0.5, 20, 10)

RE_u = []
RE_u_0 = []
L2 = []
#
for epsilon in epsilons:
#for T in Ts:
        
    #problem = SmallNoise(epsilon=0.005, T=T)
    #problem.compute_reference_solution(delta_t=0.01, nx=1000)

    delta_t = 0.001
    d = 1
    K = 5000000
    
    problem = SmallNoise(epsilon=epsilon, T=1)
    problem.compute_reference_solution(delta_t=0.001, nx=5000)

    #delta_t = 0.001
    #d = 1
    #K = 100000

    verbose = True

    sq_delta_t = np.sqrt(delta_t)
    N = int(problem.T / delta_t)
    
    #X_trail_u = np.zeros([K, N + 1, d])
    #X_trail_u_0 = np.zeros([K, N + 1, d])

    X = 0.1 * np.ones([d, K])
    X_u = 0.1 * np.ones([d, K])
    X_u_0 = 0.1 * np.ones([d, K])
    ito_int_u = np.zeros(K)
    riemann_int_u = np.zeros(K)
    ito_int_u_0 = np.zeros(K)
    riemann_int_u_0 = np.zeros(K)
    
    L2_error = np.zeros(K)

    for n in range(N + 1):
        xi = np.random.randn(d, K)

        X = X + problem.B.dot(xi) * sq_delta_t

        ut = problem.u_true(X_u, n * delta_t)
        X_u = X_u + ut * delta_t + problem.B.dot(xi) * sq_delta_t
        ito_int_u += np.sum(ut / problem.B * xi, 0) * sq_delta_t
        riemann_int_u += np.sum((ut / problem.B)**2, 0) * delta_t

        u_0t = problem.u_0_true(X_u_0, n * delta_t)
        X_u_0 = X_u_0 + u_0t * delta_t + problem.B.dot(xi) * sq_delta_t
        ito_int_u_0 += np.sum(u_0t / problem.B * xi, 0) * sq_delta_t
        riemann_int_u_0 += np.sum((u_0t / problem.B)**2, 0) * delta_t
    
        L2_error += (ut[0, :] - u_0t[0, :])**2 * delta_t
        
        #X_trail_u[:, n, :] = X_u.T
        #X_trail_u_0[:, n, :] = X_u_0.T

    girsanov_u = np.exp(- ito_int_u - 0.5 * riemann_int_u)
    girsanov_u_0 = np.exp(- ito_int_u_0 - 0.5 * riemann_int_u_0)
    
    stats = {}
    stats['naive'] = {}
    stats['IS'] = {}
    stats['IS u_0'] = {}
    stats['naive']['mean'] = np.mean(np.exp(- problem.g(X) / problem.epsilon))
    stats['naive']['variance'] = np.var(np.exp(- problem.g(X) / problem.epsilon))
    stats['naive']['relative_error'] = np.sqrt(stats['naive']['variance']) / stats['naive']['mean']
    stats['IS']['mean'] = np.mean(np.exp(- problem.g(X_u) / problem.epsilon) * girsanov_u)
    stats['IS']['variance'] = np.var(np.exp(- problem.g(X_u) / problem.epsilon) * girsanov_u)
    stats['IS']['relative_error'] = np.sqrt(stats['IS']['variance']) / stats['IS']['mean']
    stats['IS u_0']['mean'] = np.mean(np.exp(- problem.g(X_u_0) / problem.epsilon) * girsanov_u_0)
    stats['IS u_0']['variance'] = np.var(np.exp(- problem.g(X_u_0) / problem.epsilon) * girsanov_u_0)
    stats['IS u_0']['relative_error'] = np.sqrt(stats['IS u_0']['variance']) / stats['IS u_0']['mean']
    
    RE_u.append(stats['IS']['relative_error'])
    RE_u_0.append(stats['IS u_0']['relative_error'])
    L2.append(np.mean(L2_error))
    
    t = 0.1
    fig, ax = plt.subplots(1, 2, figsize=(10, 3.5))

    x_val = np.linspace(-3, 3, 100);

    ax[0].plot(x_val, problem.v_true(x_val, t))
    ax[0].plot(x_val, problem.v_0_true(x_val, t));
    ax[1].plot(x_val, problem.u_true(x_val, t));
    ax[1].plot(x_val, problem.u_0_true(x_val, t));

    if verbose:
        print('epsilon: %.2e' % problem.epsilon)
        print('naive mean:         %.4e, naive variance: %.4e, naive RE: %.4e' % (stats['naive']['mean'], stats['naive']['variance'], stats['naive']['relative_error']))
        print('IS optimal mean:    %.4e, IS variance:    %.4e, IS RE:    %.4e' % (stats['IS']['mean'], stats['IS']['variance'], stats['IS']['relative_error']))
        print('IS suboptimal mean: %.4e, IS variance:    %.4e, IS RE:    %.4e' % (stats['IS u_0']['mean'], stats['IS u_0']['variance'], stats['IS u_0']['relative_error']))
        print('true mean:  %.4e' % np.exp(-problem.v_true(0.1 * np.ones(1), 0)))

In [None]:
#L2_eps = L2
RE_u_0_T = RE_u_0
#RE_u_0_eps = RE_u_0

In [None]:
end_eps = 15

fig, ax = plt.subplots(1, 3, figsize=(15, 3.5))
#ax[0].plot(epsilons[:end_eps], exp(np.array(L2_eps[:end_eps])));
#ax[0].set_title(r'Exponential of $L^2$ error');
#ax[0].set_xlabel(r'$\eta$');
#ax[1].plot(epsilons[:end_eps], RE_u_0_eps[:end_eps]);
#ax[1].set_title(r'Relative error depending on $\eta$')
#ax[1].set_xlabel(r'$\eta$');
ax[2].plot(Ts[:8], RE_u_0_T[:8]);
ax[2].set_title(r'Relative error depending on time $T$')
ax[2].set_xlabel(r'$T$');

#fig.savefig('img/small_noise_RE_eta_T_3.pdf')