In [2]:
# 通过期权数据计算隐含波动率
# https://stackoverflow.com/questions/61289020/fast-implied-volatility-calculation-in-python
import numpy as np
from scipy.stats import norm
from py_vollib.black_scholes import black_scholes as bs


def black_scholes(S, K, T, r, sigma, option_type='call'):
    """
    计算Black-Scholes期权定价模型
    S: (n,)股票价格
    K: (n,)行权价格
    T: (n,)到期时间（以年为单位）
    r: (n,)无风险利率
    sigma: (n,)波动率
    option_type: 'call' 或 'put'
    """
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)

    if option_type.lower() == 'call':
        price = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    elif option_type.lower() == 'put':
        price = K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
    else:
        raise ValueError("option_type must be 'call' or 'put'")

    return price


def bs_vega(S, K, T, r, sigma):
    """
    计算期权的Vega，表示波动率变化 1% 时，期权价格的绝对变化
    S: (n,)股票价格
    K: (n,)行权价格
    T: (n,)到期时间（以年为单位）
    r: (n,)无风险利率
    sigma: (n,)波动率
    """
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    vega = S * norm.pdf(d1) * np.sqrt(T)  # dC/dσ
    return vega


def implied_volatility_newton(market_price, S, K, T, r, option_type='call', init_vol=0.2, tolerance=1e-5,
                              max_iteration=100):
    """
    根据期权价格反解出隐含波动率的牛顿迭代法
    market_price: (n,)期权市场价格
    S: (n,)股票价格
    K: (n,)行权价格
    T: (n,)到期时间（以年为单位）
    r: (n,)无风险利率
    option_type: 'call' 或 'put'
    init_vol: 初始波动率猜测
    tolerance: 收敛容忍度
    max_iteration: 最大迭代次数
    """
    if market_price <= 0 or S <= 0 or K <= 0 or T <= 0:
        raise ValueError("Prices and time must be positive")
    if option_type.lower() not in ['call', 'put']:
        raise ValueError("option_type must be 'call' or 'put'")

    sigma = init_vol  # 初始猜测的波动率
    min_sigma = 1e-6  # 最小波动率，防止除以零

    for i in range(max_iteration):
        try:
            # 检查 Vega 是否太小，避免除以零
            vega = bs_vega(S, K, T, r, sigma)
            if abs(vega) < 1e-10:
                raise ValueError("Vega is too small.")

            model_price = black_scholes(S, K, T, r, sigma, option_type.lower())
            diff = market_price - model_price
            if abs(diff) < tolerance:
                return sigma  # 收敛，返回隐含波动率
            sigma += diff / vega  # 牛顿-拉夫森更新 f(x) / f'(x)
            sigma = max(min_sigma, sigma)  # 限制波动率在合理范围内
        except (OverflowError, ZeroDivisionError):
            raise ValueError("Numerical instability occurred during calculation")
    raise ValueError(f"Implied volatility did not converge after {max_iteration} iterations")


# 测试
if __name__ == "__main__":
    S = 100  # 股票价格
    K = 100  # 行权价格
    T = 11  # 到期时间（11年）
    r = 0.01  # 无风险利率
    vol = 0.25
    market_value = black_scholes(S, K, T, r, vol, option_type='call')
    print(f"市场价格：{market_value:.2f}")

    iv = implied_volatility_newton(market_value, S, K, T, r, option_type='call')
    print(f"隐含波动率: {iv:.2%}")

    model_value = black_scholes(S, K, T, r, iv, option_type='call')
    pyvol_value = bs('c', S, K, T, r, iv)
    print(f"模型价格: {model_value:.2f}")
    print(f"py_vollib模型价格: {pyvol_value:.2f}")

市场价格：35.94
隐含波动率: 25.00%
模型价格: 35.94
py_vollib模型价格: 35.94


