In [None]:
!pip install wrds yfinance tqdm pandas numpy backtrader pyfolio quantstats


In [None]:
#FUNCTIONS NEEDED
# Function to load distinct tickers from the SQLite database
def load_distinct_tickers_from_db():
    """Load distinct tickers from the database within the specified date range."""
    conn = sqlite3.connect(DB_PATH)
    query = """
        SELECT DISTINCT ticker
        FROM merged_data
        WHERE date BETWEEN ? AND ?
    """
    distinct_tickers = pd.read_sql(query, conn, params=(START_DATE, END_DATE))
    conn.close()
    return distinct_tickers

def analyze_portfolio_returns(portfolio_returns, strategy_name):
    """
    Analyze and visualize portfolio returns with key performance metrics.

    Parameters:
    portfolio_returns (OrderedDict): Dictionary of datetime to returns
    strategy_name (str): Name of the strategy for differentiation

    Returns:
    dict: Performance metrics
    """
    # Convert OrderedDict to pandas Series
    returns_series = pd.Series(portfolio_returns)
    returns_series.index = pd.to_datetime(returns_series.index)

    # Calculate cumulative returns
    cumulative_returns = (1 + returns_series).cumprod() - 1

    # Calculate annualized return
    trading_days = len(returns_series)
    years = trading_days / 252  # Assuming 252 trading days per year
    total_return = cumulative_returns.iloc[-1]
    annualized_return = (1 + total_return) ** (1 / years) - 1

    # Calculate Sharpe Ratio (assuming risk-free rate of 0)
    mean_return = returns_series.mean()
    std_return = returns_series.std()
    sharpe_ratio = mean_return / std_return * np.sqrt(252)

    # Calculate Sortino Ratio
    downside_returns = returns_series[returns_series < 0]
    downside_std = downside_returns.std() if len(downside_returns) > 0 else 0
    sortino_ratio = mean_return / downside_std * np.sqrt(252) if downside_std != 0 else 0

    # Calculate Drawdown
    running_max = cumulative_returns.cummax()
    drawdown = cumulative_returns - running_max
    max_drawdown = drawdown.min()

    # Create visualizations

    # Plot Cumulative Returns
    plt.figure(figsize=(10, 6))
    cumulative_returns.plot()
    plt.xlabel(f'Date ({strategy_name})')
    plt.ylabel('Cumulative Return')
    plt.grid()
    plt.savefig(f"cumulative_returns_{strategy_name}.png")
    plt.show()

    # Plot Daily Returns
    plt.figure(figsize=(10, 6))
    returns_series.plot(kind='bar')
    plt.xlabel(f'Date ({strategy_name})')
    plt.ylabel('Daily Return')
    plt.xticks(rotation=45)
    plt.grid()
    plt.savefig(f"daily_returns_{strategy_name}.png")
    plt.show()

    # Plot Drawdown
    plt.figure(figsize=(10, 6))
    drawdown.plot()
    plt.xlabel(f'Date ({strategy_name})')
    plt.ylabel('Drawdown')
    plt.grid()
    plt.savefig(f"drawdown_{strategy_name}.png")
    plt.show()

    # Plot Histogram of Returns
    plt.figure(figsize=(10, 6))
    returns_series.hist(bins=30)
    plt.xlabel(f'Return ({strategy_name})')
    plt.ylabel('Frequency')
    plt.grid()
    plt.savefig(f"histogram_returns_{strategy_name}.png")
    plt.show()

    # Prepare metrics dictionary
    metrics = {
        'Total Return': total_return,
        'Annualized Return': annualized_return,
        'Sharpe Ratio': sharpe_ratio,
        'Sortino Ratio': sortino_ratio,
        'Max Drawdown': max_drawdown
    }

    return {
        'metrics': metrics
    }

def load_merged_data_from_db(tickers_list, chunk_size=100000):
    """Load data from the database within the specified date range, excluding tickers with no data."""
    conn = sqlite3.connect(DB_PATH)

    # Convert tickers_list into a format suitable for the SQL IN clause (comma-separated string)
    tickers_tuple = tuple(tickers_list)

    # Make sure tickers_tuple is not empty to prevent SQL errors
    if not tickers_tuple:
        return pd.DataFrame()  # Return an empty DataFrame if no tickers are provided

    query = """
        SELECT *
        FROM merged_data
        WHERE date BETWEEN ? AND ?
        AND ticker IN ({})
    """.format(','.join(['?'] * len(tickers_tuple)))  # Dynamically insert placeholders for each ticker

    # Initialize an empty DataFrame to hold the data
    merged_data = pd.DataFrame()

    # Run the query with the tickers_list as parameters
    params = (START_DATE, END_DATE) + tickers_tuple

    # Use SQLite's `fetchmany` to load data in chunks to minimize memory usage
    cursor = conn.cursor()
    cursor.execute(query, params)

    while True:
        # Fetch a chunk of rows
        rows = cursor.fetchmany(chunk_size)
        if not rows:
            break  # Stop if no more rows are returned

        # Convert the chunk of rows to a DataFrame
        chunk_df = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])

        # Append the chunk to the final DataFrame
        merged_data = pd.concat([merged_data, chunk_df], ignore_index=True)

        del chunk_df

    conn.close()

    return merged_data

def load_equities_data_from_db(tickers_list, chunk_size=100000):
    """Load data from the database within the specified date range, excluding tickers with no data."""
    conn = sqlite3.connect(DB_PATH)

    # Convert tickers_list into a format suitable for the SQL IN clause (comma-separated string)
    tickers_tuple = tuple(tickers_list)

    # Make sure tickers_tuple is not empty to prevent SQL errors
    if not tickers_tuple:
        return pd.DataFrame()  # Return an empty DataFrame if no tickers are provided

    query = """
        SELECT *
        FROM equities_data
        WHERE date BETWEEN ? AND ?
    """.format(','.join(['?'] * len(tickers_tuple)))  # Dynamically insert placeholders for each ticker

    # Initialize an empty DataFrame to hold the data
    equities_data = pd.DataFrame()

    # Run the query with the tickers_list as parameters
    params = (START_DATE, END_DATE)

    # Use SQLite's `fetchmany` to load data in chunks to minimize memory usage
    cursor = conn.cursor()
    cursor.execute(query, params)

    while True:
        # Fetch a chunk of rows
        rows = cursor.fetchmany(chunk_size)
        if not rows:
            break  # Stop if no more rows are returned

        # Convert the chunk of rows to a DataFrame
        chunk_df = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])

        # Append the chunk to the final DataFrame
        equities_data = pd.concat([equities_data, chunk_df], ignore_index=True)

        del chunk_df

    conn.close()

    return equities_data

