<a href="https://colab.research.google.com/github/tluxxx/CandleStickStudies/blob/main/01_candlestick_studies_predictive_power_updated_02_fixed.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Candlestick Studies

## Topics
* Analyzing S&P 500 data from 2000 to 2025 for candlestick patterns using TA-Lib, generating bullish and bearish pattern signals
* Introducing a simple EMA-based trend filter (and studying the impact)
* Counting bullish/bearish signals and identifying the most active patterns
* For each detected pattern (event/signal), compiling key information:
    * date of occurrence,
    * mode (bullish, bearish),
    * market trend (no filter, up-trend, down-trend),
    * and cumulative returns for the 20 days following the signal
* Analyzing the distribution of cumulative returns
* Rigorous statistical testing for:
    * Mean cumulative returns after a pattern signal vs. mean cumulative returns after random entries
    * Win rate after a pattern signal vs. win rate after random entries

## Release Data
* Rev: 1.0
* Author: Tobi Lux
* Updated: 2026-02-27



## 0. Preparation

In [None]:
print("⏳ Downloading TA-Lib 0.6.4 from GitHub...")
!wget -q https://github.com/TA-Lib/ta-lib/releases/download/v0.6.4/ta-lib-0.6.4-src.tar.gz
print("📦 Extracting...")
!tar -xzf ta-lib-0.6.4-src.tar.gz > /dev/null 2>&1
print("🔧 Building TA-Lib C library (this takes 2-3 minutes)...")
!cd ta-lib-0.6.4 && ./configure --prefix=/usr > /dev/null 2>&1 && make > /dev/null 2>&1 && make install > /dev/null 2>&1
print("🐍 Installing Python wrapper...")
!pip install -q TA-Lib
print("✅ Done! TA-Lib 0.6.4 is ready to use!")

⏳ Downloading TA-Lib 0.6.4 from GitHub...
📦 Extracting...
🔧 Building TA-Lib C library (this takes 2-3 minutes)...
🐍 Installing Python wrapper...
✅ Done! TA-Lib 0.6.4 is ready to use!


In [None]:
import numpy as np
import pandas as pd
from typing import List, Any
from tqdm.notebook import tqdm
from numba import njit

import yfinance as yf
import talib
from scipy import stats

import plotly.graph_objects as go
from plotly.subplots import make_subplots

## 1. Price Data Download and Trend Filter Calculation
S&P 500 data are downloaded for the period from 2000 to 2025.

As a trend filter we use a simple EMA(M). The filter is defined as:
* +1 if Close > EMA(M)  → uptrend
* -1 if Close ≤ EMA(M)  → downtrend

Finally we calculate:
* a series of trading days (all days in prices, no trend-filter applied)
* a series of trading days in an up-trend (trend-filter == 1)
* a series of trading days in a downtrend (trend-filter == -1)

We will use these series of dates later for filtering.

In [None]:
# settings
ticker = '^GSPC'
start_date, end_date = '2000-01-01', '2025-12-31'
N_FORWARD = 20

# Download S&P 500 daily OHLC, caclulating daily returns
prices = (yf.download(ticker, start=start_date, end=end_date, progress=False, auto_adjust=True)
            [['Open','High','Low','Close']]
            .droplevel(1, axis=1))
rets = prices['Close'].pct_change().dropna()

# EMA trend filter (+1 / -1)
ema   = prices['Close'].ewm(span=50, adjust=False).mean()
trend = (prices['Close'] > ema).map({True: 1, False: -1}).reindex(rets.index).rename('trend')

# Date pools (exclude last N_FORWARD rows)
valid = rets.index[:-N_FORWARD]
dates_no_trend_filter   = valid                         # all datas
dates_up_trend_filter   = valid[trend[valid] ==  1]     # dates in up-trend
dates_down_trend_filter = valid[trend[valid] == -1]     # dates in down-trend

## 2. Detection of Candlestick Patterns
* Identifying all candlestick patterns available in TA-Lib
* Applying all available TA-Lib candlestick patterns to prices, generating a DataFrame `pattern_df` with all signals (signal = event = detection of a pattern)
* Counting bearish and bullish signals (as per TA-Lib convention)
* Counting all signals


In [None]:
# extracting Candlestick pattern signals (CDL* in TA-Lib)
cdl_patterns = talib.get_function_groups()['Pattern Recognition']

pattern_df = pd.DataFrame(
    {pat: getattr(talib, pat)(prices.Open, prices.High, prices.Low, prices.Close)
     for pat in cdl_patterns},
    index=prices.index
)

# Signal counts per pattern
summary = pd.DataFrame({
    'bull_sigs_talib': (pattern_df > 0).sum(),
    'bear_sigs_talib': (pattern_df < 0).sum(),
    'all_sigs_talib' : (pattern_df != 0).sum()
}).sort_values('all_sigs_talib', ascending=False)

* Counting the number of detected signals per pattern
* Plotting bar charts showing signal occurrences

In [None]:
# Prepare data for plotting from summary
plot_df = summary.reset_index()
plot_df = plot_df.rename(columns={'index': 'Pattern'})

# Create staggered bar chart
fig = go.Figure(
    data=[
        go.Bar(x=plot_df['Pattern'], y=plot_df['bull_sigs_talib'],
            name='Bullish Signals', marker_color='green'),
        go.Bar(x=plot_df['Pattern'], y=plot_df['bear_sigs_talib'],
            name='Bearish Signals', marker_color='red')
        ]
)
# finetune layout
fig.update_layout(
    barmode='relative',
    title_text= f'Nb of Bullish/Bearish Candlestick Signals (ticker {ticker}  from {start_date} to {end_date})',
    xaxis_title='Candlestick Pattern', yaxis_title='Number of Signals',
    title_font_size=16, # this line and below makes sur that all CSP are displayed
    xaxis=dict(tickfont=dict(size=8),tickangle=-45, automargin=True),
    yaxis=dict(tickfont=dict(size=10)),
    legend=dict(font=dict(size=12)),
    width=1000, height=400,
)

fig.show()

Now we identify the most active patterns (measured by number of occurrences).

In [None]:
# Filter patterns with >= N number of total signals
N = 50

top_patterns_talib = summary[summary['all_sigs_talib'] >= N].sort_values('all_sigs_talib', ascending=False)
top_pattern_list   = top_patterns_talib.index.tolist()

print(f'Frequent patterns (>= {N} signals): {len(top_patterns_talib)}')
display(top_patterns_talib)

Frequent patterns (>= 50 signals): 27


Unnamed: 0,bull_sigs_talib,bear_sigs_talib,all_sigs_talib
CDLBELTHOLD,859,721,1580
CDLLONGLINE,907,608,1515
CDLCLOSINGMARUBOZU,811,526,1337
CDLSPINNINGTOP,598,625,1223
CDLDOJI,819,0,819
CDLLONGLEGGEDDOJI,819,0,819
CDLENGULFING,402,408,810
CDLHIKKAKE,354,385,739
CDLMARUBOZU,471,266,737
CDLHIGHWAVE,352,384,736


In [None]:
# list of top-active-patterns
top_patterns_talib.index

