In [4]:
from dataclasses import dataclass, field

import numpy as np
import pandas as pd
from scipy.stats import norm


def bsm_price(S, K, T, r, sigma, option_type="call"):
    """
    Calculate the Black-Scholes-Merton option price.

    Parameters:
    - S: Current stock price
    - K: Strike price
    - T: Time to expiration in years
    - r: Risk-free interest rate
    - sigma: Volatility of the underlying asset
    - option_type: The type of option ("call" or "put")

    Returns:
    - Option price
    """
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)

    if option_type == "call":
        option_price = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    else:  # put
        option_price = K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
    return option_price


@dataclass
class Option:
    strike: float
    expiration_date: datetime
    option_type: Literal["call", "put"]
    exercised: bool = field(default=False, init=False)


class Portfolio:
    def __init__(self):
        self.options = []
        self.cash = 0
        self.stock_positions = 0

    def add_option(self, option):
        self.options.append(option)

    def remove_option(self, option):
        self.options.remove(option)

    def exercise_option(self, option, stock_price):
        if option.option_type == "call":
            self.stock_positions -= 1
        self.cash += option.strike - stock_price
        option.exercised = True

    def update_portfolio(self, stock_price):
        for option in self.options[:]:
            if option.expiration_date <= pd.Timestamp.now() and not option.exercised:
                # Decide to exercise or let expire based on strategy, simplified here
                if (option.option_type == "call" and stock_price > option.strike) or (
                    option.option_type == "put" and stock_price < option.strike
                ):
                    self.exercise_option(option, stock_price)
                self.remove_option(option)


def backtest(
    start_date, end_date, initial_cash, strategy_function, option_chain, stock_prices
):
    portfolio = Portfolio()
    portfolio.cash = initial_cash

    current_date = start_date
    while current_date <= end_date:
        # Update portfolio for any expired or exercised options
        if stock_prices.get(current_date) is not None:
            portfolio.update_portfolio(stock_prices[current_date])

        # Generate orders from the strategy
        orders = strategy_function(
            portfolio, option_chain[current_date], stock_prices[current_date]
        )

        # Process orders
        for order in orders:
            if order["action"] == "buy":
                portfolio.cash -= order["option"].premium
                portfolio.add_option(order["option"])
            elif order["action"] == "sell":
                # This is simplified; would need to check if portfolio contains the option, etc.
                portfolio.cash += order["option"].premium
                portfolio.remove_option(order["option"])

        current_date += pd.Timedelta(days=1)

    return portfolio


def generate_option_chain(S, r, sigma, strike_range, dte_range, current_date):
    """
    Generate an option chain using the BSM model.

    Parameters:
    - S: Current stock price
    - r: Risk-free interest rate
    - sigma: Volatility of the underlying asset
    - strike_range: A tuple of (min_strike, max_strike)
    - dte_range: A tuple of (min_days_to_expire, max_days_to_expire)
    - current_date: The current date for calculating T (time to expiration)

    Returns:
    - A dictionary with expiration dates as keys and lists of Option objects as values
    """
    option_chain = {}
    strike_prices = np.arange(
        strike_range[0], strike_range[1], 5
    )  # Adjust step for finer granularity
    expiration_dates = pd.date_range(current_date, periods=dte_range[1], freq="D")[
        :: dte_range[0]
    ]

    for expiration_date in expiration_dates:
        T = (expiration_date - current_date).days / 365.0
        options_list = []
        for strike in strike_prices:
            call_price = bsm_price(S, strike, T, r, sigma, "call")
            put_price = bsm_price(S, strike, T, r, sigma, "put")
            options_list.append(Option(strike, call_price, expiration_date, "call"))
            options_list.append(Option(strike, put_price, expiration_date, "put"))
        option_chain[expiration_date] = options_list

    return option_chain


# Define a sample strategy function
def sample_strategy(portfolio, option_chain, stock_price):
    orders = []
    # Example: Buy an option if certain conditions are met, this is a placeholder
    if stock_price < 100:
        for option in option_chain:
            if option.strike < 105 and not option.exercised:
                orders.append({"action": "buy", "option": option})
                break
    return orders