def load_options_data_from_db(tickers_list, chunk_size=100000):
    """Load data from the database within the specified date range, excluding tickers with no data."""
    conn = sqlite3.connect(DB_PATH)

    # Convert tickers_list into a format suitable for the SQL IN clause (comma-separated string)
    tickers_tuple = tuple(tickers_list)

    # Make sure tickers_tuple is not empty to prevent SQL errors
    if not tickers_tuple:
        return pd.DataFrame()  # Return an empty DataFrame if no tickers are provided

    query = """
        SELECT *
        FROM volume_threshold_options_data
        WHERE date BETWEEN ? AND ?
    """.format(','.join(['?'] * len(tickers_tuple)))  # Dynamically insert placeholders for each ticker

    # Initialize an empty DataFrame to hold the data
    options_data = pd.DataFrame()

    # Run the query with the tickers_list as parameters
    params = (START_DATE, END_DATE)

    # Use SQLite's `fetchmany` to load data in chunks to minimize memory usage
    cursor = conn.cursor()
    cursor.execute(query, params)

    while True:
        # Fetch a chunk of rows
        rows = cursor.fetchmany(chunk_size)
        if not rows:
            break  # Stop if no more rows are returned

        # Convert the chunk of rows to a DataFrame
        chunk_df = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])

        # Append the chunk to the final DataFrame
        options_data = pd.concat([options_data, chunk_df], ignore_index=True)

        del chunk_df

    conn.close()

    return options_data

def gen_returns_frame(view, days=[1]):
    #view.drop_duplicates('date', keep='first', inplace=True)
    #view.drop([''], axis=1, inplace=True
    for day in days:
        view[f'{day}_fut_return'] = (view['bid'] + view['ask']) / 2
        view[f'{day}_fut_return'] = view[f'{day}_fut_return'].pct_change(-day) # return in the future
    return view

def gen_norm_option_stats(view):
    view = view.groupby('date').mean('impl_volatility').fillna(0)
    return view['impl_volatility']

def merge_put_call(calls, puts, greek, value):
    df_sig = pd.merge(calls[['volume', 'open_interest']], puts[['volume', 'open_interest']], how='left', left_on='date', right_on='date', suffixes=('_call', '_put'))#['volume_put']
    df_sig = df_sig.fillna(0)
    df_sig[f'put_call_ratio_{greek}_{value}'] = df_sig['volume_put'] / (df_sig['volume_call'] + df_sig['volume_put'])
    df_sig[f'open_interest_ratio_{greek}_{value}'] = df_sig['open_interest_put'] / (df_sig['open_interest_call'] + df_sig['open_interest_put'])
    return df_sig[[f'put_call_ratio_{greek}_{value}', f'open_interest_ratio_{greek}_{value}']]

### Signals of interest
def gen_put_call_ratio(view):
    # filtered for low delta values
    # delta sigs
    calls = view[(view['cp_flag'] == 'C') & (view['delta'] < 0.15)].groupby('date').sum('volume')
    puts = view[(view['cp_flag'] == 'P') & (view['delta'] < -0.15)].groupby('date').sum('volume')

    delta = merge_put_call(calls, puts, 'delta', 0.15).fillna(0)

    calls = view[(view['cp_flag'] == 'C') & (view['delta'] < 0.25)].groupby('date').sum('volume')
    puts = view[(view['cp_flag'] == 'P') & (view['delta'] < -0.25)].groupby('date').sum('volume')

    delta1 = merge_put_call(calls, puts, 'delta', 0.25).fillna(0)

    # gamma sigs
    calls = view[(view['cp_flag'] == 'C') & (view['gamma'] > 0.20)].groupby('date').sum()
    puts = view[(view['cp_flag'] == 'P') & (view['gamma'] > 0.20)].groupby('date').sum()

    gamma = merge_put_call(calls, puts, 'gamma', 0.2)

    calls = view[(view['cp_flag'] == 'C') & (view['gamma'] > 0.10)].groupby('date').sum()
    puts = view[(view['cp_flag'] == 'P') & (view['gamma'] > 0.10)].groupby('date').sum()

    gamma1 = merge_put_call(calls, puts, 'gamma', 0.1)

    # theta sigs
    calls = view[(view['cp_flag'] == 'C') & (view['theta'] < -25)].groupby('date').sum()
    puts = view[(view['cp_flag'] == 'P') & (view['theta'] < -25)].groupby('date').sum()

    theta = merge_put_call(calls, puts, 'theta', 25)

    calls = view[(view['cp_flag'] == 'C') & (view['theta'] < -15)].groupby('date').sum()
    puts = view[(view['cp_flag'] == 'P') & (view['theta'] < -15)].groupby('date').sum()

    theta1 = merge_put_call(calls, puts, 'theta', 15)

    df_sigs = pd.concat([delta, delta1, gamma, gamma1, theta, theta1], axis=1)

    return df_sigs.fillna(0)

# Convert to OHLC format for backtrader
def convert_to_ohlc_format(df):
    df['date'] = pd.to_datetime(df['date'])
    grouped = df.groupby(['date', 'ticker'])
    result = grouped.agg(
        Open=('bid', 'first'),
        High=('ask', 'max'),
        Low=('bid', 'min'),
        Close=('prc', 'last'),  # Use 'prc' for closing price here
        Adj_Close=('prc', 'last'),
        Volume=('vol', 'sum')
    ).reset_index()
    result.set_index('date', inplace=True)
    result.rename(columns={'ticker': 'Ticker'}, inplace=True)
    return result

def calculate_rsi(prices, period=14):
    """Calculate RSI indicator"""
    delta = prices.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

def prepare_features(df):
    """
    Create features for each stock individually
    """
    # Initialize an empty list to store the processed data for each stock
    processed_data = []

    # Loop through each unique permno in the DataFrame
    for permno in df['permno'].unique():
        # Filter the DataFrame for the current permno
        stock_data = df[df['permno'] == permno].copy()

        # Technical features
        stock_data['returns'] = stock_data['prc'].pct_change()
        stock_data['volume_ma5'] = stock_data['vol'].rolling(5).mean()
        stock_data['volume_ma20'] = stock_data['vol'].rolling(20).mean()
        stock_data['price_ma5'] = stock_data['prc'].rolling(5).mean()
        stock_data['price_ma20'] = stock_data['prc'].rolling(20).mean()
        stock_data['price_ma50'] = stock_data['prc'].rolling(50).mean()

        # Momentum indicators
        stock_data['rsi'] = calculate_rsi(stock_data['prc'])
        stock_data['momentum'] = stock_data['prc'].pct_change(5)

        # Volatility
        stock_data['volatility'] = stock_data['returns'].rolling(20).std()

        # Target variable (next day return > 0)
        stock_data['target'] = np.where(stock_data['returns'].shift(-1) > 0, 1, 0)

        # Append the processed data to the list
        processed_data.append(stock_data.dropna())

    # Concatenate all the processed data into a single DataFrame
    result_df = pd.concat(processed_data, ignore_index=True)

    return result_df.dropna()

# Convert to OHLC format for backtrader
def convert_to_ohlc_format(df):
    df['date'] = pd.to_datetime(df['date'])
    grouped = df.groupby(['date', 'permno'])
    result = grouped.agg(
        Open=('bid', 'first'),
        High=('ask', 'max'),
        Low=('bid', 'min'),
        Close=('prc', 'last'),  # Use 'prc' for closing price here
        Adj_Close=('prc', 'last'),
        Volume=('vol', 'sum')
    ).reset_index()
    result.set_index('date', inplace=True)
    return result

