In [None]:
# %pip install vectorbt
# libta-lib
#url = 'https://anaconda.org/conda-forge/libta-lib/0.4.0/download/linux-64/libta-lib-0.4.0-h166bdaf_1.tar.bz2'
#!curl -L $url | tar xj -C /usr/lib/x86_64-linux-gnu/ lib --strip-components=1

# ta-lib Python package for Python 3.10
#!wget https://anaconda.org/conda-forge/ta-lib/0.4.19/download/linux-64/ta-lib-0.4.19-py310hde88566_4.tar.bz2

# Extracting the package
#!tar -xjf ta-lib-0.4.19-py310hde88566_4.tar.bz2

# Moving talib to site-packages
#!mv ./lib/python3.10/site-packages/talib /usr/local/lib/python3.10/dist-packages/

In [None]:
from dask.distributed import Client, LocalCluster
import json
import logging
import os
import glob
from datetime import timedelta
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd
import dask
import vectorbt as vbt
import matplotlib.pyplot as plt
import talib
from dask.diagnostics import ProgressBar

### Configurations

In [None]:
# Setting up progress bar for Dask operations
ProgressBar().register()

# Configuring Dask spill-to-disk
temp_dir = "/tmp/dask-spill"  
os.makedirs(temp_dir, exist_ok=True)

# Seingt up LocalCluster with 8 workers for better parallelism
cluster = LocalCluster(
    n_workers=8,                     # More workers for 8 vCPUs
    threads_per_worker=2,            # 2 threads per worker
    memory_limit="6GB",              # ~48GB total
    processes=True,                  
    local_directory=temp_dir         
)
client = Client(cluster)
print("Dask Dashboard:", client.dashboard_link)

dask.config.set({
    "temporary-directory": temp_dir,
    "distributed.worker.memory.spill": 0.7,    
    "distributed.worker.memory.target": 0.6,   
    "distributed.worker.memory.pause": 0.85,   
    "distributed.worker.memory.terminate": 0.95,  
})

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:44787
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:43113'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:34003'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33495'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:37811'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38131'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:37249'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:40893'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33195'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:40481 name: 2
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:40481
INFO:distributed.core:Starting

Dask Dashboard: http://127.0.0.1:8787/status


<dask.config.set at 0x7a59abe82110>

In [None]:
# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Directory and file constants
BASE_DIR = "/content/backtesting_final"
TICKER_JSON = "nasdaq100_ticker_dataset.json"
LOG_CSV = "regime_counts.csv"
PLOT_DIR = "plots"
OUTPUT_DIR = "output"
REGIMES_DIR = os.path.join(BASE_DIR, "_regimes")

# Loading NASDAQ-100 tickers
print(f"Loading tickers from {TICKER_JSON}")
try:
    with open(TICKER_JSON, 'r') as f:
        ticker_data = json.load(f)
    tickers = sorted(set(item['Ticker'] for item in ticker_data))
except Exception as e:
    logging.error(f"Failed to load tickers from {TICKER_JSON}: {str(e)}")
    raise

# Loading earliest dates for each ticker
earliest_dates_df = pd.read_csv('earliest_dates.csv')

# Schemas
input_schema = pa.schema([
    ('ticker', pa.string()),
    ('timestamp', pa.timestamp('ns')),
    ('open', pa.float64()),
    ('high', pa.float64()),
    ('low', pa.float64()),
    ('close', pa.float64()),
    ('volume', pa.float32()),
    ('prev_session_high', pa.float32()),
    ('prev_session_low', pa.float32()),
    ('estimated_bid_ask_spread', pa.float32()),
    ('estimated_obd', pa.float32()),
    ('50_day_sma', pa.float32()),
    ('news_impact', pa.int8())
])
for field in input_schema:
    print(f"  {field.name}: {field.type}")

output_schema = pa.schema([
    ('ticker', pa.string()),
    ('date', pa.timestamp('ns')),
    ('volatility_regime', pa.string()),
    ('trend_regime', pa.string()),
    ('liquidity_regime', pa.string()),
    ('news_impact', pa.int8())
])
for field in output_schema:
    print(f"  {field.name}: {field.type}")

### Preprocessing files based on schema

In [None]:
def preprocess_parquet_file(input_file, output_file):
    try:
        table = pq.read_table(input_file, schema=input_schema)
        df = table.to_pandas()
        df = df.drop_duplicates(subset=['timestamp'], keep='last')
        # Validating price consistency
        invalid_prices = df[(df['high'] < df['low']) |
                          (df['close'] > df['high']) |
                          (df['close'] < df['low'])]
        if not invalid_prices.empty:
            logging.warning(f"Invalid prices detected in {input_file}: {len(invalid_prices)} rows")
            df = df[~((df['high'] < df['low']) |
                      (df['close'] > df['high']) |
                      (df['close'] < df['low']))]
        table = pa.Table.from_pandas(df, schema=input_schema)
        os.makedirs(os.path.dirname(output_file), exist_ok=True)
        with pq.ParquetWriter(output_file, input_schema, compression='snappy', use_dictionary=False) as writer:
            writer.write_table(table)
        print(f"Preprocessed {input_file} to {output_file}")
        return True
    except Exception as e:
        logging.error(f"Error preprocessing {input_file}: {str(e)}")
        return False

def preprocess_ticker_files(ticker):
    ticker_path = os.path.join(BASE_DIR, f'ticker={ticker}')
    output_path = os.path.join(BASE_DIR, f'ticker={ticker}_preprocessed')
    parquet_files = glob.glob(os.path.join(ticker_path, '*.parquet'))
    preprocessed_files = []
    for file in parquet_files:
        output_file = os.path.join(output_path, os.path.basename(file))
        if preprocess_parquet_file(file, output_file):
            preprocessed_files.append(output_file)
    return preprocessed_files, parquet_files