Index(['CDLBELTHOLD', 'CDLLONGLINE', 'CDLCLOSINGMARUBOZU', 'CDLSPINNINGTOP',
       'CDLDOJI', 'CDLLONGLEGGEDDOJI', 'CDLENGULFING', 'CDLHIKKAKE',
       'CDLMARUBOZU', 'CDLHIGHWAVE', 'CDLSHORTLINE', 'CDLHARAMI',
       'CDLRICKSHAWMAN', 'CDLHAMMER', 'CDL3OUTSIDE', 'CDLHARAMICROSS',
       'CDLHANGINGMAN', 'CDLDRAGONFLYDOJI', 'CDLTAKURI', 'CDLDOJISTAR',
       'CDLSHOOTINGSTAR', 'CDLADVANCEBLOCK', 'CDL3INSIDE', 'CDLGRAVESTONEDOJI',
       'CDLSEPARATINGLINES', 'CDLMATCHINGLOW', 'CDLXSIDEGAP3METHODS'],
      dtype='object')

## 3. Preparing Cumulative Return Data for Analysis and Random Entry Benchmark

We compile now a dataframe `analytics` that stores for each detected pattern (=event or signal) the following information:
* Date of occurrence
* Pattern name
* Mode (bullish, bearish)
* Market trend (uptrend, downtrend), defined as Close > EMA(N) or Close ≤ EMA(N)
* Cumulative returns for the 20 days following the signal





In [None]:
# Event (= occurence of a pattern = signal) identification
events = (
    pattern_df[top_pattern_list]                        # top pattern only
    .stack()                                            # rearranging to stack (row, column, value)
    .reset_index()                                      # resetting index
    .set_axis(['date', 'pattern', 'signal'], axis=1)    # renaming columns (unless df.columns=.. it is chainable)
)

# Keep signals only, convert to 1 (bullish) or -1 (bearish)
events = events[events['signal'] != 0].copy()
events['mode']   = np.sign(events['signal']).astype(int)

# Add trend filter (bullish/bearish trend)
events['trend'] = trend.loc[events['date']].to_numpy()

# Map each date to its row index in rets, drop events too close to end of data
row_pos           = pd.Series(np.arange(len(rets)), index=rets.index)
events['row_pos'] = row_pos.loc[events['date']].to_numpy()
events            = events[events['row_pos'] <= len(rets) - N_FORWARD - 1]

# Collect forward returns for N_FORWARD days after each event
offsets         = np.arange(1, N_FORWARD + 1)
forward_returns = rets.values[events['row_pos'].to_numpy()[:, None] + offsets]

# Build analytics dataframe combining event info and forward returns
day_cols  = [f'Day_{i:02d}' for i in range(1, N_FORWARD + 1)]
analytics = pd.concat([
    events[['date', 'pattern', 'mode', 'trend']].reset_index(drop=True),
    pd.DataFrame(forward_returns, columns=day_cols)
], axis=1)

# Prepend Day_00 = 0.0 as anchor (cumret starts at 1), then compute cumulative returns
analytics.insert(analytics.columns.get_loc('trend') + 1, 'Day_00', 0.0)
day_cols            = [f'Day_{i:02d}' for i in range(N_FORWARD + 1)]
analytics[day_cols] = (1 + analytics[day_cols]).cumprod(axis=1)

Finally, we generate a sequence of 500 random entries and calculate the cumulative returns
for the 20 days following each entry. Random entries are drawn under three conditions:
* No trend filter — entries sampled freely across the full period
* Uptrend filter — entries sampled only when Close > EMA(M)
* Downtrend filter — entries sampled only when Close ≤ EMA(M)

In [None]:
def compute_random_series(
    valid_dates  : np.ndarray,
    pattern_name : str,
    rng          : np.random.Generator,
    ) -> tuple[pd.DataFrame, pd.DataFrame]:

    '''Randomly sample N entry dates and compute cumulative forward returns.
    Args:
        valid_dates:  Pool of eligible entry dates (pre-filtered for trend/lookahead).
        pattern_name: Label assigned to this random benchmark series.
        rng:          Random number generator (shared across calls to preserve state).
    Returns:
        analytics:   Per-event DataFrame with cumulative returns (Day_00 … Day_N).
        summary_row: Single-row DataFrame with mean cumulative return per day.
    '''
    # Random selection of N entry dates from a pre-filtered set of dates
    random_dates = rng.choice(valid_dates, size=min(N, len(valid_dates)), replace=False)

    # Forward returns matrix (events x days)
    rows = row_pos.loc[random_dates].to_numpy()
    forward_returns = rets.values[rows[:, None] + offsets]

    # Build per-event analytics with cumulative returns
    cum_cols  = ['Day_00'] + day_cols
    analytics = pd.concat([
        pd.DataFrame({'date': random_dates, 'pattern': pattern_name}),
        pd.DataFrame(forward_returns, columns=day_cols)
    ], axis=1)
    analytics.insert(analytics.columns.get_loc('pattern') + 1, 'Day_00', 0.0)
    analytics[cum_cols] = (1 + analytics[cum_cols]).cumprod(axis=1)

    # Summary: mean cumulative return per day
    summary_row = pd.DataFrame({
        'pattern'    : [pattern_name],
        'occurrences': [len(analytics)],
        'Day_00'     : [1.0],
        **analytics[day_cols].mean().to_dict()
    })

    return analytics, summary_row


In [None]:
# Config
SEED     = 42
N        = 500
rng      = np.random.default_rng(SEED)
row_pos  = pd.Series(np.arange(len(rets)), index=rets.index)
offsets  = np.arange(1, N_FORWARD + 1)
day_cols = [f'Day_{i:02d}' for i in range(1, N_FORWARD + 1)]

# Generate 3 random benchmarks (no trend filter / up-trend-filter / down trend-filter)
analytics_rnd_no_trend, summary_no_trend   = compute_random_series(dates_no_trend_filter,   'RANDOM_no_trend_filter',   rng)
analytics_rnd_up, summary_up   = compute_random_series(dates_up_trend_filter,   'RANDOM_up_trend_filter',   rng)
analytics_rnd_down, summary_down = compute_random_series(dates_down_trend_filter, 'RANDOM_down_trend_filter', rng)

# --- Combine details per-event in analytics df ---
analytics_rnd_all = (
    pd.concat([analytics_rnd_no_trend, analytics_rnd_up, analytics_rnd_down], ignore_index=True)
    .sort_values(['pattern', 'date'])
    .reset_index(drop=True)
)

# --- Combine summaries in summary df---
random_summary = (
    pd.concat([summary_no_trend, summary_up, summary_down], ignore_index=True)
    [['pattern', 'occurrences', 'Day_00'] + day_cols]
)

In [None]:
analytics_rnd_down.head(3)

Unnamed: 0,date,pattern,Day_00,Day_01,Day_02,Day_03,Day_04,Day_05,Day_06,Day_07,...,Day_11,Day_12,Day_13,Day_14,Day_15,Day_16,Day_17,Day_18,Day_19,Day_20
0,2015-07-10,RANDOM_down_trend_filter,1.0,1.011066,1.015568,1.014822,1.022956,1.024087,1.024877,1.020509,...,0.995676,1.008008,1.015386,1.015414,1.013108,1.010315,1.008042,1.011182,1.003342,1.000457
1,2002-12-20,RANDOM_down_trend_filter,1.0,1.001809,0.996327,0.99319,0.977271,0.981725,0.982205,1.014814,...,1.015819,1.035512,1.035512,1.034049,1.040078,1.025074,1.021032,1.006721,0.990913,0.980575
2,2007-11-08,RANDOM_down_trend_filter,1.0,0.985713,0.975867,1.004258,0.997159,0.983984,0.98913,0.97186,...,0.954196,0.968443,0.996101,0.996576,1.004319,0.998407,0.991877,1.006943,1.022085,1.020268