In [3]:
# 计算多个期权的隐含波动率
num_stocks = 10000
S = np.random.randint(100, 200, num_stocks)
K = np.random.uniform(low=1.0, high=2.0, size=(num_stocks,)) * S
T = np.random.randint(low=1, high=4, size=(num_stocks,))
R = np.random.randint(low=0, high=35, size=(num_stocks,)) / 100.0
vols = np.random.randint(low=15, high=50, size=(num_stocks,)) / 100.0
model_values = black_scholes(S, K, T, R, vols, option_type='call')
vegas = bs_vega(S, K, T, R, vols)
print(model_values.shape, vegas.shape)

params = np.vstack((model_values, S, K, T, R)).T
print(params.shape)
implied_volatility_newton(*params[0])

(10000,) (10000,)
(10000, 5)


np.float64(0.3799999999999861)

In [4]:
# 牛顿迭代法无法张量并行
# 调用多线程 https://grok.com/chat/627f49fd-cb41-4470-8a9a-7222f4932fa9
import numpy as np
from scipy.stats import norm
from multiprocessing import Pool
from functools import partial
import os
from tqdm import tqdm

def black_scholes(S, K, T, r, sigma, option_type='call'):
    """
    计算Black-Scholes期权定价模型
    S: 股票价格
    K: 行权价格
    T: 到期时间（以年为单位）
    r: 无风险利率
    sigma: 波动率
    option_type: 'call' 或 'put'
    """
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    if option_type == 'call':
        price = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    elif option_type == 'put':
        price = K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
    else:
        raise ValueError("option_type must be 'call' or 'put'")
    return price

def bs_vega(S, K, T, r, sigma):
    """
    计算期权的Vega
    """
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    vega = S * norm.pdf(d1) * np.sqrt(T)
    return vega

def implied_volatility_single(args):
    """
    单样本隐含波动率计算
    """
    option_price, S, K, T, r, option_type, tolerance, max_iteration = args
    try:
        if option_price <= 0 or S <= 0 or K <= 0 or T <= 0:
            return np.nan
        sigma = 0.2
        min_sigma = 1e-6
        for i in range(max_iteration):
            vega = bs_vega(S, K, T, r, sigma)
            if abs(vega) < 1e-10:
                return np.nan
            price = black_scholes(S, K, T, r, sigma, option_type)
            diff = option_price - price
            if abs(diff) < tolerance:
                return sigma
            sigma += diff / vega
            sigma = max(min_sigma, sigma)
        return np.nan
    except (OverflowError, ZeroDivisionError):
        return np.nan

def implied_volatility_parallel(option_price, S, K, T, r, option_type='call',
                               tolerance=1e-5, max_iteration=100, num_processes=None):
    """
    使用 pool.imap_unordered 并行计算隐含波动率，带进度条
    Inputs:
        option_price, S, K, T, r: 形状为 (n,) 的 numpy 数组
        option_type: 'call' 或 'put'
        tolerance: 收敛容忍度
        max_iteration: 最大迭代次数
        num_processes: 进程数（默认 CPU 核心数）
    Returns:
        隐含波动率，形状为 (n,)
    """
    # 输入验证
    if not all(isinstance(x, np.ndarray) and x.shape == option_price.shape for x in [S, K, T, r]):
        raise ValueError("All inputs must be numpy arrays with the same shape")

    # 设置进程数
    num_processes = num_processes or os.cpu_count()

    # 准备输入参数（包含索引以便排序）
    n = len(option_price)
    inputs = [(option_price[i], S[i], K[i], T[i], r[i], option_type, tolerance, max_iteration, i)
              for i in range(n)]

    # 使用 pool.imap_unordered 并行计算，带进度条
    results = np.full(n, np.nan)  # 初始化结果数组
    with Pool(processes=num_processes) as pool:
        # 使用 tqdm 包装 imap_unordered
        for result, idx in tqdm(pool.imap_unordered(
                lambda x: (implied_volatility_single(x[:-1]), x[-1]), inputs),
                total=n, desc="Computing implied volatility"):
            results[idx] = result

    return results

