In [11]:
import numpy as np

def tensor_product(*arrays):
    """
    Compute the tensor product of an arbitrary number of arrays.

    Args:
        *arrays: A variable number of NumPy arrays.

    Returns:
        A NumPy array representing the tensor product.
    """

    if not arrays:
        return np.array(1)  # Scalar identity for empty product

    result = arrays[0]
    for arr in arrays[1:]:
        result = np.kron(result, arr)
    return result

# Example usage:
a = np.array([1, 2])
b = np.array([3, 4])
c = np.array([5, 6])

tp = tensor_product(a, b, c)
print(tp)

[15 18 20 24 30 36 40 48]


In [12]:
from typing import List, TypeVar

T = TypeVar('T')

def shuffle_product(u: List[T], v: List[T]) -> List[List[T]]:
    """
    Compute the shuffle product of two lists.

    Args:
        u: The first list.
        v: The second list.

    Returns:
        A list of lists, representing all possible shuffles.
    """

    if not u:
        return [v]
    if not v:
        return [u]

    shuffles = []
    for w in shuffle_product(u[1:], v):
        shuffles.append([u[0]] + w)
    for w in shuffle_product(u, v[1:]):
        shuffles.append([v[0]] + w)
    return shuffles

# Example usage:
u = [1, 2]
v = [3, 4]

shuffles = shuffle_product(u, v)
for s in shuffles:
    print(s)

[1, 2, 3, 4]
[1, 3, 2, 4]
[1, 3, 4, 2]
[3, 1, 2, 4]
[3, 1, 4, 2]
[3, 4, 1, 2]


In [13]:
from scipy.integrate import quad, solve_ivp
from typing import Callable, List, Tuple, Dict
import logging

# Configure logging (you can adjust the level as needed)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def stratonovich_integral(func: Callable[[float], float], a: float, b: float, n: int) -> float:
    """
    Approximates the Stratonovich integral of a 1D function using the midpoint rule.

    Args:
        func: The function to integrate.
        a: The start of the integration interval.
        b: The end of the integration interval.
        n: The number of subintervals.

    Returns:
        The approximate value of the Stratonovich integral.
    """

    h = (b - a) / n
    integral = 0.0
    t = np.linspace(a + h / 2, b - h / 2, n)
    integral = np.sum(func(t)) * h
    return integral

def iterated_integral(path: np.ndarray, indices: List[int], a: float, b: float, n: int) -> float:
    """
    Approximates an iterated integral of a path using Stratonovich integration.

    Args:
        path: A 2D NumPy array representing the path, where path[0] is time and path[1] is the value.
        indices: A list of indices indicating which components of the path to integrate.
                 For 1D path, indices should be a list of 1s.
        a: The start of the integration interval.
        b: The end of the integration interval.
        n: The number of subintervals for each integration.

    Returns:
        The approximate value of the iterated integral.
    """

    if not indices:
        return 1.0  # Base case: empty integral is 1

    def inner_integral(t: float) -> float:
        current_indices = indices[:-1]
        return iterated_integral(path, current_indices, a, t, n)

    func_to_integrate = lambda t: inner_integral(t) * np.interp(t, path[:, 0], path[:, indices[-1]])
    return stratonovich_integral(func_to_integrate, a, b, n)

def path_signature(path: np.ndarray, order: int, a: float, b: float, n: int) -> List[float]:
    """
    Computes the signature of a 1D path up to a given order.

    Args:
        path: A 2D NumPy array representing the path, where path[0] is time and path[1] is the value.
        order: The maximum order of the signature to compute.
        a: The start of the path interval.
        b: The end of the path interval.
        n: The number of subintervals for the iterated integrals.

    Returns:
        A list representing the signature of the path.
    """

    signature = [1.0]  # 0th order term
    for k in range(1, order + 1):
        for index_tuple in generate_index_tuples(1, k):  # For 1D path, indices are always 1
            indices = list(index_tuple)
            signature.append(iterated_integral(path, indices, a, b, n))

    return signature