In [None]:
# DATA NEEDED
import matplotlib.pyplot as plt
import sqlite3
import pandas as pd
import logging
import backtrader as bt
import quantstats as qs
from datetime import datetime,timedelta
import numpy as np
import os,sys
import warnings
warnings.filterwarnings("ignore")
import gc
from google.colab import drive
drive.mount('/content/drive')
%matplotlib inline

# Add the folder containing your strategy file to the Python path
sys.path.append('/content/drive/My Drive/AFP/Code/Download_This_Folder')  # Update this path to your folder
from bt_strategy import (
    evaluate_strategies,
    buy_and_hold_strategy,
    close_vs_sma_strategy,
    mean_reversion_strategy,
    prev_peak_strategy,
    random_strategy,
    prev_peak_nodrop_strategy,
    close_vs_sma_nodrop_strategy,
    mean_reversion_nodrop_strategy,
    anti_drop_strategy
)

# Set Pandas option to display all columns
pd.set_option('display.max_columns', None)

# Set up logging configuration
logging.basicConfig(level=logging.INFO)

# Define your reporting folder (ensure this exists)
REPORTS_FOLDER = "QuantStats_Reports"
os.makedirs(REPORTS_FOLDER, exist_ok=True)  # Create the folder if it doesn't exist

# Constants
DB_PATH = r"/content/drive/MyDrive/AFP/Code/Download_This_Folder/1_financial_data_long.db"
START_DATE = '2020-01-01'
END_DATE = '2023-12-31'

# Function to load distinct tickers from the SQLite database
def load_distinct_tickers_from_db():
    """Load distinct tickers from the database within the specified date range."""
    conn = sqlite3.connect(DB_PATH)
    query = """
        SELECT DISTINCT ticker
        FROM merged_data
        WHERE date BETWEEN ? AND ?
    """
    distinct_tickers = pd.read_sql(query, conn, params=(START_DATE, END_DATE))
    conn.close()
    return distinct_tickers

# Load tickers and order alphabetically
tickers = load_distinct_tickers_from_db()

# Assuming the result is a DataFrame, extract the 'ticker' column
tickers_list_full = tickers['ticker'].tolist()

# BETTER RAM HERE
def load_options_data_from_db(tickers_list, chunk_size=100000):
    """Load data from the database within the specified date range, excluding tickers with no data."""
    conn = sqlite3.connect(DB_PATH)

    # Convert tickers_list into a format suitable for the SQL IN clause (comma-separated string)
    tickers_tuple = tuple(tickers_list)

    # Make sure tickers_tuple is not empty to prevent SQL errors
    if not tickers_tuple:
        return pd.DataFrame()  # Return an empty DataFrame if no tickers are provided

    query = """
        SELECT *
        FROM volume_threshold_options_data
        WHERE date BETWEEN ? AND ?
        AND ticker IN ({})
    """.format(','.join(['?'] * len(tickers_tuple)))

    # Initialize an empty DataFrame to hold the data
    options_data = pd.DataFrame()

    # Run the query with the tickers_list as parameters
    params = (START_DATE, END_DATE) + tickers_tuple

    # Use SQLite's `fetchmany` to load data in chunks to minimize memory usage
    cursor = conn.cursor()
    cursor.execute(query, params)

    while True:
        # Fetch a chunk of rows
        rows = cursor.fetchmany(chunk_size)
        if not rows:
            break  # Stop if no more rows are returned

        # Convert the chunk of rows to a DataFrame
        chunk_df = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])

        # Append the chunk to the final DataFrame
        options_data = pd.concat([options_data, chunk_df], ignore_index=True)

        del chunk_df

    conn.close()

    return options_data

# Load and preprocess data
options_data = load_options_data_from_db(tickers_list_full)
# Assuming options_data is already loaded as a DataFrame
options_data['date'] = pd.to_datetime(options_data['date'])
options_data['exdate'] = pd.to_datetime(options_data['exdate'])
#options_data['strike_price'] = options_data['strike_price_actual']  # Use the actual strike price
options_data['cp_flag'] = options_data['cp_flag'].str.upper()  # Ensure the flag is uppercase

# Rename columns to match the existing code
options_data.rename(columns={
    'ticker': 'underlying',
    'impl_volatility': 'impl_volatility',
    'delta': 'delta',
    'gamma': 'gamma',
    'vega': 'vega',
    'theta': 'theta'
}, inplace=True)

# Ensure the data is sorted correctly
options_data.sort_values(by=['date', 'exdate', 'strike_price'], ascending=True, inplace=True)
options_data.reset_index(drop=True, inplace=True)



class StrategyLeg:
    def __init__(self, option_type, strike_pct, days_to_expiry, quantity):
        self.option_type = option_type.lower()
        self.strike_pct = strike_pct  # Percentage of underlying price
        self.days_to_expiry = days_to_expiry
        self.quantity = quantity

class GenericStrategy:
    def __init__(self, name, legs):
        """
        legs: List of StrategyLeg objects
        """
        self.name = name
        self.legs = legs

class DataLoader:
    def __init__(self):
        self.historical_data = {}
        self.underlying_data = {}
        self.all_options = set()

    def load_data(self, df):
        for _, row in df.iterrows():
            date = pd.to_datetime(row['date'])
            opt = Option(
                underlying=row['underlying'],  # Use the ticker symbol from the biotech data
                strike=row['strike_price'],
                expiry=row['exdate'],
                option_type=row['cp_flag']
            )

            if date not in self.historical_data:
                self.historical_data[date] = {}

            self.historical_data[date][opt] = {
                'best_bid': row['best_bid'],
                'best_offer': row['best_offer'],
                'delta': row['delta'],
                'gamma': row['gamma'],
                'vega': row['vega'],
                'theta': row['theta']
            }

            # Store underlying price (assuming Close price is available)
            self.underlying_data[date] = row.get('Close', row['best_offer'])  # Adjust as needed

            self.all_options.add(opt)


class PositionManager:
    def __init__(self):
        self.positions = {}
        self.underlying_position = 0
        self.cash = 0
        self.transaction_log = []

    def execute_trade(self, date, legs, trade_quantity=1):
        for leg in legs:
            opt = leg['option']
            qty = leg['quantity'] * trade_quantity
            price = leg['price']

            key = (opt.underlying, opt.strike, opt.expiry, opt.option_type)
            cost = qty * price * (-1 if qty < 0 else 1)

            if key not in self.positions:
                self.positions[key] = {'quantity': 0, 'cost': 0}

            self.positions[key]['quantity'] += qty
            self.positions[key]['cost'] += cost
            self.cash -= cost

            # Log transaction
            self.transaction_log.append({
                'date': date,
                'action': 'SELL' if qty < 0 else 'BUY',
                'option': opt,
                'quantity': abs(qty),
                'price': price,
                'cost': cost
            })

    def delta_hedge(self, date, total_delta, underlying_price):
        required_hedge = -total_delta
        hedge_cost = required_hedge
        self.underlying_position += required_hedge/underlying_price
        self.cash -= hedge_cost
        return required_hedge, hedge_cost