# Example usage
# Assume option_chain and stock_prices are dictionaries with dates as keys
# option_chain = {pd.Timestamp('2023-01-01'): [Option(100, 10, pd.Timestamp('2023-03-01'))], ...}
# stock_prices = {pd.Timestamp('2023-01-01'): 100, ...}
# portfolio = backtest(pd.Timestamp('2023-01-01'), pd.Timestamp('2023-12-31'), 10000, sample_strategy, option_chain, stock_prices)

# 假設的股票價格、無風險利率和波動率
S = 100  # 當前股票價格
r = 0.02  # 無風險利率
sigma = 0.20  # 波動率

# 使用者定義的行使價範圍和到期日範圍
strike_range = (90, 110)  # 行使價範圍
dte_range = (30, 90)  # 到期日範圍，每30天一個到期日，最遠到90天

# 當前日期
current_date = pd.Timestamp("2023-01-01")

# 生成選擇權鏈
option_chain = generate_option_chain(S, r, sigma, strike_range, dte_range, current_date)
option_chain

  d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
  d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))


{Timestamp('2023-01-01 00:00:00'): [<__main__.Option at 0xffff50fdefe0>,
  <__main__.Option at 0xffff50fdf160>,
  <__main__.Option at 0xffff50fdf0d0>,
  <__main__.Option at 0xffff50fdf100>,
  <__main__.Option at 0xffff50fdf040>,
  <__main__.Option at 0xffff50fdc040>,
  <__main__.Option at 0xffff50fdc100>,
  <__main__.Option at 0xffff50fdc130>],
 Timestamp('2023-01-31 00:00:00'): [<__main__.Option at 0xffff50fdc1c0>,
  <__main__.Option at 0xffff50fdc070>,
  <__main__.Option at 0xffff50fdc280>,
  <__main__.Option at 0xffff50fddcf0>,
  <__main__.Option at 0xffff50fde140>,
  <__main__.Option at 0xffff50fde350>,
  <__main__.Option at 0xffff50fde3e0>,
  <__main__.Option at 0xffff3a38cbe0>],
 Timestamp('2023-03-02 00:00:00'): [<__main__.Option at 0xffff3a38cc40>,
  <__main__.Option at 0xffff3a38cca0>,
  <__main__.Option at 0xffff3a38cd00>,
  <__main__.Option at 0xffff3a38cd60>,
  <__main__.Option at 0xffff3a38cdc0>,
  <__main__.Option at 0xffff3a38ce20>,
  <__main__.Option at 0xffff3a38ce80>,

In [2]:
import pandas as pd
from datetime import timedelta

# Define a simplified Option class and Portfolio class from the previous example here

# Mock stock prices for three months
date_range = pd.date_range(start="2023-01-01", end="2023-03-31")
stock_prices = {
    date: 100 + i % 10 for i, date in enumerate(date_range)
}  # Simplified stock price fluctuation

# Mock option chain: one call option available at the start of each month
option_chain = {
    pd.Timestamp("2023-01-01"): [
        Option(
            strike=105,
            premium=2,
            expiration_date=pd.Timestamp("2023-01-31"),
            option_type="call",
        )
    ],
    pd.Timestamp("2023-02-01"): [
        Option(
            strike=105,
            premium=2,
            expiration_date=pd.Timestamp("2023-02-28"),
            option_type="call",
        )
    ],
    pd.Timestamp("2023-03-01"): [
        Option(
            strike=105,
            premium=2,
            expiration_date=pd.Timestamp("2023-03-31"),
            option_type="call",
        )
    ],
}

# Adjust the option chain dictionary to have an entry for each day (simplification for the example)
for date in date_range:
    if date not in option_chain:
        option_chain[date] = option_chain.get(date.replace(day=1), [])


def simple_call_buy_strategy(portfolio, option_chain, stock_price):
    orders = []
    # Buy a call option at the start of the month if the stock price is below $105
    if stock_price < 105 and len(option_chain) > 0:
        # Assuming only one option available for simplicity
        option = option_chain[0]
        orders.append({"action": "buy", "option": option})
    return orders


# Running the backtest from 2023-01-01 to 2023-03-31 with an initial cash of $10000
portfolio = backtest(
    pd.Timestamp("2023-01-01"),
    pd.Timestamp("2023-03-31"),
    10000,
    simple_call_buy_strategy,
    option_chain,
    stock_prices,
)

# Print the final portfolio status
print(f"Final Cash: {portfolio.cash}")
print(f"Stock Positions: {portfolio.stock_positions}")
print("Options Held:")
for option in portfolio.options:
    print(
        f"Strike: {option.strike}, Premium: {option.premium}, Expiration: {option.expiration_date}, Exercised: {option.exercised}"
    )

Final Cash: 9910
Stock Positions: 0
Options Held:


In [2]:
import numpy as np
import pandas as pd
from scipy.stats import norm


def bsm_call(S, K, T, r, sigma):
    """
    Black-Scholes-Merton model to calculate European call option price.
    """
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    call = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    return call


def calculate_call_prices(stock_prices, K, T_years, r, sigma):
    """
    Calculate call prices for a given set of parameters.

    Parameters:
    - stock_prices: list of stock prices
    - K: strike price
    - T_years: time to expiry in years
    - r: risk-free interest rate
    - sigma: volatility of the stock

    Returns:
    - DataFrame containing the call prices.
    """
    days = np.arange(len(stock_prices))
    call_prices = [
        bsm_call(S, K, T_years - day / 365, r, sigma)
        for day, S in zip(days, stock_prices)
    ]
    return pd.DataFrame({"day": days, "call_price": call_prices})


def calculate_metrics(call_prices_df):
    """
    Calculate various performance metrics based on call prices.

    Parameters:
    - call_prices_df: DataFrame with call prices.

    Returns:
    - A dictionary containing various performance metrics.
    """
    call_prices = call_prices_df["call_price"]
    returns = np.diff(call_prices) / call_prices[:-1]

    total_return = (call_prices.iloc[-1] - call_prices.iloc[0]) / call_prices.iloc[0]
    volatility = np.std(returns, ddof=1) * np.sqrt(252)
    # downside_risk = np.sqrt(np.mean([min(0, r) ** 2 for r in returns]) * 252)
    sharpe_ratio = np.mean(returns) / np.std(returns, ddof=1) * np.sqrt(252)

    cumulative_returns = np.cumprod(1 + returns) - 1
    max_drawdown = np.max(
        np.maximum.accumulate(cumulative_returns) - cumulative_returns
    )
    max_drawup = np.max(cumulative_returns - np.minimum.accumulate(cumulative_returns))

    return {
        "total_return": total_return,
        "volatility": volatility,
        # "downside_risk": downside_risk,
        "sharpe_ratio": sharpe_ratio,
        "max_drawdown": max_drawdown,
        "max_drawup": max_drawup,
    }


def calculate_covered_call_return(stock_prices, call_prices, K):
    """
    Calculate the return of a covered call strategy.

    Parameters:
    - stock_prices: list of stock prices
    - call_prices: DataFrame with call prices
    - K: strike price of the call option

    Returns:
    - A float, the total return of the covered call strategy.
    """
    # Assume we sell the call option at the beginning and hold until expiry
    initial_call_price = call_prices["call_price"].iloc[0]

    # Final stock price
    final_stock_price = stock_prices[-1]

    # If stock price at expiry is higher than the strike price,
    # the call is exercised and we sell at the strike price.
    # Otherwise, we sell at the market price.
    final_return = initial_call_price + min(final_stock_price, K) - stock_prices[0]

    # Calculate the total return as a percentage of the initial stock price
    total_return = final_return / stock_prices[0]

    return total_return


# Simulation parameters
stock_prices = [950, 870, 830, 800, 840, 880, 840, 900, 940, 950]
# stock_prices = [900, 870, 830, 800, 840, 880, 840, 900, 940, 950]
days = list(range(0, len(stock_prices)))
r = 0.01  # Risk-free interest rate
sigma = 0.3  # Volatility

strike_range = [700, 800, 850, 900, 950, 1000, 1100]
expiry_range = [30, 60, 90, 120, 240, 360]  # Expiry dates range in days


# Apply the strategy to each combination of strike price and expiry
covered_call_results = []

for K in strike_range:
    for T in expiry_range:
        T_years = T / 365
        call_prices_df = calculate_call_prices(stock_prices, K, T_years, r, sigma)
        total_return = calculate_covered_call_return(stock_prices, call_prices_df, K)
        covered_call_results.append([K, T, total_return])

# Create a DataFrame to display the results
covered_call_results_df = pd.DataFrame(
    covered_call_results,
    columns=[
        "Strike Price",
        "Expiry (Days)",
        "Total Return",
    ],
)

covered_call_results_df

Unnamed: 0,Strike Price,Expiry (Days),Total Return
0,700,30,0.000609
1,700,60,0.001404
2,700,90,0.002718
3,700,120,0.004547
4,700,240,0.014616
5,700,360,0.026056
6,800,30,0.001348
7,800,60,0.005236
8,800,90,0.0102
9,800,120,0.015416


In [None]:
請參考以下code，寫一個選擇權的 backtest engine，可以參考以下流程

for each day t:
    檢查 current porfolio，例如選擇權若已到期，需要處理行權、結算
    orders <- strategy(porfolio, option chain at t, stock at t)
    process orders, update portfolio

strategy function 可以是一個 input argument，但需要定義好這個 function 該長怎樣


```
def calculate_daily_returns(
    stock_prices, option_premium, K, option_exercised, exercise_day
):
    """
    Calculate daily portfolio returns for a covered call strategy.

    Parameters:
    - stock_prices: List of stock prices.
    - option_premium: Premium received from selling the call option.
    - K: Strike price of the call option.
    - option_exercised: Boolean indicating if the option was exercised.
    - exercise_day: Day on which the option was exercised.

    Returns:
    - List of daily returns.
    """
    daily_returns = []
    daily_values = []
    portfolio_value_previous = (
        stock_prices[0] + option_premium
    )  # Initial portfolio value includes option premium
    n_days = len(stock_prices)

    for i in range(1, n_days):
        # Calculate new portfolio value
        if option_exercised and i == exercise_day:
            # If option is exercised, stock is sold at strike price K
            portfolio_value = (
                K * n_days + option_premium
            )  # Assuming 1 stock for simplicity
        else:
            portfolio_value = stock_prices[i] + option_premium

        daily_values.append(portfolio_value)

        # Update previous portfolio value
        portfolio_value_previous = portfolio_value

    # Calculate daily return
    # Use daily_values and numpy or pandas to calculate

    return daily_returns
```

In [11]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

# Assuming results_df is already populated with the data from the previous step

# Initialize subplot
fig = make_subplots(
    rows=2,
    cols=2,
    subplot_titles=("Total Return", "Volatility", "Sharpe Ratio", "Max Drawdown"),
)


# Custom function to create hover text
def create_hover_text(row):
    return f"Strike: {row['Strike Price']}, Expiry: {row['Expiry (Days)']} days"


# Add traces with hoverinfo
metrics = ["Total Return", "Volatility", "Sharpe Ratio", "Max Drawdown"]
positions = [(1, 1), (1, 2), (2, 1), (2, 2)]

for metric, pos in zip(metrics, positions):
    fig.add_trace(
        go.Scatter(
            x=results_df["Strike Price"],
            y=results_df[metric],
            mode="markers",
            name=metric,
            text=results_df.apply(create_hover_text, axis=1),
            hoverinfo="text+y",
        ),
        row=pos[0],
        col=pos[1],
    )

# Update layout for readability
fig.update_layout(
    height=800,
    width=1200,
    title_text="Option Performance Metrics by Strike Price with Strike and Expiry Details",
    hovermode="closest",
)

fig.show()