In [1]:
import polars as pl
import pandas as pd
import numpy as np
from numba import njit
from joblib import Parallel, delayed
import multiprocessing
import logging

@njit
def calculate_exit_percentages(close, ema, upper_band, lower_band):
    """
    Calculate the percentage of exits above the upper band relative to total exits.

    Parameters:
    - close: NumPy array of closing prices.
    - ema: NumPy array of EMA values.
    - upper_band: NumPy array of upper band values (EMA + atr_mult * ATR).
    - lower_band: NumPy array of lower band values (EMA - atr_mult * ATR).

    Returns:
    - float: Percentage of exits above the upper band. Returns np.nan if no exits are found.
    """
    exits_above = 0
    exits_below = 0
    n = len(close)
    t = 1  # Start from the second data point

    while t < n:
        # Check if the previous day was in an uptrend
        if close[t-1] > ema[t-1]:
            # Check if the current day enters the band
            if lower_band[t] <= close[t] <= upper_band[t]:
                # Search for the next exit
                exit_found = False
                for k in range(t+1, n):
                    if close[k] > upper_band[k]:
                        exits_above += 1
                        t = k  # Move the pointer to the exit point
                        exit_found = True
                        break
                    elif close[k] < lower_band[k]:
                        exits_below += 1
                        t = k  # Move the pointer to the exit point
                        exit_found = True
                        break
                if not exit_found:
                    # If no exit found till the end, exit the loop
                    break
        t += 1

    total_exits = exits_above + exits_below
    if total_exits == 0:
        return np.nan  # Avoid division by zero
    else:
        percentage = (exits_above / total_exits) * 100
        return percentage

def compute_exit_percentage(
    df: pl.DataFrame,
    ema_period: int,
    atr_mult: float,
    atr_calc_period: int
) -> float:
    """
    Compute the percentage of exits above the upper ATR band relative to total exits.

    Parameters:
    - df (pl.DataFrame): Polars DataFrame containing at least 'High', 'Low', 'Close' columns.
    - ema_period (int): The period for calculating the Exponential Moving Average (EMA).
    - atr_mult (float): Multiplier for ATR to define the upper and lower bands.
    - atr_calc_period (int): The period over which to calculate the Average True Range (ATR).

    Returns:
    - float: Percentage of exits above the upper ATR band. Returns np.nan if no exits are found.
    """

    # Validate required columns
    required_columns = {'High', 'Low', 'Close'}
    if not required_columns.issubset(df.columns):
        missing = required_columns - set(df.columns)
        raise ValueError(f"DataFrame is missing required columns: {missing}")

    # Sort the DataFrame by 'Date' if present, else assume already sorted
    if 'Date' in df.columns:
        df = df.sort('Date')
    else:
        df = df.sort(pl.col(df.columns[0]))  # Sort by the first column

    # Convert Polars DataFrame to Pandas for EMA and ATR calculations
    df_pd = df.to_pandas()

    # Ensure 'Date' column is in datetime format if it exists
    if 'Date' in df_pd.columns:
        df_pd['Date'] = pd.to_datetime(df_pd['Date'])
        df_pd = df_pd.sort_values('Date').reset_index(drop=True)

    # Compute True Range (TR)
    high = df_pd['High'].values
    low = df_pd['Low'].values
    close = df_pd['Close'].values

    tr = np.maximum(high[1:] - low[1:], np.abs(high[1:] - close[:-1]))
    tr = np.maximum(tr, np.abs(low[1:] - close[:-1]))
    tr = np.concatenate(([np.nan], tr))  # Align with original DataFrame

    # Compute ATR using rolling mean of TR
    df_pd['ATR'] = pd.Series(tr).rolling(window=atr_calc_period, min_periods=1).mean()

    # Calculate EMA
    df_pd['EMA'] = df_pd['Close'].ewm(span=ema_period, adjust=False).mean()

    # Define ATR bands
    df_pd['Upper_Band'] = df_pd['EMA'] + atr_mult * df_pd['ATR']
    df_pd['Lower_Band'] = df_pd['EMA'] - atr_mult * df_pd['ATR']

    # Drop rows with NaN values resulting from EMA or ATR calculation
    df_pd_clean = df_pd.dropna(subset=['EMA', 'ATR', 'Upper_Band', 'Lower_Band']).reset_index(drop=True)

    # Extract relevant columns as NumPy arrays
    close_prices = df_pd_clean['Close'].to_numpy()
    ema_values = df_pd_clean['EMA'].to_numpy()
    upper_band = df_pd_clean['Upper_Band'].to_numpy()
    lower_band = df_pd_clean['Lower_Band'].to_numpy()

    # Calculate the percentage using the Numba function
    percentage = calculate_exit_percentages(close_prices, ema_values, upper_band, lower_band)

    return percentage

