In [None]:
import numpy as np
import matplotlib.pyplot as plt
from math import erf, exp, log, sqrt
import csv


def N(x: float) -> float:
    """
    Computes the CDF for the standard normal distribution

    :param x: Value to evaluate the CDF
    :return: The value of the cdf
    """
    return 0.5 * (1.0 + erf(x / sqrt(2.0)))

def bs_price_call(S0: float, K: float, r: float, sigma: float, T: float) -> float:
    """
    Calculates the analytical price of a European call option using the Black-Scholes formula.

    :param S0: Initial stock price at t=0
    :param K: Strike price
    :param r: Risk free Rate
    :param sigma: Volatility
    :param T: Time to maturity in years
    :return: Price of european option
    """
    # Handle zero division issue

    if T <= 0 or sigma <= 0:
        return max(S0 - K, 0.0) * exp(-r * T)
    vol = sigma * sqrt(T)
    d1 = (log(S0 / K) + (r + 0.5 * sigma**2) * T) / vol
    d2 = d1 - vol

    # Return price
    return S0 * N(d1) - K * exp(-r * T) * N(d2)


def payoff_call(S_t: np.ndarray, K: float):
    """
    Calculates the final payoff of a European Call Option

    :param S_t: Stock price at t=0
    :param K: Strike price
    :return: Final payoff array
    """
    return np.maximum(S_t - K, 0.0)


def euler_step_to_maturity(S0: float, dW: np.ndarray, dt: float, r: float, sigma: float):
    """
    Simulates asset price paths to maturity using the Euler Scheme

    :param S0: Initial stock price at t=0
    :param dW: Stock price at t=0
    :param dt: Time to maturity in years
    :param r: Risk free rate
    :param sigma: Volatility

    """
    s = np.full(dW.shape[0], S0, dtype=float)
    for j in range(dW.shape[1]):
        s += r * s * dt + sigma * s * dW[:, j]
    return np.maximum(s, 0.0)

def milstein_step_to_maturity(S0: float, dW: np.ndarray, dt: float, r: float, sigma: float):
    """
    Simulates asset price paths to maturity using the Milstein Scheme

    :param S0: Initial stock price at t=0
    :param dW: Stock price at t=0
    :param dt: Time to maturity in years
    :param r: Risk free rate
    :param sigma: Volatility
    :return: A 1D array of simulated stock prices at maturity

    """
    s = np.full(dW.shape[0], S0, dtype=float)
    for j in range(dW.shape[1]):
        s += r * s * dt + sigma * s * dW[:, j] + 0.5 * (sigma**2) * s * (dW[:, j]**2 - dt)
    return np.maximum(s, 0.0)

def rk_step_to_maturity(S0: float, dW: np.ndarray, dt: float, r: float, sigma: float):
    """
    Simulates asset price paths to maturity using a Runge-Kutta Scheme

    :param S0: Initial stock price at t=0
    :param dW: Stock price at t=0
    :param dt: Time to maturity in years
    :param r: Risk free rate
    :param sigma: Volatility
    :return: A 1D array of simulated stock prices at maturity
    """
    s = np.full(dW.shape[0], S0, dtype=float)
    for j in range(dW.shape[1]):
        # Predictor step using a full Milstein step for higher accuracy.
        s_pred = s + r * s * dt + sigma * s * dW[:, j] + 0.5 * (sigma**2) * s * (dW[:, j]**2 - dt)
        # Corrector step (averaging the drift term).
        s = s + 0.5 * (r * s + r * s_pred) * dt + sigma * s * dW[:, j] + 0.5 * (sigma**2) * s * (dW[:, j]**2 - dt)
    return np.maximum(s, 0.0)


