In [None]:
'''
Preface:

    Clean input data before using the functions in this class.  There are no data cleaners embedded in the functions below!
    
    Use to_arrays(self, *args, dtype=float) and option_type(self, opt_type, array_to_mimic) from OptionsMathHelpers (parent class) to vectorize

    The classic Black Scholes formula as implemented below assumes:
        1) European options only (not American)
        2) Based on fwd_value of underlying (no rates for growth of underlying in formulas below) 
        3) Discount option value outside of model; same for greeks (no rates for discounting of option value in formulas below)
        4) The time_to_expiry in formula below is based on years and should be until expiry (no time_to_pmt in formulas below)
        5) The term "value" is used rather than "price" to emphasize that formulas below can be used for rates, too

    Parameters:
        opt_type (string)      : acceptable names for 'call' or 'put' (see c_or_p function below)
        fwd_value (float)      : see note above, expected value of underlying at expiry
        strike (float)         : strike value expressed in same units as fwd_value
        vol (float)            : volatility expressed as a percentage of fwd_value (should be the same amount regardless of using forward or spot underlying price)
        time_to_expiry (float) : see note above, fraction of a year (use Daycount_Math prior to calling functions below)
        d0 (float)             : the amount that is directly in the middle of d1 and d2 - similar to z-score of u relative to k based on v * sqrt(t)
        d1 (float)             : the amount equal to the d1 term in the Black Scholes formula
        d2 (float)             : the amount equal to the d2 term in the Black Scholes formula
        opt_value (float)      : the value of the option in the same units as the units of the underlying

    Results:
        d0        : same as in parameters
        d1        : same as in parameters
        d2        : same as in parameters
        opt_value : same as in parameters
        delta     : change in option_value for a one unit change in underlying value (first order greek)
        vega      : change in option_value for a one percent absolute change in volatility (first order greek)
        theta     : change in option_value for a one day change in time_to_expiry (first order greek)
        rho       : excluded since no inputs for rates (first order greek)
        gamma     : change in delta for a one unit change in underlying value (second order greek)
        volga     : change in vega for a one percent absolute value change in volatility (second order greek)
        speed     : (third order greek)
        ultima    : (third order greek)
        vanna     : (cross greek)
        charm     : (cross greek)
        veta      : (cross greek)
        zoma      : (cross greek)
        color     : (cross greek)
'''   

In [1]:
import numpy as np 

from scipy.stats import norm
from scipy.optimize import brentq

from Options_Math_Helpers import *

