# Import libraries

In [2]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

# Stocastic Differential Equations

## Asset Price

In [78]:
class AssetPrice:
    """
    This is a class for Asset Price ODE. 
    It contains methods for finding exact solution, plotting and exat mean hitting time.
    Arguments:
    a: Lower bound
    b: Upper bound
    mu: mu
    sigma: sigma
    """
    def __init__(self, a, b, mu, sigma, exact_time):
        self.a = a
        self.b = b
        self.mu = mu
        self.sigma = sigma
        self.exact_time = exact_time
        
    def f(self,x):
        """
        Function of f(x) in asset price stochastic differential equation (deterministic part)
        """
        return self.mu * x
    
    def g(self,x):
        """
        Function of g(x) in asset price stochastic differential equation (stochastic part)
        """
        return self.sigma * x
    
    def dg(self,x):
        """
        Function of g'(x) in asset price stochastic differential equation (stochastic part)
        """
        return self.sigma
  
    def MHT_Xt_exact(self,X0,dt,npaths, adaptive= False):
        """
        Methods that return mean hitting time of X(t)
        
        Arguments:
        X0: Initial value
        dt: timestep
        
        Return: List containing Mean, STD, Confidence interval Left, Confidence interval Right
        """
        
        sigma = self.sigma; mu = self.mu
        exit_time = np.zeros(npaths)
        # TODO: Add threshold for situation when the loop goes forever
        for i in tqdm(range(npaths)):
            X = X0
            t = 0
            while X > self.a and X < self.b:
                dW = np.sqrt(dt) * np.random.randn()
                X = X*np.exp( (mu - 0.5*(sigma**2))*dt + sigma*dW)
                t = t + dt
            
            exit_time[i] = t - 0.5*dt
            
        tmean = np.mean(exit_time)
        tstd = np.std(exit_time)

        cileft = tmean - 1.96*tstd/np.sqrt(npaths)
        ciright = tmean + 1.96*tstd/np.sqrt(npaths)
        
        return tmean, tstd, cileft, ciright
    
    def Xt_path(self,X0,duration,dt):
        """
        Methods that return paths of X(t)

        Arguments:
        X0: Initial value
        duration: The duration for running process
        dt: timestep

        Return: Array of a path
        """
        X_EM = [] + [X0]
        X_Mils = [] + [X0]


        tsteps_EM_AT = []
        X_EM_AT = [] + [X0]

        tsteps_EM_Mils = []
        X_EM_Mils = [] + [X0]


        timestamps1 = [] + [0]
        timestamps2 = [] + [0]
        timestamps3 = [] + [0]
        AdaptiveT = AdaptiveTimestep()

        time_elapsed = 0
        while time_elapsed < duration:
            dW = np.sqrt(dt)*np.random.randn() # increments
            X_EM = X_EM + [X_EM[-1] + dt*self.f(X_EM[-1]) + self.g(X_EM[-1])*dW]
            X_Mils = X_Mils + [X_Mils[-1] + dt*self.f(X_Mils[-1]) + self.g(X_Mils[-1])*dW + 0.5 * self.g(X_Mils[-1])*self.sigma*(dW**2 - dt)]

            time_elapsed += dt
            timestamps1 = timestamps1 + [time_elapsed]

        time_elapsed = 0
        while time_elapsed < duration:
            dt_new_EM = AdaptiveT.kev_EM(LB=self.a, UB=self.b, x=X_EM_AT[-1], f=self.f, g=self.g, dt=dt)
            dW = np.sqrt(dt_new_EM)*np.random.randn()
            X_EM_AT = X_EM_AT + [X_EM_AT[-1] + dt_new_EM*self.f(X_EM_AT[-1]) + self.g(X_EM_AT[-1])*dW]
            tsteps_EM_AT = tsteps_EM_AT + [dt_new_EM]
            time_elapsed += dt_new_EM
            timestamps2 = timestamps2 + [time_elapsed]

