In [None]:
pip install ccxt pandas numpy

In [10]:
from __future__ import annotations
import time
import math
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional, Tuple

import ccxt
import pandas as pd
import numpy as np

# ---------- Utilities

def to_millis(dt: datetime) -> int:
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    return int(dt.timestamp() * 1000)

def from_millis(ms: int) -> datetime:
    return datetime.fromtimestamp(ms / 1000, tz=timezone.utc)

def ensure_usdt_symbols(symbols: List[str]) -> List[str]:
    return [s for s in symbols if s.endswith("/USDT")]

def interval_to_ccxt_tf(tf: str) -> str:
    # pass-through for standard intervals like '1m','5m','15m','1h','4h','1d'
    return tf

# ---------- Technicals (no external TA libs)

def ema(series: pd.Series, span: int) -> pd.Series:
    return series.ewm(span=span, adjust=False).mean()

def rsi(series: pd.Series, period: int = 14) -> pd.Series:
    delta = series.diff()
    up = np.where(delta > 0, delta, 0.0)
    down = np.where(delta < 0, -delta, 0.0)
    roll_up = pd.Series(up, index=series.index).ewm(alpha=1/period, adjust=False).mean()
    roll_down = pd.Series(down, index=series.index).ewm(alpha=1/period, adjust=False).mean()
    rs = roll_up / (roll_down.replace(0, np.nan))
    rsi_val = 100 - (100 / (1 + rs))
    return rsi_val.fillna(0.0)

def macd(series: pd.Series, fast=12, slow=26, signal=9) -> Tuple[pd.Series, pd.Series, pd.Series]:
    ema_fast = ema(series, fast)
    ema_slow = ema(series, slow)
    macd_line = ema_fast - ema_slow
    signal_line = ema(macd_line, signal)
    hist = macd_line - signal_line
    return macd_line, signal_line, hist

def rolling_percentiles(series: pd.Series, window: int = 50,
                        percentiles: Tuple[float, float, float] = (0.2, 0.5, 0.8)
                       ) -> Tuple[pd.Series, pd.Series, pd.Series]:
    p_low, p_mid, p_high = percentiles
    def _roll_quantile(s, q):
        return s.rolling(window, min_periods=max(2, int(window/5))).quantile(q)
    return (_roll_quantile(series, p_low),
            _roll_quantile(series, p_mid),
            _roll_quantile(series, p_high))

# ---------- ORB helpers (session = UTC day by default)

def mark_orb(df: pd.DataFrame, timeframe: str) -> pd.DataFrame:
    # Assumes UTC timestamps
    df = df.copy()
    df["date"] = df.index.tz_convert("UTC").date if hasattr(df.index, "tz") and df.index.tz is not None else pd.to_datetime(df.index, utc=True).date
    # first candle per day
    first_idx = df.groupby("date").head(1).index
    df["is_orb"] = df.index.isin(first_idx)
    # propagate ORB high/low/mid/L1/L2 for the day
    orb_high = df.loc[first_idx, "high"].rename("orb_high")
    orb_low  = df.loc[first_idx, "low"].rename("orb_low")
    per_day = pd.DataFrame({"orb_high": orb_high, "orb_low": orb_low})
    per_day["orb_mid"] = (per_day["orb_high"] + per_day["orb_low"]) / 2.0
    # L1/L2 levels (50% extensions from ORB high/low)
    # Bullish side: L1_bull = orb_high + 0.5*(orb_high - orb_low), L2_bull = L1_bull + 0.5*(L1_bull - orb_high)
    # Bearish side: L1_bear = orb_low - 0.5*(orb_high - orb_low), L2_bear = L1_bear - 0.5*(orb_low - L1_bear)
    rng = (per_day["orb_high"] - per_day["orb_low"])
    per_day["L1_bull"] = per_day["orb_high"] + 0.5 * rng
    per_day["L2_bull"] = per_day["L1_bull"] + 0.5 * (per_day["L1_bull"] - per_day["orb_high"])
    per_day["L1_bear"] = per_day["orb_low"] - 0.5 * rng
    per_day["L2_bear"] = per_day["L1_bear"] - 0.5 * (per_day["orb_low"] - per_day["L1_bear"])
    # attach by day
    per_day["date"] = per_day.index.tz_convert("UTC").date
    df = df.merge(per_day.reset_index()[["date","orb_high","orb_low","orb_mid","L1_bull","L2_bull","L1_bear","L2_bear"]],
                  on="date", how="left").set_index(df.index)
    df = df.drop(columns=["date"])
    return df