### Regime classification functions

In [None]:
def compute_volatility_regime(stoch_k, k_low, k_high):
    if pd.isna(stoch_k) or pd.isna(k_low) or pd.isna(k_high):
        logging.info(f"Volatility regime undefined: stoch_k={stoch_k}, k_low={k_low}, k_high={k_high}")
        return 'undefined'
    if stoch_k < k_low:
        logging.info(f"Low volatility assigned: stoch_k={stoch_k} < k_low={k_low}")
        return 'Low'
    elif stoch_k > k_high:
        logging.info(f"High volatility assigned: stoch_k={stoch_k} > k_high={k_high}")
        return 'High'
    logging.info(f"Medium volatility assigned: {k_low} <= stoch_k={stoch_k} <= k_high={k_high}")
    return 'Medium'

def compute_trend_regime(adx, plus_di, minus_di, high, low, bb_width, adx_low, adx_high, bb_width_low):
    if pd.isna(adx) or pd.isna(plus_di) or pd.isna(minus_di) or pd.isna(bb_width) or pd.isna(adx_low) or pd.isna(adx_high):
        logging.info(f"Trend regime undefined: adx={adx}, plus_di={plus_di}, minus_di={minus_di}, bb_width={bb_width}, adx_low={adx_low}, adx_high={adx_high}")
        return 'undefined'
    if bb_width == 0:
        logging.info("Trend regime undefined: bb_width is zero")
        return 'undefined'
    higher_highs = high[-1] > high[-2] if len(high) > 1 else False
    higher_lows = low[-1] > low[-2] if len(low) > 1 else False
    lower_highs = high[-1] < high[-2] if len(high) > 1 else False
    lower_lows = low[-1] < low[-2] if len(low) > 1 else False
    if adx > adx_high and plus_di > minus_di and (higher_highs or higher_lows):
        logging.info(f"Uptrend assigned: adx={adx} > adx_high={adx_high}, plus_di={plus_di} > minus_di={minus_di}, higher_highs={higher_highs}, higher_lows={higher_lows}")
        return 'Uptrend'
    elif adx > adx_high and minus_di > plus_di and (lower_highs or lower_lows):
        logging.info(f"Downtrend assigned: adx={adx} > adx_high={adx_high}, minus_di={minus_di} > plus_di={plus_di}, lower_highs={lower_highs}, lower_lows={lower_lows}")
        return 'Downtrend'
    elif adx < adx_low and (pd.isna(bb_width_low) or bb_width < bb_width_low):
        logging.info(f"Range assigned: adx={adx} < adx_low={adx_low}, bb_width={bb_width} < bb_width_low={bb_width_low}")
        return 'Range'
    logging.info(f"Range assigned: no strong trend conditions met")
    return 'Range'

def compute_liquidity_regime(spread, obd, volume, spread_low, spread_high, obd_low, obd_high, volume_high):
    if pd.isna(spread) or pd.isna(obd) or pd.isna(spread_low) or pd.isna(spread_high) or pd.isna(obd_low) or pd.isna(obd_high):
        if not pd.isna(volume) and volume > volume_high:
            logging.info(f"High liquidity assigned based on high volume: volume={volume} > volume_high={volume_high}")
            return 'High'
        logging.info("Using 'Medium' liquidity regime due to missing spread/obd data")
        return 'Medium'
    spread_pct = spread / 100
    logging.info(f"Liquidity check: spread_pct={spread_pct}, spread_low={spread_low}, spread_high={spread_high}, obd={obd}, obd_low={obd_low}, obd_high={obd_high}, volume={volume}, volume_high={volume_high}")
    if spread_pct < spread_low and (obd > obd_high or volume > volume_high):
        logging.info("High liquidity assigned: tight spread and high obd or volume")
        return 'High'
    elif spread_pct > spread_high and obd < obd_low:
        logging.info("Low liquidity assigned: wide spread and low obd")
        return 'Low'
    logging.info("Liquidity regime set to 'Medium' as conditions for 'High' or 'Low' not met")
    return 'Medium'

### Ticker processing and computing metrics functions, assigning regimes