In [3]:
class OptionsMathBlackScholes(OptionsMathHelpers):

    
    def __init__(self):
        # constructor 
        pass

    
    # -----------------------
    # Pricing 
    # -----------------------         
    def bs_d0(self, fwd_value=0, strike=0, vol=0, time_to_expiry=0, **kwargs): 
        vol_sqrt_time_to_expiry = vol * np.sqrt(time_to_expiry)  
        log_moneyness           = np.log(fwd_value / strike)        
        return log_moneyness / vol_sqrt_time_to_expiry

     
    def bs_d1(self, vol=0, time_to_expiry=0, d0=0, **kwargs):
         d1 = d0 + (vol * np.sqrt(time_to_expiry) / 2.0)
         return np.clip(d1, -20.0, 20.0)


    def bs_d2(self, vol=0, time_to_expiry=0, d0=0, **kwargs):
        d2 = d0 - (vol * np.sqrt(time_to_expiry) / 2.0)
        return np.clip(d2, -20.0, 20.0)

    
    def bs_option_value(self, opt_type=0, fwd_value=0, strike=0, time_to_expiry=0, d1=0, d2=0, **kwargs):
        scalar         = np.asarray(np.where(opt_type == 'CALL', 1, -1), dtype=float)
        fwd_value_term =  scalar * norm.cdf(d1 * scalar) * fwd_value 
        strike_term    = -scalar * norm.cdf(d2 * scalar) * strike 
        return fwd_value_term + strike_term         
        

    # -----------------------
    # Greeks (first order)
    # -----------------------
    def bs_delta(self, opt_type=0, d1=0, **kwargs):
        call_delta = norm.cdf(d1)
        put_delta  = call_delta - 1  # N(d1) − 1 = −N(−d1)
        return np.where(opt_type == 'CALL', call_delta, put_delta)

    
    def bs_vega(self, fwd_value=0, time_to_expiry=0, d1=0, **kwargs):
        return fwd_value * np.sqrt(time_to_expiry) * norm.pdf(d1) * 0.01

    
    def bs_theta(self, opt_type=0, fwd_value=0, strike=0, vol=0, time_to_expiry=0, d1=0, d2=0, diy=365.0, **kwargs):
        rate           = 0
        scalar         = np.asarray(np.where(opt_type == 'CALL', 1, -1), dtype=float)
        fwd_value_term = -1 * fwd_value * vol * norm.pdf(d1) / (2 * np.sqrt(time_to_expiry))
        strike_term    = -1 * rate * strike * np.exp(-rate * time_to_expiry) * norm.cdf(scalar * d2) * scalar         
        return (fwd_value_term + strike_term) / diy


    def bs_rho(self, opt_type=0, strike=0, time_to_expiry=0, d2=0, **kwargs):
        rate   = 0
        scalar = np.where(opt_type == 'CALL', 1.0, -1.0)
        return scalar * strike * time_to_expiry * np.exp(-rate * time_to_expiry) * norm.cdf(scalar * d2)
        

    # -----------------------
    # Greeks (second order)
    # -----------------------
    def bs_gamma(self, fwd_value=0, vol=0, time_to_expiry=0, d1=0, **kwargs):
        denominator = fwd_value * vol * np.sqrt(time_to_expiry)
        return norm.pdf(d1) / denominator  


    def bs_volga(self, vol=0, d1=0, d2=0, vega=0, **kwargs):
        numerator = vega * d1 * d2
        return numerator / vol

    
    # -----------------------
    # Greeks (third order)
    # -----------------------
    def bs_speed():
        pass
        #pending

    
    def bs_ultima():
        pass
        #pending        


    # -----------------------
    # Greeks (cross)
    # -----------------------
    def bs_vanna():
        pass
        #pending


    def bs_charm():
        pass
        #pending


    def bs_veta():
        pass
        #pending


    def bs_zomma():
        pass
        #pending


    def bs_color():
        pass
        #pending
        
    # -----------------------
    # Volatility backsolvers
    # ----------------------- 

    
    def bs_vol_solver_newton(self, opt_value=None, opt_type=None, fwd_value=None, strike=None, time_to_expiry=None, 
                                  initial_guess=0.5, tolerance=1e-10, max_iterations=100):
        """
        Vectorized Newton-Raphson implied volatility solver.
        Rows that fail to converge will return np.nan, but the rest continue.
        """
   
        # Initialize - assumption is that opt_value and strike will have to be array like to begin with
        if fwd_value.shape != opt_value.shape:
            fwd_value = np.full_like(opt_value, fwd_value, dtype=float)
        if time_to_expiry.shape != opt_value.shape:
            time_to_expiry =  np.full_like(opt_value, time_to_expiry, dtype=float)

        vol = np.full_like(opt_value, initial_guess, dtype=float)
        converged = np.zeros_like(opt_value, dtype=bool)

        valid = (
                    (opt_value > 0) &
                    (time_to_expiry > 0) &
                    (strike > 0)
                )

        vol[~valid] = np.nan
        converged[~valid] = True

        for i in range(max_iterations):

            # Skip already converged elements
            mask = valid & ~converged  

            if not np.any(mask):  # All done
                break

            # Protect against very small v by taking the max of the vol and 1e-8
            vol[mask] = np.maximum(vol[mask], 1e-8)  

            d0 = self.bs_d0(fwd_value=fwd_value[mask], strike=strike[mask], vol=vol[mask], time_to_expiry=time_to_expiry[mask])
            d1 = self.bs_d1(vol=vol[mask], time_to_expiry=time_to_expiry[mask], d0=d0)

            d2 = self.bs_d2(vol=vol[mask], time_to_expiry=time_to_expiry[mask], d0=d0)
            opt_value_est =  self.bs_option_value(opt_type=opt_type[mask], fwd_value=fwd_value[mask],
                                                    strike=strike[mask], time_to_expiry=time_to_expiry[mask], d1=d1, d2=d2)

            diff = opt_value_est - opt_value[mask]

            vega = self.bs_vega(fwd_value=fwd_value[mask], time_to_expiry=time_to_expiry[mask], d1=d1) 

            # Newton-Raphson step with small factor to improve stability
            safe_step = np.where(np.abs(vega) > 1e-8, 0.01 * diff / vega, 0.0)
            vol[mask] = vol[mask] - safe_step

            # Update convergence mask
            converged[mask] = np.abs(diff) < tolerance

        # Assign np.nan to non-converged elements
        vol[~converged] = np.nan

        return vol.astype(float)



    def bs_vol_solver_brentq(self, opt_value=None, opt_type=None, fwd_value=None, strike=None, time_to_expiry=None, rate=0.0):
        """
        Solve for implied volatility given a market price.
        """
        
        fwd_value, strike, time_to_expiry, rate, opt_value = self.to_arrays(fwd_value, strike, time_to_expiry, rate, opt_value)  
        opt_type = self.option_type(opt_type, u)

        lo_vol_bound = np.full(opt_value.shape, 1e-6)
        hi_vol_bound = np.full(opt_value.shape, 10.0)     
        
        # Define the function whose root we want to find
        def objective(v):
            d0 = self.bs_d0_formula(fwd_value=fwd_value, strike=strike, vol=vol, time_to_expiry=time_to_expiry, rate=rate)
            d1 = self.bs_d1_formula(vol=vol, time_to_expiry=time_to_expiry, d0=d0)
            d2 = self.bs_d2_formula(vol=vol, time_to_expiry=time_to_expiry, d0=d0)
            return self.bs_option_value_formula(opt_type=opt_type, fwd_value=fwd_value, strike=strike, vol=vol, time_to_expiry=time_to_expiry, rate=rate) - opt_value
    
        # Use brentq to find sigma between lo_vol_bound and hi_vol_bound
        try:
            return brentq(objective, lo_vol_bound, hi_vol_bound)
        except ValueError:
            return np.nan  # If no solution in the range