In [None]:
random_summary

Unnamed: 0,pattern,occurrences,Day_00,Day_01,Day_02,Day_03,Day_04,Day_05,Day_06,Day_07,...,Day_11,Day_12,Day_13,Day_14,Day_15,Day_16,Day_17,Day_18,Day_19,Day_20
0,RANDOM_no_trend_filter,500,1.0,1.000286,1.000767,1.000838,1.001016,1.001315,1.002689,1.002699,...,1.004332,1.004397,1.005393,1.005218,1.005938,1.005142,1.005497,1.005866,1.005565,1.006003
1,RANDOM_up_trend_filter,500,1.0,1.000794,1.000963,1.001091,1.001214,1.001663,1.00198,1.0024,...,1.003799,1.003786,1.003874,1.004436,1.004807,1.005356,1.005513,1.004967,1.004938,1.005246
2,RANDOM_down_trend_filter,500,1.0,1.00023,1.002261,1.001789,1.00309,1.003022,1.002789,1.003435,...,1.004588,1.005329,1.005456,1.005857,1.005502,1.005441,1.004851,1.005987,1.006328,1.00692


## 4. cumulated returns over time, after detection of patterns

### 4.1 Helper functions

In [None]:
def summary_cr(
        analytics   : pd.DataFrame,
        mode_value  : int,
        trend_value : int,
        day_cols    : List[str],
) -> pd.DataFrame:

    '''Compute mean cumulative returns and occurrence counts per pattern.
    Args:
        analytics:   Per-event DataFrame (output of main analytics cell).
        mode_value:  Signal direction: 1 (bullish) or -1 (bearish).
        trend_value: Trend filter: 1 (up-trend), -1 (down-trend), 0 (no filter).
        day_cols:    Cumulative return columns to aggregate (Day_00 … Day_N).
    Returns:
        DataFrame with [pattern, mode, trend, occurrences] + day_cols,
        sorted descending by the last day column.
    '''
    # Filter by mode, apply trend filter only if trend_value != 0
    df = analytics[analytics['mode'] == mode_value]
    if trend_value != 0:
        df = df[df['trend'] == trend_value]

    # Aggregate mean cumrets and occurrence counts per pattern
    summary = (
        df.groupby('pattern')[day_cols].mean()
        .merge(df.groupby('pattern').size().rename('occurrences'), left_index=True, right_index=True)
        .reset_index()
    )
    summary['mode']  = mode_value
    summary['trend'] = trend_value

    return (summary[['pattern', 'mode', 'trend', 'occurrences'] + day_cols]
            .sort_values(day_cols[-1], ascending=False)
            .reset_index(drop=True))


In [None]:
def plot_trajectories(
        traject          : pd.DataFrame,
        random_benchmark : pd.DataFrame,
        cum_cols         : List[str],
        title_suffix     : str  = "",
        style_map        : dict = None,
) -> go.Figure:

    ''' Plot averaged cumulative return trajectories with random benchmarks.
    Args:
        traject:          Output of summary_cr(), mean cumrets per pattern/day.
        random_benchmark: random_summary DataFrame (3 benchmark rows).
        cum_cols:         Day columns to plot (Day_00 … Day_20).
        title_suffix:     Appended to the plot title.
        style_map:        Maps raw benchmark pattern names to display label and line color.
                          Format: {"RANDOM_key": {"label": "...", "color": "..."}}.
    Returns:
        plotly go.Figure
    '''
    fig = go.Figure()

    # Signal trajectories
    for _, row in traject.iterrows():
        fig.add_trace(go.Scatter(
            x=cum_cols, y=row[cum_cols],
            mode="lines", name=row["pattern"], line=dict(width=1)
        ))

    # Random benchmark trajectories
    for _, row in random_benchmark.iterrows():
        raw_name = row["pattern"]
        style    = style_map.get(raw_name, {}) if style_map else {}
        fig.add_trace(go.Scatter(
            x=cum_cols, y=row[cum_cols],
            mode="lines",
            name=style.get("label", raw_name),
            line=dict(width=3, color=style.get("color", "black"), dash="dashdot")
        ))

    fig.update_layout(
        title=f"Cumulative Return Trajectories {title_suffix}",
        template=PLOT_TEMPLATE, height=PLOT_HEIGHT, width=PLOT_WIDTH,
    )
    fig.update_xaxes(title="Holding Period", tickangle=-90, showgrid=True)
    fig.update_yaxes(title="Cumulative Return", showgrid=True)

    return fig



### 4.2. Trajectories of Cumulative Returns
We plot the evolution of the average cumulative return over the 20 days following a pattern
signal, broken down by pattern, mode (bullish/bearish), and trend filter
(no filter, uptrend, downtrend).

In [None]:
# principal settings

RANDOM_BENCHMARK_STYLE = {
    "RANDOM_no_trend_filter"  : {"regime":  0, "label": "Random entries (no trend filter)", "color": "yellow"},
    "RANDOM_up_trend_filter"  : {"regime":  1, "label": "Random entries (in up-trends)",    "color": "red"   },
    "RANDOM_down_trend_filter": {"regime": -1, "label": "Random entries (in down-trends)",  "color": "green" },
}

# Derived lookup: regime integer -> pattern key
REGIME_TO_RANDOM = {v["regime"]: k for k, v in RANDOM_BENCHMARK_STYLE.items()}

combinations = [
    {'mode': 1, 'trend': 0, 'description': 'pure bullish signals, no filter'},
    {'mode':-1, 'trend': 0, 'description': 'pure bearish signals, no filter'},
    {'mode': 1, 'trend': 1, 'description': 'bullish signals in up-trends'},
    {'mode': 1, 'trend':-1, 'description': 'bullish signals in down-trends'},
    {'mode':-1, 'trend':-1, 'description': 'bearish signals in down-trends'},
    {'mode':-1, 'trend': 1, 'description': 'bearish signals in up-trends'},
]

PLOT_TEMPLATE = "plotly_dark"
PLOT_HEIGHT = 450
PLOT_WIDTH = 800

TOP_N    = 10
all_days = ['Day_00'] + [f'Day_{i:02d}' for i in range(1, N_FORWARD + 1)]


In [None]:
# plotting

for comb in combinations:
    mode, trend, desc = comb["mode"], comb["trend"], comb["description"]

    traject = summary_cr(analytics, mode_value=mode, trend_value=trend, day_cols=all_days)
    traject = traject.tail(TOP_N) if "bearish" in desc else traject.head(TOP_N)

    plot_trajectories(
        traject          = traject,
        random_benchmark = random_summary,
        cum_cols         = all_days,
        title_suffix     = f"— {desc}",
        style_map        = RANDOM_BENCHMARK_STYLE,
    ).show()