# ---------- Candlestick stats

def candle_stats(df: pd.DataFrame) -> pd.DataFrame:
    o = df["open"]; h = df["high"]; l = df["low"]; c = df["close"]
    body = (c - o).abs()
    rng = (h - l).replace(0, np.nan)
    upper_wick = (h - np.maximum(c, o))
    lower_wick = (np.minimum(c, o) - l)
    stats = pd.DataFrame(index=df.index)
    stats["candle_color"] = np.where(c > o, "bull", np.where(c < o, "bear", "doji"))
    stats["body"] = body
    stats["range"] = (h - l)
    stats["upper_wick"] = upper_wick
    stats["lower_wick"] = lower_wick
    stats["body_pct_of_range"] = (body / rng).fillna(0.0)
    stats["upper_wick_pct_of_range"] = (upper_wick / rng).fillna(0.0)
    stats["lower_wick_pct_of_range"] = (lower_wick / rng).fillna(0.0)
    # Simple flags
    stats["is_doji"] = stats["body_pct_of_range"] < 0.1
    stats["is_marubozu"] = (stats["upper_wick_pct_of_range"] < 0.05) & (stats["lower_wick_pct_of_range"] < 0.05)
    return stats

# ---------- Binance via ccxt (with raw klines to get extra fields)

class BinanceData:
    def __init__(self, rate_limit_ms: int = 1200):
        self.exchange = ccxt.binance({
            "enableRateLimit": True,
            "rateLimit": rate_limit_ms,
            "options": {"defaultType": "spot"},
        })
        # 🔑 make sure markets are loaded once
        self.exchange.load_markets()



    def _raw_klines(self, symbol: str, interval: str, start_ms: int, limit: int = 1000) -> List[List[Any]]:
        # ccxt symbol -> binance market id (e.g., 'BTC/USDT' -> 'BTCUSDT')
        market = self.exchange.market(symbol)
        params = {"symbol": market["id"], "interval": interval, "limit": limit, "startTime": start_ms}
        # Raw endpoint via ccxt's implicit methods
        data = self.exchange.publicGetKlines(params)
        return data  # each row per Binance docs

    def fetch_ohlcv_batched(self, symbol: str, timeframe: str, since_ms: int, until_ms: Optional[int] = None,
                         step_limit: int = 1000, sleep_ms: int = 0) -> pd.DataFrame:
        tf = interval_to_ccxt_tf(timeframe)
        all_rows = []
        next_ms = int(since_ms)
        if until_ms is None:
            until_ms = to_millis(datetime.now(timezone.utc))
        else:
            until_ms = int(until_ms)

        while True:
            rows = self._raw_klines(symbol, tf, next_ms, limit=step_limit)
            if not rows:
                break
            all_rows.extend(rows)

            # rows[-1][6] is the close time (ms). Ensure it's an int before adding 1.
            last_close = int(rows[-1][6])
            next_ms = last_close + 1

            if next_ms >= until_ms:
                break
            if sleep_ms > 0:
                time.sleep(sleep_ms / 1000.0)

        if not all_rows:
            return pd.DataFrame(columns=[
                "open","high","low","close","volume","quote_volume","n_trades","taker_buy_base_vol","taker_buy_quote_vol"
            ])
    
        # Map raw rows per Binance:
        # [0] open time ms
        # [1] open
        # [2] high
        # [3] low
        # [4] close
        # [5] volume (base)
        # [6] close time ms
        # [7] quote asset volume
        # [8] number of trades
        # [9] taker buy base asset volume
        # [10] taker buy quote asset volume
        # [11] ignore

        cols = ["open_time","open","high","low","close","volume","close_time","quote_volume","n_trades",
                "taker_buy_base_vol","taker_buy_quote_vol","_ignore"]
        df = pd.DataFrame(all_rows, columns=cols)

        # Cast numeric fields (some envs return strings)
        numeric_cols = ["open","high","low","close","volume","quote_volume","taker_buy_base_vol","taker_buy_quote_vol"]
        df[numeric_cols] = df[numeric_cols].astype(float)
        df["n_trades"] = df["n_trades"].astype(int)

        # Make sure times are ints → datetimes
        df["open_time"] = pd.to_datetime(df["open_time"].astype("int64"), unit="ms", utc=True)
        df["close_time"] = pd.to_datetime(df["close_time"].astype("int64"), unit="ms", utc=True)

        df = df.set_index("open_time").sort_index()
        return df.drop(columns=["_ignore"])


        


    # ---- Screening: top N USDT pairs by previous-day volume gain (day-over-day)
    def screen_top_by_prevday_volume_gain(self, usdt_only: bool = True, top_n: int = 10,
                                          min_price: float = 0.0, min_vol_quote: float = 0.0) -> List[str]:
        """
        For each symbol, compares daily volume of (D-1) vs (D-2) using daily klines to rank by % gain.
        Filters to */USDT if usdt_only.
        """
        markets = self.exchange.load_markets()
        symbols = [m for m in markets if markets[m]["active"]]
        if usdt_only:
            symbols = [s for s in symbols if s.endswith("/USDT")]
        # Basic sanity filter using current ticker (optional)
        tickers = self.exchange.fetch_tickers(symbols)
        filtered = []
        for s in symbols:
            t = tickers.get(s, {})
            last = t.get("last")
            quote_vol = t.get("quoteVolume")  # 24h
            if (last is None or (min_price and last < min_price) or
                (quote_vol is None or (min_vol_quote and quote_vol < min_vol_quote))):
                continue
            filtered.append(s)

        gains = []
        for s in filtered:
            try:
                df = self.fetch_ohlcv_batched(s, "1d", since_ms=to_millis(datetime.now(timezone.utc) - timedelta(days=10)))
                if len(df) < 3:
                    continue
                vol = df["quote_volume"].dropna()
                # D-1 vs D-2
                gain = (vol.iloc[-2] - vol.iloc[-3]) / max(vol.iloc[-3], 1e-9)
                gains.append((s, gain))
            except Exception:
                continue

        gains.sort(key=lambda x: x[1], reverse=True)
        return [s for s, _ in gains[:top_n]]

