- LANGUAGE: Python 3
- VERSION: v1.0
           (v1.0 - 2022-04-01 - Initial version)
- AUTHORS: 
          Reda Chhaibi - reda.chhaibi@math.univ-toulouse.fr
- DESCRIPTION:  TODO.


# 0. Get and set the state of the pseudo-random number generator

In [None]:
# Get state and save it
state0 = np.random.get_state()

import pickle
pickle.dump( state0, open('state0', 'wb') )

In [None]:
# Load state from disk and set it
import pickle
new_state = pickle.load( open('state0', 'rb') )
np.random.set_state( new_state )

# 1. Hidden and observed processes

Let us start by sampling
- the Markov jump process ${\bf x}$
- a Brownian motion $B$
- the observed process ${\bf y}$ following the "signal plus noise" model

In [None]:
import numpy as np            # Matrix computation library (as a more general mathematical purpose)
import numpy.random as random # Pseudo random numbers generation sub library

def sampleBM(t,stepCount):
    """ Samples a Brownian motion up to time t, with stepCount steps"""
    h=t/float(stepCount)
    W=[0]
    W=W+random.normal(0.,np.sqrt(h),stepCount)
    return np.cumsum(W)

#random.seed(3) #change to find good realization. Comment to select "random" realizations.

final_time=10. # final time
step_count=int(1e6)  # Number of steps

param_p=0.4
param_lambda=1.5
param_gamma =10000
param_x0=0
jump_rates=np.array( [param_p*param_lambda, (1-param_p)*param_lambda] )

print("Sampling Brownian motion...")
sampled_B  = sampleBM(final_time,step_count)
dt = final_time/step_count
time_grid = np.linspace(0, final_time, step_count)

print("Sampling the Markov jump process...")
jump_times = []
sampled_x  = sampled_B*0
t = 0
x = param_x0
while t<final_time:
    jump_times.append( [t, x] )
    current_rate = jump_rates[x]
    clock = np.random.exponential( scale = 1.0/current_rate )
    #
    indices_to_update = np.where( (time_grid >= t) & (time_grid < (t+clock)) )
    sampled_x[indices_to_update] = x
    #
    t += clock
    x = 1-x
# end while

print("Sampling the observation process...")
sampled_y = np.zeros_like( sampled_B )
dB = sampled_B[1:]-sampled_B[:-1]
sampled_y[1:] = sampled_x[0:-1]*dt + dB/10
sampled_y = np.cumsum( sampled_y )

import matplotlib.pyplot as plt
fig, axs = plt.subplots(2, figsize=(10, 10), sharex=True, sharey=False, gridspec_kw={'hspace': 0})
axs[0].plot(time_grid, sampled_x, color='r')
axs[0].set_ylabel("Hidden process $x$", fontsize=20)
axs[0].set_ylim( [-0.25,1.25] )
axs[1].plot(time_grid, sampled_y)
axs[1].set_ylabel("Observation $y^\gamma$", fontsize=20)
axs[1].set_ylim( [-1,5] )
plt.xlabel("Time", fontsize=20)
plt.savefig("pics/process_xy.png")
# Hide x labels and tick labels for all but bottom plot.
for ax in axs:
    ax.label_outer()


# 2. Optimal filter

The Wonham-Shiryaev filter 
$$ \pi_t^\gamma := \mathbb{P}\left( {\bf x}_t = 1 \ | \ \left( {\bf y}_s \right)_{s\leq t} \right) \ $$
satisfies the SDE
$$ d \pi_t = - \lambda \left( \pi_t - p \right)dt
           + \sqrt{\gamma} \pi_t \left( 1-\pi_t \right) dW_t
$$
where $W$ is the innovation.

In [None]:
import math

#--- Parameters and functions defining the SDE---#
eds_lambda = param_lambda
eds_p      = param_p
gamma      = param_gamma
D          = math.sqrt(gamma)

def eds_b(x):
    """ Returns drift. """
    return -eds_lambda*(x-eds_p)

def eds_b_prime(x):
    """ Returns drift's derivative. Useful for Milstein scheme."""
    return -eds_lambda

def eds_sigma(x):
    """ Returns volatility. """
    return x*(1.-x)

def eds_sigma_prime(x):
    """ Returns volatility's derivative. Useful for Runge Kutta"""
    return 1.-2.*x

def constrains(x):
    """ Returns True if x belongs to the theoretical domain of the process. """
    return x>0 and x<1

def compute_step_Euler(x,h,dW):
    b_x     = eds_b(x)
    sigma_x = eds_sigma(x) 
    return x + b_x*h + D*sigma_x*dW

def compute_step_Milstein(x,h,dW):
    b_x     = eds_b(x)
    sigma_x = eds_sigma(x)
    x_euler = x + b_x*h + D*sigma_x*dW
    b_prime_x = eds_b_prime(x)
    return x_euler + 1./2.*b_x*b_prime_x*(dW**2-h)