def generate_index_tuples(dim: int, length: int) -> List[Tuple[int, ...]]:
    """
    Generates all tuples of indices for iterated integrals.  #helper function

    Args:
        dim: The dimension of the path.
        length: The length of the index tuples.

    Returns:
        A list of tuples.
    """
    if length == 0:
        return [()]
    if dim == 0:
        return []

    tuples = []
    for i in range(1, dim + 1):
        for sub_tuple in generate_index_tuples(dim, length - 1):
            tuples.append((i,) + sub_tuple)
    return tuples

# --- Error Control ---
def adaptive_signature(path: np.ndarray, order: int, a: float, b: float, tolerance: float) -> List[float]:
    """
    Computes the signature with adaptive error control.

    Args:
        path: The path.
        order: The maximum order.
        a: Start of interval.
        b: End of interval.
        tolerance: Desired error tolerance.

    Returns:
        The signature.
    """

    signature = [1.0]
    for k in range(1, order + 1):
        for index_tuple in generate_index_tuples(1, k):  #1D path
            indices = list(index_tuple)
            signature.append(adaptive_iterated_integral(path, indices, a, b, tolerance))
    return signature

def adaptive_iterated_integral(path: np.ndarray, indices: List[int], a: float, b: float, tolerance: float) -> float:
    """
    Adaptive iterated integral calculation (simplified for 1D).

    Args:
        path: The path.
        indices: Indices to integrate.
        a: Start of interval.
        b: End of interval.
        tolerance: Error tolerance.

    Returns:
        The integral value.
    """

    n = 10  # Initial number of intervals
    integral1 = iterated_integral(path, indices, a, b, n)
    n *= 2
    integral2 = iterated_integral(path, indices, a, b, n)

    error_estimate = abs(integral2 - integral1)
    if error_estimate < tolerance:
        return integral2
    else:
        mid = (a + b) / 2
        left_integral = adaptive_iterated_integral(path, indices, a, mid, tolerance / 2)
        right_integral = adaptive_iterated_integral(path, indices, mid, b, tolerance / 2)
        return left_integral + right_integral

# --- Optimization (Basic Example: Chunking for Path) ---
def batched_signature(path: np.ndarray, order: int, a: float, b: float, n: int, chunk_size: int) -> List[List[float]]:
    """
    Computes the signature in batches to optimize memory (simplified).

    Args:
        path: The path.
        order: Max order.
        a: Start.
        b: End.
        n: Subintervals.
        chunk_size: Size of path chunks.

    Returns:
        Batched signature.
    """

    num_chunks = int(np.ceil(len(path) / chunk_size))
    batched_results = []

    for i in range(num_chunks):
        path_chunk = path[i * chunk_size: (i + 1) * chunk_size]
        batched_results.append(path_signature(path_chunk, order, a, b, n))  # Recalculate signature for each chunk
    return batched_results

if __name__ == '__main__':
    # Example Usage (1D Path)
    path = np.array([
        [0.0, 0.0],
        [0.2, 0.5],
        [0.4, 1.0],
        [0.6, 0.8],
        [0.8, 0.2],
        [1.0, 0.0]
    ])
    a = 0.0
    b = 1.0
    order = 3
    n = 100

    logging.info("--- Basic Signature ---")
    signature = path_signature(path, order, a, b, n)
    print("Signature:", signature)

    logging.info("--- Adaptive Signature ---")
    adaptive_sig = adaptive_signature(path, order, a, b, 0.01)
    print("Adaptive Signature:", adaptive_sig)

    logging.info("--- Batched Signature ---")
    batched_sig = batched_signature(path, order, a, b, n, chunk_size=3)
    print("Batched Signature:", batched_sig)

2025-04-05 15:36:13,267 - INFO - --- Basic Signature ---
2025-04-05 15:36:13,390 - INFO - --- Adaptive Signature ---


Signature: [1.0, 0.5, 10.313963524500002, 10027.589089905876]


2025-04-05 15:36:14,352 - INFO - --- Batched Signature ---


Adaptive Signature: [1.0, 0.5000000000000001, 0.012723626502111833, 0.005641040930104442]
Batched Signature: [[1.0, 0.8, 24.459064345921874, 28022.60339039236], [1.0, 0.6, 17.589720768895006, 31767.279438135472]]


In [14]:
from itertools import product

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


