#### Consider the linear SDE:

$$ \mathrm{d}X(t) = \mu X(t) \mathrm{d}t + \sigma X(t) \mathrm{d}W(t), X(0) = X_0$$ 

#### where $\mu, \sigma$ are real constants.
#### The exact solution to this SDE is

$$ X(t) = X(0) \exp \left ( (\mu - \frac{1}{2} \sigma^2) t + \sigma W(t) \right )$$

In [None]:
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(100)

In [None]:
mu = 1
sigma = 0.1
X_0 = 1
T = 0.1
N = 2**10

dt = float(T) / N
t = np.linspace(0, T, N+1)


In [None]:
def EM_solver(R, dt, dW, N, X_ref):
    Dt = R * dt
    L = N // R
    X_approx = np.zeros(L + 1)
    X_approx[0] = X_ref[0]

    for j in range(1, L+1):
        W_acc = np.sum(dW[0][range(R*(j-1), R*j)])
        X_approx[j] = X_approx[j-1] + mu * X_approx[j-1] * Dt + sigma * X_approx[j-1] * W_acc

    X_L = X_approx[-1]
    X_T = X_ref[-1]
    
    err = np.abs(X_L - X_T)   
    # err = np.abs(X_approx - X_ref[::R])   
    # print("Error: {:.4e}".format(err))
    
    return err, X_approx, X_L, X_T
    

In [None]:
def Milstein_solver(R, dt, dW, N, X_ref):
    Dt = R * dt
    L = N // R
    X_approx = np.zeros(L + 1)
    X_approx[0] = X_ref[0]

    for j in range(1, L+1):
        W_acc = np.sum(dW[0][range(R*(j-1), R*j)])
        X_approx[j] = X_approx[j-1] + mu * X_approx[j-1] * Dt + sigma * X_approx[j-1] * W_acc + 0.5 * sigma**2 * X_approx[j-1] * (W_acc**2 - Dt)

    X_L = X_approx[-1]
    X_T = X_ref[-1]
    
    err = np.abs(X_L - X_T)   
    # err = np.abs(X_approx - X_ref[::R])   
    # print("Error: {:.4e}".format(err))
    
    return err, X_approx, X_L, X_T
    

In [None]:
def RK_solver(R, dt, dW, N, X_ref):
    Dt = R * dt
    L = N // R
    X_approx = np.zeros(L + 1)
    X_approx[0] = X_ref[0]

    for j in range(1, L+1):
        W_acc = np.sum(dW[0][range(R*(j-1), R*j)])
        X_hat = X_approx[j-1] + sigma * X_approx[j-1] * Dt ** 0.5
        X_approx[j] = X_approx[j-1] + mu * X_approx[j-1] * Dt + sigma * X_approx[j-1] * W_acc + 0.5 / Dt ** 0.5 * sigma * (X_hat - X_approx[j-1]) * (W_acc**2 - Dt)

    X_L = X_approx[-1]
    X_T = X_ref[-1]
    
    err = np.abs(X_L - X_T)   
    # err = np.abs(X_approx - X_ref[::R])   
    # print("Error: {:.4e}".format(err))
    
    return err, X_approx, X_L, X_T

In [None]:
def order(R, X_0 = X_0, dt = dt, N = N, solver = EM_solver):

    dW = np.sqrt(dt) * np.random.randn(1, N)
    W = np.cumsum(dW)

    X_ref = X_0 * np.exp((mu - 0.5*sigma**2)*t[1:] + sigma*W)
    X_ref = np.insert(X_ref, obj = 0, values = X_0)

    err, X_approx, X_L, X_T = solver(R, dt, dW, N, X_ref)

    return err, X_approx, X_L, X_T


In [None]:
R_list = [2**0, 2**1, 2**2, 2**3, 2**4]
MC = 5 * 10**3

In [None]:
# Euler-Maruyama scheme

Error = np.zeros((MC, len(R_list)))

XL = np.zeros((MC, len(R_list)))
XT = np.zeros((MC, len(R_list)))
for i in range(MC):
    for j in range(len(R_list)):
        err, _, X_L, X_T = order(R = R_list[j], X_0 = X_0, dt = dt, N = N, solver = EM_solver)
        Error[i, j] = err
        XL[i, j] = X_L
        XT[i, j] = X_T

    
Strong_Error_list = np.mean(Error, axis = 0)

XL_list = np.mean(XL, axis = 0)
XT_list = np.mean(XT, axis = 0)

Weak_Error_list = np.abs(XL_list - XT_list)

In [None]:
# strong convergence
dt_list = [dt*2**i for i in range(len(R_list))]
sqrt_dt_list = [dt**0.5 for dt in dt_list]
plt.loglog(dt_list, Strong_Error_list, "k", label = "order of Euler-Maruyama scheme (strong)")
plt.loglog(dt_list, sqrt_dt_list, "-.", label = "reference line: slope = 0.5")
plt.xlabel(r"$\Delta t$")
plt.ylabel(r"$E[|x(T) - \hat{X}_L|]$")
plt.legend()
plt.savefig("./Euler-Maruyama_strong_convergence.pdf")
plt.grid()