# 示例使用
if __name__ == "__main__":
    n = 100
    option_price = np.random.uniform(5, 15, n)
    S = np.random.uniform(50, 150, n)
    K = np.random.uniform(50, 150, n)
    T = np.random.uniform(0.1, 2, n)
    r = np.random.uniform(0.01, 0.1, n)

    implied_vol = implied_volatility_parallel(option_price, S, K, T, r, option_type='call')
    print(implied_vol.shape)  # 输出: (100,)
    print(implied_vol)

Computing implied volatility:   0%|          | 0/100 [00:00<?, ?it/s]


AttributeError: Can't get local object 'implied_volatility_parallel.<locals>.<lambda>'

In [1]:
from typing import Union, Optional, List, Tuple, Dict, Any
from scipy.stats import norm
import numpy as np
from enum import Enum


# 定义期权类型枚举
class OptionType(Enum):
    CALL = 'call'
    PUT = 'put'


# 定义期权类型
class Option:
    def __init__(self,
                 stock_price: Union[float, np.ndarray],
                 strike_price: Union[float, np.ndarray],
                 time_until_maturity: Union[float, np.ndarray],
                 risk_free_rate: Union[float, np.ndarray],
                 option_type: Union[str, OptionType] = OptionType.CALL):
        """初始化期权对象
        args:
            stock_price: 股票价格
            strike_price: 行权价格
            time_until_maturity: 持有时长（以年为单位）
            risk_free_rate: 无风险利率
            option_type: 'call'/'put' 或 OptionType enum
        """
        self.S = np.asarray(stock_price, dtype=float)
        self.K = np.asarray(strike_price, dtype=float)
        self.T = np.asarray(time_until_maturity, dtype=float)
        self.r = np.asarray(risk_free_rate, dtype=float)
        # 处理期权类型
        if isinstance(option_type, OptionType):
            self.option_type = option_type
        elif isinstance(option_type, str):
            try:
                self.option_type = OptionType(option_type.lower())
            except ValueError:
                raise ValueError(f"不支持的期权类型: {option_type}，必须是'call'或'put'")
        else:
            raise TypeError("option_type必须是字符串或OptionType枚举")
        self._validate_inputs()

    def _validate_inputs(self):
        """验证输入参数的合法性"""
        if np.any(self.S <= 0):
            raise ValueError("标的资产价格必须为正")
        if np.any(self.K <= 0):
            raise ValueError("行权价格必须为正")
        if np.any(self.T <= 0):
            raise ValueError("到期时间必须为正")

    def __repr__(self) -> str:
        return (f"Option(type={self.option_type.value}, "
                f"S={self.S.tolist() if self.S.size <= 5 else '...'}, "
                f"K={self.K.tolist() if self.K.size <= 5 else '...'}, "
                f"T={self.T.tolist() if self.T.size <= 5 else '...'}, "
                f"r={self.r.tolist() if self.r.size <= 5 else '...'})")

    def intrinsic_value(self) -> float:
        """标的现价与行权价格的比较，内在价值是马上能赚的钱：
        标的有利于你行权 → 有内在价值
        标的不利于你行权 → 内在价值为0
        """
        if self.option_type == OptionType.CALL:
            return np.maximum(0, self.S - self.K)
        elif self.option_type == OptionType.PUT:
            return np.maximum(0, self.K - self.S)
        else:
            raise ValueError("Invalid option type")


# 测试
if __name__ == "__main__":
    S = 130  # 股票价格
    K = 110  # 行权价格
    T = 3  # 到期时间（11年）
    r = 0.02  # 无风险利率
    option = Option(S, K, T, r, option_type='call')
    print(option)
    print(option.intrinsic_value())

    num_stocks = 4
    S = np.random.randint(100, 200, num_stocks)
    K = np.random.uniform(low=1.0, high=2.0, size=(num_stocks,)) * S
    T = np.random.randint(low=1, high=4, size=(num_stocks,))
    r = np.random.randint(low=0, high=35, size=(num_stocks,)) / 100.0
    option = Option(S, K, T, r, option_type='call')
    print(option)
    print(option.intrinsic_value())

