# Numerical Methods for Stochastic Differential Equations

This is a lab on Matthew's Chapter 7.

In [None]:
# import the relevant libraries.

import matplotlib.pyplot as plt
from mpl_toolkits import mplot3d
import numpy as np
import random
from scipy.interpolate import interp1d

## Contents

- [Random Processes](#Random-Processes)
    - [Random Walk](#Random-Walk)
    - [Wiener Processes](#Wiener-Processes)
    - [Wiener Process Statistical Properties](#Wiener-Process-Statistical-Properties)
    - [Random Points on the Surface of an N Ball](#Random-Points-on-the-Surface-of-an-N-Ball)
    - [Generalized Wiener Process](#Generalized-Wiener-Process)
- [Numerical Methods](#Numerical-Methods)
    - [Euler-Maruyama](#Euler-Maruyama)

## Differentiation

Differentiation (1-dimensional gradient) of differentiable functions.

In [None]:
tt = np.linspace(-1, 1, 100)
xx1 = np.power(tt, 3)
xx2 = np.gradient(xx1, tt)
xx3 = 3*np.power(tt, 2)

plt.plot(tt, xx2, '--', label = '$D[t^3]$')
plt.plot(tt, xx3, '--', label = '$3t^2$')
plt.legend()
plt.show()

## Random Processes

Review of some random processes, e.g. random walk, Wiener process, etc.

### Random Walk

A random walk is a discrete stochastic process where the state $X_{n+1}$ is related to the state $X_n$ in that they differ by a distance $1$.

In [None]:
# random walk in one dimension.
def random_walk_one_dimension(n, x_init):
    
    # type testing.
    if not isinstance(n, int):
        raise TypeError('n must be int.')
        pass
    
    def random_walk_iter(x):
        return x + random.choice([-1, 1])
    
    solution = [None for _ in range(n)]
    for i in range(n):
        if i == 0:
            solution[0] = x_init
            pass
        else:
            solution[i] = random_walk_iter(solution[i-1])
            pass
        pass
    return solution

In [None]:
n = 100
plt.plot(range(n), random_walk_one_dimension(n, 0))
plt.title('Random Walk')
plt.show()

### Wiener Process

A Wiener Process is a random process $Y(t_{k+1}) = Y(t_k) + \sqrt{\delta t}J_k$.

In [None]:
# Wiener process in one dimension.
def wiener_process_one_dimension(tt, y_init):
    # tt is a linear array.
    
    # extract data from linear array.
    n = len(tt)
    t_min = tt[0]
    t_max = tt[n-1]
    dt = (t_max - t_min) / (n - 1)
    
    # construct solution set.
    solution = [None for _ in range(n)]
    
    # solve.
    for i in range(n):
        if i == 0:
            solution[0] = y_init
            pass
        else:
            solution[i] = solution[i-1] + np.sqrt(dt)*random.choice([-1, 1])
            pass
        pass
    return solution

In [None]:
tt = np.linspace(0, 1, 1000)

for i in range(10):
    plt.plot(tt, wiener_process_one_dimension(tt, 0))
plt.title('Wiener Process with 1000 Timesteps')
plt.show()

Interpolating a discrete Wiener process.

In [None]:
tt = np.linspace(0, 1, 11)
solution = wiener_process_one_dimension(tt, 0)

f1 = interp1d(tt, solution)
f2 = interp1d(tt, solution, kind = 'cubic')

new_tt = np.linspace(0, 1, 101)

plt.plot(
    tt, solution, 'o',
    new_tt, f1(new_tt), '-',
    new_tt, f2(new_tt), '--'
)

plt.legend(['data', 'linear', 'cubic'], loc = 'best')
plt.show()

In this case, it is more "accurate" to use the linear case, rather than cubic.

### Wiener Process Statistical Properties

A memory-efficient function for computing a discrete Wiener process at time $t$.

In [None]:
def wiener_process_yield(tt, x_init = 0):
    n = len(tt)
    t_min, t_max = tt[0], tt[n-1]
    dt = (t_max - t_min) / (n - 1)
    
    for i in range(n - 1):
        x_init += random.choice([-1, 1])*np.sqrt(dt)
        pass
    return x_init

In [None]:
n = 1000
tt = np.linspace(0, 1, 100)

sample = [wiener_process_yield(tt) for _ in range(n)]

In [None]:
count, bins, ignored = plt.hist(sample, 30, density = True)
plt.title('Frequency Histogram of $W(t)$')
plt.show()

Verify that Wiener process has mean $0$ and variance $1$.

In [None]:
print(np.mean(sample))
print(np.std(sample))

Construct a function $dW(t)$, which returns either $\sqrt{h}$ or $-\sqrt{h}$, where $h$ is the timestep.

In [None]:
def d_wiener_process_one_dimension(tt):
    n = len(tt)
    t_min, t_max = tt[0], tt[n-1]
    h = (t_max - t_min) / (n - 1)
    
    return [random.choice([-np.sqrt(h), np.sqrt(h)])
           for _ in range(n - 1)]

### Random Points on the Surface of an N Ball

Let $\mathbf{x}$ be an $n$-dimensional vector of normal deviates and $r = \vert\vert x\vert\vert$. Then the vector $\mathbf{x}/r$ is randomly chosen over the surface of an $n$-sphere. (Marsaglia)

In [None]:
def norm(x):
    return np.sqrt(x.dot(x))

def unit_sphere(n):
    x = np.array([random.normalvariate(0, 1) for _ in range(n)])
    r = norm(x)
    
    return x/r

In [None]:
# randomly selected points on a ball.

n = 1000
points = [unit_sphere(3) for _ in range(n)]

xx = [point[0] for point in points]
yy = [point[1] for point in points]
zz = [point[2] for point in points]

In [None]:
# plot them in three dimensions.

plt.figure(figsize = (10, 10))
ax = plt.axes(projection = '3d')
ax.scatter(xx, yy, zz)
plt.title('Random Points on Surface of Ball')
plt.show()

### Generalized Wiener Process

In [None]:
# wiener process in n dimensions.

def wiener_process(tt, x_init):
    
    # calculate dimension.
    dim = len(x_init)
    
    # extract data from tt.
    n = len(tt)
    t_min = tt[0]
    t_max = tt[n-1]
    dt = (t_max - t_min) / (n - 1)
    
    # construct solution set.
    solution = [None for _ in range(n)]
    
    # solve.
    for i in range(n):
        if i == 0:
            solution[0] = x_init
            pass
        else:
            solution[i] = solution[i-1] + np.sqrt(dt)*unit_sphere(dim)
            pass
        pass
    return solution

In [None]:
tt = np.linspace(0, 1, 1000)

# a two-dimensional Wiener process.
solution = wiener_process(tt, np.array([0, 0]))
xx = [s[0] for s in solution]
yy = [s[1] for s in solution]

plt.figure(figsize = (10, 10))
plt.plot(xx, yy)
plt.title('2D Wiener Process')
plt.show()

# a three-dimensional Wiener process
solution = wiener_process(tt, np.array([0, 0, 0]))
xx = [s[0] for s in solution]
yy = [s[1] for s in solution]
zz = [s[2] for s in solution]

plt.figure(figsize = (10, 10))
ax = plt.axes(projection = '3d')
ax.plot3D(xx, yy, zz)
plt.title('3D Wiener Process')
plt.show()

Generalized $d\mathbf{W}(t)$.

In [None]:
def d_wiener_process(tt, dimension = 1):
    n = len(tt)
    t_min, t_max = tt[0], tt[n-1]
    h = (t_max - t_min) / (n - 1)
    
    solution = [
        unit_sphere(dimension)*np.sqrt(h)
        for _ in range(n-1)
    ]
    
    return solution

In [None]:
# example.
# set up.
n = 10000

tt = np.linspace(0, 1, n)
dw = d_wiener_process(tt)
x_init = 0

# solution.
solution = [None for _ in range(n)]
for i in range(n):
    if i == 0:
        solution[0] = x_init
        pass
    else:
        solution[i] = solution[i-1] + dw[i-1]
        pass
    pass

plt.plot(tt, solution)
plt.title('Wiener Process')
plt.show()