class Option:
    def __init__(self, underlying, strike, expiry, option_type):
        self.underlying = underlying
        self.strike = float(strike)
        self.expiry = pd.to_datetime(expiry)
        self.option_type = option_type.lower()

    def __repr__(self):
        return f"{self.option_type} {self.strike} {self.expiry.strftime('%Y-%m-%d')}"

    def __hash__(self):
        return hash((self.underlying, self.strike, self.expiry, self.option_type))

    def __eq__(self, other):
        return (self.underlying == other.underlying and
                self.strike == other.strike and
                self.expiry == other.expiry and
                self.option_type == other.option_type)

class GenericBacktester:
    def __init__(self, df, ticker='REGN'):  # Default to a specific ticker
        self.dl = DataLoader()
        self.dl.load_data(df[df['underlying'] == ticker])  # Filter data for the specific ticker
        self.pm = PositionManager()
        self.results = []
        self.trade_blotter = []

    def find_closest_strike(self, date, target_strike, expiry, option_type):
        available = [opt for opt in self.dl.historical_data.get(date, {})
                     if opt.expiry == expiry and opt.option_type == option_type]
        if not available:
            return None
        strikes = np.array([opt.strike for opt in available])
        idx = np.argmin(np.abs(strikes - target_strike))
        return available[idx]

    def get_total_delta(self, date, underlying_price):
        total_delta = 0
        for key, pos in self.pm.positions.items():
            if pos['quantity'] == 0:
                continue
            opt = Option(*key)
            if opt in self.dl.historical_data.get(date, {}):
                delta = self.dl.historical_data[date][opt]['delta']
                total_delta += delta * pos['quantity']
        return (total_delta + self.pm.underlying_position)*underlying_price

    def create_strategy_legs(self, date, strategy):
        legs = []
        S = self.dl.underlying_data[date]

        for leg in strategy.legs:
            expiry_date = date + timedelta(days=leg.days_to_expiry)
            target_strike = S * leg.strike_pct
            opt = self.find_closest_strike(date, target_strike, expiry_date, leg.option_type)

            if not opt:
                return None  # Skip if any leg is missing

            # Get execution price
            if leg.quantity < 0:  # Selling
                price = self.dl.historical_data[date][opt]['best_bid']
            else:  # Buying
                price = self.dl.historical_data[date][opt]['best_offer']

            legs.append({
                'option': opt,
                'quantity': leg.quantity,
                'price': price
            })

        return legs

    def calculate_risk_metrics(self, date):
        delta, gamma, vega, theta = 0, 0, 0, 0
        for key, pos in self.pm.positions.items():
            if pos['quantity'] == 0:
                continue
            opt = Option(*key)
            if opt in self.dl.historical_data.get(date, {}):
                data = self.dl.historical_data[date][opt]
                delta += data['delta'] * pos['quantity']
                # Assuming gamma/vega/theta are part of historical data (extend as needed)
                gamma += data.get('gamma', 0) * pos['quantity']
                vega += data.get('vega', 0) * pos['quantity']
                theta += data.get('theta', 0) * pos['quantity']
        delta += self.pm.underlying_position
        return delta, gamma, vega, theta

    def run_backtest(self, strategy, trade_quantity=1):
        for date in sorted(self.dl.historical_data.keys()):
            if date not in self.dl.underlying_data:
                print(f"Skipping {date} as underlying data is not available.")
                continue

            S = self.dl.underlying_data[date]

            # Create strategy legs for current date
            legs = self.create_strategy_legs(date, strategy)
            if not legs:
                print(f"Skipping {date} as no strategy legs could be created.")
                continue

            # Execute strategy
            self.pm.execute_trade(date, legs, trade_quantity)

            # Log trades to blotter
            for leg in legs:
                trade_details = {
                    'date': date,
                    'action': 'SELL' if leg['quantity'] < 0 else 'BUY',
                    'option': leg['option'],
                    'quantity': abs(leg['quantity']),
                    'price': leg['price'],
                    'cost': leg['quantity'] * leg['price']
                }
                self.trade_blotter.append(trade_details)

            # Calculate and hedge delta
            total_delta = self.get_total_delta(date, S)
            hedge_qty, hedge_cost = self.pm.delta_hedge(date, total_delta, S)

            if hedge_qty != 0:
                self.trade_blotter.append({
                    'date': date,
                    'action': 'BUY' if hedge_qty > 0 else 'SELL',
                    'underlying': S,
                    'quantity': abs(hedge_qty),
                    'price': S,
                    'cost': hedge_cost
                })

            # Calculate MTM PnL and risk metrics
            mtm_pnl = 0
            for key, pos in self.pm.positions.items():
                opt = Option(*key)
                if opt in self.dl.historical_data.get(date, {}):
                    px = (self.dl.historical_data[date][opt]['best_bid'] if pos['quantity'] < 0
                        else self.dl.historical_data[date][opt]['best_offer'])
                    mtm_pnl += (px * pos['quantity']) - pos['cost']

            delta, gamma, vega, theta = self.calculate_risk_metrics(date)

            # Append results snapshot
            self.results.append({
                'date': date,
                'underlying': S,
                'total_delta': total_delta,
                'hedge_qty': hedge_qty,
                'cash': self.pm.cash,
                'mtm_pnl': mtm_pnl,
                'hedge_cost': hedge_cost,
                'strategy': strategy.name,
                'delta': delta,
                'gamma': gamma,
                'vega': vega,
                'theta': theta
            })

        # Debugging: Print the results list to verify its contents
        print("Backtest Results: ", self.results)

        return pd.DataFrame(self.results)

    def plot_daily_pnl(self):
        df_results = pd.DataFrame(self.results)
        plt.figure(figsize=(12, 6))
        plt.plot(df_results['date'], df_results['mtm_pnl'], marker='o')
        plt.title("Daily PnL Over Run Period")
        plt.xlabel("Date")
        plt.ylabel("MTM PnL")
        plt.grid()
        plt.show()

    def print_trade_blotter(self):
        print("Trade Blotter:")
        for trade in self.trade_blotter:
            if 'underlying' in trade:
                print(f"{trade['date'].strftime('%Y-%m-%d')}: {trade['action']} {trade['quantity']} \
                {trade['underlying']} @ {trade['price']:.2f} (Cost: {trade['cost']:.2f})")
            else:
                opt = trade['option']
                print(f"{trade['date'].strftime('%Y-%m-%d')}: {trade['action']} {trade['quantity']} \
                {opt} @ {trade['price']:.2f} (Cost: {trade['cost']:.2f})")


options_data.tail()

In [None]:
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import logging
from pathlib import Path
import json

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('strategy.log'),
        logging.StreamHandler()
    ]
)

@dataclass
class StrategyParameters:
    """Parameters for the options trading strategy"""
    delta_threshold: float
    position_size: int
    include_otm: bool
    trade_direction: str

    def to_string(self) -> str:
        """Convert parameters to a string for file naming"""
        return f"delta_{self.delta_threshold}_size_{self.position_size}_otm_{self.include_otm}_dir_{self.trade_direction}"

@dataclass
class Position:
    """Represents an options position"""
    type: str  # 'call' or 'put'
    strike: float
    quantity: int
    entry_price: float
    entry_date: datetime
    ticker: str