In [None]:
def process_ticker_day(ticker, date, df, lookback=30, quantile_window=252):
    print(f"\nProcessing ticker {ticker} for date {date}")
    try:
        date = pd.Timestamp(date)
        end_date = date - timedelta(days=1)  # Last day of historical data
        quantile_start = date - timedelta(days=quantile_window)

        # Filtering minute-level data up to end_date (excluding date)
        window = df[(df['timestamp'].dt.date <= end_date.date())]
        trading_days = sorted(window['timestamp'].dt.date.unique().compute())

        # Dynamically setting lookback period: min 27, max 30
        min_lookback = 27
        if len(trading_days) < min_lookback:
            logging.warning(f"Insufficient trading days ({len(trading_days)}) for {ticker} on {date.date()}")
            print(f"Insufficient trading days: {len(trading_days)}")
            return pd.DataFrame([{
                'ticker': ticker,
                'date': date,
                'volatility_regime': 'undefined',
                'trend_regime': 'undefined',
                'liquidity_regime': 'undefined',
                'news_impact': 0
            }])
        effective_lookback = min(max(min_lookback, len(trading_days)), lookback)
        start_date = trading_days[-effective_lookback]

        # Filtering window for lookback period
        window = df[(df['timestamp'].dt.date >= start_date) & (df['timestamp'].dt.date <= end_date.date())]
        quantile_window_data = df[(df['timestamp'].dt.date >= quantile_start.date()) & (df['timestamp'].dt.date <= end_date.date())]

        window_days = window['timestamp'].dt.date.nunique().compute()
        window_pd = window.compute()

        if window_pd.empty or window_days < min_lookback:
            logging.warning(f"Insufficient data in lookback window for ticker {ticker} on {date.date()} (days: {window_days}).")
            return pd.DataFrame([{
                'ticker': ticker,
                'date': date,
                'volatility_regime': 'undefined',
                'trend_regime': 'undefined',
                'liquidity_regime': 'undefined',
                'news_impact': 0
            }])

        # Aggregating to daily data
        window_pd['date'] = window_pd['timestamp'].dt.floor('D')
        daily_pd = window_pd.groupby(['ticker', 'date']).agg({
            'open': 'first',
            'high': 'max',
            'low': 'min',
            'close': 'last',
            'volume': 'sum',
            'estimated_bid_ask_spread': 'mean',
            'estimated_obd': 'mean',
            'news_impact': 'max'  # 1 if any minute has news, else 0
        }).reset_index()

        # Ensuring sufficient daily data
        if len(daily_pd) < min_lookback:
            logging.warning(f"Insufficient daily data ({len(daily_pd)} days) for {ticker} on {date.date()}")
            return pd.DataFrame([{
                'ticker': ticker,
                'date': date,
                'volatility_regime': 'undefined',
                'trend_regime': 'undefined',
                'liquidity_regime': 'undefined',
                'news_impact': 0
            }])

        # Clipping estimated_obd
        daily_pd['estimated_obd'] = daily_pd['estimated_obd'].clip(lower=1000.0)

        # Validating and clean daily data
        for col in ['high', 'low', 'close']:
            if daily_pd[col].isna().any():
                daily_pd[col] = daily_pd[col].fillna(method='ffill', limit=5).fillna(method='bfill', limit=5)
            print(f"Daily {col} dtype: {daily_pd[col].dtype}, NaNs: {daily_pd[col].isna().sum()}")

        # Checking price consistency
        invalid_prices = daily_pd[(daily_pd['high'] < daily_pd['low']) |
                                (daily_pd['close'] > daily_pd['high']) |
                                (daily_pd['close'] < daily_pd['low'])]
        if not invalid_prices.empty:
            logging.warning(f"Invalid prices detected for {ticker} on {date.date()}: {len(invalid_prices)} rows")
            return pd.DataFrame([{
                'ticker': ticker,
                'date': date,
                'volatility_regime': 'undefined',
                'trend_regime': 'undefined',
                'liquidity_regime': 'undefined',
                'news_impact': 0
            }])

        if daily_pd[['high', 'low', 'close']].isna().any().any():
            logging.warning(f"Daily data for {ticker} on {date.date()} contains NaNs after cleaning.")
            return pd.DataFrame([{
                'ticker': ticker,
                'date': date,
                'volatility_regime': 'undefined',
                'trend_regime': 'undefined',
                'liquidity_regime': 'undefined',
                'news_impact': 0
            }])

        # Checking for zero standard deviation
        close_std = daily_pd['close'].std()
        if close_std == 0 or pd.isna(close_std):
            logging.warning(f"Zero or NaN standard deviation in daily close prices for {ticker} on {date.date()}")
            return pd.DataFrame([{
                'ticker': ticker,
                'date': date,
                'volatility_regime': 'undefined',
                'trend_regime': 'undefined',
                'liquidity_regime': 'undefined',
                'news_impact': 0
            }])

        # Computing Stochastic Oscillator %K on daily data
        try:
            stoch = vbt.IndicatorFactory.from_talib('STOCH').run(
                high=daily_pd['high'],
                low=daily_pd['low'],
                close=daily_pd['close'],
                fastk_period=14
            )
            stoch_k = stoch.slowk.iloc[-1] if not stoch.slowk.empty else np.nan
        except Exception as e:
            logging.error(f"Error computing STOCH for {ticker} on {date.date()}: {str(e)}")
            stoch_k = np.nan

        # Computing ADX on daily data
        try:
            adx = vbt.IndicatorFactory.from_talib('ADX').run(
                high=daily_pd['high'],
                low=daily_pd['low'],
                close=daily_pd['close'],
                timeperiod=14
            )
            adx_val = adx.real.iloc[-1] if not adx.real.empty else np.nan
        except Exception as e:
            logging.error(f"Error computing ADX for {ticker} on {date.date()}: {str(e)}")
            adx_val = np.nan

        # Computing PLUS_DI on daily data
        try:
            plus_di = vbt.IndicatorFactory.from_talib('PLUS_DI').run(
                high=daily_pd['high'],
                low=daily_pd['low'],
                close=daily_pd['close'],
                timeperiod=14
            )
            plus_di_val = plus_di.real.iloc[-1] if not plus_di.real.empty else np.nan
        except Exception as e:
            logging.error(f"Error computing PLUS_DI for {ticker} on {date.date()}: {str(e)}")
            plus_di_val = np.nan

        # Computing MINUS_DI on daily data
        try:
            minus_di = vbt.IndicatorFactory.from_talib('MINUS_DI').run(
                high=daily_pd['high'],
                low=daily_pd['low'],
                close=daily_pd['close'],
                timeperiod=14
            )
            minus_di_val = minus_di.real.iloc[-1] if not minus_di.real.empty else np.nan
        except Exception as e:
            logging.error(f"Error computing MINUS_DI for {ticker} on {date.date()}: {str(e)}")
            minus_di_val = np.nan

        # Computing Bollinger Bands on daily data
        try:
            bbands = vbt.IndicatorFactory.from_talib('BBANDS').run(
                close=daily_pd['close'],
                timeperiod=14
            )
            bb_width = (bbands.upperband - bbands.lowerband) / bbands.middleband
            bb_width_val = bb_width.iloc[-1] if not bb_width.empty else np.nan
        except Exception as e:
            logging.error(f"Error computing BBANDS for {ticker} on {date.date()}: {str(e)}")
            bb_width_val = np.nan

        # Extracting liquidity metrics and news impact from the last available trading day
        last_day = daily_pd[daily_pd['date'] == end_date]
        if last_day.empty:
            last_day = daily_pd[daily_pd['date'] <= end_date].tail(1)
        spread = last_day['estimated_bid_ask_spread'].iloc[-1] if not last_day.empty else np.nan
        obd = last_day['estimated_obd'].iloc[-1] if not last_day.empty else np.nan
        volume = last_day['volume'].iloc[-1] if not last_day.empty else np.nan
        news_impact = int(last_day['news_impact'].iloc[-1]) if not last_day.empty and not pd.isna(last_day['news_impact'].iloc[-1]) else 0

        # Computing thresholds using quantile window
        if len(quantile_window_data) >= quantile_window // 2:
            quantile_pd = quantile_window_data.compute()

            # Aggregating quantile data to daily
            quantile_pd['date'] = quantile_pd['timestamp'].dt.floor('D')
            daily_quantile_pd = quantile_pd.groupby(['ticker', 'date']).agg({
                'open': 'first',
                'high': 'max',
                'low': 'min',
                'close': 'last',
                'volume': 'sum',
                'estimated_bid_ask_spread': 'mean',
                'estimated_obd': 'mean',
                'news_impact': 'max'
            }).reset_index()

            if daily_quantile_pd.empty:
                logging.info(f"Empty quantile window for ticker {ticker} on {date.date()}.")
                k_low, k_high = 20, 80
                adx_low, adx_high = 20, 25
                bb_width_low = np.nan
                spread_low, spread_high = 0.00002, 0.0001
                obd_low, obd_high = 10000.0, 50000.0
                volume_high = 1e7
            else:
                # Validating and cleaning quantile window data
                for col in ['high', 'low', 'close']:
                    if daily_quantile_pd[col].isna().any():
                        daily_quantile_pd[col] = daily_quantile_pd[col].fillna(method='ffill', limit=5).fillna(method='bfill', limit=5)

                if daily_quantile_pd[['high', 'low', 'close']].isna().any().any():
                    logging.warning(f"Daily quantile data for {ticker} on {date.date()} contains NaNs after cleaning.")
                    k_low, k_high = 20, 80
                    adx_low, adx_high = 20, 25
                    bb_width_low = np.nan
                    spread_low, spread_high = 0.00002, 0.0001
                    obd_low, obd_high = 10000.0, 50000.0
                    volume_high = 1e7
                else:
                    # Stochastic %K thresholds
                    try:
                        stoch_full = vbt.IndicatorFactory.from_talib('STOCH').run(
                            high=daily_quantile_pd['high'],
                            low=daily_quantile_pd['low'],
                            close=daily_quantile_pd['close'],
                            fastk_period=14
                        )
                        k_low = stoch_full.slowk.quantile(0.15) if not stoch_full.slowk.empty else 20
                        k_high = stoch_full.slowk.quantile(0.75) if not stoch_full.slowk.empty else 80
                    except Exception as e:
                        logging.error(f"Error computing STOCH (quantile) for {ticker} on {date.date()}: {str(e)}")
                        k_low, k_high = 20, 80

                    # ADX thresholds
                    try:
                        adx_full = vbt.IndicatorFactory.from_talib('ADX').run(
                            high=daily_quantile_pd['high'],
                            low=daily_quantile_pd['low'],
                            close=daily_quantile_pd['close'],
                            timeperiod=14
                        )
                        adx_low = adx_full.real.quantile(0.25) if not adx_full.real.empty else 20
                        adx_high = adx_full.real.quantile(0.75) if not adx_full.real.empty else 25
                    except Exception as e:
                        logging.error(f"Error computing ADX (quantile) for {ticker} on {date.date()}: {str(e)}")
                        adx_low, adx_high = 20, 25

                    # Bollinger Band Width threshold
                    try:
                        bbands_full = vbt.IndicatorFactory.from_talib('BBANDS').run(
                            close=daily_quantile_pd['close'],
                            timeperiod=14
                        )
                        bb_width_full = (bbands_full.upperband - bbands_full.lowerband) / bbands.middleband
                        bb_width_low = bb_width_full.quantile(0.05) if not bb_width_full.empty else np.nan
                    except Exception as e:
                        logging.error(f"Error computing BBANDS (quantile) for {ticker} on {date.date()}: {str(e)}")
                        bb_width_low = np.nan

                    # Liquidity thresholds
                    daily_quantile_pd['estimated_obd'] = daily_quantile_pd['estimated_obd'].clip(lower=1000.0)
                    spread_low = daily_quantile_pd['estimated_bid_ask_spread'].quantile(0.1) / 100 if not daily_quantile_pd.empty else 0.00002
                    spread_high = daily_quantile_pd['estimated_bid_ask_spread'].quantile(0.95) / 100 if not daily_quantile_pd.empty else 0.0001
                    obd_low = daily_quantile_pd['estimated_obd'].quantile(0.3) if not daily_quantile_pd.empty else 10000.0
                    obd_high = daily_quantile_pd['estimated_obd'].quantile(0.7) if not daily_quantile_pd.empty else 50000.0
                    volume_high = daily_quantile_pd['volume'].quantile(0.7) if not daily_quantile_pd.empty else 1e7
        else:
            logging.info(f"Using static thresholds for {ticker} on {date.date()} due to insufficient quantile data ({len(quantile_window_data)} days).")
            k_low, k_high = 20, 80
            adx_low, adx_high = 20, 25
            bb_width_low = np.nan
            spread_low, spread_high = 0.00002, 0.0001
            obd_low, obd_high = 10000.0, 50000.0
            volume_high = 1e7

        # Assigning regimes
        volatility_regime = compute_volatility_regime(stoch_k, k_low, k_high)
        trend_regime = compute_trend_regime(
            adx_val, plus_di_val, minus_di_val,
            daily_pd['high'].values[-2:], daily_pd['low'].values[-2:],
            bb_width_val, adx_low, adx_high, bb_width_low
        )
        liquidity_regime = compute_liquidity_regime(
            spread, obd, volume, spread_low, spread_high, obd_low, obd_high, volume_high
        )
        logging.info(f"Regimes assigned for {date.date()}: Volatility={volatility_regime}, Trend={trend_regime}, Liquidity={liquidity_regime}, News Impact={news_impact}")

        regime = {
            'ticker': ticker,
            'date': date,
            'volatility_regime': volatility_regime,
            'trend_regime': trend_regime,
            'liquidity_regime': liquidity_regime,
            'news_impact': news_impact
        }

        regime_df = pd.DataFrame([regime])
        return regime_df
    
    except Exception as e:
        logging.error(f"Error processing ticker {ticker} on {date.date()}: {str(e)}")
        return pd.DataFrame([{
            'ticker': ticker,
            'date': date,
            'volatility_regime': 'undefined',
            'trend_regime': 'undefined',
            'liquidity_regime': 'undefined',
            'news_impact': 0
        }])