#         time_elapsed = 0
#         while time_elapsed < duration:
#             dt_new_EM = AdaptiveT.kev_Mils(LB=self.a, UB=self.b, x=X_EM_Mils[-1], f=self.f, g=self.g, dt=dt)
#             dW = np.sqrt(dt_new_EM)*np.random.randn()
#             X_EM_Mils = X_EM_Mils + [X_EM_Mils[-1] + dt_new_EM*self.f(X_EM_Mils[-1]) + self.g(X_EM_Mils[-1])*dW]
#             tsteps_EM_AT = tsteps_EM_AT + [dt_new_EM]
#             time_elapsed += dt_new_EM
#             timestamps3 = timestamps3 + [time_elapsed]


        return X_EM, X_Mils, X_EM_AT, timestamps1, timestamps2

    def _plot(self,X0,duration,dt):
        """
        Method to plot a path

        Arguments:
        X0: intial value
        duration: The duration for running process
        dt: timestep
        """
        t = np.arange(0,duration,dt)

        xt_EM, xt_Milstein, xt_EM_A, timestamps1, timestamps2 = self.Xt_path(X0,duration,dt)

        plt.figure(figsize=(20,10))
    #         plt.plot(timestamps1, xt_exact, color='green', label='Exact')
    #         plt.plot(timestamps1, xt_EM, color='purple', label='EM')
    #         plt.plot(timestamps1, xt_Milstein, color='orange', label='Milstein')
        plt.plot(timestamps2, xt_EM_A,linestyle='dashed',color='purple', label='EM')
        plt.xlabel('Time',fontsize=18)
        plt.ylabel('Stock Price',fontsize=18)
        plt.hlines(self.b, xmin=0, xmax=max(timestamps1), color='red', label='$b$')
        plt.hlines(self.a,xmin=0, xmax=max(timestamps1), color='blue', label='$a$')
        plt.title('Stock Price SDE with dt={}'.format(dt),fontsize=18)
        plt.legend()
        plt.savefig('AdaptiveEM.png')


## Asset Price Interest Rate

In [79]:
class AssetPriceInterestRate:
    def __init__(self, lam, mu, sigma):
        self.lam = lam
        self.mu = mu
        self.sigma = sigma
        
    
    def f(self,x):
        return self.lam*(self.mu - x)
    
    def g(self,x):
        return self.sigma*np.sqrt(x)
    

## Opinion Polls

In [80]:
class OpinionPolls:
    def __init__(self, mu, sigma):
        self.mu = mu
        self.sigma = sigma
        
    def f(self,x):
        return -self.mu*(x/(1-x**2))
        
    def g(self,x):
        return self.sigma

## Population Dynamic 

In [81]:
class PopulationDynamic:
    def __init__(self, K, r, beta):
        self.K = K
        self.r = r
        self.beta = beta
        
    def f(self,x):
        return self.r*x*(self.K - x)
    
    def g(self,x):
        return self.beta*x

## Epidemic Model

In [82]:
class EpidemicModel:
    
    def __init__(self, p, B, beta, alpha, rho, C):
        self.p = p
        self.B = B
        self.beta = beta
        self.alpha = alpha
        self.rho = rho 
        self.C = C
    
    def f(self, x):
        return (self.p -1)*self.B*x + (self.beta*self.C - self.alpha)*(1 - x)*x
    
    def g(self, x):
        return self.p*self.C*(1-x)*x

## Political Opinion

In [83]:
class PoliticalOpinion:
    def __init__(self, r, G, eps):
        self.r = r
        self.G = G
        self.eps = eps
        
    def f(self,x):
        return self.r*(self.G-x)
    
    def g(self,x):
        return np.sqrt(self.eps*x*(1-x))
    

## Double Well Potential

In [84]:
class DoubleWellPotential:
    def __init__(self, sigma):
        self.sigma = sigma
        
    def _V(self,x):
        return (x**2)*(x-2)**2
    
    def _dV(self,x):
        return 4*x*(x-2)*(x-1)
    
    def f(self,x):
        return -self._dV(x)
    
    def g(self,x):
        return self.sigma

In [2]:
class SimpleSDE:
    def __init__(self, mu, sigma):
        self.mu = mu
        self.sigma = sigma
    
    def f(self, x):
        return self.mu
    
    def g(self, x):
        return self.sigma
    

# Methods

## EXPONENTIAL TIMESTEPPING WITH BOUNDARY TEST FOR SDEs 

### The exponential-Euler method with boundary test (1st Algorithm)

#### Assumptions: $f(X_t) = \mu$ and $\sigma(X_t)=\epsilon$

In [None]:
class ExponentialTimestepping:
    def __init__(self, rate):
        self.rate = rate
    
    def F(self, Xt, f, g):
        return f(Xt) / g(Xt)**2
        
    def N(self, Xt, g):
        return np.sqrt((2*self.rate / g(Xt)**2) + self.F(Xt)**2)
    
    def udpate(self, Xt, f, g):
        v = np.random.uniform()
        p = -log(v)
        u = np.random.uniform()
        sign = np.sign(0.5 * (1 + self.F(Xt)/self.N(Xt)) - u)
        return Xt+ (sign*p)/(self.N(Xt)-sign*self.F(Xt))
    
    def check_exit(self, lower_b=None, upper_b=None, Xt, f, g):
        if lower_b is None and upper_b is None:
            assert("Please provide a boundary value")

        w = np.random.uniform()
        Xnew = self.update(Xt, f, g)
        
        if lower_b:
            return Xnew < lower_b or w < np.exp(-2*self.N(b-max(Xt, Xnew))) 
        if upper_b:
            return Xnew > upper_b or w < np.exp(-2*self.N(min(Xt, Xnew)-b)) 
    
    