def weak_order_convergence(path_simulator, S0:float, K:float, r:float, sigma:float, T:float, exact_price:float, Npaths:float=200000, seed=332093):
    """
    Performs a weak order convergence analysis for a given scheme, printing bias vs time step size

    :param path_simulator: SDE function to test
    :param S0: Initial stock price at t=0
    :param K: Strike price
    :param r: Risk free rate
    :param sigma: Volatility
    :param T: Time to maturity in years
    :param exact_price: Exact price
    :param Npaths: Number of paths to simulate at each resolution
    :param seed: Random seed
    :return: Converegence results and estimated weak order
    """
    rng = np.random.default_rng(seed)
    results = []

    # Use time step progression for loop
    steps_list = [2**i for i in range(12)]

    for n in steps_list:
        dt = T / n
        dW = rng.normal(0.0, sqrt(dt), size=(Npaths, n))
        S_approx = path_simulator(S0, dW, dt, r, sigma)
        P_approx = exp(-r * T) * payoff_call(S_approx, K)

        mean_approx = np.mean(P_approx)
        bias = mean_approx - exact_price
        se_mean = np.std(P_approx, ddof=1) / np.sqrt(Npaths)

        results.append({'dt': dt, 'abs_bias': abs(bias), 'se': se_mean})
        print(f"Steps: {n:3d}, dt: {dt:.6f}, bias: {bias:+.3e}, abs_bias: {abs(bias):.3e} ± {se_mean:.3e}")

    # Fit convergence rate using points where bias is statistically significant
    dts = np.array([r['dt'] for r in results])
    abs_biases = np.array([r['abs_bias'] for r in results])
    se_biases = np.array([r['se'] for r in results])

    # Mask noisy point where bias is too small
    mask = abs_biases > 2.0 * se_biases
    if np.sum(mask) >= 2:
        log_dts = np.log(dts[mask])
        log_biases = np.log(abs_biases[mask])
        # Linear Regression on log log plot to get slope/convergence rate
        alpha, _ = np.polyfit(log_dts, log_biases, 1)
        weak_order = alpha
        print(f"Weak order estimate: {weak_order:.3f}")
    else:
        # Fallback if not enough data point due to insignificance of points
        weak_order = 1.0
        print("Using theoretical weak order = 1.0")

    return results, weak_order