def process_ticker(ticker, lookback=30, quantile_window=252):
    logging.info(f"\nStarting process_ticker for {ticker}")
    try:
        preprocessed_files, original_files = preprocess_ticker_files(ticker)
        files_to_process = preprocessed_files if preprocessed_files else original_files
        if not files_to_process:
            logging.warning(f"No Parquet files for {ticker}")
            return None

        dask.config.set({'dataframe.shuffle.compression': None})
        dask.config.set({'parquet.metadata': None})
        dask.config.set({'scheduler': 'threads'})

        df = dd.read_parquet(
            files_to_process,
            schema=input_schema,
            columns=[
                'ticker', 'timestamp', 'open', 'high', 'low', 'close', 'volume',
                'estimated_bid_ask_spread', 'estimated_obd', 'news_impact'
            ],
            engine='pyarrow',
            dtype_backend='pyarrow'
        )
        df['ticker'] = df['ticker'].astype('string[pyarrow]')
        df['timestamp'] = df['timestamp'].astype('timestamp[ns][pyarrow]')

        # Earliest_date based on ticker
        earliest_dates = {
            row['Ticker']: pd.Timestamp(row['Earliest_Date'])
            for _, row in earliest_dates_df.iterrows()
        }
        earliest_date = earliest_dates.get(ticker, pd.Timestamp('2020-05-04'))
        logging.info(f"Earliest date for {ticker}: {earliest_date}")

        trading_days = sorted(df['timestamp'].dt.date.unique().compute())
        trading_days = [pd.Timestamp(d) for d in trading_days if pd.Timestamp(d) >= earliest_date + timedelta(days=lookback)]

        regimes = [process_ticker_day(ticker, date, df, lookback, quantile_window) for date in trading_days]
        if not regimes:
            logging.warning(f"No regimes computed for {ticker}")
            return None

        regimes_df = pd.concat(regimes, ignore_index=True)
        regimes_df['ticker'] = regimes_df['ticker'].astype(str)
        regimes_df['date'] = regimes_df['date'].astype('datetime64[ns]')
        regimes_df['news_impact'] = regimes_df['news_impact'].astype('int8')

        # Saving daily regimes to CSV
        output_path = os.path.join(REGIMES_DIR, f'ticker={ticker}', 'daily_regimes.csv')
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        regimes_df.to_csv(output_path, index=False)
        return regimes_df
    
    except Exception as e:
        logging.error(f"Error processing {ticker}: {str(e)}")
        return None