### The exponential-V method with boundary test(2nd Algorithm)

#### Assumptions: $\sigma(x) = \epsilon$ but $f(x)$ is a function of $x$

In [1]:
class ExponentialVTimestepping:
    def __init__(self, rate, V):
        self.V = V
        self.rate = rate
        
    
    def nu(self, Xt, g):
        return np.sqrt(2*self.rate / g(Xt)**2)
    
    def update(self, Xt, f, g):
        nu = self.nu(Xt, g)
        v = np.random.uniform()
        p = -log(v)
        u = np.random.uniform()
        sign = np.sign(0.5*(1 + (nu**(-2)) * (g(Xt)**(-2))*f(Xt)) - u)
        
        return Xt + (nu**(-1))*sign*(p - (g(Xt)**(-2))*(self.V(Xt + (nu**(-1))*sign*p) - self.V(Xt)))
                                 
    def check_exit(self, lower_b=None, upper_b=None, Xt, f, g):
        if lower_b is None and upper_b is None:
            assert("Please provide a boundary value")

        w = np.random.uniform()
        Xnew = self.update(Xt, f, g)
        nu = self.nu(Xt, g)
        
        if lower_b:
            return Xnew < lower_b or w < np.exp(-2*nu(min(Xt, Xnew)-b)) 
        if upper_b:
            return Xnew > upper_b or w < np.exp(-2*nu(b - max(Xt, Xnew))) 
    
                                 

-----------------------------------------------------------

## Absorbing boundaries and optimal stopping in a stochastic differential equation

#### Assumtions $g(x)=\sigma$

In [None]:
class EulerMaryamaBoundaryCheck:
    def __init__(self):
        pass
    
    def P_hit(self, x0,xh,dt,xb,D, df):
    return np.exp(-df(xb)/(2*D*(np.exp(2*dt*df(xb))-1))*(xh-xb+(x0-xb)*np.exp(dt*df(xb))-f(xb)/df(xb))**2 + (xb - (x0 + dt*(f(x0)+f(xh))/2))**2/4*D*dt)
    
    def check_exit(self, lower_b=None, upper_b=None, Xt, f, df, g):
        if lower_b is None and upper_b is None:
            assert("Please provide a boundary value")
            
        Rn = np.random.randn(1)
        Xnew = Xt + dt*f(Xt) + np.sqrt(dt)*Rn*g(Xt)
        D = (g(Xt)**2)/2
        if Xn-a<5*dt or b-Xn<5*dt:
            prob = self.P_hit(Xn,Xn_1,dt,a,D, df)
            return prob>np.random.uniform(0,1) or prob>np.random.uniform(0,1)

_______________________________________

## Adaptive Timestep

In [1]:
class AdaptiveTimestep:
    def __init__(self):
        pass

________________________

# Expariments

### Simple SDE: $dX = \mu dt + \sigma dW, \quad X(0) = X_0$

In [3]:
# Parameters
mu = 0.5
sigma = 1
X0 = 0
upper_b = 5
dt = 0.01

# New instance of SimpleSDE class
simpleSDE = simpleSDE(mu = mu, sigma = sigma)

NameError: name 'simpleSDE' is not defined

#### Euler-Maruyama

#### Milstien

####  EXPONENTIAL TIMESTEPPING WITH BOUNDARY TEST FOR SDEs (The exponential-Euler method with boundary test) 

#### Absorbing boundaries and optimal stopping in a stochastic differential equation

#### Adaptive Timestep-Euler-Maruyama

________________

### Double Well Potential

#### Euler-Maruyama

#### Milstien

####  EXPONENTIAL TIMESTEPPING WITH BOUNDARY TEST FOR SDEs (The exponential-Euler method with boundary test) 

#### Absorbing boundaries and optimal stopping in a stochastic differential equation

#### Adaptive Timestep-Euler-Maruyama

#### Adaptive Timestep-Milstein

### Opinion Poll

#### Euler-Maruyama

#### Milstien

####  EXPONENTIAL TIMESTEPPING WITH BOUNDARY TEST FOR SDEs (The exponential-Euler method with boundary test) 

#### Absorbing boundaries and optimal stopping in a stochastic differential equation

#### Adaptive Timestep-Euler-Maruyama

#### Adaptive Timestep-Milstein