# Signal Detectors — December 2025

Testing `signalflow-ta` signal detectors on real Binance 1m OHLCV data (December 2025).  
Analysis metrics via `signalflow.analytic.signals`.

In [None]:
from datetime import datetime
from pathlib import Path

import asyncio
import polars as pl
import pandas as pd

from signalflow.data.raw_store import DuckDbRawStore
from signalflow.data.source import BinanceSpotLoader
from signalflow.core import RawData, RawDataView, Signals
from signalflow.analytic.signals import (
    SignalProfileMetric,
    SignalDistributionMetric,
    SignalPairPrice,
)
from signalflow.ta.signals import (
    BollingerBandDetector1,
    RsiAnomalyDetector1,
    AdxRegimeDetector1,
    StochasticDetector1,
    MfiDetector1,
    DivergenceDetector1,
    DivergenceDetector2,
    AroonCrossDetector1,
    CciAnomalyDetector1,
)

## 1. Download Data from Binance

Download 1m spot OHLCV candles from Binance REST API into a local DuckDB store.  
`BinanceSpotLoader.download()` handles pagination, rate limits, gap detection, and deduplication automatically.

In [None]:
DB_PATH = Path("market.duckdb")
PAIRS = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "BNBUSDT", "XRPUSDT"]
DEC_START = datetime(2025, 11, 1)
DEC_END = datetime(2026, 2, 12)

# Buffer: Nov 1 (warmup) .. Jan 5 (look-ahead)
START = datetime(2025, 11, 1)
END = datetime(2026, 2, 12)

# --- Download from Binance into DuckDB ---
store = DuckDbRawStore(db_path=DB_PATH, data_type="spot", timeframe="1m")
loader = BinanceSpotLoader(store=store, timeframe="1m")

await loader.download(pairs=PAIRS, start=START, end=END, fill_gaps=True)
# Note: loader.download() closes the store connection internally

print(f"Download complete. Store: {DB_PATH}")

## 2. Load Data

In [None]:
# --- Load 1m data from store ---
store = DuckDbRawStore(db_path=DB_PATH, data_type="spot", timeframe="1m")
raw_data = store.to_raw_data(pairs=PAIRS, start=START, end=END)
store.close()

view = RawDataView(raw=raw_data)
spot = raw_data["spot"]

print(f"1m bars: {spot.height:,} ({spot.height // len(PAIRS):,} per pair)")
print(f"Range : {spot['timestamp'].min()} -> {spot['timestamp'].max()}")

## 3. Configure Detectors

Parameters scaled for 1m data (periods x60 vs 1h, thresholds raised to filter noise).

In [None]:
# --- Detector registry (edit this dict to choose which detectors to analyze) ---
detectors = {
    # "BollingerBand": BollingerBandDetector1(period=1200, std_dev=2.5, direction="both"),
    # "RSI Anomaly": RsiAnomalyDetector1(
    #     rsi_period=840, zscore_window=6000, threshold=2.5, direction="both"
    # ),
    # "ADX Regime": AdxRegimeDetector1(
    #     adx_period=840, adx_threshold=30, direction="both"
    # ),
    # "Stochastic": StochasticDetector1(
    #     stoch_period=840, stoch_smooth_k=180, stoch_smooth_d=180, direction="both"
    # ),
    # "MFI": MfiDetector1(mfi_period=840, direction="both"),
    "Divergence": DivergenceDetector2(
        rsi_period=14, lookback=50, offset=60, direction="both",
    ),
    # "Aroon Cross": AroonCrossDetector1(period=1500, direction="both"),
    # "CCI Anomaly": CciAnomalyDetector1(
    #     cci_period=1200, zscore_window=6000, threshold=2.0, direction="both"
    # ),
}

print(f"Configured {len(detectors)} detectors: {', '.join(detectors.keys())}")

## 4. Run Detectors

In [None]:
def deduplicate_series(df: pl.DataFrame) -> pl.DataFrame:
    """Remove consecutive signals of the same type per pair, keep only the first."""
    return (
        df.sort("pair", "timestamp")
        .with_columns(
            pl.col("signal_type").shift(1).over("pair").alias("_prev_type")
        )
        .filter(
            pl.col("_prev_type").is_null() | (pl.col("signal_type") != pl.col("_prev_type"))
        )
        .drop("_prev_type")
    )


def filter_signals(signals: Signals, signal_type: str) -> Signals:
    """Filter signals to a specific signal_type, deduplicate series, map to signal column."""
    df = signals.value
    df = df.filter(
        (pl.col("timestamp") >= DEC_START) & (pl.col("timestamp") <= DEC_END)
    )
    df = deduplicate_series(df)
    df = df.filter(pl.col("signal_type") == signal_type)
    df = df.with_columns(
        pl.when(pl.col("signal_type") == "rise").then(1)
        .when(pl.col("signal_type") == "fall").then(-1)
        .otherwise(0)
        .alias("signal")
    )
    return Signals(df)