# --- Tensor Algebra Operations ---
def tensor_product(*arrays: np.ndarray) -> np.ndarray:
    """Computes the tensor product (Kronecker product)."""
    if not arrays:
        return np.array([1.0])  # Identity element
    result = arrays[0]
    for arr in arrays[1:]:
        result = np.kron(result, arr)
    return result


def shuffle_product(
    u: Dict[Tuple[int, ...], float], v: Dict[Tuple[int, ...], float], max_order: int
) -> Dict[Tuple[int, ...], float]:
    """Computes the shuffle product (truncated)."""

    result: Dict[Tuple[int, ...], float] = {}

    def merge(p: Tuple[int, ...], q: Tuple[int, ...]) -> List[Tuple[int, ...]]:
        if not p:
            return [q]
        if not q:
            return [p]
        return [(p[0],) + s for s in merge(p[1:], q)] + [(q[0],) + s for s in merge(p, q[1:])]

    for p_key, p_val in u.items():
        for q_key, q_val in v.items():
            merged_keys = merge(p_key, q_key)
            for m_key in merged_keys:
                if len(m_key) <= max_order:
                    result[m_key] = result.get(m_key, 0.0) + p_val * q_val
    return result


def projection(
    tensor: Dict[Tuple[int, ...], float], u: Tuple[int, ...], max_order: int
) -> Dict[Tuple[int, ...], float]:
    """Computes the projection operator."""

    result: Dict[Tuple[int, ...], float] = {}
    for key, val in tensor.items():
        if key[: len(u)] == u and len(key[len(u) :]) <= max_order:
            result[key[len(u) :]] = result.get(key[len(u) :], 0.0) + val
    return result


def tensor_to_vector(
    tensor: Dict[Tuple[int, ...], float], max_order: int, dim: int
) -> List[float]:
    """Converts a tensor to a vector (for ODE solver)."""

    vector: List[float] = []
    all_keys = generate_all_keys(dim, max_order)  # Get all possible keys
    for key in all_keys:  # Iterate through all possible keys
        vector.append(tensor.get(key, 0.0))  # Append value or 0.0 if key is absent
    return vector


def vector_to_tensor(
    vector: List[float], max_order: int, dim: int
) -> Dict[Tuple[int, ...], float]:
    """Converts a vector back to a tensor."""

    tensor: Dict[Tuple[int, ...], float] = {}
    all_keys = generate_all_keys(dim, max_order)
    if len(vector) != len(all_keys):
        raise ValueError(
            "Vector length does not match the expected tensor size."
            f"Expected {len(all_keys)}, got {len(vector)}"
        )
    for i, key in enumerate(all_keys):
        tensor[key] = vector[i]
    return tensor


def generate_all_keys(dim: int, max_order: int) -> List[Tuple[int, ...]]:
    """Generates all possible keys (indices) for the tensor."""

    all_keys: List[Tuple[int, ...]] = [()]  # Include the empty tuple
    for order in range(1, max_order + 1):
        for key in product(range(1, dim + 1), repeat=order):
            all_keys.append(key)
    return sorted(all_keys, key=lambda k: (len(k), k))  # Sort by length then lexicographically


# --- Riccati Equation Solver ---
def riccati_rhs(
    t: float,
    y: List[float],
    sigma_t: Dict[Tuple[int, ...], float],
    f_t: float,
    g_t: float,
    rho: float,
    max_order: int,
    dim: int,
) -> List[float]:
    """RHS of the Riccati equation."""

    psi_t = vector_to_tensor(y, max_order, dim)
    psi_t_1 = projection(psi_t, (1,), max_order)
    psi_t_2 = projection(psi_t, (2,), max_order)
    psi_t_22 = projection(psi_t, (2, 2), max_order)

    term1 = shuffle_product(psi_t_2, psi_t_2, max_order)
    term2 = shuffle_product(sigma_t, psi_t_2, max_order)
    sigma_t_2 = shuffle_product(sigma_t, sigma_t, max_order)

    rhs_tensor: Dict[Tuple[int, ...], float] = {}
    # FIXME: Implement the full Riccati equation from the paper
    # This is a placeholder!
    rhs_tensor = {
        k: (
            0.5 * term1.get(k, 0.0)
            + rho * f_t * term2.get(k, 0.0)
            + 0.5 * psi_t_22.get(k, 0.0)
            + psi_t_1.get(k, 0.0)
            + (0.5 * (f_t**2 - f_t) + g_t) * sigma_t_2.get(k, 0.0)
        )
        for k in set(term1) | set(term2) | set(psi_t_22) | set(psi_t_1) | set(sigma_t_2)
    }

    return tensor_to_vector(rhs_tensor, max_order, dim)