Option(type=call, S=130.0, K=110.0, T=3.0, r=0.02)
20.0
Option(type=call, S=[151.0, 106.0, 191.0, 129.0], K=[207.37281573555904, 206.58775129114235, 346.0970347973027, 254.36891979004793], T=[1.0, 3.0, 2.0, 1.0], r=[0.14, 0.11, 0.0, 0.27])
[0. 0. 0. 0.]


In [None]:
# https://grok.com/chat/06a5dd4a-c2eb-4b4e-9919-b22cbeb81b7c
class EuropeanOption(Option):
    """欧式期权类，包含期权的基本属性和方法"""


In [25]:
class BlackScholesModel:
    """Black-Scholes期权定价模型"""

    @staticmethod
    def option_price(option: Option, volatility: Union[float, np.ndarray]) -> np.ndarray:
        """计算期权价格"""
        sigma = np.asarray(volatility, dtype=float)
        sigma = np.maximum(sigma, 1e-8)  # 避免除以零
        d1 = (np.log(option.S / option.K) + (option.r + 0.5 * sigma ** 2) * option.T) / (sigma * np.sqrt(option.T))
        d2 = d1 - sigma * np.sqrt(option.T)
        if option.option_type == OptionType.CALL:
            price = option.S * norm.cdf(d1) - option.K * np.exp(-option.r * option.T) * norm.cdf(d2)
        elif option.option_type == OptionType.PUT:
            price = option.K * np.exp(-option.r * option.T) * norm.cdf(-d2) - option.S * norm.cdf(-d1)
        else:
            raise ValueError("Invalid option type")
        return price

    @staticmethod
    def vega(option: Option, volatility: Union[float, np.ndarray]) -> np.ndarray:
        """计算期权的Vega值（期权价格对波动率的敏感性 ν=dC/dσ）"""
        sigma = np.asarray(volatility, dtype=float)
        sigma = np.maximum(sigma, 1e-8)  # 避免除以零
        d1 = (np.log(option.S / option.K) + (option.r + 0.5 * sigma ** 2) * option.T) / (sigma * np.sqrt(option.T))
        vega = option.S * norm.pdf(d1) * np.sqrt(option.T)  # Vega公式对call和put相同
        return vega

    @staticmethod
    def delta(option: Option, volatility: Union[float, np.ndarray]) -> np.ndarray:
        """计算期权的Delta值（期权价格对标的价格的敏感性 Δ=dC/dS）"""
        sigma = np.asarray(volatility, dtype=float)
        sigma = np.maximum(sigma, 1e-8)  # 避免除以零
        d1 = (np.log(option.S / option.K) + (option.r + 0.5 * sigma ** 2) * option.T) / (sigma * np.sqrt(option.T))
        if option.option_type == OptionType.CALL:
            delta = norm.cdf(d1)
        elif option.option_type == OptionType.PUT:
            delta = norm.cdf(d1) - 1
        else:
            raise ValueError("Invalid option type")
        return delta

    @staticmethod
    def gamma(option: Option, volatility: Union[float, np.ndarray]) -> np.ndarray:
        """计算期权的Gamma值（Delta对标的价格的敏感性 Γ=dΔ/dS）"""
        sigma = np.asarray(volatility, dtype=float)
        sigma = np.maximum(sigma, 1e-8)  # 避免除以零
        d1 = (np.log(option.S / option.K) + (option.r + 0.5 * sigma ** 2) * option.T) / (sigma * np.sqrt(option.T))
        gamma = norm.pdf(d1) / (option.S * sigma * np.sqrt(option.T))  # Gamma公式对call和put相同
        return gamma