@dataclass
class PortfolioMetrics:
    """Portfolio performance metrics"""
    total_return: float
    sharpe_ratio: float
    max_drawdown: float
    sortino_ratio: float
    win_rate: float

class OptionsStrategy:
    def __init__(
        self,
        options_data: pd.DataFrame,
        params: StrategyParameters,
        initial_capital: float = 1_000_000,
        output_dir: str = "results"
    ):
        """Initialize the options trading strategy"""
        self.validate_input_data(options_data)
        self.data = self.preprocess_data(options_data)
        self.params = params
        self.initial_capital = initial_capital
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)

        self.positions: List[Position] = []
        self.daily_pnl: List[Dict] = []
        self.grouped_data = self.data.groupby(['underlying', 'date'])

    @staticmethod
    def validate_input_data(data: pd.DataFrame) -> None:
        """Validate input data structure and contents"""
        required_columns = {
            'date', 'underlying', 'strike_price_actual', 'cp_flag',
            'delta', 'best_bid', 'best_offer'
        }
        missing_columns = required_columns - set(data.columns)
        if missing_columns:
            raise ValueError(f"Missing required columns: {missing_columns}")

    def preprocess_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """Preprocess and clean the input data"""
        df = data.copy()
        df['date'] = pd.to_datetime(df['date'])
        df['strike_price'] = df['strike_price_actual']

        # Remove invalid options data
        df = df[
            (df['best_bid'] > 0) &
            (df['best_offer'] > df['best_bid']) &
            (df['delta'].notna()) &
            (df['strike_price'] > 0)
        ]

        return df

    def get_market_data(self, date: datetime, ticker: str) -> Optional[Tuple[float, pd.DataFrame, pd.DataFrame]]:
        """Get underlying price and ATM options for a given date and ticker"""
        try:
            if (ticker, date) not in self.grouped_data.groups:
                return None

            daily_data = self.grouped_data.get_group((ticker, date))
            calls = daily_data[daily_data['cp_flag'] == 'C']
            puts = daily_data[daily_data['cp_flag'] == 'P']

            if calls.empty or puts.empty:
                return None

            # Find ATM price using call options closest to 0.5 delta
            atm_call = calls.iloc[(calls['delta'] - 0.5).abs().argmin()]
            underlying_price = atm_call['strike_price'] + (atm_call['best_bid'] + atm_call['best_offer']) / 2

            return underlying_price, calls, puts
        except Exception as e:
            logging.error(f"Error getting market data for {ticker} on {date}: {e}")
            return None

    def select_options(self, underlying_price: float, calls: pd.DataFrame, puts: pd.DataFrame) -> List[Position]:
        """Select options based on strategy parameters"""
        new_positions = []

        # Filter options based on delta threshold
        eligible_calls = calls[calls['delta'] > self.params.delta_threshold]
        eligible_puts = puts[puts['delta'] < -self.params.delta_threshold]

        if not self.params.include_otm:
            eligible_calls = eligible_calls[eligible_calls['strike_price'] <= underlying_price]
            eligible_puts = eligible_puts[eligible_puts['strike_price'] >= underlying_price]

        # Select the most liquid options
        for options_df, opt_type in [(eligible_calls, 'call'), (eligible_puts, 'put')]:
            if not options_df.empty:
                option = options_df.sort_values('delta', ascending=(opt_type == 'put')).iloc[0]
                price = option['best_bid'] if self.params.trade_direction == 'sell' else option['best_offer']

                new_positions.append(Position(
                    type=opt_type,
                    strike=option['strike_price'],
                    quantity=self.params.position_size,
                    entry_price=price,
                    entry_date=option['date'],
                    ticker=option['underlying']
                ))

        return new_positions

    def calculate_pnl(self, positions: List[Position], underlying_price: float) -> float:
        """Calculate P&L for current positions"""
        total_pnl = 0
        for pos in positions:
            if pos.type == 'call':
                value = max(0, underlying_price - pos.strike)
            else:
                value = max(0, pos.strike - underlying_price)

            multiplier = -1 if self.params.trade_direction == 'sell' else 1
            total_pnl += (value - pos.entry_price) * pos.quantity * multiplier

        return total_pnl

    def run_strategy(self, ticker: str) -> pd.DataFrame:
        """Run the strategy for a single ticker"""
        capital = self.initial_capital
        daily_results = []

        dates = sorted(self.data[self.data['underlying'] == ticker]['date'].unique())

        for date in dates:
            market_data = self.get_market_data(date, ticker)
            if not market_data:
                continue

            underlying_price, calls, puts = market_data

            # Close existing positions
            if self.positions:
                pnl = self.calculate_pnl(self.positions, underlying_price)
                capital += pnl
                self.positions = []

            # Open new positions
            new_positions = self.select_options(underlying_price, calls, puts)
            initial_cost = sum(pos.entry_price * pos.quantity for pos in new_positions)

            if self.params.trade_direction == 'sell':
                capital += initial_cost
            else:
                capital -= initial_cost

            self.positions = new_positions

            daily_results.append({
                'date': date,
                'ticker': ticker,
                'capital': capital,
                'underlying_price': underlying_price,
                'positions': len(self.positions),
                'daily_pnl': capital - (daily_results[-1]['capital'] if daily_results else self.initial_capital)
            })

        return pd.DataFrame(daily_results)

def calculate_portfolio_metrics(returns_df: pd.DataFrame) -> PortfolioMetrics:
    """Calculate portfolio performance metrics"""
    if returns_df.empty:
        return PortfolioMetrics(0, 0, 0, 0, 0)

    daily_returns = returns_df['daily_return'].dropna()

    total_return = (daily_returns / 100 + 1).prod() - 1
    excess_returns = daily_returns - 0.02 / 252  # Assuming 2% risk-free rate

    sharpe_ratio = np.sqrt(252) * excess_returns.mean() / daily_returns.std()

    downside_returns = daily_returns[daily_returns < 0]
    sortino_ratio = np.sqrt(252) * excess_returns.mean() / downside_returns.std()

    cumulative_returns = (daily_returns / 100 + 1).cumprod()
    rolling_max = cumulative_returns.expanding().max()
    drawdowns = (cumulative_returns - rolling_max) / rolling_max
    max_drawdown = drawdowns.min()

    win_rate = len(daily_returns[daily_returns > 0]) / len(daily_returns)

    return PortfolioMetrics(
        total_return=total_return,
        sharpe_ratio=sharpe_ratio,
        max_drawdown=max_drawdown,
        sortino_ratio=sortino_ratio,
        win_rate=win_rate
    )