# ---------- Pipeline

class CryptoDataPipeline:
    def __init__(self,
                 timeframe: str = "5m",
                 start_date: str = "2024-01-01",
                 symbols: Optional[List[str]] = None,
                 screen_top_prevday_gain: Optional[int] = None,
                 rate_limit_ms: int = 1200):
        """
        timeframe: e.g., '1m','5m','15m','1h','4h','1d'
        start_date: ISO date in UTC
        symbols: explicit list of symbols like ['BTC/USDT','ETH/USDT']
        screen_top_prevday_gain: if provided, screen top-N USDT pairs by previous-day volume gain
        """
        self.timeframe = timeframe
        self.start_ms = to_millis(datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc))
        self.binance = BinanceData(rate_limit_ms=rate_limit_ms)

        if symbols is not None:
            self.symbols = ensure_usdt_symbols(symbols)
        elif screen_top_prevday_gain:
            self.symbols = self.binance.screen_top_by_prevday_volume_gain(usdt_only=True, top_n=screen_top_prevday_gain)
        else:
            raise ValueError("Provide either explicit symbols or screen_top_prevday_gain > 0")

        if not self.symbols:
            raise ValueError("No symbols selected after filtering.")

    def build_dataframe_for_symbol(self, symbol: str) -> pd.DataFrame:
        raw = self.binance.fetch_ohlcv_batched(symbol, self.timeframe, since_ms=self.start_ms, step_limit=1000, sleep_ms=0)
        if raw.empty:
            return raw

        df = raw.copy()
        # Basic OHLCV
        df = df[["open","high","low","close","volume","quote_volume","n_trades","taker_buy_base_vol","taker_buy_quote_vol","close_time"]]
        # Volume in USDT (for USDT pairs quote_volume = already in USDT; fall back to close*volume if missing)
        df["volume_usdt"] = df["quote_volume"].where(df["quote_volume"] > 0, df["close"] * df["volume"])
        # Day of week (Mon=0 ... Sun=6)
        df["dow"] = df.index.dayofweek

        # Technicals
        for span in (9, 20, 100, 200):
            df[f"ema_{span}"] = ema(df["close"], span)

        macd_line, macd_signal, macd_hist = macd(df["close"])
        df["macd_line"] = macd_line
        df["macd_signal"] = macd_signal
        df["macd_hist"] = macd_hist

        df["rsi_14"] = rsi(df["close"], 14)

        # ORB markings & levels
        df = mark_orb(df, self.timeframe)

        # Moving volume percentiles (High/Low/Median proxy) - configurable window
        p20, p50, p80 = rolling_percentiles(df["volume_usdt"], window=50, percentiles=(0.2, 0.5, 0.8))
        df["vol_p20"] = p20
        df["vol_p50"] = p50
        df["vol_p80"] = p80

        # Candlestick characteristics
        stats = candle_stats(df.rename(columns=str))  # ensure columns present
        df = pd.concat([df, stats], axis=1)

        # Bullish/Bearish "trades" proxy:
        # We don't get per-trade direction counts, but Binance klines include taker buy volume.
        # Use taker-buy share of total base volume as a proxy of bullish activity.
        with np.errstate(divide='ignore', invalid='ignore'):
            df["taker_buy_share"] = (df["taker_buy_base_vol"] / df["volume"]).replace([np.inf, -np.inf], np.nan).fillna(0.0)
        # If you want a ratio, map to bullish:bearish by volume share:
        df["bull_bear_vol_ratio"] = df["taker_buy_share"] / (1 - df["taker_buy_share"] + 1e-9)

        # Label
        df["symbol"] = symbol

        # Housekeeping
        # Keep a tidy column order (you can adjust as needed)
        preferred = [
            "symbol","open","high","low","close","volume","volume_usdt","quote_volume","n_trades",
            "taker_buy_base_vol","taker_buy_quote_vol","taker_buy_share","bull_bear_vol_ratio",
            "dow","ema_9","ema_20","ema_100","ema_200",
            "macd_line","macd_signal","macd_hist","rsi_14",
            "is_orb","orb_high","orb_low","orb_mid","L1_bull","L2_bull","L1_bear","L2_bear",
            "vol_p20","vol_p50","vol_p80",
            "candle_color","body","range","upper_wick","lower_wick",
            "body_pct_of_range","upper_wick_pct_of_range","lower_wick_pct_of_range",
            "is_doji","is_marubozu","close_time"
        ]
        # Some may be missing early in series; fill reindex with existing
        cols = [c for c in preferred if c in df.columns] + [c for c in df.columns if c not in preferred]
        return df[cols]

    def run(self) -> pd.DataFrame:
        frames = []
        for s in self.symbols:
            try:
                frames.append(self.build_dataframe_for_symbol(s))
            except Exception as e:
                print(f"[WARN] {s} failed: {e}")
        if not frames:
            return pd.DataFrame()
        out = pd.concat(frames).sort_index()
        # Optional: drop very-early NaNs from long EMAs
        return out