Loading tickers from nasdaq100_ticker_dataset.json
Input schema defined:
  ticker: string
  timestamp: timestamp[ns]
  open: double
  high: double
  low: double
  close: double
  volume: float
  prev_session_high: float
  prev_session_low: float
  estimated_bid_ask_spread: float
  estimated_obd: float
  50_day_sma: float
  news_impact: int8
Output schema defined:
  ticker: string
  date: timestamp[ns]
  volatility_regime: string
  trend_regime: string
  liquidity_regime: string
  news_impact: int8
Starting script execution

Starting main function

Processing ticker: AAPL

Starting process_ticker for AAPL
Preprocessed /content/backtesting_final/ticker=AAPL/part.7.parquet to /content/backtesting_final/ticker=AAPL_preprocessed/part.7.parquet
Preprocessed /content/backtesting_final/ticker=AAPL/part.8.parquet to /content/backtesting_final/ticker=AAPL_preprocessed/part.8.parquet
Preprocessed /content/backtesting_final/ticker=AAPL/part.1.parquet to /content/backtesting_final/ticker=AAPL_prepr




Processing ticker AAPL for date 2020-06-03 00:00:00
Predicting regime for 2020-06-03 using lookback from 2020-06-02 to earlier
Quantile window for thresholds: 2019-09-25 to 2020-06-02
Total trading days available up to 2020-06-02: 21
Insufficient trading days: 21

Processing ticker AAPL for date 2020-06-04 00:00:00
Predicting regime for 2020-06-04 using lookback from 2020-06-03 to earlier
Quantile window for thresholds: 2019-09-26 to 2020-06-03
Total trading days available up to 2020-06-03: 22
Insufficient trading days: 22

Processing ticker AAPL for date 2020-06-05 00:00:00
Predicting regime for 2020-06-05 using lookback from 2020-06-04 to earlier
Quantile window for thresholds: 2019-09-27 to 2020-06-04




