# Solutions 3

In [1]:
import numpy as np

class Call:
    
    def __init__(self, strike):
        self.strike = strike

    def __call__(self, spot):
        return np.maximum(spot - self.strike, 0.)
        
class FixedRateYieldCurve:
    def __init__(self, rate):
        self.rate = rate
        
    def discount(self, maturity):
        return np.exp(-self.rate * maturity)

In [2]:
def mc_path_dependent(option_state, process, yield_curve, n_paths):
    """Computes the fair value of a path-dependent option using Monte-Carlo simulations.
    
    Parameters
    ----------
    option_state : OptionState
        The option payoff state variable recorder
    process : object with method update()
        The underlying's stochastic process evolver
    yield_curve : object with method discount()
        The yield curve for discounting future cash flows
    n_paths : int
        The number of paths to simulate
    """

    # Helper function to process one path
    def do_one_path(times):
        # move over each time node
        for t in times: 
            # generate one standard normal variable
            std_norm = np.random.normal() 
            # simulate the value of the underlying at the next time
            St = process.update(t, std_norm) 
            # pass the value of the underlying at time t to the option_state
            # so it can use it when computing the payoff
            option_state.update(t, St) 
        
        return option_state.calculate_payoff()


    # query for the times needed in the simulation
    times = option_state.times 
    # initialize the running sum to zero
    running_sum = 0 
    for i in range(n_paths):
        # reset process to start a new simulation
        process.reset()        
        # reset option state to a start a new simulation
        option_state.reset()
        # simulate one path
        path_value = do_one_path(times)
        # update the running sum with the payoff at the end of the path
        running_sum += path_value
    
    return running_sum / n_paths * yield_curve.discount(option_state.expiry)

In [3]:
class Process:
    """
    A class used to simulate a stochastic process

    Inner classes
    -------------
    StateData : encapsulates the mutable state data (spot, time)

    Attributes
    ----------
    spot : float
        The starting value of the stochastic process
    state : Process.StateData
        The current value of the mutable state (spot and time)
    
    Methods
    -------
    reset()
        Reset the state of the object back to the initial state to start a new path
    evolve(spot, dt, norms)
        Simulate the next value of the spot for a time increment dt
    update(time, norms)
        Simulate the process to time using the standard normal variable norms
    """

    class StateData:
        """Mutable state of the process"""

        def __init__(self, spot, time):
            """
            Parameters
            ----------
            spot : float
                The current value of the stochastic process
            time : float
                The current time in the simulation
            """
            self.spot = spot
            self.time = time
        
        def __repr__(self):
            return "{}({!r}, {!r})".format(
            self.__class__.__name__,
            self.spot, self.time
        )
   
    def __init__(self, state):  
        """
        Parameters
        ----------
        state : Process.StateData
            The current value of the stochastic process
        """
        self.spot = state.spot
        # a simulation path starts at time 0 from value spot
        self.state = state
        
    def __repr__(self):
        return "{}({!r}, {!r})".format(
            self.__class__.__name__,
            self.spot, self.state
        )
    
    def reset(self):
        """Reset the state before starting a new path"""
        self.state = self.StateData(self.spot, 0)

    def evolve(self, spot, dt, norms):
        """
        Simulates the next value of the process for a time increment dt
        Should be implemented in subclasses to return the next spot value
        
        Parameters
        ----------
        spot : float
            The current value of the stochastic process
        dt: flost
            The time increment
        norms: float
            The standard normal variable to simulate a Brownian increment
        """
        raise NotImplementedError(
            "Classes derived from Process should implement evolve() method")

    def update(self, time, norms):        
        """
        Updates the process state to time `time`
        
        Parameters
        ----------
        time : float
            The time to evolve the process to
        norms: float
            The standard normal variable to simulate a Brownian increment
        """        
        # calculate time increment
        dt = time - self.state.time
        # simulate spot incrementally
        st = self.evolve(self.state.spot, dt, norms)
        # update state
        self.state = self.StateData(st, time)

        return st