def analyze_ticker(ticker, market_data, ema_period, atr_mult, atr_calc_period):
    """
    Analyze a single ticker to compute the exit percentage.

    Parameters:
    - ticker (str): The ticker symbol.
    - market_data: The MarketData object.
    - ema_period (int): The period for EMA calculation.
    - atr_mult (float): ATR multiplier for bands.
    - atr_calc_period (int): ATR calculation period.

    Returns:
    - tuple: (ticker, percentage)
    """
    try:
        ub = market_data.get_ticker_data(ticker)
        percentage = compute_exit_percentage(
            df=ub,
            ema_period=ema_period,
            atr_mult=atr_mult,
            atr_calc_period=atr_calc_period
        )
        return (ticker, percentage)
    except ValueError as e:
        print(f"Ticker {ticker}: {e}")
        return (ticker, None)
    except Exception as e:
        print(f"Ticker {ticker}: Unexpected error: {e}")
        return (ticker, None)

def analyze_multiple_tickers(
    tickers: list,
    market_data,
    ema_period: int,
    atr_mult: float,
    atr_calc_period: int
) -> dict:
    """
    Analyze multiple tickers and compute exit percentages sequentially.

    Parameters:
    - tickers (list): List of ticker symbols as strings.
    - market_data: The MarketData object.
    - ema_period (int): The period for EMA calculation.
    - atr_mult (float): ATR multiplier for bands.
    - atr_calc_period (int): ATR calculation period.

    Returns:
    - dict: Dictionary mapping ticker symbols to their exit percentages.
    """
    results = {}
    for ticker in tickers:
        ticker, pct = analyze_ticker(ticker, market_data, ema_period, atr_mult, atr_calc_period)
        if pct is None:
            results[ticker] = None
        else:
            results[ticker] = pct
    return results

def analyze_multiple_tickers_parallel(
    tickers: list,
    market_data,
    ema_period: int,
    atr_mult: float,
    atr_calc_period: int,
    n_jobs: int = -1
) -> dict:
    """
    Analyze multiple tickers in parallel and compute exit percentages.

    Parameters:
    - tickers (list): List of ticker symbols as strings.
    - market_data: The MarketData object.
    - ema_period (int): The period for EMA calculation.
    - atr_mult (float): ATR multiplier for bands.
    - atr_calc_period (int): ATR calculation period.
    - n_jobs (int): Number of parallel jobs. Default is -1 (all available cores).

    Returns:
    - dict: Dictionary mapping ticker symbols to their exit percentages.
    """
    results = Parallel(n_jobs=n_jobs)(
        delayed(analyze_ticker)(ticker, market_data, ema_period, atr_mult, atr_calc_period) for ticker in tickers
    )
    return dict(results)

def main():
    # Specify the file paths (as per your initial setup)
    data_folder = r'G:\Projects\BackTesting1.0\Data\Bloomberg\Futures'
    # data_folder = r'/Users/utkarsh/Documents/Projects/BBG-Trading/RealTest/Data/Futures'
    tick_values_file = r'G:\Projects\BackTesting1.0\Data\Bloomberg\HelperFiles\fut_val_pt.parquet'
    fx_rates_file = r'G:\Projects\BackTesting1.0\Data\Bloomberg\HelperFiles\fxHist.parquet'

    # Create the MarketData object
    market_data = MarketData(
        data_folder,
        tick_values_file,
        fx_rates_file,
        instrument_type="Futures",
        log_level=logging.ERROR
    )

    # Example: Analyzing a single ticker
    single_ticker = 'ES1 Index'
    ema_period = 20
    atr_mult = 0.5
    atr_calc_period = 20

    try:
        ub = market_data.get_ticker_data(single_ticker)
        percentage = compute_exit_percentage(
            df=ub,
            ema_period=ema_period,
            atr_mult=atr_mult,
            atr_calc_period=atr_calc_period
        )

        if np.isnan(percentage):
            print(f"{single_ticker}: No exit instances found to calculate the percentage.")
        else:
            print(f"{single_ticker}: Percentage of exits above the upper band: {percentage:.2f}%")
    except ValueError as e:
        print(f"Error: {e}")

    # Example: Analyzing multiple tickers sequentially
    tickers = ['ES1 Index', 'CL1 Index', 'GC1 Index']  # Add more tickers as needed
    results_sequential = analyze_multiple_tickers(
        tickers=tickers,
        market_data=market_data,
        ema_period=ema_period,
        atr_mult=atr_mult,
        atr_calc_period=atr_calc_period
    )

    # Display sequential results
    print("\nSequential Analysis Results:")
    for ticker, pct in results_sequential.items():
        if pct is None:
            print(f"{ticker}: No exit instances found or ATR data missing.")
        else:
            print(f"{ticker}: {pct:.2f}% of exits were above the upper band.")

    # Example: Analyzing multiple tickers in parallel
    results_parallel = analyze_multiple_tickers_parallel(
        tickers=tickers,
        market_data=market_data,
        ema_period=ema_period,
        atr_mult=atr_mult,
        atr_calc_period=atr_calc_period,
        n_jobs=-1  # Use all available CPU cores
    )

    # Display parallel results
    print("\nParallel Analysis Results:")
    for ticker, pct in results_parallel.items():
        if pct is None:
            print(f"{ticker}: No exit instances found or ATR data missing.")
        else:
            print(f"{ticker}: {pct:.2f}% of exits were above the upper band.")

if __name__ == "__main__":
    main()


NameError: name 'MarketData' is not defined