[1;30;43mVýstupní stream byl oříznut na posledních 5000 řádků.[0m
Parameters: timeperiod=14
ADX output shape: (30,)
ADX last value: 42.49271869842114
ADX sample (last 5): [        nan         nan 46.53162337 43.70051347 42.4927187 ]
Number of NaNs in ADX: 27

Computing PLUS_DI (Daily)
Daily input shapes: high=(30,), low=(30,), close=(30,)
Parameters: timeperiod=14
PLUS_DI output shape: (30,)
PLUS_DI last value: 17.793323351548963
PLUS_DI sample (last 5): [21.3543372  24.22628609 22.58548136 21.23493432 17.79332335]
Number of NaNs in PLUS_DI: 14

Computing MINUS_DI (Daily)
Daily input shapes: high=(30,), low=(30,), close=(30,)
Parameters: timeperiod=14
MINUS_DI output shape: (30,)
MINUS_DI last value: 30.816594357588222
MINUS_DI sample (last 5): [23.77029778 22.19348882 23.67361892 24.38062173 30.81659436]
Number of NaNs in MINUS_DI: 14

Computing Bollinger Bands (Daily)
Daily input shape: close=(30,)
Parameters: timeperiod=14
Bollinger Bands output shapes: upperband=(30,), middleband




Processing ticker ABNB for date 2021-01-11 00:00:00
Predicting regime for 2021-01-11 using lookback from 2021-01-10 to earlier
Quantile window for thresholds: 2020-05-04 to 2021-01-10
Total trading days available up to 2021-01-10: 20
Insufficient trading days: 20

Processing ticker ABNB for date 2021-01-12 00:00:00
Predicting regime for 2021-01-12 using lookback from 2021-01-11 to earlier
Quantile window for thresholds: 2020-05-05 to 2021-01-11
Total trading days available up to 2021-01-11: 21
Insufficient trading days: 21

Processing ticker ABNB for date 2021-01-13 00:00:00
Predicting regime for 2021-01-13 using lookback from 2021-01-12 to earlier
Quantile window for thresholds: 2020-05-06 to 2021-01-12
Total trading days available up to 2021-01-12: 22
Insufficient trading days: 22

Processing ticker ABNB for date 2021-01-14 00:00:00
Predicting regime for 2021-01-14 using lookback from 2021-01-13 to earlier
Quantile window for thresholds: 2020-05-07 to 2021-01-13




[1;30;43mVýstupní stream byl oříznut na posledních 5000 řádků.[0m
      volume  estimated_bid_ask_spread  estimated_obd  news_impact  
0  3319342.0                  0.059789      9162709.0            0  
1  1786079.5                  0.057453    1937625.375            1  
2  4527821.5                  0.078931     54077208.0            0  
3  3672754.5                  0.074668     13919754.0            0  
4  3758935.0                  0.083575     11890780.0            0  

Validating and cleaning daily input data
Daily high dtype: double[pyarrow], NaNs: 0
Daily low dtype: double[pyarrow], NaNs: 0
Daily close dtype: double[pyarrow], NaNs: 0

Daily data for indicators:
                   date        high         low       close     volume  \
25  2025-01-03 00:00:00  136.360001  131.759995  135.699997  2982679.5   
26  2025-01-06 00:00:00  138.100006  134.580002  135.220001  3365463.5   
27  2025-01-07 00:00:00  136.764999  130.809601  131.339996  3555301.0   
28  2025-01-08 00:00:00



Preprocessed /content/backtesting_final/ticker=ADBE/part.3.parquet to /content/backtesting_final/ticker=ADBE_preprocessed/part.3.parquet
Preprocessed /content/backtesting_final/ticker=ADBE/part.5.parquet to /content/backtesting_final/ticker=ADBE_preprocessed/part.5.parquet

Processing ticker ADBE for date 2020-06-03 00:00:00
Predicting regime for 2020-06-03 using lookback from 2020-06-02 to earlier
Quantile window for thresholds: 2019-09-25 to 2020-06-02
Total trading days available up to 2020-06-02: 21
Insufficient trading days: 21

Processing ticker ADBE for date 2020-06-04 00:00:00
Predicting regime for 2020-06-04 using lookback from 2020-06-03 to earlier
Quantile window for thresholds: 2019-09-26 to 2020-06-03




Total trading days available up to 2020-06-03: 22
Insufficient trading days: 22

Processing ticker ADBE for date 2020-06-05 00:00:00
Predicting regime for 2020-06-05 using lookback from 2020-06-04 to earlier
Quantile window for thresholds: 2019-09-27 to 2020-06-04
Total trading days available up to 2020-06-04: 23
Insufficient trading days: 23

Processing ticker ADBE for date 2020-06-08 00:00:00
Predicting regime for 2020-06-08 using lookback from 2020-06-07 to earlier
Quantile window for thresholds: 2019-09-30 to 2020-06-07
Total trading days available up to 2020-06-07: 24
Insufficient trading days: 24

Processing ticker ADBE for date 2020-06-09 00:00:00
Predicting regime for 2020-06-09 using lookback from 2020-06-08 to earlier
Quantile window for thresholds: 2019-10-01 to 2020-06-08
Total trading days available up to 2020-06-08: 25
Insufficient trading days: 25

Processing ticker ADBE for date 2020-06-10 00:00:00
Predicting regime for 2020-06-10 using lookback from 2020-06-09 to earli



[1;30;43mVýstupní stream byl oříznut na posledních 5000 řádků.[0m
dtype: object
Regime DataFrame ticker type: <class 'str'>

Processing ticker ADBE for date 2025-01-13 00:00:00
Predicting regime for 2025-01-13 using lookback from 2025-01-12 to earlier
Quantile window for thresholds: 2024-05-06 to 2025-01-12
Total trading days available up to 2025-01-12: 1180
Lookback window: 2024-11-26 to 2025-01-12, effective lookback: 30 days
Window shape before compute: (<dask_expr.expr.Scalar: expr=(Filter(frame=Assign(frame=Assign(frame=ReadParquetFSSpec(42e43d5))), predicate=PropertyMap(frame=Assign(frame=Assign(frame=ReadParquetFSSpec(42e43d5)))['timestamp'], accessor='dt', attr='date') >= 2024-11-26 & PropertyMap(frame=Assign(frame=Assign(frame=ReadParquetFSSpec(42e43d5)))['timestamp'], accessor='dt', attr='date') <= 2025-01-12)).size() // 10, dtype=int64>, 10)
Quantile window shape before compute: (<dask_expr.expr.Scalar: expr=(Filter(frame=Assign(frame=Assign(frame=ReadParquetFSSpec(42e43d5



Total trading days available up to 2020-06-02: 21
Insufficient trading days: 21

Processing ticker ADI for date 2020-06-04 00:00:00
Predicting regime for 2020-06-04 using lookback from 2020-06-03 to earlier
Quantile window for thresholds: 2019-09-26 to 2020-06-03
Total trading days available up to 2020-06-03: 22
Insufficient trading days: 22

Processing ticker ADI for date 2020-06-05 00:00:00
Predicting regime for 2020-06-05 using lookback from 2020-06-04 to earlier
Quantile window for thresholds: 2019-09-27 to 2020-06-04
Total trading days available up to 2020-06-04: 23
Insufficient trading days: 23

Processing ticker ADI for date 2020-06-08 00:00:00
Predicting regime for 2020-06-08 using lookback from 2020-06-07 to earlier
Quantile window for thresholds: 2019-09-30 to 2020-06-07
Total trading days available up to 2020-06-07: 24
Insufficient trading days: 24

Processing ticker ADI for date 2020-06-09 00:00:00
Predicting regime for 2020-06-09 using lookback from 2020-06-08 to earlier
Q



[1;30;43mVýstupní stream byl oříznut na posledních 5000 řádků.[0m
            close      volume  estimated_bid_ask_spread  estimated_obd  \
54892  142.740005       458.0                    0.0001      4580000.0   
54893  142.740005  457.263885                    0.0001      4572639.0   
54894  142.740005  456.527771                    0.0001      4565277.5   
54895  142.740005  455.791656                    0.0001      4557916.5   
54896  142.740005  455.055542                    0.0001      4550555.5   

       news_impact  
54892            0  
54893            0  
54894            0  
54895            0  
54896            0  
Quantile window ticker type: <class 'str'>
Daily aggregated quantile window shape: (173, 10)
Daily aggregated quantile window sample:
  ticker                 date        open        high         low       close  \
0    ADI  2022-09-27 00:00:00  142.740005  143.929993  139.869995  141.809998   
1    ADI  2022-09-28 00:00:00  139.479996  145.649994  139.460007

### Regime distributions and summaries

In [None]:
def validate_regime_distribution(regimes_df, min_pct=0.05):
    if regimes_df.empty:
        logging.warning("Regime DataFrame is empty")
        return pd.DataFrame()

    regime_counts = regimes_df.groupby('date').agg({
        'volatility_regime': lambda x: x.value_counts().to_dict(),
        'trend_regime': lambda x: x.value_counts().to_dict(),
        'liquidity_regime': lambda x: x.value_counts().to_dict(),
        'news_impact': lambda x: x.value_counts().to_dict()
    }).reset_index()

    for regime_type in ['volatility_regime', 'trend_regime', 'liquidity_regime', 'news_impact']:
        for _, row in regime_counts.iterrows():
            date = row['date']
            counts = row[regime_type]
            total = sum(counts.values())
            if total == 0:
                logging.warning(f"No {regime_type} assignments for {date}")
                continue
            for regime, count in counts.items():
                pct = count / total
                if regime == 'undefined' and regime_type != 'news_impact':
                    logging.info(f"{regime_type} has {count} undefined assignments on {date} ({pct:.2%})")
                elif pd.isna(regime):
                    logging.warning(f"{regime_type} has {count} NaN assignments on {date} ({pct:.2%})")
                elif pct < min_pct and regime_type != 'news_impact':
                    logging.warning(f"Low representation of {regime_type}={regime} on {date}: {pct:.2%} (<{min_pct:.2%})")

    return regime_counts

def plot_regime_distribution(regime_counts):
    if regime_counts.empty:
        logging.warning("No regime counts to plot")
        return

    os.makedirs(PLOT_DIR, exist_ok=True)
    for regime_type in ['volatility_regime', 'trend_regime', 'liquidity_regime', 'news_impact']:
        counts = regime_counts[regime_type].apply(
            lambda x: pd.Series(x) / pd.Series(x).sum()
        ).fillna(0)
        plt.figure(figsize=(12, 8))
        counts.plot(kind='area', stacked=True, title=f'{regime_type} Distribution')
        plt.xlabel('Date')
        plt.ylabel('Proportion')
        plt.legend(title=regime_type)
        plt.savefig(os.path.join(PLOT_DIR, f'{regime_type}_distribution.png'))
        plt.close()

def print_regime_distribution_summary(regimes_df):
    regimes_df = regimes_df.copy()
    for col in ['volatility_regime', 'trend_regime', 'liquidity_regime']:
        regimes_df[col] = regimes_df[col].fillna('NaN')

    for regime_type in ['volatility_regime', 'trend_regime', 'liquidity_regime', 'news_impact']:
        print(f"\n{regime_type.replace('_regime', '').capitalize()}:")
        counts = regimes_df[regime_type].value_counts(normalize=True) * 100
        for regime, pct in counts.items():
            print(f"  {regime}: {pct:.2f}%")

    regimes_df['year'] = regimes_df['date'].dt.year
    for regime_type in ['volatility_regime', 'trend_regime', 'liquidity_regime', 'news_impact']:
        print(f"\n{regime_type.replace('_regime', '').capitalize()}:")
        yearly_counts = regimes_df.groupby('year')[regime_type].value_counts(normalize=True).unstack().fillna(0) * 100
        print(yearly_counts.round(2))

    for regime_type in ['volatility_regime', 'trend_regime', 'liquidity_regime', 'news_impact']:
        print(f"\n{regime_type.replace('_regime', '').capitalize()}:")
        ticker_counts = regimes_df.groupby('ticker')[regime_type].value_counts(normalize=True).unstack().fillna(0) * 100
        for regime in ticker_counts.columns:
            max_ticker = ticker_counts[regime].idxmax()
            min_ticker = ticker_counts[regime].idxmin()
            print(f"  {regime}:")
            print(f"    Highest: {max_ticker} ({ticker_counts.loc[max_ticker, regime]:.2f}%)")
            print(f"    Lowest: {min_ticker} ({ticker_counts.loc[min_ticker, regime]:.2f}%)")

    total_days = len(regimes_df)
    undefined_counts = regimes_df[regimes_df[['volatility_regime', 'trend_regime', 'liquidity_regime']].eq('undefined').any(axis=1)]
    nan_counts = regimes_df[regimes_df[['volatility_regime', 'trend_regime', 'liquidity_regime']].eq('NaN').any(axis=1)]
    undefined_days = len(undefined_counts)
    nan_days = len(nan_counts)
    print(f"Total ticker-days with at least one undefined regime: {undefined_days} ({undefined_days/total_days*100:.2f}%)")
    print(f"Total ticker-days with at least one NaN regime: {nan_days} ({nan_days/total_days*100:.2f}%)")
    for regime_type in ['volatility_regime', 'trend_regime', 'liquidity_regime', 'news_impact']:
        undef_pct = (regimes_df[regime_type] == 'undefined').mean() * 100 if regime_type != 'news_impact' else 0
        nan_pct = (regimes_df[regime_type].isna()).mean() * 100
        print(f"  {regime_type.replace('_regime', '').capitalize()}:")
        print(f"    Undefined: {undef_pct:.2f}%")
        print(f"    NaN: {nan_pct:.2f}%")

    for regime_type in ['volatility_regime', 'trend_regime', 'liquidity_regime', 'news_impact']:
        print(f"\n{regime_type.replace('_regime', '').capitalize()}:")
        undef_ticker_counts = regimes_df[regimes_df[regime_type] == 'undefined'].groupby('ticker').size() if regime_type != 'news_impact' else pd.Series()
        nan_ticker_counts = regimes_df[regimes_df[regime_type].isna()].groupby('ticker').size()
        if not undef_ticker_counts.empty:
            top_undef_ticker = undef_ticker_counts.idxmax()
            print(f"  Undefined - Highest: {top_undef_ticker} ({undef_ticker_counts.max()} days)")
        else:
            print("  Undefined - None")
        if not nan_ticker_counts.empty:
            top_nan_ticker = nan_ticker_counts.idxmax()
            print(f"  NaN - Highest: {top_nan_ticker} ({nan_ticker_counts.max()} days)")
        else:
            print("  NaN - None")

### Main

In [None]:
def main():
    all_regimes = []

    for ticker in tickers:  
        logging.info(f"Processing ticker: {ticker}")
        regimes = process_ticker(ticker)
        if regimes is not None:
            all_regimes.append(regimes)
            logging.info(f"Successfully processed {ticker}")
        else:
            logging.warning(f"Failed to process {ticker}")

    if not all_regimes:
        logging.error("No regimes computed for any ticker")
        return

    regimes_df = pd.concat(all_regimes, ignore_index=True)

    assigned_tickers = set(regimes_df['ticker'].unique())
    missing_tickers = set(tickers) - assigned_tickers
    if missing_tickers:
        logging.warning(f"Missing regime assignments for tickers: {missing_tickers}")

    regime_counts = validate_regime_distribution(regimes_df)
    if not regime_counts.empty:
        regime_counts.to_csv(LOG_CSV, index=False)
        logging.info(f"Saved regime counts to {LOG_CSV}")
    else:
        logging.warning("No regime counts to save")

    plot_regime_distribution(regime_counts)
    print_regime_distribution_summary(regimes_df)

    # Saving aggregated regimes
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    regimes_df.to_csv(os.path.join(OUTPUT_DIR, 'all_regimes.csv'), index=False)
    logging.info(f"Saving aggregated regimes to {OUTPUT_DIR}/all_regimes.csv")
    logging.info("Main executed successfully")

if __name__ == "__main__":
    print("Starting script execution")
    main()
    print("Script execution completed")

In [None]:
# Closing the client and cluster
client.close()
cluster.close()

INFO:distributed.scheduler:Remove client Client-c2ef4c20-2860-11f0-bd9c-0242c0a80a0a
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:36560; closing.
INFO:distributed.scheduler:Remove client Client-c2ef4c20-2860-11f0-bd9c-0242c0a80a0a
INFO:distributed.scheduler:Close client connection: Client-c2ef4c20-2860-11f0-bd9c-0242c0a80a0a
INFO:distributed.scheduler:Retire worker addresses (stimulus_id='retire-workers-1746306469.1812365') (0, 1, 2, 3, 4, 5, 6, 7)
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:42337'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:45425'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:33207'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.