In [None]:
# selection of days for further analytics
selected_days = [1, 5, 10, 15, 20]
selected_day_cols = [f"Day_{i:02d}" for i in selected_days]

In [None]:
# just for demonstration: option to print results for selected days
N = 10          # Top N
mode = 1        # bullish
trend = 1       # up-trend filter
summary = summary_cr(analytics, mode_value=1, trend_value=0, day_cols=selected_day_cols).head(N) # use tail for bearish
summary

Unnamed: 0,pattern,mode,trend,occurrences,Day_01,Day_05,Day_10,Day_15,Day_20
0,CDLGRAVESTONEDOJI,1,0,76,1.000131,1.00439,1.00247,1.003492,1.008929
1,CDLMATCHINGLOW,1,0,77,0.998359,1.000108,0.999688,1.00236,1.008445
2,CDL3INSIDE,1,0,41,1.001106,1.003834,1.003414,1.001618,1.007482
3,CDLSPINNINGTOP,1,0,595,0.999965,1.001485,1.003025,1.006388,1.007101
4,CDLHARAMICROSS,1,0,66,0.999334,1.000815,1.002334,1.001427,1.006935
5,CDLHIKKAKE,1,0,354,1.000515,1.001239,1.004226,1.006416,1.006282
6,CDLHIGHWAVE,1,0,350,1.000108,1.001487,1.002965,1.00525,1.005878
7,CDLHAMMER,1,0,194,1.000693,1.001931,1.002353,1.004818,1.005009
8,CDLDOJI,1,0,812,1.000411,1.00119,1.001328,1.003058,1.004716
9,CDLLONGLEGGEDDOJI,1,0,812,1.000411,1.00119,1.001328,1.003058,1.004716


## 5. Distribution of cumulated returns after detection of a pattern

### 5.1. Helper functions

In [None]:
def get_cumrets_for_pattern(
        analytics    : pd.DataFrame,
        pattern_name : str,
        mode_value   : int,
        trend_value  : int,
        day_col      : str,
) -> pd.Series:

    '''Extract cumulative returns for a given pattern, mode and trend filter.
    Args:
        analytics:    Per-event DataFrame with pattern, mode, trend and day columns.
        pattern_name: Candlestick pattern name (e.g. 'CDLSPINNINGTOP').
        mode_value:   Signal direction: 1 (bullish) or -1 (bearish).
        trend_value:  Trend filter: 1 (up-trend), -1 (down-trend), 0 (no filter).
        day_col:      Cumulative return column to extract (e.g. 'Day_20').
    Returns:
        pd.Series of cumulative returns for the selected pattern/mode/trend.
    '''
    mask = (analytics['pattern'] == pattern_name) & (analytics['mode'] == mode_value)
    if trend_value != 0:
        mask &= (analytics['trend'] == trend_value)

    return analytics.loc[mask, day_col].dropna()


def get_random_cumrets(
        analytics_rnd_all : pd.DataFrame,
        trend_value       : int,
        day_col           : str,
) -> pd.Series:

    '''Extract cumulative returns from the random benchmark matching the trend regime.
    Args:
        analytics_rnd_all: Concatenated random analytics DataFrame.
        trend_value:       Regime: 0 (no filter), 1 (up-trend), -1 (down-trend).
        day_col:           Cumulative return column to extract (e.g. 'Day_20').
    Returns:
        pd.Series of cumulative returns for the matching random benchmark.
    '''
    if trend_value not in REGIME_TO_RANDOM:
        raise ValueError(f"trend_value must be 0, 1, or -1, got {trend_value}")

    return analytics_rnd_all.loc[
        analytics_rnd_all["pattern"] == REGIME_TO_RANDOM[trend_value], day_col
    ].dropna()

### 5.2. visualisation of distribution

In [None]:
# --- Parameters ---
pattern_name  = 'CDLSPINNINGTOP'
mode_value    = 1
trend_value   = 1      # 1=up-trend, -1=down-trend, 0=no filter
selected_days = [5, 10, 15, 20]
day_cols_sel  = [f'Day_{d:02d}' for d in selected_days]

# --- Benchmark style ---
rnd_key   = REGIME_TO_RANDOM[trend_value]
rnd_label = RANDOM_BENCHMARK_STYLE[rnd_key]["label"]
rnd_color = RANDOM_BENCHMARK_STYLE[rnd_key]["color"]

# --- Build 2x2 subplots ---
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=[f"Day {d}" for d in selected_days],
    shared_xaxes=False, shared_yaxes=False,
    vertical_spacing=0.12, horizontal_spacing=0.08
)

positions = [(1,1), (1,2), (2,1), (2,2)]

for (row, col), (d, day_col) in zip(positions, zip(selected_days, day_cols_sel)):
    cumrets        = get_cumrets_for_pattern(analytics, pattern_name, mode_value, trend_value, day_col)
    cumrets_random = get_random_cumrets(analytics_rnd_all, trend_value, day_col)

    show_rnd_legend = (row == 1 and col == 1)   # random label only once

    fig.add_trace(go.Histogram(
        x=cumrets,
        name=f"{pattern_name} | Day {d}",          # unique label per subplot
        histnorm='probability density', opacity=0.8,
        showlegend=True                             # always show pattern label
    ), row=row, col=col)

    fig.add_trace(go.Histogram(
        x=cumrets_random,
        name=rnd_label,
        histnorm='probability density', opacity=0.5,
        marker_color=rnd_color,
        showlegend=show_rnd_legend                  # random label only once
    ), row=row, col=col)

fig.update_layout(
    barmode='overlay',
    title=f"Cumulative Return Distribution — {pattern_name} | Trend={trend_value}",
    template=PLOT_TEMPLATE,
    height=PLOT_HEIGHT * 1.5,
    width=PLOT_WIDTH  * 1.5,
)

# x-axis label only on bottom row (row 2)
for i in range(1, 5):
    r = 1 if i <= 2 else 2
    c = (i - 1) % 2 + 1
    if r == 2:
        fig.update_xaxes(title_text="Cumulative Return", row=r, col=c)
    fig.update_yaxes(title_text="Density", row=r, col=c)

fig.show()

## 6. Statistical tests

### 6.1 Test for Normality of Cumulative Returns

We test whether the distribution of cumulative returns at selected holding periods follows
a normal distribution. This confirms our choice of non-parametric tests for the subsequent
significance testing.

**Result:** The distribution of cumulative returns is not normal.

In [None]:
# hypothesis
H0 = "The cumulative returns at the selected day follow a normal distribution."
print(f"H0: {H0}\n")

# --- Parameters ---
selected_days = [5, 10, 15, 20]
day_cols_sel  = [f'Day_{d:02d}' for d in selected_days]
ALPHA         = 0.05
N_min         = 30 # minum number of patterns for testing

# --- Full universe screen across selected days ---
results = []

for day_col in day_cols_sel:
    for comb in combinations:
        mode_value  = comb["mode"]
        trend_value = comb["trend"]

        sig_col  = 'bull_sigs_talib' if mode_value == 1 else 'bear_sigs_talib'
        patterns = top_patterns_talib[top_patterns_talib[sig_col] > N_min].index

        for pattern_name in patterns:
            cumrets = get_cumrets_for_pattern(analytics, pattern_name, mode_value, trend_value, day_col)
            if len(cumrets) < 3:
                continue
            _, p_value = stats.shapiro(cumrets)
            results.append({
                'day'    : day_col,
                'pattern': pattern_name,
                'mode'   : mode_value,
                'trend'  : trend_value,
                'n'      : len(cumrets),
                'p_value': round(p_value, 4),
                'verdict': 'Fail to reject H0' if p_value > ALPHA else 'Reject H0',
            })