# ---------- Example usage
if __name__ == "__main__":
    """
    Choose ONE of:
    1) Explicit symbols:
       pipeline = CryptoDataPipeline(timeframe="5m", start_date="2024-06-01", symbols=["BTC/USDT","ETH/USDT"])
    2) Screen top 10 USDT pairs by previous-day volume gain:
       pipeline = CryptoDataPipeline(timeframe="5m", start_date="2024-06-01", screen_top_prevday_gain=10)
    """
    pipeline = CryptoDataPipeline(
        timeframe="5m",
        start_date="2024-06-01",
        symbols=["BTC/USDT","ETH/USDT"]
        # or: screen_top_prevday_gain=10
    )
    df = pipeline.run()
    print(df.tail(10))


                             symbol       open       high        low  \
open_time                                                              
2025-09-24 15:25:00+00:00  ETH/USDT    4195.84    4196.56    4192.66   
2025-09-24 15:25:00+00:00  BTC/USDT  113672.11  113712.05  113644.40   
2025-09-24 15:30:00+00:00  ETH/USDT    4192.67    4193.22    4188.27   
2025-09-24 15:30:00+00:00  BTC/USDT  113650.15  113650.16  113564.17   
2025-09-24 15:35:00+00:00  ETH/USDT    4189.20    4196.13    4188.99   
2025-09-24 15:35:00+00:00  BTC/USDT  113574.60  113686.98  113574.59   
2025-09-24 15:40:00+00:00  ETH/USDT    4192.64    4196.37    4191.08   
2025-09-24 15:40:00+00:00  BTC/USDT  113646.70  113708.15  113608.50   
2025-09-24 15:45:00+00:00  BTC/USDT  113708.14  113708.15  113654.93   
2025-09-24 15:45:00+00:00  ETH/USDT    4192.14    4192.44    4186.76   

                               close      volume   volume_usdt  quote_volume  \
open_time                                              