def solve_riccati_equation(
    sigma: Callable[[float], Dict[Tuple[int, ...], float]],
    f: Callable[[float], float],
    g: Callable[[float], float],
    rho: float,
    max_order: int,
    dim: int,
    T: float,
    initial_condition: Dict[Tuple[int, ...], float],
    adaptive: bool = True,
) -> Callable[[float], Dict[Tuple[int, ...], float]]:
    """Solves the Riccati equation."""

    def rhs_wrapper(t: float, y: List[float]) -> List[float]:
        return riccati_rhs(t, y, sigma(t), f(t), g(t), rho, max_order, dim)

    y0 = tensor_to_vector(initial_condition, max_order, dim)

    sol = solve_ivp(
        rhs_wrapper,
        (T, 0),
        y0,
        dense_output=True,
        method="RK45" if adaptive else "RK4",  # RK45 is adaptive
        rtol=1e-5,
        atol=1e-7,
    )  # Adjust tolerances as needed

    if sol.status != 0:
        logging.warning(f"ODE solver did not converge: {sol.message}")

    def solution_as_tensor(t: float) -> Dict[Tuple[int, ...], float]:
        return vector_to_tensor(sol.sol(t).tolist(), max_order, dim)

    return solution_as_tensor


# --- Example Usage ---
if __name__ == "__main__":
    max_order = 2
    dim = 2  # Time and W_t
    T = 1.0

    # Example: Time-dependent sigma (simplified)
    def sigma_t(t: float) -> Dict[Tuple[int, ...], float]:
        return {
            (): 0.1 + 0.01 * t,
            (1,): 0.2,
            (2,): 0.3,
            (1, 1): 0.05 * t,
            (1, 2): 0.02,
            (2, 1): 0.03,
            (2, 2): 0.01,
        }

    def f_t(t: float) -> float:
        return 1.0  # Constant for simplicity

    def g_t(t: float) -> float:
        return 0.0

    initial_condition: Dict[Tuple[int, ...], float] = {
        k: 0.0 for k in generate_all_keys(dim, max_order)
    }
    initial_condition[()] = 0.0

    solution = solve_riccati_equation(
        sigma_t,
        f_t,
        g_t,
        rho=0.5,
        max_order=max_order,
        dim=dim,
        T=T,
        initial_condition=initial_condition,
        adaptive=True,
    )

    times = np.linspace(0, T, 10)
    for t in times:
        print(f"Solution at t={t}:")
        print(solution(t))