normality_results = (
    pd.DataFrame(results)
    .sort_values(['day', 'mode', 'trend', 'p_value'], ascending=[True, True, True, False])
    .reset_index(drop=True)
)

# --- Summary: rejection counts per day and combination ---
summary_normality = (
    normality_results
    .groupby(['day', 'mode', 'trend', 'verdict'])
    .size()
    .unstack(fill_value=0)
    .rename_axis(None, axis=1)
    .reset_index()
)
for col in ['Reject H0', 'Fail to reject H0']:
    if col not in summary_normality.columns:
        summary_normality[col] = 0

summary_normality['total']      = summary_normality['Reject H0'] + summary_normality['Fail to reject H0']
summary_normality['pct_reject'] = (summary_normality['Reject H0'] / summary_normality['total'] * 100).round(1)

display(summary_normality)

H0: The cumulative returns at the selected day follow a normal distribution.



Unnamed: 0,day,mode,trend,Fail to reject H0,Reject H0,total,pct_reject
0,Day_05,-1,-1,10,9,19,47.4
1,Day_05,-1,0,0,19,19,100.0
2,Day_05,-1,1,4,15,19,78.9
3,Day_05,1,-1,9,14,23,60.9
4,Day_05,1,0,1,22,23,95.7
5,Day_05,1,1,7,16,23,69.6
6,Day_10,-1,-1,8,11,19,57.9
7,Day_10,-1,0,3,16,19,84.2
8,Day_10,-1,1,4,15,19,78.9
9,Day_10,1,-1,6,17,23,73.9


In [None]:
# --- Patterns failing to reject H0 on ALL selected days ---
fail_to_reject = normality_results[normality_results['verdict'] == 'Fail to reject H0']

# Keep only patterns present in all selected days
pattern_day_counts   = fail_to_reject.groupby('pattern')['day'].nunique()
patterns_all_days    = pattern_day_counts[pattern_day_counts == len(selected_days)].index

print(f"H0: {H0}\n")
print(f"Patterns failing to reject H0 on all {len(selected_days)} selected days: {len(patterns_all_days)}\n")

for pattern in patterns_all_days:
    subset = (
        fail_to_reject[fail_to_reject['pattern'] == pattern]
        [['day', 'mode', 'trend', 'n', 'p_value', 'verdict']]
        .sort_values(['day', 'mode', 'trend'])
        .reset_index(drop=True)
    )
    print(f"Pattern: {pattern}")
    # display(subset)
    #print()

H0: The cumulative returns at the selected day follow a normal distribution.

Patterns failing to reject H0 on all 4 selected days: 5

Pattern: CDL3INSIDE
Pattern: CDLDOJISTAR
Pattern: CDLHARAMICROSS
Pattern: CDLSEPARATINGLINES
Pattern: CDLXSIDEGAP3METHODS


In [None]:
# --- Normality test (Jarque-Bera) for random benchmark cumrets across selected days ---
results_random = []

for day_col in day_cols_sel:
    for trend_value, rnd_key in REGIME_TO_RANDOM.items():
        cumrets = get_random_cumrets(analytics_rnd_all, trend_value, day_col)
        if len(cumrets) < 3:
            continue
        stat, p_value = stats.jarque_bera(cumrets)
        results_random.append({
            'day'    : day_col,
            'pattern': rnd_key,
            'trend'  : trend_value,
            'n'      : len(cumrets),
            'stat'   : round(stat, 4),
            'p_value': p_value,
            'verdict': 'Fail to reject H0' if p_value > ALPHA else 'Reject H0',
        })

normality_results_random = (
    pd.DataFrame(results_random)
    .sort_values(['day', 'trend'])
    .reset_index(drop=True)
)

print(f"H0: {H0}\n")
display(normality_results_random.style.format({'p_value': '{:.2e}', 'stat': '{:.4f}'}))

H0: The cumulative returns at the selected day follow a normal distribution.



Unnamed: 0,day,pattern,trend,n,stat,p_value,verdict
0,Day_05,RANDOM_down_trend_filter,-1,500,169.814,1.33e-37,Reject H0
1,Day_05,RANDOM_no_trend_filter,0,500,1439.8372,0.0,Reject H0
2,Day_05,RANDOM_up_trend_filter,1,500,350.1561,9.22e-77,Reject H0
3,Day_10,RANDOM_down_trend_filter,-1,500,348.3113,2.32e-76,Reject H0
4,Day_10,RANDOM_no_trend_filter,0,500,184.6548,7.99e-41,Reject H0
5,Day_10,RANDOM_up_trend_filter,1,500,323.1122,6.87e-71,Reject H0
6,Day_15,RANDOM_down_trend_filter,-1,500,151.3353,1.37e-33,Reject H0
7,Day_15,RANDOM_no_trend_filter,0,500,305.1525,5.4599999999999996e-67,Reject H0
8,Day_15,RANDOM_up_trend_filter,1,500,388.3412,4.71e-85,Reject H0
9,Day_20,RANDOM_down_trend_filter,-1,500,158.3943,4.03e-35,Reject H0


### 6.2. Test for equal means and equal win rates

#### 6.2.1. Helper Functions

The helper functions support the following tasks:
* Checking whether a sufficient number of occurrences exists for meaningful statistical testing
* Calculating the win rate of a pattern depending on signal mode (bullish signals require cumrets > 1, bearish signals require cumrets < 1)

In [None]:
def data_quality(n: int) -> str:
    if n > 100:   return 'good'
    elif n >= 30: return 'acceptable'
    else:         return 'indication only'

def win_condition(cumrets: np.ndarray, mode_value: int) -> np.ndarray:
    ''' defines what is a winner: for a bullish signal cumrets > 1 wins, for a bearish signal cumrets < 1 win
    Args:
        cumrets: cumulative returns
        mode_value: 1 (bullish) or -1 (bearish)
    Returns:
        np.ndarray of booleans
    '''
    return cumrets > 1 if (mode_value == 1 ) else cumrets < 1

def wr_direction(wr_diff: float) -> str:
    ''' defines the direction of the win rate '''
    return 'over' if wr_diff > 0 else 'under'

We test the following hypotheses:
* **H0_µ**: The mean cumulative return after a pattern signal equals the mean cumulative return after a random entry (µ_pattern = µ_random)
* **H0_wr**: The win rate after a pattern signal equals the win rate after a random entry (wr_pattern = wr_random)

For each pattern we compare its cumulative returns — filtered by mode (bullish/bearish) and
trend (no filter, uptrend, downtrend) — against the cumulative returns of a random entry
series filtered by the same trend condition.

The tested series have the following properties:
* Non-normally distributed
* Independent
* Unequal sample sizes

The most robust and suitable test under these conditions is the permutation test.

**H0_µ** is tested using a **mean-centered permutation test**. Mean-centering imposes the
null hypothesis (µ_pattern = µ_random) before pooling, ensuring the permuted null distribution
correctly reflects "no difference in means".

