In [None]:
from datetime import date
import random
import time
import yfinance as yf
import pandas as pd

import seaborn as sns

import matplotlib.pyplot as plt
import matplotlib.colors as mcolors

from numpy.fft import fft, ifft, fftshift
import numpy as np
from numpy import log, sqrt, exp


from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_absolute_percentage_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.mixture import GaussianMixture


from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.stats.diagnostic import acorr_ljungbox

import scipy.stats as stats
from scipy.stats import probplot, laplace, norm, t, poisson
from scipy.linalg import solve_banded
from scipy.optimize import minimize, differential_evolution
from scipy.integrate import quad
from scipy.special import roots_laguerre
from scipy.interpolate import interp1d
from scipy.sparse import diags, kron, identity, csr_matrix
from scipy.sparse.linalg import spsolve

import statsmodels.api as sm
from statsmodels.nonparametric.kde import KDEUnivariate
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.arima_process import ArmaProcess

#import pymc as pm
#import arviz as az

from tensorflow import keras
#from tensorflow.keras.utils import plot_model

#import pyswarms as ps

######################################
#from pmdarima import auto_arima
#from diptest import diptest

In [None]:
class Stochastic_prices_MC:
    def __init__(self, df, nfuture, last_price, npath, rfr, risk_neutral = True):
        self.nfuture = nfuture
        self.df = df
        self.rfr = rfr
        self.risk_neutral = risk_neutral
        self.npath = npath
        self.last_price = last_price

    def plot_simulated_price_paths(self, simulated_prices, num_paths_to_plot=50):
        plt.figure(figsize=(14, 7))

        # Plot only a subset if there are too many paths
        num_paths_to_plot = min(num_paths_to_plot, simulated_prices.shape[0])
        for i in range(num_paths_to_plot):
            plt.plot(simulated_prices[i], linewidth=1, alpha=0.6)

        plt.title(f"Simulated Future Price Paths ({self.nfuture} Days Ahead)",
          fontsize=16, fontweight='bold')
        plt.xlabel("Time Steps (Days)", fontsize=12)
        plt.ylabel("Price", fontsize=12)
        plt.grid(True, linestyle=':', linewidth=0.8)
        plt.tight_layout()
        plt.show()

    def simulate_future_prices_Heston(self, mu_sample, sigma_sample, rho, kappa, sigma_sigma, inst_vol):
        # Initialize a 2D array for all simulations (each row is one simulation)
        simulated_prices = np.zeros((self.npath, self.nfuture + 1))
        volatility = np.zeros((self.npath, self.nfuture + 1))

        simulated_prices[:, 0] = self.last_price  # Set initial price for all paths
        volatility[:, 0] = inst_vol

        dt = 1 / 252  # Daily time step

        for ii in range(self.npath):
            for t in range(1, self.nfuture + 1):
                 ##Euler-Maruyama Discretization
                 z1 = np.random.normal(0, 1, 1)
                 z2 = np.random.normal(0, 1, 1)
                 w1 = z1
                 w2 = rho * z1 + np.sqrt(1 - rho**2) * z2

                 if self.risk_neutral:
                      volatility[ii, t] = np.maximum(volatility[ii, t-1] + kappa * (sigma_sample - volatility[ii, t-1]) * dt + sigma_sigma * np.sqrt(volatility[ii, t-1] * dt) * w2, 0)
                      simulated_prices[ii, t] = simulated_prices[ii, t - 1] * np.exp((self.rfr - 0.5 * volatility[ii, t-1]) * dt + np.sqrt( volatility[ii, t-1] * dt) * w1)
                 else:
                      volatility[ii, t] = np.maximum(volatility[ii, t-1] + kappa * (sigma_sample - volatility[ii, t-1]) * dt + sigma_sigma * np.sqrt(volatility[ii, t-1] * dt) * w2, 0)
                      simulated_prices[ii, t] = simulated_prices[ii, t - 1] * np.exp((mu_sample - 0.5 * volatility[ii, t-1]) * dt + np.sqrt( volatility[ii, t-1] * dt) * w1)

        return simulated_prices, volatility

    def simulate_future_prices_Merton(self, mu_sample, sigma_sample, mu_j_sample, sigma_j_sample, lamda_j_sample):
        # Initialize a 2D array for all simulations (each row is one simulation)
        simulated_prices = np.zeros((self.npath, self.nfuture + 1))

        simulated_prices[:, 0] = self.last_price  # Set initial price for all paths

        dt = 1 / 252  # Daily time step

        for ii in range(self.npath):
                for t in range(1, self.nfuture + 1):
                    z = np.random.normal(0, 1, 1)
                    NN = np.random.poisson(lam=int(lamda_j_sample * dt), size=1)[0]

                    #analytical
                    logJ_NN=np.random.normal(mu_j_sample, sigma_j_sample, NN)
                    j_first_term = -lamda_j_sample * (np.exp(mu_j_sample + 0.5 * sigma_j_sample**2) -1)
                    j_second_term = np.sum(logJ_NN)

                    #euler maruyama
                    J = np.prod(np.random.lognormal(mu_j_sample, sigma_j_sample, NN)) if NN > 0 else 1.0


                    if self.risk_neutral:
                        #Analytical
                        #simulated_prices[ii, t] = simulated_prices[ii, t - 1] * np.exp((self.rfr - j_first_term - 0.5 * sigma_sample**2) * dt + sigma_sample * np.sqrt(dt) * z + j_second_term)

                        #Euler-Maruyama Discretization
                        simulated_prices[ii, t] = simulated_prices[ii, t - 1] * (1 + (self.rfr - j_first_term) * dt + sigma_sample * np.sqrt(dt) * z + (J-1))
                    else:
                        #Analytical
                        #simulated_prices[ii, t] = simulated_prices[ii, t - 1] * np.exp((mu_sample - 0.5 * sigma_sample**2) * dt + sigma_sample * np.sqrt(dt) * z + j_second_term)

                        #Euler-Maruyama Discretization
                        simulated_prices[ii, t] = simulated_prices[ii, t - 1] * (1 + mu_sample * dt + sigma_sample * np.sqrt(dt) * z + J-1)

        return simulated_prices

    def simulate_future_prices_BSM(self, mu_sample, sigma_sample):
        # Initialize a 2D array for all simulations (each row is one simulation)
        simulated_prices = np.zeros((self.npath, self.nfuture + 1))

        simulated_prices[:, 0] = self.last_price  # Set initial price for all paths

        dt = 1 / 252  # Daily time step

        for ii in range(self.npath):
                for t in range(1, self.nfuture + 1):
                    z = np.random.normal(0, 1, 1)

                    if self.risk_neutral:
                        #Analytical
                        #simulated_prices[ii, t] = simulated_prices[ii, t - 1] * np.exp((self.rfr - 0.5 * sigma_sample**2) * dt + sigma_sample * np.sqrt(dt) * z)

                        #Euler-Maruyama Discretization
                        simulated_prices[ii, t] = simulated_prices[ii, t - 1] * (1 + self.rfr * dt + sigma_sample * np.sqrt(dt) * z)

                    else:
                        #Analytical
                        #simulated_prices[ii, t] = simulated_prices[ii, t - 1] * np.exp((mu_sample - 0.5 * sigma_sample**2) * dt + sigma_sample * np.sqrt(dt) * z)

                        #Euler-Maruyama Discretization
                        simulated_prices[ii, t] = simulated_prices[ii, t - 1] * (1 + mu_sample * dt + sigma_sample * np.sqrt(dt) * z)

        return simulated_prices

    def simulate_future_prices_btree(self, mu_sample, sigma_sample):
        # Initialize a 2D array for all simulations (each row is one simulation)
        simulated_prices = np.zeros((self.npath, self.nfuture + 1))

        simulated_prices[:, 0] = self.last_price  # Set initial price for all paths

        dt = 1 / 252  # Daily time step

        u = np.exp(sigma_sample * np.sqrt(dt))
        d = 1 / u

        for ii in range(self.npath):
            for t in range(1, self.nfuture + 1):
                # Compute risk-neutral or real-world probability via Cox-Rox-Rubinstein probability
                if self.risk_neutral:
                    p = (np.exp(self.rfr * dt) - d) / (u - d)
                else:
                    p = (np.exp(mu_sample * dt) - d) / (u - d)

                # Simulate a Bernoulli trial
                move = np.random.rand() < p  # True = up, False = down

                # Update the simulated price path
                simulated_prices[ii, t] = simulated_prices[ii, t - 1] * (u if move else d)

        return simulated_prices

    def estimate_continuation_value(self, S_t, Y, method='regression'):

        if method == 'regression':
            # Fit continuation value using regression --> Longstaff-Schwartz
            A = np.vstack([np.ones_like(S_t), S_t, S_t**2]).T
            coeffs = np.linalg.lstsq(A, Y, rcond=None)[0]
            return A @ coeffs

        elif method == 'random_forest':
            X = S_t.reshape(-1, 1)
            model = RandomForestRegressor(n_estimators=100, max_depth=5, random_state=42)
            model.fit(X, Y)
            return model.predict(X)

        elif method == 'nested_mc': #underprocess ---> needs to incorporate well
            continuation_values = np.zeros_like(S_t)
            for i, s in enumerate(S_t):
                inner_paths = simulate_paths(s, n_steps=len(Y), n_paths=n_inner, dt=dt)  # User-defined simulation
                payoff = np.maximum(inner_paths[:, -1] - strike, 0)  # Assuming a call option
                continuation_values[i] = np.mean(payoff) * np.exp(-rfr * len(Y) * dt)
            return continuation_values
        else:
            raise ValueError("Unknown method: choose 'regression' or 'nested_mc'")

    def price_vanilla_options(self, simulated_prices, strike, option_type, exercise_type, exercise_days):
        '''Pricing vanilla option using the Longstaff-Schwartz method.'''

        maturity_years = self.nfuture / 252  # Total time to maturity in years
        dt = 1 / 252
        discount = lambda t: np.exp(-self.rfr * dt * t)

        # Calculate the payoff matrix
        if option_type == 'call':
            payoffs = np.maximum(simulated_prices - strike, 0)
        else:
            payoffs = np.maximum(strike - simulated_prices, 0)

        option_price = payoffs.copy()

        if exercise_type == 'european':
             exercise_binary = np.zeros(self.nfuture + 1)
        elif exercise_type == 'american':
            exercise_binary = np.ones(self.nfuture + 1)
        elif exercise_type == 'bermudan':
            exercise_binary = np.zeros(self.nfuture + 1)
            for i in range(self.nfuture+1):
                if i in exercise_days:
                    exercise_binary[i] = 1

        # Initialize cashflows and exercise times
        cashflows = payoffs[:, -1].copy()
        cashflow_times = np.full_like(cashflows, self.nfuture, dtype=float)  # Time of cashflow (default: maturity)

        # Work backwards
        for t in range(self.nfuture - 1, -1, -1):
            in_the_money = payoffs[:, t] > 0
            if not np.any(in_the_money):
                continue

            if exercise_binary[t] == 1:
                S_t = simulated_prices[in_the_money, t]
                Y = cashflows[in_the_money] * discount(cashflow_times[in_the_money] - t)

                continuation_values = self.estimate_continuation_value(S_t, Y, method='regression')

                immediate_exercise = payoffs[in_the_money, t] > continuation_values
                exercise_indices = np.where(in_the_money)[0]

                # Update cashflows and times where early exercise occurs
                cashflows[exercise_indices[immediate_exercise]] = payoffs[exercise_indices[immediate_exercise], t]
                cashflow_times[exercise_indices[immediate_exercise]] = t

                option_price[:, t] = cashflows

        # Final pricing
        present_values = cashflows * discount(cashflow_times)
        cal_price = np.mean(present_values)

        return cal_price, option_price

    def price_asian_options(self, simulated_prices, strike, option_type, exercise_type, exercise_days):
        maturity_years = self.nfuture / 252  # Total time to maturity in years
        dt = 1 / 252
        discount = lambda t: np.exp(-self.rfr * dt * t)

        #airthmetic averaging
        #cumulative_sum = np.cumsum(simulated_prices, axis=1)
        #time_steps = np.arange(1, simulated_prices.shape[1] + 1)
        #Saverages = cumulative_sum / time_steps

        #geometric averaging
        log_prices = np.log(simulated_prices)
        log_prices = np.nan_to_num(log_prices, nan=-1e10)
        cumulative_logsum = np.cumsum(log_prices, axis=1)
        time_steps = np.arange(1, simulated_prices.shape[1] + 1)
        Saverages = np.exp(cumulative_logsum / time_steps)

        # Calculate the payoff matrix
        if option_type == 'call':
            payoffs = np.maximum(Saverages - strike, 0)
        else:
            payoffs = np.maximum(strike - Saverages, 0)

        option_price = payoffs.copy()

        if exercise_type == 'european':
            exercise_binary = np.zeros(self.nfuture + 1)
        elif exercise_type == 'american':
            exercise_binary = np.ones(self.nfuture + 1)
        elif exercise_type == 'bermudan':
            exercise_binary = np.zeros(self.nfuture + 1)
            for i in range(self.nfuture+1):
                if i in exercise_days:
                    exercise_binary[i] = 1

        # Initialize cashflows and exercise times
        cashflows = payoffs[:, -1].copy()
        cashflow_times = np.full_like(cashflows, self.nfuture, dtype=float)  # Time of cashflow (default: maturity)

        # Work backwards
        for t in range(self.nfuture - 1, -1, -1):
            in_the_money = payoffs[:, t] > 0
            if not np.any(in_the_money):
                continue

            if exercise_binary[t] == 1:
                S_t = simulated_prices[in_the_money, t]
                Y = cashflows[in_the_money] * discount(cashflow_times[in_the_money] - t)

                continuation_values = self.estimate_continuation_value(S_t, Y, method='regression')

                immediate_exercise = payoffs[in_the_money, t] > continuation_values
                exercise_indices = np.where(in_the_money)[0]

                # Update cashflows and times where early exercise occurs
                cashflows[exercise_indices[immediate_exercise]] = payoffs[exercise_indices[immediate_exercise], t]
                cashflow_times[exercise_indices[immediate_exercise]] = t

                option_price[:, t] = cashflows

        # Final pricing
        present_values = cashflows * discount(cashflow_times)
        cal_price = np.mean(present_values)

        return cal_price, option_price

    def compute_greeks(self, option_price, simulated_prices, strike, option_type, exercise_type, exercise_days, model_type, sigma_prior, args):
        method_name = "simulate_future_prices_" + model_type

        dS = 0.01 * self.last_price  # Perturbation in asset price
        dSigma = 0.01 * sigma_prior  # Perturbation in volatility
        dR = 0.0001  # Small change in risk-free rate
        dTday = 1       # One day for theta

        # ---------- DELTA ----------
        last_price_up = self.last_price + dS
        delta_class_up = Stochastic_prices_MC(self.df, self.nfuture, last_price_up ,self.npath, self.rfr, self.risk_neutral)
        simulate_method = getattr(delta_class_up , method_name)
        if model_type == 'Heston':
           delta_sim_prices_up, _ = simulate_method(*args)
        else:
            delta_sim_prices_up = simulate_method(*args)
        delta_price_up, *_ = delta_class_up.price_vanilla_options(delta_sim_prices_up, strike, option_type, exercise_type, exercise_days)

        last_price_down = self.last_price - dS
        delta_class_down = Stochastic_prices_MC(self.df, self.nfuture, last_price_down ,self.npath, self.rfr, self.risk_neutral)
        simulate_method = getattr(delta_class_down , method_name)
        if model_type == 'Heston':
           delta_sim_prices_down, _ = simulate_method(*args)
        else:
            delta_sim_prices_down = simulate_method(*args)
        delta_price_down, *_ = delta_class_down.price_vanilla_options(delta_sim_prices_down, strike, option_type, exercise_type, exercise_days)

        delta = (delta_price_up - delta_price_down) / (2 * dS)

        # ---------- GAMMA ----------
        last_price = self.last_price
        delta_class = Stochastic_prices_MC(self.df, self.nfuture, last_price ,self.npath, self.rfr, self.risk_neutral)
        simulate_method = getattr(delta_class , method_name)
        if model_type == 'Heston':
           delta_sim_prices, _ = simulate_method(*args)
        else:
            delta_sim_prices = simulate_method(*args)
        delta_price, *_ = delta_class.price_vanilla_options(delta_sim_prices, strike, option_type, exercise_type, exercise_days)

        gamma = (delta_price_up - 2 * delta_price + delta_price_down) / (dS ** 2)

        # ---------- VEGA ----------
        # Perturb volatility
        vega_class = Stochastic_prices_MC(self.df, self.nfuture, self.last_price, self.npath, self.rfr, self.risk_neutral)
        simulate_method = getattr(vega_class , method_name)

        sigma_up = sigma_prior + dSigma
        arg_list = list(args)
        arg_list[1] = sigma_up
        args_mod = tuple(arg_list)
        if model_type == 'Heston':
            vega_sim_price_up, _ = simulate_method(*args_mod)
        else:
            vega_sim_price_up = simulate_method(*args_mod)
        vega_price_up, *_ = vega_class.price_vanilla_options(vega_sim_price_up, strike, option_type, exercise_type, exercise_days)

        sigma_down = sigma_prior - dSigma
        arg_list = list(args)
        arg_list[1] = sigma_down
        args_mod = tuple(arg_list)
        if model_type == 'Heston':
            vega_sim_price_down, _ = simulate_method(*args_mod)
        else:
            vega_sim_price_down = simulate_method(*args_mod)
        vega_price_down, *_ = vega_class.price_vanilla_options(vega_sim_price_down, strike, option_type, exercise_type, exercise_days)

        vega = (vega_price_up - vega_price_down) / (2 * dSigma)

        # ---------- RHO ----------
        rfr_up = self.rfr + dR
        rho_class_up = Stochastic_prices_MC(self.df, self.nfuture, self.last_price ,self.npath, rfr_up, self.risk_neutral)
        simulate_method = getattr(rho_class_up , method_name)
        if model_type == 'Heston':
            rho_sim_prices_up, _ = simulate_method(*args)
        else:
            rho_sim_prices_up = simulate_method(*args)
        rho_price_up, *_ = rho_class_up.price_vanilla_options(rho_sim_prices_up, strike, option_type, exercise_type, exercise_days)

        rfr_down = self.rfr - dR
        rho_class_down = Stochastic_prices_MC(self.df, self.nfuture, self.last_price ,self.npath, rfr_down, self.risk_neutral)
        simulate_method = getattr(rho_class_down , method_name)
        if model_type == 'Heston':
            rho_sim_prices_down, _ = simulate_method(*args)
        else:
            rho_sim_prices_down = simulate_method(*args)
        rho_price_down, *_ = delta_class_down.price_vanilla_options(rho_sim_prices_down, strike, option_type, exercise_type, exercise_days)

        rho = (rho_price_up - rho_price_down) / (2 * dR)

        # ---------- THETA ----------
        # Approximate by shifting time to maturity (nfuture - 1)
        theta_class = Stochastic_prices_MC(self.df, self.nfuture, self.last_price ,self.npath, self.rfr, self.risk_neutral)
        simulate_method = getattr(theta_class , method_name)
        if model_type == 'Heston':
            theta_sim_prices, _ = simulate_method(*args)
        else:
            theta_sim_prices = simulate_method(*args)
        theta_price, *_ = theta_class.price_vanilla_options(theta_sim_prices, strike, option_type, exercise_type, exercise_days)

        nfuture_down = max(1, self.nfuture - dTday)
        theta_class_down = Stochastic_prices_MC(self.df, nfuture_down, self.last_price ,self.npath, self.rfr, self.risk_neutral)
        simulate_method = getattr(theta_class_down , method_name)
        if model_type == 'Heston':
            theta_sim_prices_down, _ = simulate_method(*args)
        else:
            theta_sim_prices_down = simulate_method(*args)
        theta_price_down, *_ = theta_class_down.price_vanilla_options(theta_sim_prices_down, strike, option_type, exercise_type, exercise_days[:nfuture_down])

        theta = (theta_price - theta_price_down) / (252 / dTday)

        return delta, gamma, theta, vega, rho