# 测试
if __name__ == "__main__":
    from py_vollib.black_scholes import black_scholes as bs
    from py_vollib.black_scholes.greeks.analytical import delta
    from py_vollib.black_scholes.greeks.analytical import gamma

    S = 120  # 股票价格
    K = 113  # 行权价格
    T = 7  # 到期时间（11年）
    r = 0.022  # 无风险利率
    option = Option(S, K, T, r, option_type='put')

    vol = 0.25
    pyvol_value = bs('p', S, K, T, r, vol)
    pyvol_delta = delta('p', S, K, T, r, vol)
    pyvol_gamma = gamma('p', S, K, T, r, vol)
    print(f"py_vollib模型价格: {pyvol_value:.2f}")
    print(f"py_vollib模型Delta: {pyvol_delta:.2f}")
    print(f"py_vollib模型Gamma: {pyvol_gamma:.2f}")

    model_value = BlackScholesModel.option_price(option, vol)
    model_delta = BlackScholesModel.delta(option, vol)
    model_gamma = BlackScholesModel.gamma(option, vol)
    print(f"模型价格: {model_value:.2f}")
    print(f"模型Delta: {model_delta:.2f}")
    print(f"模型Gamma: {model_gamma:.2f}")

py_vollib模型价格: 17.94
py_vollib模型Delta: -0.26
py_vollib模型Gamma: 0.00
模型价格: 17.94
模型Delta: -0.26
模型Gamma: 0.00


In [None]:
# 掩码解决收敛速度不一致


In [None]:
# https://claude.ai/chat/dfe1b834-14e8-48c0-be19-3063e81f3680
class ImpliedVolatilityCalculator:
    def __init__(self,
                 tolerance: float = 1e-6,
                 max_iteration: int = 100,
                 min_volatility: float = 1e-6,
                 max_volatility: float = 5.0):
        """初始化隐含波动率计算器
        args:
            tolerance: 收敛容忍度
            max_iteration: 最大迭代次数
            min_volatility: 最小波动率，防止除以零
            max_volatility: 最大波动率，防止数值不稳定
        """
        self.tolerance = tolerance
        self.max_iteration = max_iteration
        self.min_sigma = min_volatility
        self.max_sigma = max_volatility
        self.bs_model = BlackScholesModel()

    def newton_raphson(self,
                       option: Option,
                       market_price: float,
                       init_volatility: float = 0.2) -> float:
        """牛顿-拉夫森迭代法计算隐含波动率"""
        market_price = np.asarray(market_price, dtype=float)
        if np.any(market_price < 0):
            raise ValueError("市场价格不能为负")

        # 检查期权内在价值
        if np.any(market_price < option.intrinsic_value()):
            raise ValueError("市场价格小于期权内在价值")

        # 初始化波动率为初始猜测值
        sigma = np.full_like(market_price, init_volatility, dtype=float)
        sigma = np.maximum(sigma, 1e-8)  # 避免除以零
        iv = np.copy(sigma)

        # 创建掩码以跟踪哪些值尚未收敛
        mask = np.ones_like(market_price, dtype=bool)
        for i in range(self.max_iteration):
            price_of_sigma = self.bs_model.option_price(option, sigma[mask])
            diff = market_price[mask] - price_of_sigma
            converged = np.abs(diff) < self.tolerance
            iv[mask] = sigma[mask]  # 更新结果
            mask = ~converged  # 更新掩码, 只保留未收敛的值
            if not np.any(mask):
                break  # 所有值都已收敛，提前退出循环
            vega = self.bs_model.vega(option, sigma[mask])
            vega_mask = np.abs(vega) > 1e-10  # 防止vega太小导致数值不稳定
            if not np.any(vega_mask):
                break  # 如果所有vega都太小，直接返回当前结果

            # 牛顿-拉夫森更新 f(x) / f'(x)
            update = np.zeros_like(diff)
            update[vega_mask] = diff[vega_mask] / vega[vega_mask]  # 仅更新vega不太小的值
            update = np.clip(update, -0.5, 0.5)  # 限制单步更新的大小，防止震荡
            sigma[mask] = sigma[mask] + update
            sigma = np.clip(sigma, self.min_sigma, self.max_sigma)  # 限制波动率在合理范围内
            sigma[mask] = np.clip(sigma[mask], self.min_sigma, self.max_sigma)
        return iv