def run_parameter_optimization(
    options_data: pd.DataFrame,
    tickers: List[str],
    parameter_grid: Dict,
    initial_capital: float = 1_000_000,
    output_dir: str = "results"
) -> pd.DataFrame:
    """Run strategy optimization across parameter combinations"""
    results = []

    # Create parameter combinations
    param_combinations = [
        StrategyParameters(delta, size, otm, direction)
        for delta in parameter_grid['delta_thresholds']
        for size in parameter_grid['position_sizes']
        for otm in parameter_grid['include_otm_options']
        for direction in parameter_grid['trade_directions']
    ]

    for params in param_combinations:
        logging.info(f"Running strategy with parameters: {params}")

        strategy = OptionsStrategy(options_data, params, initial_capital, output_dir)
        daily_returns_all = pd.DataFrame()

        # Run strategy for each ticker in parallel
        with ThreadPoolExecutor() as executor:
            future_to_ticker = {
                executor.submit(strategy.run_strategy, ticker): ticker
                for ticker in tickers
            }

            for future in as_completed(future_to_ticker):
                ticker = future_to_ticker[future]
                try:
                    ticker_results = future.result()
                    if not ticker_results.empty:
                        ticker_results['daily_return'] = ticker_results['daily_pnl'] / ticker_results['capital'].shift(1) * 100
                        daily_returns_all = pd.concat([daily_returns_all, ticker_results], ignore_index=True)
                except Exception as e:
                    logging.error(f"Error processing ticker {ticker}: {e}")

        # Save daily returns for this parameter set
        if not daily_returns_all.empty:
            output_file = Path(output_dir) / f"daily_returns_{params.to_string()}.csv"
            daily_returns_all.to_csv(output_file, index=False)

            # Calculate and store metrics
            metrics = calculate_portfolio_metrics(daily_returns_all)
            results.append({
                'delta_threshold': params.delta_threshold,
                'position_size': params.position_size,
                'include_otm': params.include_otm,
                'trade_direction': params.trade_direction,
                **metrics.__dict__
            })

    results_df = pd.DataFrame(results)
    results_df.to_csv(Path(output_dir) / "optimization_results.csv", index=False)
    return results_df

if __name__ == "__main__":
    # Example usage
    parameter_grid = {
        'delta_thresholds': [0.1,0.3,0.5,0.7],0., #
        'position_sizes': [1,5,10,20,50],
        'include_otm_options': [False,True],
        'trade_directions': ['buy', 'sell']
    }

    try:
        tickers = options_data.underlying.unique().tolist()
        results = run_parameter_optimization(
            options_data=options_data,
            tickers=tickers,
            parameter_grid=parameter_grid,
            initial_capital=1_000_000,
            output_dir="strategy_results"
        )

        # Print best performing parameters
        best_params = results.sort_values('sharpe_ratio', ascending=False).iloc[0]
        logging.info(f"Best parameters found:\n{best_params.to_dict()}")

    except Exception as e:
        logging.error(f"Error in main execution: {e}", exc_info=True)

In [None]:
# # -*- coding: utf-8 -*-
# import pandas as pd
# import numpy as np
# import sqlite3
# import logging
# from datetime import datetime, timedelta
# from pathlib import Path
# from concurrent.futures import ThreadPoolExecutor, as_completed
# from dataclasses import dataclass
# from typing import List, Dict, Optional, Tuple
# import matplotlib.pyplot as plt
# import seaborn as sns

# # Configure logging
# logging.basicConfig(
#     level=logging.INFO,
#     format='%(asctime)s - %(levelname)s - %(message)s',
#     handlers=[
#         logging.FileHandler('strategy.log'),
#         logging.StreamHandler()
#     ]
# )

# # Constants
# START_DATE = '2020-01-01'
# END_DATE = '2023-12-31'
# REPORTS_FOLDER = "QuantStats_Reports"
# Path(REPORTS_FOLDER).mkdir(exist_ok=True)

# # Data Classes
# @dataclass
# class StrategyParameters:
#     """Parameters for the options trading strategy"""
#     delta_threshold: float
#     position_size: int
#     include_otm: bool
#     trade_direction: str

#     def to_string(self) -> str:
#         """Convert parameters to a string for file naming"""
#         return f"delta_{self.delta_threshold}_size_{self.position_size}_otm_{self.include_otm}_dir_{self.trade_direction}"

# @dataclass
# class Position:
#     """Represents an options position"""
#     type: str  # 'call' or 'put'
#     strike: float
#     quantity: int
#     entry_price: float
#     entry_date: datetime
#     ticker: str

# @dataclass
# class PortfolioMetrics:
#     """Portfolio performance metrics"""
#     total_return: float
#     sharpe_ratio: float
#     max_drawdown: float
#     sortino_ratio: float
#     win_rate: float

# # Helper Functions
# def load_distinct_tickers_from_db():
#     """Load distinct tickers from the database within the specified date range."""
#     conn = sqlite3.connect(DB_PATH)
#     query = """
#         SELECT DISTINCT ticker
#         FROM merged_data
#         WHERE date BETWEEN ? AND ?
#     """
#     distinct_tickers = pd.read_sql(query, conn, params=(START_DATE, END_DATE))
#     conn.close()
#     return distinct_tickers

# def load_options_data_from_db(tickers_list):
#     """Load options data from the database for the specified tickers."""
#     conn = sqlite3.connect(DB_PATH)
#     tickers_tuple = tuple(tickers_list)
#     query = """
#         SELECT *
#         FROM volume_threshold_options_data
#         WHERE date BETWEEN ? AND ?
#         AND ticker IN ({})
#     """.format(','.join(['?'] * len(tickers_tuple)))
#     options_data = pd.read_sql(query, conn, params=(START_DATE, END_DATE) + tickers_tuple)
#     conn.close()
#     return options_data

# # Strategy Class
# class OptionsStrategy:
#     def __init__(self, options_data, params, initial_capital, output_dir):
#         self.data = self.preprocess_data(options_data)
#         self.params = params
#         self.initial_capital = initial_capital
#         self.output_dir = Path(output_dir)
#         self.output_dir.mkdir(exist_ok=True)
#         self.positions = []
#         self.daily_pnl = []
#         self.grouped_data = self.data.groupby(['underlying', 'date'])

#     def preprocess_data(self, data):
#         """Preprocess and clean the input data."""
#         df = data.copy()
#         df['date'] = pd.to_datetime(df['date'])
#         df['strike_price'] = df['strike_price_actual']
#         df = df[(df['best_bid'] > 0) & (df['best_offer'] > df['best_bid']) & (df['delta'].notna())]
#         return df

#     def calculate_pnl(self, positions, underlying_price):
#         """Calculate P&L for current positions."""
#         total_pnl = 0
#         for pos in positions:
#             if pos.type == 'call':
#                 value = max(0, underlying_price - pos.strike)
#             else:
#                 value = max(0, pos.strike - underlying_price)

#             if self.params.trade_direction == 'sell':
#                 pnl = (pos.entry_price - value) * pos.quantity * 100
#             else:
#                 pnl = (value - pos.entry_price) * pos.quantity * 100

#             total_pnl += pnl
#         return total_pnl

#     def run_strategy(self, ticker):
#         """Run the strategy for a single ticker."""
#         capital = self.initial_capital
#         daily_results = []
#         dates = sorted(self.data[self.data['underlying'] == ticker]['date'].unique())

#         for date in dates:
#             market_data = self.get_market_data(date, ticker)
#             if not market_data:
#                 continue

#             underlying_price, calls, puts = market_data

#             # Close existing positions
#             if self.positions:
#                 pnl = self.calculate_pnl(self.positions, underlying_price)
#                 capital += pnl
#                 self.positions = []