**H0_wr** is tested using a **standard (non-centered) permutation test**. Win rates are binary
series (0/1), and centering would destroy the binary structure and invalidate the null
distribution. Pooling the raw 0/1 values and reshuffling correctly reflects "no difference
in win rates".


In [None]:
def permutation_test_centered(
    sample: np.ndarray,
    reference: np.ndarray,
    n_perm: int = 10_000,
    rng: np.random.Generator=None,
    ) -> tuple[float, float]:

    ''' Permutation test: Version mean-centric-permutation (test for µ1=µ2) and continuous values
    Args:
        sample: sample 1
        reference: sample 2 (reference)
        n_perm:number of permutations
        rng: random number generator
    Returns as tuple (,):
        T_obs: observed difference
        p_value: p-value
    '''
    n = len(sample)
    obs = sample.mean() - reference.mean()   # observation (T_obs)

    # make tw samples centric and form the urn (=pool)
    pool = np.concatenate([sample - sample.mean(), reference - reference.mean()])

    # Monte Carlo-Simulation (n_perm-permutations, slicing at n and calculating the difference of the means)
    perms  = np.array([
        (perm := rng.permutation(pool))[:n].mean() - perm[n:].mean()
        for _ in range(n_perm)
    ])

    return obs, np.mean(np.abs(perms) >= np.abs(obs)) # boolean array for the difference, then averaging (True =1, False=0)

def permutation_test_standard(
    sample: np.ndarray,
    reference: np.ndarray,
    n_perm: int = 10_000,
    rng: np.random.Generator = None,
    ) -> tuple[float, float]:

    ''' Standard Permutation test: pool-and-reshuffle (no mean-centering) (tests p1=p2).
        suitable also for binary series (as win-rates), preserves binary 0/1 structure.
    Args:
        sample: sample 1
        reference: sample 2 (reference)
        n_perm: number of permutations
        rng: random number generator
    Returns as tuple (,):
        T_obs: observed win rate difference (sample - reference)
        p_value: p-value
    '''
    n   = len(sample)
    obs = sample.mean() - reference.mean()

    # pool raw 0/1 values — no centering
    pool = np.concatenate([sample, reference])

    # Monte Carlo simulation
    perms = np.array([
        (perm := rng.permutation(pool))[:n].mean() - perm[n:].mean()
        for _ in range(n_perm)
    ])

    return obs, np.mean(np.abs(perms) >= np.abs(obs))


To speed up execution, both tests are implemented using explicit loops with optimized
random number generation, accelerated by Numba JIT compilation.

In [None]:
# permutations using numba functionality for acceleration
@njit
def _permutation_test_centered_numba(a, b, n_perm, seed):
    """Mean-centered permutation test for continuous data (tests µ1 = µ2)."""
    na = len(a)
    nb = len(b)
    n  = na + nb

    # center both samples (impose H0: µ1 = µ2)
    mean_a = a.mean()
    mean_b = b.mean()

    combined = np.empty(n)
    for i in range(na):
        combined[i] = a[i] - mean_a
    for i in range(nb):
        combined[na + i] = b[i] - mean_b

    T_obs     = mean_a - mean_b
    abs_T_obs = T_obs if T_obs >= 0.0 else -T_obs  # abs(T_obs)

    rng_state = seed
    count = 0

    for _ in range(n_perm):
        for i in range(n - 1, 0, -1):
            rng_state = (rng_state * 6364136223846793005 + 1442695040888963407) & 0xFFFFFFFFFFFFFFFF
            j = rng_state % (i + 1)
            combined[i], combined[j] = combined[j], combined[i]

        s = 0.0
        for k in range(na):
            s += combined[k]
        diff     = s / na - (combined[na:].sum()) / nb
        abs_diff = diff if diff >= 0.0 else -diff

        if abs_diff >= abs_T_obs:
            count += 1

    return T_obs, count / n_perm


def permutation_test_centered_fast(a, b, n_perm=10_000, rng=None):
    """Drop-in replacement for permutation_test() — continuous cumrets."""
    seed = int(rng.integers(0, 2**31)) if rng is not None else 42
    return _permutation_test_centered_numba(
        a.astype(np.float64),
        b.astype(np.float64),
        n_perm,
        seed
    )

@njit
def _permutation_test_standard_numba(a, b, n_perm, seed):
    """Standard (non-centered) permutation test for binary win/loss data (tests p1 = p2)."""
    na = len(a)
    nb = len(b)
    n  = na + nb

    # no centering — pool raw 0/1 values
    combined = np.empty(n)
    for i in range(na):
        combined[i] = a[i]
    for i in range(nb):
        combined[na + i] = b[i]

    T_obs     = a.mean() - b.mean()
    abs_T_obs = T_obs if T_obs >= 0.0 else -T_obs  # abs(T_obs)

    rng_state = seed
    count = 0

    for _ in range(n_perm):
        for i in range(n - 1, 0, -1):
            rng_state = (rng_state * 6364136223846793005 + 1442695040888963407) & 0xFFFFFFFFFFFFFFFF
            j = rng_state % (i + 1)
            combined[i], combined[j] = combined[j], combined[i]

        s = 0.0
        for k in range(na):
            s += combined[k]
        diff     = s / na - (combined[na:].sum()) / nb
        abs_diff = diff if diff >= 0.0 else -diff

        if abs_diff >= abs_T_obs:
            count += 1

    return T_obs, count / n_perm

def permutation_test_standard_fast(a, b, n_perm=10_000, rng=None):
    """Permutation test for binary win-rate comparison."""
    seed = int(rng.integers(0, 2**31)) if rng is not None else 42
    return _permutation_test_standard_numba(
        a.astype(np.float64),
        b.astype(np.float64),
        n_perm,
        seed
    )

For all combinations of patterns, holding periods, mode (bullish/bearish), and trend filter
(no filter, uptrend, downtrend) we perform the following steps:
* Extracting the relevant cumulative returns for the pattern and the matching random series (same trend condition)
* Testing H0_µ with two independent tests: a mean-centered permutation test and a Welch t-test
* Testing H0_wr with a standard (non-centered) permutation test

In [None]:
# H0 definitions
H0_perm    = "The mean cumulative return after a pattern signal equals the mean cumulative return after a random entry (µ_pattern = µ_random)."
H0_winrate = "The win rate of pattern signals equals the win rate of random entries (p_pattern = p_random)."
print(f"H0 mean:     {H0_perm}")
print(f"H0 win rate: {H0_winrate}\n")

# parameters
ALPHA = 0.05
N_PERM        = 10_000
selected_days = [5, 10, 15, 20]
day_cols_sel  = [f'Day_{d:02d}' for d in selected_days]

# Analysing all combinations in all selected days
results_all = []