In [2]:
# 查看股票期权数据
# https://www.kaggle.com/datasets/bendgame/options-market-trades
# https://www.kaggle.com/discussions/general/395148
import pandas as pd

trade_option1_file_path = '../data/optionsTradeData.csv'
trade_option2_file_path = '../data/optionsTradeData2.csv'
tsla_stock_option_file_path = '../data/tsla_2019_2022.csv'

tsla_raw = pd.read_csv(tsla_stock_option_file_path)
tsla_raw.head()

  tsla_raw = pd.read_csv(tsla_stock_option_file_path)


Unnamed: 0,[QUOTE_UNIXTIME],[QUOTE_READTIME],[QUOTE_DATE],[QUOTE_TIME_HOURS],[UNDERLYING_LAST],[EXPIRE_DATE],[EXPIRE_UNIX],[DTE],[C_DELTA],[C_GAMMA],...,[P_LAST],[P_DELTA],[P_GAMMA],[P_VEGA],[P_THETA],[P_RHO],[P_IV],[P_VOLUME],[STRIKE_DISTANCE],[STRIKE_DISTANCE_PCT]
0,1556740800,2019-05-01 16:00,2019-05-01,16.0,233.98,2019-05-03,1556913600,2.0,0.98465,0.00055,...,0.01,-0.00071,3e-05,0.00046,-0.00975,0.0,2.22548,147.0,104.0,0.444
1,1556740800,2019-05-01 16:00,2019-05-01,16.0,233.98,2019-05-03,1556913600,2.0,0.98371,0.00067,...,0.02,-0.00109,9e-05,0.00058,-0.0101,-1e-05,2.08349,12.0,99.0,0.423
2,1556740800,2019-05-01 16:00,2019-05-01,16.0,233.98,2019-05-03,1556913600,2.0,0.98458,0.00069,...,0.02,-0.00122,0.00012,0.00113,-0.01435,0.0,2.02359,15.0,94.0,0.402
3,1556740800,2019-05-01 16:00,2019-05-01,16.0,233.98,2019-05-03,1556913600,2.0,0.99187,0.00049,...,0.01,-0.00134,9e-05,0.00139,-0.01465,0.0,1.89504,0.0,89.0,0.38
4,1556740800,2019-05-01 16:00,2019-05-01,16.0,233.98,2019-05-03,1556913600,2.0,0.99341,0.00039,...,0.01,-0.00176,8e-05,0.00105,-0.01467,0.0,1.76812,91.0,84.0,0.359


In [3]:
pd.read_csv(tsla_stock_option_file_path, nrows=5).columns

Index(['[QUOTE_UNIXTIME]', ' [QUOTE_READTIME]', ' [QUOTE_DATE]',
       ' [QUOTE_TIME_HOURS]', ' [UNDERLYING_LAST]', ' [EXPIRE_DATE]',
       ' [EXPIRE_UNIX]', ' [DTE]', ' [C_DELTA]', ' [C_GAMMA]', ' [C_VEGA]',
       ' [C_THETA]', ' [C_RHO]', ' [C_IV]', ' [C_VOLUME]', ' [C_LAST]',
       ' [C_SIZE]', ' [C_BID]', ' [C_ASK]', ' [STRIKE]', ' [P_BID]',
       ' [P_ASK]', ' [P_SIZE]', ' [P_LAST]', ' [P_DELTA]', ' [P_GAMMA]',
       ' [P_VEGA]', ' [P_THETA]', ' [P_RHO]', ' [P_IV]', ' [P_VOLUME]',
       ' [STRIKE_DISTANCE]', ' [STRIKE_DISTANCE_PCT]'],
      dtype='object')