#             # Open new positions
#             new_positions = self.select_options(underlying_price, calls, puts)
#             initial_cost = sum(pos.entry_price * pos.quantity * 100 for pos in new_positions)

#             if self.params.trade_direction == 'sell':
#                 capital += initial_cost
#             else:
#                 capital -= initial_cost

#             self.positions = new_positions

#             daily_results.append({
#                 'date': date,
#                 'ticker': ticker,
#                 'capital': capital,
#                 'underlying_price': underlying_price,
#                 'positions': len(self.positions),
#                 'daily_pnl': capital - (daily_results[-1]['capital'] if daily_results else self.initial_capital)
#             })

#         return pd.DataFrame(daily_results)

# def calculate_portfolio_metrics(returns_df: pd.DataFrame) -> PortfolioMetrics:
#     """Calculate portfolio performance metrics"""
#     if returns_df.empty:
#         return PortfolioMetrics(0, 0, 0, 0, 0)

#     daily_returns = returns_df['daily_return'].dropna()

#     total_return = (daily_returns / 100 + 1).prod() - 1
#     excess_returns = daily_returns - 0.02 / 252  # Assuming 2% risk-free rate

#     sharpe_ratio = np.sqrt(252) * excess_returns.mean() / daily_returns.std()

#     downside_returns = daily_returns[daily_returns < 0]
#     sortino_ratio = np.sqrt(252) * excess_returns.mean() / downside_returns.std()

#     cumulative_returns = (daily_returns / 100 + 1).cumprod()
#     rolling_max = cumulative_returns.expanding().max()
#     drawdowns = (cumulative_returns - rolling_max) / rolling_max
#     max_drawdown = drawdowns.min()

#     win_rate = len(daily_returns[daily_returns > 0]) / len(daily_returns)

#     return PortfolioMetrics(
#         total_return=total_return,
#         sharpe_ratio=sharpe_ratio,
#         max_drawdown=max_drawdown,
#         sortino_ratio=sortino_ratio,
#         win_rate=win_rate
#     )

# # Optimization Function
# def run_parameter_optimization(options_data, tickers, parameter_grid, initial_capital, output_dir):
#     """Run strategy optimization across parameter combinations."""
#     results = []
#     param_combinations = [
#         StrategyParameters(delta, size, otm, direction)
#         for delta in parameter_grid['delta_thresholds']
#         for size in parameter_grid['position_sizes']
#         for otm in parameter_grid['include_otm_options']
#         for direction in parameter_grid['trade_directions']
#     ]

#     for params in param_combinations:
#         logging.info(f"Running strategy with parameters: {params}")
#         strategy = OptionsStrategy(options_data, params, initial_capital, output_dir)
#         daily_returns_all = pd.DataFrame()

#         with ThreadPoolExecutor() as executor:
#             future_to_ticker = {
#                 executor.submit(strategy.run_strategy, ticker): ticker
#                 for ticker in tickers
#             }

#             for future in as_completed(future_to_ticker):
#                 ticker = future_to_ticker[future]
#                 try:
#                     ticker_results = future.result()
#                     if not ticker_results.empty:
#                         ticker_results['daily_return'] = ticker_results['daily_pnl'] / ticker_results['capital'].shift(1) * 100
#                         daily_returns_all = pd.concat([daily_returns_all, ticker_results], ignore_index=True)
#                 except Exception as e:
#                     logging.error(f"Error processing ticker {ticker}: {e}")

#         if not daily_returns_all.empty:
#             output_file = Path(output_dir) / f"daily_returns_{params.to_string()}.csv"
#             daily_returns_all.to_csv(output_file, index=False)

#             metrics = calculate_portfolio_metrics(daily_returns_all)
#             results.append({
#                 'delta_threshold': params.delta_threshold,
#                 'position_size': params.position_size,
#                 'include_otm': params.include_otm,
#                 'trade_direction': params.trade_direction,
#                 **metrics.__dict__
#             })

#     results_df = pd.DataFrame(results)
#     results_df.to_csv(Path(output_dir) / "optimization_results.csv", index=False)
#     return results_df

# # Main Function
# def main():
#     # Load data
#     tickers = load_distinct_tickers_from_db()
#     tickers_list = tickers['ticker'].unique().tolist()
#     options_data = load_options_data_from_db(tickers_list)

#     # Define parameter grid for optimization
#     parameter_grid = {
#         'delta_thresholds': [0.9, 0.5, 0.3],
#         'position_sizes': [50, 25, 10],
#         'include_otm_options': [False, True],
#         'trade_directions': ['buy', 'sell']
#     }

#     # Run optimization
#     optimization_results = run_parameter_optimization(
#         options_data=options_data,
#         tickers=tickers_list,
#         parameter_grid=parameter_grid,
#         initial_capital=1_000_000,
#         output_dir="optimization_results"
#     )

#     # Print best parameters
#     best_params = optimization_results.sort_values('sharpe_ratio', ascending=False).iloc[0]
#     logging.info(f"Best parameters found:\n{best_params.to_dict()}")

# if __name__ == "__main__":
#     main()

In [None]:
# import pandas as pd
# import numpy as np

# class PositionManager:
#     def __init__(self):
#         self.positions = {}  # Store positions by keys (underlying, strike, expiry, type)
#         self.cash = 0
#         self.transaction_log = []

#     def execute_trade(self, date, underlying, legs, trade_quantity=1):
#         for leg in legs:
#             opt = leg['option']
#             qty = leg['quantity'] * trade_quantity
#             price = leg['price'] * 100  # Ensure correct contract multiplier

#             key = (opt.underlying, opt.strike, opt.expiry, opt.option_type)
#             cost = qty * price * (-1 if qty < 0 else 1)

#             if key not in self.positions:
#                 self.positions[key] = {'quantity': 0, 'cost': 0}

#             self.positions[key]['quantity'] += qty
#             self.positions[key]['cost'] += cost
#             self.cash -= cost

#             self.transaction_log.append({
#                 'date': date,
#                 'underlying': underlying,
#                 'action': 'SELL' if qty < 0 else 'BUY',
#                 'option': (opt.underlying, opt.strike, opt.expiry, opt.option_type),  # Only store essential data
#                 'quantity': abs(qty),
#                 'price': price,
#                 'cost': cost
#             })

#     def calculate_pnl(self, underlying_prices):
#         mtm_pnl = 0
#         for key, pos in self.positions.items():
#             underlying_price = underlying_prices.get(key[0], 0)  # Fetch using underlying ticker

#             if key[3] == 'C':  # Call option
#                 value = max(0, underlying_price - key[1]) * 100
#             else:  # Put option
#                 value = max(0, key[1] - underlying_price) * 100

#             market_value = value * pos['quantity']
#             mtm_pnl += market_value - pos['cost']

#         return mtm_pnl

# class TradingStrategy:
#     def __init__(self, position_manager):
#         self.pm = position_manager