Solution at t=0.0:
{(): 0.0, (1,): 0.0, (2,): 0.0, (1, 1): 0.0, (1, 2): 0.0, (2, 1): 0.0, (2, 2): 0.0}
Solution at t=0.1111111111111111:
{(): 0.0, (1,): 0.0, (2,): 0.0, (1, 1): 0.0, (1, 2): 0.0, (2, 1): 0.0, (2, 2): 0.0}
Solution at t=0.2222222222222222:
{(): 0.0, (1,): 0.0, (2,): 0.0, (1, 1): 0.0, (1, 2): 0.0, (2, 1): 0.0, (2, 2): 0.0}
Solution at t=0.3333333333333333:
{(): 0.0, (1,): 0.0, (2,): 0.0, (1, 1): 0.0, (1, 2): 0.0, (2, 1): 0.0, (2, 2): 0.0}
Solution at t=0.4444444444444444:
{(): 0.0, (1,): 0.0, (2,): 0.0, (1, 1): 0.0, (1, 2): 0.0, (2, 1): 0.0, (2, 2): 0.0}
Solution at t=0.5555555555555556:
{(): 0.0, (1,): 0.0, (2,): 0.0, (1, 1): 0.0, (1, 2): 0.0, (2, 1): 0.0, (2, 2): 0.0}
Solution at t=0.6666666666666666:
{(): 0.0, (1,): 0.0, (2,): 0.0, (1, 1): 0.0, (1, 2): 0.0, (2, 1): 0.0, (2, 2): 0.0}
Solution at t=0.7777777777777777:
{(): 0.0, (1,): 0.0, (2,): 0.0, (1, 1): 0.0, (1, 2): 0.0, (2, 1): 0.0, (2, 2): 0.0}
Solution at t=0.8888888888888888:
{(): 0.0, (1,): 0.0, (2,): 0.0, (1, 1

In [15]:
from scipy.special import genlaguerre
import cmath
from numba import njit, prange
from numpy.polynomial import laguerre

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


def gauss_laguerre_quadrature(n: int) -> Tuple[np.ndarray, np.ndarray]:
    """
    Computes the Gauss-Laguerre quadrature nodes and weights.

    Args:
        n: The number of nodes.

    Returns:
        A tuple containing the nodes and weights.
    """

    nodes, weights = laguerre.laggauss(n)  # Corrected function name
    return nodes, weights


def european_call_price(
    S_t: float,
    K: float,
    T: float,
    t: float,
    char_func: Callable[[float], complex],
    sigma_bs: float,
    quadrature_nodes: int,
) -> float:
    """
    Calculates the European call option price using Fourier inversion with Gauss-Laguerre quadrature and control variate.

    Args:
        S_t: Current stock price.
        K: Strike price.
        T: Maturity time.
        t: Current time.
        char_func: The characteristic function of the log-price.
        sigma_bs: Black-Scholes volatility for the control variate.
        quadrature_nodes: Number of nodes for Gauss-Laguerre quadrature.

    Returns:
        The calculated call option price.
    """

    nodes, weights = gauss_laguerre_quadrature(quadrature_nodes)
    integrand_real = np.zeros_like(nodes, dtype=complex)

    u = nodes - 0.5j
    integrand_real = np.exp(1j * u * np.log(K)) * char_func(u) / (1j * u + u**2)
    integral_value = np.sum(weights * integrand_real).real

    # Black-Scholes control variate
    d1 = (np.log(S_t / K) + (sigma_bs**2 / 2) * (T - t)) / (sigma_bs * np.sqrt(T - t))
    d2 = d1 - sigma_bs * np.sqrt(T - t)
    call_bs = S_t * 0.5 * (1 + np.math.erf(d1 / np.sqrt(2))) - K * 0.5 * (1 + np.math.erf(d2 / np.sqrt(2)))

    return call_bs - (K / np.pi) * integral_value


# --- Example Usage ---
if __name__ == "__main__":
    # Parameters
    S_0 = 100.0
    K = 110.0
    T = 1.0
    t = 0.0
    sigma_bs = 0.2
    quadrature_nodes = 64

    # Example Characteristic Function
    def example_char_func(u: float) -> complex:
        return np.exp(1j * u * np.log(S_0) - 0.5 * u**2 * (T - t) * sigma_bs**2)

    call_price = european_call_price(S_0, K, T, t, example_char_func, sigma_bs, quadrature_nodes)
    print(f"European Call Price: {call_price}")

European Call Price: -273.1989139494502


  call_bs = S_t * 0.5 * (1 + np.math.erf(d1 / np.sqrt(2))) - K * 0.5 * (1 + np.math.erf(d2 / np.sqrt(2)))


In [16]:
from scipy.optimize import differential_evolution


# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


def calibrate_signature_model(
    market_prices: np.ndarray,
    strikes: np.ndarray,
    maturities: np.ndarray,
    price_func: Callable[[float, float, float, float, List[float]], float],  # Model pricing function
    initial_guess: List[float],
    bounds: List[tuple],
    popsize: int = 15,
    maxiter: int = 1000,
    tol: float = 1e-5,
) -> dict:
    """
    Calibrates the signature volatility model to market option prices.

    Args:
        market_prices: Array of market option prices.
        strikes: Array of strike prices corresponding to market prices.
        maturities: Array of maturities corresponding to market prices.
        price_func: Function that calculates the option price from the model.
                    Its arguments should be: S_0, K, T, t, model_params
        initial_guess: Initial guess for the model parameters.
        bounds: Bounds for the model parameters (list of (min, max) tuples).
        popsize: Population size for differential evolution.
        maxiter: Maximum iterations for differential evolution.
        tol: Tolerance for convergence.

    Returns:
        A dictionary containing the calibration results:
        - 'optimal_params': The calibrated model parameters.
        - 'min_error': The minimum error achieved.
        - 'success': Boolean indicating if the optimization was successful.
        - 'message': Optimization result message.
    """

    def objective_function(model_params: List[float]) -> float:
        """Calculates the sum of squared errors between model and market prices."""
        total_error = 0.0
        for i in range(len(market_prices)):
            model_price = price_func(100.0, strikes[i], maturities[i], 0.0, model_params)  # Assuming S_0=100 and t=0
            total_error += (model_price - market_prices[i]) ** 2
        return total_error

    result = differential_evolution(
        objective_function,
        bounds,
        popsize=popsize,
        maxiter=maxiter,
        tol=tol,
        disp=True,
    )

    return {
        "optimal_params": result.x,
        "min_error": result.fun,
        "success": result.success,
        "message": result.message,
    }


# --- Example Usage ---
if __name__ == "__main__":
    # Example Market Data (Replace with your actual market data)
    market_prices = np.array([10.50, 5.20, 1.80, 15.10, 8.90, 3.50])
    strikes = np.array([90.0, 100.0, 110.0, 95.0, 105.0, 115.0])
    maturities = np.array([0.25, 0.25, 0.25, 0.5, 0.5, 0.5])

    # Example Pricing Function (Replace with your actual model pricing function)
    def example_model_price(S_0: float, K: float, T: float, t: float, model_params: List[float]) -> float:
        """
        Placeholder pricing function. Replace with your actual signature model pricing.
        For example, a simplified Black-Scholes for demonstration.
        """
        sigma = model_params[0]  # Example: volatility as the parameter
        d1 = (np.log(S_0 / K) + (sigma**2 / 2) * (T - t)) / (sigma * np.sqrt(T - t))
        d2 = d1 - sigma * np.sqrt(T - t)
        return S_0 * 0.5 * (1 + np.math.erf(d1 / np.sqrt(2))) - K * 0.5 * (1 + np.math.erf(d2 / np.sqrt(2)))

    # Initial Guess and Bounds (Adjust based on your model parameters)
    initial_guess = [0.2]  # Example: initial guess for volatility
    bounds = [(0.01, 0.5)]  # Example: bounds for volatility

    # Calibrate
    calibration_results = calibrate_signature_model(
        market_prices,
        strikes,
        maturities,
        example_model_price,
        initial_guess,
        bounds,
        popsize=20,
        maxiter=200,
        tol=1e-6
    )

    # Print Results
    logging.info("Calibration Results:")
    logging.info(f"Optimal Parameters: {calibration_results['optimal_params']}")
    logging.info(f"Minimum Error: {calibration_results['min_error']}")
    logging.info(f"Success: {calibration_results['success']}")
    logging.info(f"Message: {calibration_results['message']}")

differential_evolution step 1: f(x)= 21.2358

  return S_0 * 0.5 * (1 + np.math.erf(d1 / np.sqrt(2))) - K * 0.5 * (1 + np.math.erf(d2 / np.sqrt(2)))



differential_evolution step 2: f(x)= 21.1908
differential_evolution step 3: f(x)= 21.1907
differential_evolution step 4: f(x)= 21.1861
differential_evolution step 5: f(x)= 21.1854
differential_evolution step 6: f(x)= 21.1854
differential_evolution step 7: f(x)= 21.1854
differential_evolution step 8: f(x)= 21.1854
differential_evolution step 9: f(x)= 21.1854


2025-04-05 15:36:15,122 - INFO - Calibration Results:
2025-04-05 15:36:15,129 - INFO - Optimal Parameters: [0.34221086]
2025-04-05 15:36:15,131 - INFO - Minimum Error: 21.185395023128326
2025-04-05 15:36:15,136 - INFO - Success: True
2025-04-05 15:36:15,139 - INFO - Message: Optimization terminated successfully.


differential_evolution step 10: f(x)= 21.1854
Polishing solution with 'L-BFGS-B'