def compute_step_RungeKutta(x,h,dW):
    b_x     = eds_b(x)
    sigma_x = eds_sigma(x) 
    sigma_prime_x = eds_sigma_prime(x)
    x_euler = x + b_x*h + D*sigma_x*dW
    return x_euler + 0.5*D*D*sigma_x*sigma_prime_x*(dW**2-h)

def compute_step(x,h,dW):
    return compute_step_RungeKutta(x,h,dW)
    
#--- Numerical integrations and stochastics---#
import numpy as np            # Matrix computation library (as a more general mathematical purpose)
import numpy.random as random # Pseudo random numbers generation sub library

def sampleBM(t,stepCount):
    """ Samples a Brownian motion up to time t, with stepCount steps"""
    h=t/float(stepCount)
    W=[0]
    W=W+random.normal(0.,np.sqrt(h),stepCount)
    return np.cumsum(W)

def sampleBB_midpoint(h):
    """ Samples midpoint in a Brownian bridge, with length h"""
    return 0.5*random.normal()*np.sqrt(h)

def compute_step_with_constrains(x,h,dW):
    new_x = compute_step(x,h,dW)
    if constrains(new_x):
        return new_x
    else:
        #Faster
        #return x
        #print("Subdivision %f -> %f"%(x, new_x))
        dP = sampleBB_midpoint(h)
        intermediate_x = compute_step_with_constrains(x,0.5*h,0.5*dW+dP)
        #print("Intermediate %f -> %f -> %f"%(x, intermediate_x, new_x))
        return compute_step_with_constrains(intermediate_x,0.5*h,0.5*dW-dP)

def SDE_integration_process(x,t,B,hidden,callback):
    """ x condition initiale, h pas de temps, W realization d'un Wiener avec des points equidistant pour le temps t. La fonction renvoie les point X_kh.
        Le processus respecte les contraintes données par la fonction du même nom.
        """
    h = t/(len(B)-1)
    percent = 0
    X = [x]
    for k in range(len(W)-1):
        dB = B[k+1]-B[k]
        dW = dB + D*h*(hidden[k]-X[-1])
        x   = compute_step_with_constrains(x,h,dW)
        X.append(x)
        # Callback only when one percent is done
        # This minimizes costly communications between python kernel and notebook
        if callback is not None and percent<np.floor(100*k/len(W)):
            percent = np.ceil(100*k/len(W))
            callback(percent)
    return X

In [None]:
from ipywidgets import FloatProgress
from IPython.display import display

print("Solving SDE...")
X_0=0.5
X=[X_0]# Initialization of the path

f = FloatProgress(min=0, max=100)
display(f)
def callback(percent):
    f.value = percent

X = SDE_integration_process( 0.5, final_time, sampled_B, sampled_x, callback)

print("min(min(X),1-max(X))=%e"%min(min(X),1-max(X)))
#EOF

In [None]:
t = np.linspace(0, final_time, len(X))

import matplotlib.pyplot as plt
print("Plotting using matplotlib...")
fig, ax = plt.subplots(1, figsize=(10, 5), sharex=True, sharey=True, gridspec_kw={'hspace': 0})
ax.plot(t, X)
ax.set_ylabel("Optimal filter $\pi^\gamma$", fontsize=20)
plt.xlabel("Time", fontsize=20)
plt.savefig("pics/filter.png")
# Hide x labels and tick labels for all but bottom plot.
ax.label_outer()

# 3. Optimal smoothing of filter


In [None]:
dt = final_time/(len(X)-1)
theoretical_tresh = 2.0*np.log(param_gamma)/param_gamma

Cs = np.array([0.5, 1.0, 2.0, 4.0, 8.0])
fig, axes = plt.subplots(len(Cs), figsize=(10, len(Cs)*2), sharex=True, sharey=True, gridspec_kw={'hspace': 0})
for index in range(0, len(Cs)):
    C = Cs[index]
    treshold = C*0.5*theoretical_tresh
    smoothing_steps   = int(treshold/dt)
    print("Smoothing steps: ", smoothing_steps)
    # Init
    pi  = np.array(X)
    a_0 = param_lambda*param_p*(1-pi)/pi
    a_1 = param_lambda*param_p*pi/(1-pi)
    damping = a_0 + a_1
    smoothed_filter = pi
    # Smoothing loop
    for k in range(0,smoothing_steps):
        smoothed_filter = smoothed_filter - (smoothed_filter*damping - a_1 )*dt
        smoothed_filter = smoothed_filter[1:]
        damping = damping[:-1]
        a_1 = a_1[:-1]
    #
    t = time_grid[:len(smoothed_filter)]

    axes[index].plot(t, smoothed_filter)
    axes[index].set_ylabel(f"$C_\gamma={C}$", fontsize=15)
    plt.xlabel("Time", fontsize=20)
    # Hide x labels and tick labels for all but bottom plot.
    #ax.label_outer()
    print("")
#
plt.savefig("pics/filter_smoothed.png")