# Run all detectors, store raw results
raw_results = {}
for name, detector in detectors.items():
    try:
        raw_signals = detector.run(view)
        raw_results[name] = raw_signals
        df = raw_signals.value
        df_dedup = deduplicate_series(df)
        n_rise_raw = df.filter(pl.col("signal_type") == "rise").height
        n_fall_raw = df.filter(pl.col("signal_type") == "fall").height
        n_rise = df_dedup.filter(pl.col("signal_type") == "rise").height
        n_fall = df_dedup.filter(pl.col("signal_type") == "fall").height
        print(
            f"  {name:20s} | rise: {n_rise_raw:6d} -> {n_rise:5d} | "
            f"fall: {n_fall_raw:6d} -> {n_fall:5d}"
        )
    except Exception as e:
        print(f"  {name:20s} | ERROR: {e}")

print(f"\n{len(raw_results)}/{len(detectors)} detectors completed")

## 5. Summary Table

In [None]:
rows = []
for name, signals in raw_results.items():
    df = signals.value.filter(
        (pl.col("timestamp") >= DEC_START) & (pl.col("timestamp") <= DEC_END)
    )
    for pair in PAIRS:
        pair_df = df.filter(pl.col("pair") == pair)
        rows.append({
            "Detector": name,
            "Pair": pair,
            "Rise": pair_df.filter(pl.col("signal_type") == "rise").height,
            "Fall": pair_df.filter(pl.col("signal_type") == "fall").height,
            "Total": pair_df.height,
        })

summary = pd.DataFrame(rows)
pivot = summary.pivot_table(
    index="Detector", columns="Pair", values="Total",
    aggfunc="sum", fill_value=0, margins=True, margins_name="TOTAL",
)
pivot.sort_values("TOTAL", ascending=False)

## 6. Signal Profile — Rise vs Fall

Post-signal price behavior over 48 bars (48 min on 1m data).  
Separate plots for **rise** (bullish) and **fall** (bearish) signals.

In [None]:
profile_metric = SignalProfileMetric(look_ahead=48)

for signal_type in ("rise", "fall"):
    print(f"\n{'='*60}")
    print(f"  Signal type: {signal_type.upper()}")
    print(f"{'='*60}")

    for name, raw_signals in raw_results.items():
        signals = filter_signals(raw_signals, signal_type)
        if signals.value.height == 0:
            print(f"  {name}: no {signal_type} signals")
            continue

        metrics, ctx = profile_metric.compute(raw_data, signals)
        if metrics is None:
            print(f"  {name}: insufficient data for profile")
            continue

        q = metrics["quant"]
        print(
            f"  {name:20s} | Signals: {q['n_signals']:5d} | "
            f"Final mean: {q['final_mean']:+.2f}% | "
            f"Avg max uplift: {q['avg_max_uplift']:.2f}%"
        )

        fig = profile_metric.plot(metrics, ctx, raw_data, signals)
        fig.update_layout(title_text=f"{name} — {signal_type.upper()} — Post-Signal Profile")
        fig.show()

## 7. Signal-Price Overlay — Rise vs Fall (BTCUSDT)

In [None]:
pair_metric = SignalPairPrice(pairs=["BTCUSDT"])

for signal_type in ("rise", "fall"):
    print(f"\n{'='*60}")
    print(f"  Signal type: {signal_type.upper()}")
    print(f"{'='*60}")

    for name, raw_signals in raw_results.items():
        signals = filter_signals(raw_signals, signal_type)
        if signals.value.height == 0:
            continue

        metrics, ctx = pair_metric.compute(raw_data, signals)
        figs = pair_metric.plot(metrics, ctx, raw_data, signals)
        for fig in figs:
            fig.update_layout(title_text=f"{name} — {signal_type.upper()} — BTCUSDT")
            fig.show()

## 8. Signal Distribution — Rise vs Fall

In [None]:
dist_metric = SignalDistributionMetric(
    rolling_window_minutes=24,
    ma_window_hours=72,
)

for signal_type in ("rise", "fall"):
    print(f"\n{'='*60}")
    print(f"  Signal type: {signal_type.upper()}")
    print(f"{'='*60}")

    for name, raw_signals in raw_results.items():
        signals = filter_signals(raw_signals, signal_type)
        if signals.value.height == 0:
            continue

        metrics, ctx = dist_metric.compute(raw_data, signals)
        if metrics is None:
            print(f"  {name}: no signals for distribution")
            continue

        q = metrics["quant"]
        print(
            f"  {name:20s} | Pairs: {q['total_pairs']} | "
            f"Mean: {q['mean_signals_per_pair']:.1f} | "
            f"Max rolling: {q['max_rolling_signals']}"
        )

        fig = dist_metric.plot(metrics, ctx, raw_data, signals)
        fig.update_layout(title_text=f"{name} — {signal_type.upper()} — Signal Distribution")
        fig.show()