#     def execute_strategy(self, date, underlying, options_market, underlying_price):
#         strategy_legs = []
#         for opt_key, opt_data in options_market.items():
#             opt = Option(*opt_key)
#             if opt.option_type == 'C' and opt.strike < underlying_price:
#                 strategy_legs.append({'option': opt, 'quantity': 1, 'price': opt_data['best_offer']})
#             elif opt.option_type == 'P' and opt.strike > underlying_price:
#                 strategy_legs.append({'option': opt, 'quantity': -1, 'price': opt_data['best_bid']})

#         if strategy_legs:
#             self.pm.execute_trade(date, underlying, strategy_legs)

#     def calculate_total_pnl(self, underlying_prices):
#         return self.pm.calculate_pnl(underlying_prices)

# def backtest_strategy(options_data):
#     pm = PositionManager()
#     strategy = TradingStrategy(pm)

#     for date, daily_data in options_data.groupby('date'):
#         underlying_prices = daily_data.groupby('underlying')['strike_price_actual'].first().to_dict()

#         for underlying, sub_data in daily_data.groupby('underlying'):
#             options_market = {
#                 (row['underlying'], row['strike_price_actual'], row['exdate'], row['cp_flag']): {
#                     'best_bid': row['best_bid'], 'best_offer': row['best_offer']
#                 }
#                 for _, row in sub_data.iterrows()
#             }

#             strategy.execute_strategy(date, underlying, options_market, underlying_prices[underlying])

#     return strategy.pm.transaction_log, strategy.calculate_total_pnl(underlying_prices)

# if __name__ == "__main__":
#     data = options_data
#     trades, pnl = backtest_strategy(data)
#     print("Trades Executed:", trades)
#     print("Final PnL:", pnl)


In [None]:
# !pip install --upgrade yfinance==0.2.54

# import yfinance as yf
# import pandas as pd
# import numpy as np
# import matplotlib.pyplot as plt
# import seaborn as sns

# # Define the tickers for XBI and other indices
# TICKERS = {
#     "XBI": "XBI",
#     "S&P 500": "^GSPC",
#     "Nasdaq": "^IXIC",
#     "Dow Jones": "^DJI",
#     "Russell 2000": "^RUT",
#     "MSCI World": "URTH",
#     "Emerging Markets": "EEM",
#     "Technology": "XLK",
#     "Healthcare": "XLV",
#     "Energy": "XLE",
#     "Financials": "XLF",
#     "Materials": "XLB",
#     "Consumer Discretionary": "XLY",
#     "Consumer Staples": "XLP",
#     "Real Estate": "XLRE",
#     "Utilities": "XLU",
#     "Vanguard Total Stock Market": "VTI",
#     "Vanguard S&P 500": "VOO"
# }

# # Define the risk-free rate ticker (13-week Treasury Bill rate)
# RISK_FREE_TICKER = "^IRX"

# # Define the time period for analysis
# START_DATE = "2020-01-01"
# END_DATE = "2025-01-01"

# # Fetch the data for all tickers
# def fetch_data(tickers, risk_free_ticker, start_date, end_date):
#     tickers_list = list(tickers.values()) + [risk_free_ticker]
#     data = yf.download(tickers_list, start=start_date, end=end_date)["Close"]
#     return data

# # Calculate returns and risk-free rate
# def calculate_returns(data, risk_free_data):
#     returns = data.pct_change().dropna()
#     risk_free_returns = risk_free_data.pct_change().dropna()
#     risk_free_returns = risk_free_returns.reindex(returns.index, method="ffill")
#     return returns, risk_free_returns

# # Calculate annualized returns, volatility, and Sharpe ratio
# def calculate_metrics(returns, risk_free_returns, tickers):
#     results = {}
#     for ticker in tickers:
#         ticker_returns = returns[tickers[ticker]]

#         # Annualized return
#         annualized_return = ticker_returns.mean() * 252

#         # Annualized volatility
#         annualized_volatility = ticker_returns.std() * np.sqrt(252)

#         # Sharpe ratio (using changing risk-free rate)
#         excess_returns = ticker_returns - risk_free_returns
#         sharpe_ratio = (excess_returns.mean() / excess_returns.std()) * np.sqrt(252)

#         results[ticker] = {
#             "Annualized Return": annualized_return,
#             "Annualized Volatility": annualized_volatility,
#             "Sharpe Ratio": sharpe_ratio,
#         }

#     return pd.DataFrame(results).T

# # Plot Annualized Return
# def plot_annualized_return(results_df):
#     sns.set(style="whitegrid")
#     plt.figure(figsize=(10, 6))
#     sorted_returns = results_df['Annualized Return'].sort_values()
#     bars = sorted_returns.plot(kind='barh', color='green')
#     bars.set_title('Annualized Returns of Indices', fontsize=16)
#     bars.set_xlabel('Annualized Return', fontsize=12)
#     bars.set_ylabel('Index', fontsize=12)
#     # Highlight XBI
#     bars.patches[sorted_returns.index.get_loc("XBI")].set_facecolor('orange')
#     plt.tight_layout()
#     plt.savefig("annualized_return.png")
#     plt.show()

# # Plot Annualized Volatility
# def plot_annualized_volatility(results_df):
#     sns.set(style="whitegrid")
#     plt.figure(figsize=(10, 6))
#     sorted_volatility = results_df['Annualized Volatility'].sort_values()
#     bars = sorted_volatility.plot(kind='barh', color='red')
#     bars.set_title('Annualized Volatility of Indices', fontsize=16)
#     bars.set_xlabel('Annualized Volatility', fontsize=12)
#     bars.set_ylabel('Index', fontsize=12)
#     # Highlight XBI
#     bars.patches[sorted_volatility.index.get_loc("XBI")].set_facecolor('orange')
#     plt.tight_layout()
#     plt.savefig("annualized_volatility.png")
#     plt.show()

# # Plot Sharpe Ratio
# def plot_sharpe_ratio(results_df):
#     sns.set(style="whitegrid")
#     plt.figure(figsize=(10, 6))
#     sorted_sharpe = results_df['Sharpe Ratio'].sort_values()
#     bars = sorted_sharpe.plot(kind='barh', color='blue')
#     bars.set_title('Sharpe Ratio of Indices', fontsize=16)
#     bars.set_xlabel('Sharpe Ratio', fontsize=12)
#     bars.set_ylabel('Index', fontsize=12)
#     # Highlight XBI
#     bars.patches[sorted_sharpe.index.get_loc("XBI")].set_facecolor('orange')
#     plt.tight_layout()
#     plt.savefig("sharpe_ratio.png")
#     plt.show()

# # Main function to run the analysis
# def main():
#     # Fetch data
#     data = fetch_data(TICKERS, RISK_FREE_TICKER, START_DATE, END_DATE)
#     risk_free_data = data[RISK_FREE_TICKER]
#     data = data.drop(columns=[RISK_FREE_TICKER])

#     # Calculate returns and risk-free rate
#     returns, risk_free_returns = calculate_returns(data, risk_free_data)

#     # Calculate metrics
#     results_df = calculate_metrics(returns, risk_free_returns, TICKERS)

#     # Plot and save results
#     plot_annualized_return(results_df)
#     plot_annualized_volatility(results_df)
#     plot_sharpe_ratio(results_df)

# # Run the main analysis
# if __name__ == "__main__":
#     main()