for day_col, d in zip(day_cols_sel, selected_days):
    for comb in combinations:
        mode_value  = comb["mode"]
        trend_value = comb["trend"]
        desc        = comb["description"]

        # for current day select the relevant random series and get cumrets and winrate
        cumrets_random = get_random_cumrets(analytics_rnd_all, trend_value, day_col).to_numpy()
        wins_random    = win_condition(cumrets_random, mode_value).astype(float)
        wr_random      = wins_random.mean()

        # statistical tests
        for pattern_name in tqdm(top_patterns_talib.index, desc=f"Day {d} | {desc}", unit="pattern"):
            # for current day select the relevant cumrets of patterns (by mode and trend) from top-patterns
            cumrets_pattern = get_cumrets_for_pattern(
                analytics, pattern_name, mode_value, trend_value, day_col
            ).to_numpy()
            n_pat = len(cumrets_pattern)
            if n_pat < 10:
                continue

            # Mean permutation test
            T_obs, p_perm = permutation_test_centered_fast(cumrets_pattern, cumrets_random, n_perm=N_PERM, rng=rng) # numba compiled version
            # T_obs, p_perm = permutation_test_centered(cumrets_pattern, cumrets_random, n_perm=N_PERM, rng=rng)        # vectorized version

            # Welch t-test
            _, p_welch = stats.ttest_ind(cumrets_pattern, cumrets_random, equal_var=False)

            # Win-rate permutation test
            wins_pattern = win_condition(cumrets_pattern, mode_value).astype(float)
            wr_pattern   = wins_pattern.mean()
            wr_obs, p_winrate = permutation_test_standard_fast(wins_pattern, wins_random, n_perm=N_PERM, rng=rng)  # numba-compiled version
            # wr_obs, p_winrate = permutation_test_standard(wins_pattern, wins_random, n_perm=N_PERM, rng=rng)         # vectorized version
            results_all.append({
                'day'            : d,
                'pattern'        : pattern_name,
                'mode'           : mode_value,
                'trend'          : trend_value,
                'n_pattern'      : n_pat,
                'n_random'       : len(cumrets_random),
                'data_quality'   : data_quality(n_pat),
                'T_obs'          : round(T_obs,      6),
                'p_perm'         : round(p_perm,     4),
                'verdict_perm'   : 'Reject H0' if p_perm    < ALPHA else 'Fail to reject H0',
                'p_welch'        : round(p_welch,    4),
                'verdict_welch'  : 'Reject H0' if p_welch   < ALPHA else 'Fail to reject H0',
                'wr_pattern'     : round(wr_pattern, 4),
                'wr_random'      : round(wr_random,  4),
                'wr_diff'        : round(wr_obs,     4),
                'wr_direction'   : wr_direction(wr_obs),
                'p_winrate'      : round(p_winrate,  4),
                'verdict_winrate': 'Reject H0' if p_winrate < ALPHA else 'Fail to reject H0',
            })

# Build results DataFrame
perm_results = (
    pd.DataFrame(results_all)
    .sort_values(['day', 'mode', 'trend', 'p_perm'])
    .reset_index(drop=True)
)

H0 mean:     The mean cumulative return after a pattern signal equals the mean cumulative return after a random entry (µ_pattern = µ_random).
H0 win rate: The win rate of pattern signals equals the win rate of random entries (p_pattern = p_random).



From the results we extract, for each holding period, all patterns where at least one test
rejects H0_µ — i.e. the mean cumulative return after the pattern signal is statistically
significantly different from the random benchmark.

However, rejection of H0_µ alone does not indicate whether the deviation is favorable or
unfavorable. A significant difference can be positive or negative. We therefore also examine
T_obs (the observed mean difference):

* **Rejection of H0_µ AND T_obs > 0** → the pattern signal produces statistically significantly
higher cumulative returns than random entries — a genuine edge
* **Rejection of H0_µ AND T_obs < 0** → the pattern signal produces statistically significantly
lower cumulative returns than random entries — a contrarian signal




In [None]:
# check for patterns where equal means hypothesis is rejected
cols = [
    'day', 'pattern', 'mode', 'trend', 'n_pattern', 'data_quality',
    'T_obs', 'p_perm', 'verdict_perm', 'p_welch', 'verdict_welch']

mask_quality = perm_results['data_quality'] != 'indication only'
mask_mean    = (
    (perm_results['verdict_perm']  == 'Reject H0') |
    (perm_results['verdict_welch'] == 'Reject H0')
)
perm_rejected = (
    perm_results[mask_quality & mask_mean]
    .sort_values(['day', 'mode', 'trend', 'p_perm'])
    .reset_index(drop=True)
)

# Output section
print(f"H0 mean: {H0_perm}\n")
print(f"Patterns where at least one test rejects H0 (excl. indication only): "
      f"{len(perm_rejected)} / {len(perm_results)} tested\n")

for d in selected_days:
    subset = perm_rejected[perm_rejected['day'] == d].reset_index(drop=True)
    print(f"\n── Day {d} ── {len(subset)} rejection(s)")
    display(subset[cols])

H0 mean: The mean cumulative return after a pattern signal equals the mean cumulative return after a random entry (µ_pattern = µ_random).

Patterns where at least one test rejects H0 (excl. indication only): 20 / 504 tested


── Day 5 ── 5 rejection(s)


Unnamed: 0,day,pattern,mode,trend,n_pattern,data_quality,T_obs,p_perm,verdict_perm,p_welch,verdict_welch
0,5,CDLCLOSINGMARUBOZU,-1,0,525,good,0.004709,0.005,Reject H0,0.006,Reject H0
1,5,CDLMARUBOZU,-1,1,90,acceptable,0.005136,0.0213,Reject H0,0.0287,Reject H0
2,5,CDLCLOSINGMARUBOZU,1,-1,212,good,-0.005982,0.0464,Reject H0,0.0629,Fail to reject H0
3,5,CDL3OUTSIDE,1,0,101,good,-0.004742,0.0545,Fail to reject H0,0.0344,Reject H0
4,5,CDLENGULFING,1,1,278,good,-0.002666,0.0467,Reject H0,0.0376,Reject H0



── Day 10 ── 8 rejection(s)


Unnamed: 0,day,pattern,mode,trend,n_pattern,data_quality,T_obs,p_perm,verdict_perm,p_welch,verdict_welch
0,10,CDLMARUBOZU,-1,1,90,acceptable,0.006422,0.0362,Reject H0,0.0346,Reject H0
1,10,CDL3OUTSIDE,1,0,101,good,-0.009595,0.0038,Reject H0,0.0047,Reject H0
2,10,CDLENGULFING,1,0,400,good,-0.005488,0.0068,Reject H0,0.0071,Reject H0
3,10,CDLMARUBOZU,1,0,469,good,-0.005374,0.009,Reject H0,0.0085,Reject H0
4,10,CDLLONGLINE,1,0,904,good,-0.003813,0.0252,Reject H0,0.0238,Reject H0
5,10,CDLENGULFING,1,1,278,good,-0.005306,0.0057,Reject H0,0.0049,Reject H0
6,10,CDL3OUTSIDE,1,1,82,acceptable,-0.008248,0.0091,Reject H0,0.0102,Reject H0
7,10,CDLMARUBOZU,1,1,343,good,-0.003946,0.025,Reject H0,0.0284,Reject H0



── Day 15 ── 5 rejection(s)