In [4]:
class BlackScholesProcess(Process):
    """A class used to simulate a Black-Scholes Geometric Brownian Motion process"""

    def __init__(self, spot, rate, vol):  
        """
        Parameters
        ----------
        spot : float

            The starting value of the stochastic process
        rate : float
            The risk-free rate for the risk-neutral drift
        vol : float
            The annualized instantaneous volatility
        """
        super().__init__(Process.StateData(spot, 0))
        self.rate = rate
        self.vol = vol
        
    def __repr__(self):
        return "{}({!r}, {!r}, {!r}, {!r})".format(
            self.__class__.__name__,
            self.spot, self.rate, self.vol, self.state
        )
    
    def evolve(self, spot, dt, norms):                
        st = spot * np.exp((self.rate - 0.5 * self.vol**2) * dt + self.vol*np.sqrt(dt)*norms)        
        return st

In [5]:
class OptionState: 

    class StateData:

        def __init__(self, updates, underlying, state):
            self.updates = updates
            self.underlying = underlying
            self.state = state

        def __repr__(self):
            return "{}({!r}, {!r}, {!r})".format(
                self.__class__.__name__,
                self.updates, self.underlying, self.state)        


    def __init__(self, spot, expiry, payoff, times, state_data=None):
        self.spot = spot
        self.expiry = expiry
        self.payoff = payoff
        self.times = times
        self.state_data = state_data if state_data else self.StateData([], [], None)

    def reset(self):
        self.state_data = self.StateData([], [], None)    
    
    def __repr__(self):
        return ("{}({!r}, {!r}, {!r}, {!r}, {!r})".format(
            self.__class__.__name__,
            self.spot, self.expiry, self.payoff, self.times, self.state_data))

    def has_expired(self):
        if self.state_data.updates:
            return self.expiry <= max(self.state_data.updates)
        else:
            return False
    
    def update(self, time, underlying):
        if time in self.times and  time not in self.state_data.updates:
            self.state_data.updates.append(time)
            self.state_data.underlying.append(underlying)        
            self.state_data.state = self.update_state()        
    
    def update_state(self):
        if self.has_expired():
            return self.calculate_state_variable()
        else:
            return None
        
    def calculate_state_variable(self):
        raise NotImplementedError(
            "Classes derived from OptionState should implement calculate_state_variable() method")
    
    def is_payoff_active(self):
        return True

    def calculate_payoff(self):
        if self.has_expired():
            if self.is_payoff_active():
                return self.payoff(self.state_data.state)
            else:
                return 0.
        else:
            raise RuntimeError(
                "calculate_payoff() should only be called once option is expired")
    

In [6]:
class BarrierState(OptionState):
    
    def __init__(self, barrier, *args, **kargs):
        self.barrier = barrier
        super().__init__(*args, **kargs)

    def is_payoff_active(self):
        raise NotImplementedError(
            "Classes derived from BarrierState should implement is_payoff_active() method")            

    def calculate_state_variable(self):
        return self.state_data.underlying[-1]

In [7]:
class UpAndOut(BarrierState):

    def is_payoff_active(self):
        return max(self.state_data.underlying) < self.barrier

class DownAndOut(BarrierState):

    def is_payoff_active(self):
        return min(self.state_data.underlying) > self.barrier    

class UpAndIn(BarrierState):

    def is_payoff_active(self):
        return max(self.state_data.underlying) > self.barrier    

class DownAndIn(BarrierState):

    def is_payoff_active(self):
        return min(self.state_data.underlying) < self.barrier    

In [8]:
T = 5.
S_0 = 100.
vol = 0.2
r = 0.02
K = 100
N = 10000

times = [1, 2, 3, 4, 5]
yield_curve = FixedRateYieldCurve(r)
rand_process = BlackScholesProcess(S_0, r, vol)
call_payoff = Call(K)

In [9]:
B = 120
uao = UpAndOut(B, S_0, T, call_payoff, times)
mc_path_dependent(uao, rand_process, yield_curve, N)

0.620177148925837

In [10]:
B = 80
dao = DownAndOut(B, S_0, T, call_payoff, times)
mc_path_dependent(dao, rand_process, yield_curve, N)

20.361326619773607

In [11]:
B = 120
uai = UpAndIn(B, S_0, T, call_payoff, times)
mc_path_dependent(uai, rand_process, yield_curve, N)

21.308234241428526

In [12]:
B = 80
dai = DownAndIn(B, S_0, T, call_payoff, times)
mc_path_dependent(dai, rand_process, yield_curve, N)

1.3857260789432175