def mlmc_adaptive(path_simulator, S0: float, K: float, r: float, sigma: float, T: float, M: int, eps: float, seed: int = 42221):
    """
    This function implements an adaptive MLMC algorithm that automatically determines the number of levels and samples needed to achieve a target accuracy

    :param path_simulator: SDE function to test
    :param S0: Initial stock price at t=0
    :param K: Strike price
    :param r: Risk free rate
    :param sigma: Volatility
    :param T: Time to maturity in years
    :param M: Number of levels
    :param eps: RMSE tolerance
    :param seed: Random seed
    :return: MLMC results incl price,SE and level diagnostics

    """
    rng = np.random.default_rng(seed)
    L, L_max, N0 = 2, 8, 2000

    # Initialise arrays to store per-level statistics
    means, variances, Nl, costs = [np.zeros(L_max + 1) for _ in range(4)]

    #TODO: Typecast fixes for now
    Nl = Nl.astype(int)

    # Initial sampling for the first few levels to get first estimates of stats
    for l in range(L + 1):
        Nl[l] = max(100, N0 // (M ** l))
        Y, cost_per_sample = mlmc_sample_level(path_simulator, S0, K, r, sigma, T, l, Nl[l], rng, M)
        means[l] = np.mean(Y)
        variances[l] = np.var(Y, ddof=1)
        costs[l] = cost_per_sample
        print(f"Level {l}: N={Nl[l]}, mean={means[l]:+.4e}, var={variances[l]:.2e}")

    # Iteratively add levels and samples until the MSE target is met
    for iteration in range(15):
        # Check if we need more levels (bias check)
        if L < L_max and abs(means[L]) > eps / sqrt(2):
            L += 1
            Nl[L] = max(100, N0 // (M ** L))
            Y, cost_per_sample = mlmc_sample_level(path_simulator, S0, K, r, sigma, T, L, Nl[L], rng, M)
            means[L], variances[L], costs[L] = np.mean(Y), np.var(Y, ddof=1), cost_per_sample
            print(f"Added level {L}: N={Nl[L]}, mean={means[L]:+.4e}, var={variances[L]:.2e}")
            continue

        # Check if the desired accuracy (variance check) has been met
        total_var = sum(variances[l] / max(Nl[l], 1) for l in range(L + 1))
        if total_var < eps**2:
            print(f"Converged: total variance {total_var:.3e} < target {eps**2:.3e}")
            break

        # Calculate optimal sample allocation
        N_opt = np.zeros(L + 1)
        for l in range(L + 1):
            if variances[l] > 0 and costs[l] > 0:
                N_opt[l] = max(100, int(np.sqrt(variances[l] / costs[l]) *
                                 sum(np.sqrt(variances[:L+1] * costs[:L+1])) / eps**2))

        # Generate additional samples to meet the new optimal allocation
        samples_added = 0
        for l in range(L + 1):
            extra = max(0, int(N_opt[l] - Nl[l]))
            if extra > 0:
                Y_extra, _ = mlmc_sample_level(path_simulator, S0, K, r, sigma, T, l, extra, rng, M)
                # Update mean and variance with new samples
                total_samples = Nl[l] + extra
                combined_mean = (Nl[l] * means[l] + np.sum(Y_extra)) / total_samples
                if Nl[l] > 1 and extra > 1:
                    var_extra = np.var(Y_extra, ddof=1)
                    variances[l] = ((Nl[l] - 1) * variances[l] + (extra - 1) * var_extra +
                                  Nl[l] * (means[l] - combined_mean)**2 +
                                  extra * (np.mean(Y_extra) - combined_mean)**2) / (total_samples - 1)
                means[l] = combined_mean
                Nl[l] += extra
                samples_added += extra
                print(f"Level {l}: added {extra} samples, new N={Nl[l]}, mean={means[l]:+.4e}")

        if samples_added == 0:
            print("No additional samples needed - convergence achieved")
            break

    # Summation of values provide price,se and total cost
    price = sum(means[:L+1])
    se = sqrt(sum(variances[l] / max(Nl[l], 1) for l in range(L + 1)))
    total_cost = sum(Nl[l] * costs[l] for l in range(L + 1))

    return {'price': price, 'se': se, 'L': L, 'Nl': Nl[:L+1], 'means': means[:L+1], 'vars': variances[:L+1], 'cost': total_cost}

def mlmc_non_adaptive(path_simulator, S0: float, K: float, r: float, sigma: float, T: float, L_max: int, M: int, eps: float, N0: int = 1000, seed: int = 42221):
    """
    Implements a non-adaptive MLMC algorithm with a fixed number of levels, allocating
    samples based on variance and cost estimates.

    :param path_simulator: The SDE function to test (e.g., euler_step_to_maturity).
    :param S0: Initial stock price.
    :param K: Strike price.
    :param r: Risk-free rate.
    :param sigma: Volatility.
    :param T: Time to maturity.
    :param L_max: Maximum number of levels (L=0 to L_max).
    :param M: Refinement factor.
    :param eps: Target RMSE tolerance.
    :param N0: Initial number of samples for the estimation phase.
    :param seed: Random seed.
    :return: A dictionary of MLMC results.
    """
    rng = np.random.default_rng(seed)
    # Initial sampling to estimate variances and costs
    means = np.zeros(L_max + 1)
    variances = np.zeros(L_max + 1)
    costs = np.zeros(L_max + 1)

    print("--- Initial Sampling to Estimate Variances and Costs ---")
    for l in range(L_max + 1):
        Y, cost_per_sample = mlmc_sample_level(path_simulator, S0, K, r, sigma, T, l, N0, rng, M)
        means[l] = np.mean(Y)
        variances[l] = np.var(Y, ddof=1)
        costs[l] = cost_per_sample
        print(f"Level {l}: Est. Var = {variances[l]:.3e}, Est. Cost = {costs[l]:.0f}")

    # Calculate optimal sample allocation
    sqrt_cost = np.sqrt(costs)
    sqrt_var_div_cost = np.sqrt(variances / costs)

    # Calculate the total computational cost C for the target RMSE epsilon
    C = (sum(sqrt_var_div_cost) * sum(np.sqrt(variances) * sqrt_cost)) / eps**2
    Nl_opt = np.zeros(L_max + 1, dtype=int)
    for l in range(L_max + 1):
        Nl_opt[l] = max(100, int(C * sqrt_var_div_cost[l] / sum(sqrt_var_div_cost)))

    print("\n--- Optimal Sample Allocation ---")
    print(f"Optimal samples per level: {Nl_opt}")

    final_means = np.zeros(L_max + 1)
    final_variances = np.zeros(L_max + 1)
    total_cost = 0

    for l in range(L_max + 1):
        Y, cost_per_sample = mlmc_sample_level(path_simulator, S0, K, r, sigma, T, l, Nl_opt[l], rng, M)
        final_means[l] = np.mean(Y)
        final_variances[l] = np.var(Y, ddof=1)
        total_cost += Nl_opt[l] * cost_per_sample
        print(f"Level {l}: N={Nl_opt[l]}, Mean = {final_means[l]:+.4e}")

    # Final calculation of price and standard error
    price = sum(final_means)
    se = sqrt(sum(final_variances[l] / Nl_opt[l] for l in range(L_max + 1)))

    return {'price': price, 'se': se, 'L': L_max, 'Nl': Nl_opt, 'means': final_means, 'vars': final_variances, 'cost': total_cost}


def mlmc_sample_level(path_simulator, S0:float, K:float, r:float, sigma:float, T:float, l:float, N_l:float, rng:float, M:float):
    """
    Generates samples for a single MLMC level and returns the cost

    :param path_simulator: SDE function to test
    :param S0: initial price
    :param K: number of samples
    :param r: risk-free interest rate
    :param sigma: volatility
    :param T: time to maturity
    :param l: level
    :param N_l: number of samples to generate for this level
    :param rng: random number generator
    :param M: Refinement factor

    :returns: Tuple of generated samples and cost per sample

    """
    n_fine = M**l
    dt_f = T / n_fine
    cost = n_fine

    # Generate a single set of random increments for the finest grid.
    dW_f = rng.normal(0.0, sqrt(dt_f), size=(N_l, n_fine))

    if l == 0:
        Sf = path_simulator(S0, dW_f, dt_f, r, sigma)
        Y = exp(-r * T) * payoff_call(Sf, K)
    else:
        # Generate increments of coarse by summing fine path increments
        dW_c = dW_f.reshape(N_l, n_fine // M, M).sum(axis=2)
        dt_c = T / (n_fine // M)
        Sf = path_simulator(S0, dW_f, dt_f, r, sigma)
        Sc = path_simulator(S0, dW_c, dt_c, r, sigma)
        Y = exp(-r * T) * (payoff_call(Sf, K) - payoff_call(Sc, K))
    return Y, cost

def plot_combined_weak_convergence(results_dict):
    """Plots the weak convergence results for all schemes on a single log-log plot

    :param results_dict: Dictionary where keys are schme names and values are list of dict containing conv data
    """
    plt.figure(figsize=(10, 8))
    for name, results_list in results_dict.items():
        dts = np.array([r['dt'] for r in results_list])
        abs_biases = np.array([r['abs_bias'] for r in results_list])
        plt.loglog(dts, abs_biases, 'o-', label=f'{name} Scheme', linewidth=2, markersize=8)

    # Use the first scheme's dt for reference line scaling
    first_key = list(results_dict.keys())[0]
    ref_dt = np.array([results_dict[first_key][0]['dt'], results_dict[first_key][-1]['dt']])

    plt.loglog(ref_dt, 0.1 * ref_dt, 'k--', label='Reference O($\Delta$t)', alpha=0.7)
    plt.loglog(ref_dt, 0.01 * ref_dt**1.5, 'k:', label='Reference O($\Delta$t$^{1.5}$)', alpha=0.7)

    plt.xlabel('Time Step Size ($\Delta$t)', fontsize=14)
    plt.ylabel('Absolute Bias |Error|', fontsize=14)
    plt.title('Weak Convergence Comparison', fontsize=16)
    plt.grid(True, which="both", ls="--", alpha=0.5)
    plt.legend(fontsize=12)
    plt.gca().invert_xaxis()
    plt.tight_layout()
    plt.show()




if __name__ == "__main__":
    # Parameters
    S0, K, r, sigma, T = 1, 1, 0.05, 0.20, 1.0
    M = 4
    exact_price = bs_price_call(S0, K, r, sigma, T)
    print(f"Black–Scholes call price: {exact_price:.6f}\n")

    schemes = {
        "Euler": euler_step_to_maturity,
        "Milstein": milstein_step_to_maturity,
        "SRK": rk_step_to_maturity
    }

    # Weak Order Convergence Analysis
    print("=== Weak Order Convergence Analysis ===")
    weak_results_all = {}
    weak_orders = {}
    for name, simulator in schemes.items():
        print(f"\n--- Analysing {name} Scheme ---")
        results, order = weak_order_convergence(simulator, S0, K, r, sigma, T, exact_price, seed=21541254)
        weak_results_all[name] = results
        weak_orders[name] = order

        # Save weak convergence data for this scheme
        conv_filename = f"{name.lower()}_weak_convergence.dat"
        with open(conv_filename, 'w', newline='') as f:
            writer = csv.writer(f, delimiter=' ')
            writer.writerow(['dt', 'abs_bias'])
            for res in results:
                writer.writerow([res['dt'], res['abs_bias']])
        print(f"Saved weak convergence data to '{conv_filename}'")

    print(f"\n=== Weak Order Summary ===")
    for name, order in weak_orders.items():
        print(f"{name} weak order: {order:.3f}")

    plot_combined_weak_convergence(weak_results_all)

    # MLMC Analysis (Non-Adaptive and Adaptive)
    print("\n\n=== MLMC Analysis ===")
    # Initialise the results dictionary ONCE
    mlmc_results_all = {}

    # A single, clean loop for all MLMC analysis
    for name, simulator in schemes.items():
        print(f"\n--- Running MLMC for {name} Scheme ---")

        # Run Non-Adaptive MLMC
        print("\n--- Non-Adaptive MLMC ---")
        non_adaptive_results = mlmc_non_adaptive(simulator, S0, K, r, sigma, T, L_max=8, M=M, eps=0.001)

        # Run Adaptive MLMC
        print("\n--- Adaptive MLMC ---")
        adaptive_results = mlmc_adaptive(simulator, S0, K, r, sigma, T, M=M, eps=0.001)

        # Store BOTH results in the correct nested structure
        mlmc_results_all[name] = {
            'nonadaptive': non_adaptive_results,
            'adaptive': adaptive_results
        }

        # Save MLMC diagnostics data for the adaptive run
        diag_filename = f"{name.lower()}_mlmc_diagnostics.dat"
        with open(diag_filename, 'w', newline='') as f:
            writer = csv.writer(f, delimiter=' ')
            writer.writerow(['l', 'logM_var', 'logM_abs_mean', 'Nl'])
            for l_val in range(adaptive_results['L'] + 1):
                # Add a small epsilon to avoid log(0)
                logM_var = np.log(adaptive_results['vars'][l_val] + 1e-12) / np.log(M)
                logM_abs_mean = np.log(abs(adaptive_results['means'][l_val]) + 1e-12) / np.log(M)
                writer.writerow([l_val, logM_var, logM_abs_mean, adaptive_results['Nl'][l_val]])
        print(f"Saved MLMC diagnostics data to '{diag_filename}'")

    # Final Summary of MLMC results
    print(f"\n\n=== Final MLMC Results Summary ===")
    print(f"Exact price: {exact_price:.6f}")
    for name, results in mlmc_results_all.items():
        print(f"\n--- {name} Scheme ---")
        std_res = results['nonadaptive']
        adt_res = results['adaptive']
        std_norm_err = abs(std_res['price'] - exact_price) / exact_price
        adt_norm_err = abs(adt_res['price'] - exact_price) / exact_price

        print(f"Non Adaptive: Price={std_res['price']:.6f} ± {std_res['se']:.6f}, Cost={std_res['cost']:.0f}, NormErr={std_norm_err:.4f}")
        print(f"Adaptive: Price={adt_res['price']:.6f} ± {adt_res['se']:.6f}, Cost={adt_res['cost']:.0f}, NormErr={adt_norm_err:.4f}")