Unnamed: 0,day,pattern,mode,trend,n_pattern,data_quality,T_obs,p_perm,verdict_perm,p_welch,verdict_welch
0,15,CDLMARUBOZU,-1,1,90,acceptable,0.007772,0.0272,Reject H0,0.0468,Reject H0
1,15,CDLENGULFING,1,0,400,good,-0.005915,0.0179,Reject H0,0.0185,Reject H0
2,15,CDLLONGLINE,1,0,904,good,-0.004282,0.0383,Reject H0,0.0401,Reject H0
3,15,CDLSHORTLINE,1,0,473,good,-0.004723,0.0425,Reject H0,0.0441,Reject H0
4,15,CDLENGULFING,1,1,278,good,-0.004468,0.0401,Reject H0,0.0428,Reject H0



── Day 20 ── 2 rejection(s)


Unnamed: 0,day,pattern,mode,trend,n_pattern,data_quality,T_obs,p_perm,verdict_perm,p_welch,verdict_welch
0,20,CDLCLOSINGMARUBOZU,-1,0,525,good,0.006588,0.037,Reject H0,0.0362,Reject H0
1,20,CDLCLOSINGMARUBOZU,-1,1,194,good,0.006625,0.0414,Reject H0,0.0425,Reject H0


The results show all combinations of holding period, pattern, mode, and trend filter where
the mean cumulative return after a pattern signal is statistically significantly higher than
after random entries.

While some patterns do show a positive effect, the magnitude of improvement is modest.
Putting this in context of occurrence counts: the strongest observed outperformance (0.78%) belongs to CDLMARABOZU, but with only 90 occurrences over 25 years this is a weak statistical basis. The remaining outperforming patterns show improvements ranging from 0.4%
to 0.7% — small effects that warrant cautious interpretation.

In [None]:
# compact view of rows of interest (rejection H0_µ and  T_obs >0)
mask_t_obs = perm_results['T_obs'] > 0
mask_mean_reject = (perm_results['verdict_perm'] == 'Reject H0') | (perm_results['verdict_welch'] == 'Reject H0')
mask_data_quality = (perm_results['data_quality'] == 'good') | (perm_results['data_quality'] == 'acceptable')

final_mask = mask_t_obs & mask_mean_reject & mask_data_quality
cols_to_display = ['day', 'pattern', 'mode', 'trend', 'n_pattern', 'data_quality', 'T_obs', 'p_perm', 'verdict_perm', 'p_welch', 'verdict_welch']
perm_results[final_mask][cols_to_display]

Unnamed: 0,day,pattern,mode,trend,n_pattern,data_quality,T_obs,p_perm,verdict_perm,p_welch,verdict_welch
18,5,CDLCLOSINGMARUBOZU,-1,0,525,good,0.004709,0.005,Reject H0,0.006,Reject H0
37,5,CDLMARUBOZU,-1,1,90,acceptable,0.005136,0.0213,Reject H0,0.0287,Reject H0
163,10,CDLMARUBOZU,-1,1,90,acceptable,0.006422,0.0362,Reject H0,0.0346,Reject H0
289,15,CDLMARUBOZU,-1,1,90,acceptable,0.007772,0.0272,Reject H0,0.0468,Reject H0
396,20,CDLCLOSINGMARUBOZU,-1,0,525,good,0.006588,0.037,Reject H0,0.0362,Reject H0
415,20,CDLCLOSINGMARUBOZU,-1,1,194,good,0.006625,0.0414,Reject H0,0.0425,Reject H0


In [None]:
# check for patterns where win-rate is significantly different from win-rate of random entries
cols = [
    'day', 'pattern', 'mode', 'trend', 'n_pattern', 'data_quality','T_obs',
    'wr_pattern', 'wr_random', 'wr_diff', 'wr_direction', 'verdict_winrate',
]
mask_quality = perm_results['data_quality'] != 'indication only'
mask_winrate = perm_results['verdict_winrate'] == 'Reject H0'

perm_rejected_wr = (
    perm_results[mask_quality & mask_winrate]
    .sort_values(['day', 'mode', 'trend', 'p_winrate'])
    .reset_index(drop=True)
)

print(f"H0 win rate: {H0_winrate}\n")
print(f"Patterns where test rejects H0 (excl. indication only): "
      f"{len(perm_rejected_wr)} / {len(perm_results)} tested\n")

for d in selected_days:
    subset = perm_rejected_wr[perm_rejected_wr['day'] == d].reset_index(drop=True)
    print(f"\n── Day {d} ── {len(subset)} rejection(s)")
    display(subset[cols])

H0 win rate: The win rate of pattern signals equals the win rate of random entries (p_pattern = p_random).

Patterns where test rejects H0 (excl. indication only): 4 / 504 tested


── Day 5 ── 2 rejection(s)


Unnamed: 0,day,pattern,mode,trend,n_pattern,data_quality,T_obs,wr_pattern,wr_random,wr_diff,wr_direction,verdict_winrate
0,5,CDLENGULFING,1,0,400,good,-0.001532,0.485,0.566,-0.081,under,Reject H0
1,5,CDLENGULFING,1,1,278,good,-0.002666,0.482,0.576,-0.094,under,Reject H0



── Day 10 ── 0 rejection(s)


Unnamed: 0,day,pattern,mode,trend,n_pattern,data_quality,T_obs,wr_pattern,wr_random,wr_diff,wr_direction,verdict_winrate



── Day 15 ── 0 rejection(s)


Unnamed: 0,day,pattern,mode,trend,n_pattern,data_quality,T_obs,wr_pattern,wr_random,wr_diff,wr_direction,verdict_winrate



── Day 20 ── 2 rejection(s)


Unnamed: 0,day,pattern,mode,trend,n_pattern,data_quality,T_obs,wr_pattern,wr_random,wr_diff,wr_direction,verdict_winrate
0,20,CDLSHOOTINGSTAR,-1,0,98,acceptable,0.009268,0.2041,0.384,-0.1799,under,Reject H0
1,20,CDLSHOOTINGSTAR,-1,1,81,acceptable,0.007483,0.2222,0.356,-0.1338,under,Reject H0


We now filter for a compact overview of all combinations of holding period, pattern, mode,
and trend filter where the win rate after a pattern signal is statistically significantly
different from the win rate of the corresponding random series.

A clear pattern emerges: in every case where H0_wr is rejected, the win rate of the pattern
signal is **lower** than the win rate of the random benchmark — consistent with the contrarian
behavior already observed in the mean cumulative return analysis.

In [None]:
perm_results[perm_results.verdict_winrate == 'Reject H0'][['day','pattern', 'mode', 'trend','wr_pattern', 'wr_random', 'wr_diff', 'wr_direction', 'p_winrate', 'verdict_winrate']]

Unnamed: 0,day,pattern,mode,trend,wr_pattern,wr_random,wr_diff,wr_direction,p_winrate,verdict_winrate
6,5,CDLSHOOTINGSTAR,-1,-1,0.1765,0.438,-0.2615,under,0.0427,Reject H0
84,5,CDLENGULFING,1,0,0.485,0.566,-0.081,under,0.0171,Reject H0
102,5,CDLENGULFING,1,1,0.482,0.576,-0.094,under,0.0136,Reject H0
379,20,CDLSHOOTINGSTAR,-1,-1,0.1176,0.402,-0.2844,under,0.021,Reject H0
397,20,CDLSHOOTINGSTAR,-1,0,0.2041,0.384,-0.1799,under,0.0007,Reject H0
417,20,CDLSHOOTINGSTAR,-1,1,0.2222,0.356,-0.1338,under,0.0209,Reject H0