In [None]:
# weak convergence
dt_list = [dt*2**i for i in range(len(R_list))]
sqrt_dt_list = [dt**1 for dt in dt_list]
plt.loglog(dt_list, Weak_Error_list, "k", label = "order of Euler-Maruyama scheme (weak)")
plt.loglog(dt_list, dt_list, "-.", label = "reference line: slope = 1")
plt.xlabel(r"$\Delta t$")
plt.ylabel(r"$| E[\hat{x}_L] - E[x(T)] |$")
plt.legend()
plt.savefig("./Euler-Maruyama_weak_convergence.pdf")
plt.grid()

In [None]:
# Milstein scheme

Error = np.zeros((MC, len(R_list)))

XL = np.zeros((MC, len(R_list)))
XT = np.zeros((MC, len(R_list)))
for i in range(MC):
    for j in range(len(R_list)):
        err, _, X_L, X_T = order(R = R_list[j], X_0 = X_0, dt = dt, N = N, solver = Milstein_solver)
        Error[i, j] = err
        XL[i, j] = X_L
        XT[i, j] = X_T

    
Strong_Error_list = np.mean(Error, axis = 0)

XL_list = np.mean(XL, axis = 0)
XT_list = np.mean(XT, axis = 0)

Weak_Error_list = np.abs(XL_list - XT_list)


In [None]:
# strong convergence
dt_list = [dt*2**i for i in range(len(R_list))]
plt.loglog(dt_list, Strong_Error_list, "k", label = "order of Milstein scheme (strong)")
plt.loglog(dt_list, dt_list, "-.", label = "reference line: slope = 1.0")
plt.xlabel(r"$\Delta t$")
plt.ylabel(r"$E[|x(T) - \hat{X}_L|]$")
plt.legend()
plt.savefig("./Milstein_strong_convergence.pdf")
plt.grid()

In [None]:
# weak convergence
dt_list = [dt*2**i for i in range(len(R_list))]
sqrt_dt_list = [dt**1 for dt in dt_list]
plt.loglog(dt_list, Weak_Error_list, "k", label = "order of Milstein scheme (weak)")
plt.loglog(dt_list, dt_list, "-.", label = "reference line: slope = 1")
plt.xlabel(r"$\Delta t$")
plt.ylabel(r"$| E[\hat{x}_L] - E[x(T)] |$")
plt.legend()
plt.savefig("./Milstein_weak_convergence.pdf")
plt.grid()

In [None]:
# Runge-Kutta scheme

Error = np.zeros((MC, len(R_list)))

XL = np.zeros((MC, len(R_list)))
XT = np.zeros((MC, len(R_list)))
for i in range(MC):
    for j in range(len(R_list)):
        err, _, X_L, X_T = order(R = R_list[j], X_0 = X_0, dt = dt, N = N, solver = RK_solver)
        Error[i, j] = err
        XL[i, j] = X_L
        XT[i, j] = X_T

    
Strong_Error_list = np.mean(Error, axis = 0)

XL_list = np.mean(XL, axis = 0)
XT_list = np.mean(XT, axis = 0)

Weak_Error_list = np.abs(XL_list - XT_list)


In [None]:
# strong convergence
dt_list = [dt*2**i for i in range(len(R_list))]
plt.loglog(dt_list, Strong_Error_list, "k", label = "order of Runge-Kutta scheme (strong)")
plt.loglog(dt_list, dt_list, "-.", label = "reference line: slope = 1.0")
plt.xlabel(r"$\Delta t$")
plt.ylabel(r"$E[|x(T) - \hat{X}_L|]$")
plt.legend()
plt.savefig("./Runge-Kutta_strong_convergence.pdf")
plt.grid()

In [None]:
# weak convergence
dt_list = [dt*2**i for i in range(len(R_list))]
sqrt_dt_list = [dt**1 for dt in dt_list]
plt.loglog(dt_list, Weak_Error_list, "k", label = "order of Runge-Kutta scheme (weak)")
plt.loglog(dt_list, dt_list, "-.", label = "reference line: slope = 1")
plt.xlabel(r"$\Delta t$")
plt.ylabel(r"$| E[\hat{x}_L] - E[x(T)] |$")
plt.legend()
plt.savefig("./Runge-Kutta_weak_convergence.pdf")
plt.grid()

In [None]:
# # test on one path
# err, X_approx, X_L, X_T = order(R = 2**0, X_0 = X_0, dt = dt, N = N, seed_num = 100, solver = EM_solver)

# R = 64
# L = N // R

# np.random.seed(100)
# dW = np.sqrt(dt) * np.random.randn(1, N)
# W = np.cumsum(dW)
# X_ref = X_0 * np.exp((mu - 0.5*sigma**2)*t[1:] + sigma*W)
# X_ref = np.insert(X_ref, obj = 0, values = X_0)

# ax = plt.subplot(111)
# ax.plot(t, X_ref, "k")
# ax.plot(np.linspace(0, T, L+1), X_approx[::R], "r-x")
# ax.grid()
# ax.legend(("Ref","Approx by Euler-Maruyama"), loc = "best")
# plt.show()