In [123]:
import pandas_market_calendars as mcal
import pandas as pd
import random 
import numpy as np

# 创建 NYSE 日历
nyse = mcal.get_calendar("NYSE")

# 生成交易日（例如 1990 到 2030）
schedule = nyse.schedule(start_date="1990-01-01", end_date="2030-12-31")

# 得到实际交易日数组
nyse_dates = mcal.date_range(schedule, frequency="1D")
nyse_dates = nyse_dates.tz_localize(None)


In [132]:
import warnings, logging
import matplotlib.pyplot as plt
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", message="Glyph .* missing from font", category=UserWarning)
def no_path_warning_format(message, category, filename, lineno, line=None):
    return f"{category.__name__}: {message}\n"

warnings.formatwarning = no_path_warning_format
logging.getLogger().setLevel(logging.ERROR)
plt.rcParams['font.sans-serif'] = ['SimHei']  
plt.rcParams['axes.unicode_minus'] = False     

### Strategy Logger

In [135]:
import os
import threading
import queue
import pandas as pd
from datetime import datetime


# ============================================================
#  Async Writer Thread
# ============================================================

class AsyncWriterThread(threading.Thread):
    """
    后台异步写日志线程：
        - 主线程不断 push log 到队列
        - 异步线程后台 flush 到文件
    """

    def __init__(self, log_queue, flush_interval, base_dir):
        super().__init__(daemon=True)
        self.log_queue = log_queue
        self.flush_interval = flush_interval
        self.base_dir = base_dir
        self.running = True

        self.buffer = []         # 临时日志缓存
        self.buffer_count = 0    # 累计条数，用来触发 flush

    def write_to_disk(self, logs):
        if not logs:
            return

        df = pd.DataFrame(logs)

        # 旋转文件夹
        dstr = datetime.utcnow().strftime("%Y-%m-%d")
        day_dir = os.path.join(self.base_dir, dstr)
        os.makedirs(day_dir, exist_ok=True)

        fname = os.path.join(day_dir, "async_logs.csv")

        # 写入模式：追加（append）
        df.to_csv(fname, mode='a', header=not os.path.exists(fname), index=False)

    def run(self):
        while self.running:
            try:
                log = self.log_queue.get(timeout=1)

                if log == "__FLUSH__":
                    # 强制 flush
                    self.write_to_disk(self.buffer)
                    self.buffer = []
                    self.buffer_count = 0
                    continue

                # 写入缓存
                self.buffer.append(log)
                self.buffer_count += 1

                # 超过 interval → flush
                if self.buffer_count >= self.flush_interval:
                    self.write_to_disk(self.buffer)
                    self.buffer = []
                    self.buffer_count = 0

            except queue.Empty:
                # 没有新任务，检查是否需要 flush
                if self.buffer_count > 0:
                    self.write_to_disk(self.buffer)
                    self.buffer = []
                    self.buffer_count = 0

    def stop(self):
        self.running = False
        # Flush before exit
        self.write_to_disk(self.buffer)
        self.buffer = []


# ============================================================
#  StrategyLogger Advanced
# ============================================================

class StrategyLogger:
    """
    StrategyLogger (Advanced Version)
    ---------------------------------
    扩展功能:
        ✔ flush_interval (避免内存暴涨)
        ✔ log rotation (按日生成文件夹)
        ✔ async 异步写入 (不阻塞主策略)
    """

    def __init__(
        self,
        strategy_name="strategy",
        log_dir="./log",
        async_mode=True,
        flush_interval=5000
    ):
        self.strategy_name = strategy_name
        self.log_dir = os.path.join(log_dir, f"strategy_{strategy_name}")
        os.makedirs(self.log_dir, exist_ok=True)

        # Async 模式
        self.async_mode = async_mode
        self.flush_interval = flush_interval

        # 队列: 主线程 push, 异步线程 pull
        self.log_queue = queue.Queue()

        # 启动异步线程
        if self.async_mode:
            self.writer_thread = AsyncWriterThread(
                log_queue=self.log_queue,
                flush_interval=self.flush_interval,
                base_dir=self.log_dir
            )
            self.writer_thread.start()

        # Memory logs（不用于写文件，仅用于快速调试）
        self.signal_logs = []
        self.universe_logs = []
        self.portfolio_logs = []
        self.error_logs = []
        self.feature_logs = {}
        self.raw_signal_logs = {}
        self.filtered_signal_logs = {}

    # ============================================================
    # Unified Log Method → push to queue
    # ============================================================

    def _push_log(self, log_dict, category="generic"):
        """
        push log to async queue
        """
        if self.async_mode:
            log_dict["category"] = category
            self.log_queue.put(log_dict)
        else:
            # fallback to memory logs only
            if category == "signal":
                self.signal_logs.append(log_dict)
            elif category == "portfolio":
                self.portfolio_logs.append(log_dict)
            elif category == "universe":
                self.universe_logs.append(log_dict)
            elif category == "error":
                self.error_logs.append(log_dict)

    # ============================================================
    # Logging Methods
    # ============================================================
    def log_signal(self, date, symbol, signal, action, old_weight, new_weight, close_only=False, cooldown_left=0):
        self._push_log({
            "date": pd.Timestamp(date),
            "symbol": symbol,
            "signal": signal,
            "action": action,
            "old_weight": old_weight,
            "new_weight": new_weight,
            "close_only": close_only,
            "cooldown_left": cooldown_left
        }, category="signal")

    def log_portfolio(self, date, portfolio_dict):
        self._push_log({
            "date": pd.Timestamp(date),
            **portfolio_dict
        }, category="portfolio")

    def log_universe(self, date, symbol, in_universe, close_only=False, has_position=False):
        self._push_log({
            "date": pd.Timestamp(date),
            "symbol": symbol,
            "in_universe": int(in_universe),
            "close_only": int(close_only),
            "has_position": int(has_position)
        }, category="universe")

    def log_error(self, msg):
        self._push_log({"error": msg}, category="error")

    # Feature logs are stored in-memory only (not frequent)
    def log_feature(self, tic, df):
        self.feature_logs[tic] = df.copy()

    def log_raw_signal(self, tic, sig):
        self.raw_signal_logs[tic] = sig.copy()

    def log_filtered_signal(self, df):
        self.filtered_signal_logs["signal_df"] = df.copy()

    # ============================================================
    # Force flush
    # ============================================================
    def flush(self):
        """
        强制刷写日志到文件
        """
        if self.async_mode:
            self.log_queue.put("__FLUSH__")

    # ============================================================
    # Terminate writer thread
    # ============================================================
    def close(self):
        if self.async_mode:
            self.writer_thread.stop()



###  UniverseManager

In [146]:
import pandas as pd

class UniverseManager:
    """
    UniverseManager (Final Version, Logger-compatible)
    --------------------------------------------------
    * 自动从季度选股生成日度股票池
    * 不关心持仓
    * 只负责 in_universe 的判断
    * 事件日志兼容增强版 StrategyLogger
    """

    def __init__(
        self,
        stock_selection_df,
        col_map,
        trading_calendar,
        logger=None,
        backtest_start=None,
        backtest_end=None,
    ):
        self.logger = logger
        self.trading_calendar = pd.DatetimeIndex(sorted(trading_calendar))

        # === save backtest start and end ===
        self.backtest_start = pd.to_datetime(backtest_start) if backtest_start else None
        self.backtest_end   = pd.to_datetime(backtest_end) if backtest_end else None

        # -----------------------------
        # map column names
        # -----------------------------
        df = stock_selection_df.copy()
        df = df.rename(columns={
            col_map["tic_name"]: "tic_name",
            col_map["trade_date"]: "trade_date"
        })
        df["trade_date"] = pd.to_datetime(df["trade_date"])

        # === select backtest period ===
        if self.backtest_start is not None:
            df = df[df["trade_date"] >= self.backtest_start]

        if self.backtest_end is not None:
            df = df[df["trade_date"] <= self.backtest_end]


        # -----------------------------
        # build daily universe_df
        # -----------------------------
        self.universe_df = self._build_universe(df)

        # -----------------------------
        # build fast index
        # -----------------------------
        self.universe_map = self._build_fast_index(self.universe_df)

        # save yesterday's universe, for IN / OUT judgment
        self.prev_universe = set()
        
        # === log feedback ===
        if self.logger:
            self.logger.log_error(
                f"[UniverseManager] Loaded {len(self.universe_df)} daily rows, "
                f"backtest=[{self.backtest_start} ~ {self.backtest_end}]"
            )

    # ============================================================
    # Internal Helpers
    # ============================================================

    def _next_trade_date(self, date):
        date = pd.Timestamp(date)
        pos = self.trading_calendar.searchsorted(date, side="right")
        if pos >= len(self.trading_calendar):
            return None
        return self.trading_calendar[pos]

    def _build_universe(self, df):
        df = df.copy()
        df["activate_date"] = df["trade_date"].apply(self._next_trade_date)
        df = df.dropna(subset=["activate_date"])

        df = df.sort_values(["trade_date", "tic_name"])
        quarters = df.groupby("trade_date")

        trade_dates = sorted(df["trade_date"].unique())
        activate_dates = [self._next_trade_date(d) for d in trade_dates]

        deactivate_map = {}
        for i in range(len(activate_dates)-1):
            deactivate_map[activate_dates[i]] = activate_dates[i+1]

        max_date = self.trading_calendar.max() + pd.Timedelta(days=1)
        deactivate_map[activate_dates[-1]] = max_date

        # ⬇ build daily universe
        records = []

        for trade_date, group in quarters:
            act_date = self._next_trade_date(trade_date)
            deact_date = deactivate_map[act_date]

            tics = group["tic_name"].tolist()
            mask = (self.trading_calendar >= act_date) & (self.trading_calendar < deact_date)
            active_days = self.trading_calendar[mask]

            for d in active_days:
                for tic in tics:
                    records.append({
                        "date": d,
                        "tic_name": tic,
                        "in_universe": 1
                    })

        universe_df = (
            pd.DataFrame(records)
              .drop_duplicates(["date", "tic_name"])
              .sort_values(["date", "tic_name"])
        )
        return universe_df

    def _build_fast_index(self, universe_df):
        fast = {}
        for date, grp in universe_df.groupby("date"):
            fast[pd.Timestamp(date)] = set(grp["tic_name"].tolist())
        return fast

    # ============================================================
    # Public API
    # ============================================================

    def is_in_universe(self, tic_name, date):
        date = pd.Timestamp(date)
        tics = self.universe_map.get(date)
        if tics is None:
            return False
        return tic_name in tics

    def get_universe(self, date):
        date = pd.Timestamp(date)
        return self.universe_map.get(date, set())

    # ============================================================
    # Universe Logging (IN / OUT)
    # ============================================================

    def log_universe_events_for_date(self, date):
        """
        仅记录股票池的 IN / OUT（Execution 决定 close-only）
        """
        if self.logger is None:
            return

        date = pd.Timestamp(date)
        today_u = self.get_universe(date)

        added = today_u - self.prev_universe
        removed = self.prev_universe - today_u

        # --- modified: use logger's compatible signature ---
        for tic in sorted(added):
            self.logger.log_universe(
                date=date,
                symbol=tic,
                in_universe=1,
                close_only=False,
                has_position=False
            )

        for tic in sorted(removed):
            self.logger.log_universe(
                date=date,
                symbol=tic,
                in_universe=0,
                close_only=False,
                has_position=False
            )
        # --- end of modification ---

        self.prev_universe = today_u.copy()



### ExecutionManager

In [167]:
import pandas as pd
from typing import Dict, Optional


class ExecutionManager:
    """
    ExecutionManager (Final Version)
    --------------------------------
    职责：
      - 根据 Universe + 日度信号（-1/0/1）生成每日目标权重矩阵
      - 支持：
          * 冷静期（卖出后 X 天不能再开仓）
          * 调仓频率（日频 / 月频，可扩展）
          * 组合层面约束：max_positions / max_weight / min_weight / gross_leverage
          * close-only（退出股票池后只允许减仓/平仓，不再加仓）
      - 可选接入 logger 记录信号与权重变化

    不负责：
      - PnL 计算
      - 手数 / 资金换算
      - 回测窗口裁剪（由外部控制）
    """

    def __init__(
        self,
        universe_mgr,
        max_positions: int = 20,
        max_weight: float = 0.20,
        min_weight: float = 0.05,
        weight_step: float = 0.05,
        allow_short: bool = True,
        gross_leverage: float = 1.0,
        cooling_days: int = 0,
        rebalance_freq: str = "D",  # "D"（日频）或者 "M"（月频）,"W"（周频），未来可扩展
        logger: Optional[object] = None,
        ratio: float = 1.0,           # 最大可用资金比例
        seed: int = 42                # 固定随机性
    ):
        """
        Parameters
        ----------
        universe_mgr : UniverseManager
            已初始化好的 UniverseManager，用于判断股票是否在池子
        max_positions : int
            最大持仓股票数（按 |weight| > 0 计）
        max_weight : float
            单个股票最大绝对权重（如 0.2 = 20%）
        min_weight : float
            单个股票最小非零绝对权重（如 0.05 = 5%），低于此绝对值直接视为 0
        weight_step : float
            每次调整权重的步长（如 0.05）
        allow_short : bool
            是否允许做空（weight < 0）
        gross_leverage : float
            组合总 |weight| 之和上限（如 1.0 = 100%）
        cooling_days : int
            冷静期天数：卖出 / 平仓后需要等待的交易日数，期间禁止重新开仓
        rebalance_freq : str
            调仓频率：
              - "D"：每日调仓
              - "M"：每月调仓（默认用月内第二个交易日）
              - "W"：每周调仓（默认用周内第一个交易日）
              - 可预留扩展 "W"、"intraday" 等
        logger : object or None
            可选日志对象，需实现 log_signal(...)
        """
        self.universe_mgr = universe_mgr
        self.max_positions = int(max_positions)
        self.max_weight = float(max_weight)
        self.min_weight = float(min_weight)
        self.weight_step = float(weight_step)
        self.allow_short = allow_short
        self.gross_leverage = float(gross_leverage)
        self.cooling_days = int(cooling_days)
        self.rebalance_freq = rebalance_freq.upper()
        self.logger = logger
        self.ratio = ratio
        random.seed(seed)

        # 当前目标权重：tic_name -> weight (可为负，表示空头)
        self.current_weights: Dict[str, float] = {}

        # 冷静期计数器：tic_name -> 剩余冷静天数
        self.cooldown: Dict[str, int] = {}

        # 上一日日期，用于 close-only 判断
        self.prev_date: Optional[pd.Timestamp] = None

    def set_rebalance_frequency(self, freq: str):
        """
        freq: 'D' / 'W' / 'M'
        """
        self.rebalance_freq = freq.upper()
    # =========================================================
    # 公共主入口：根据 signal_df 生成全历史权重矩阵
    # =========================================================
    def generate_weight_matrix(self, signal_df: pd.DataFrame) -> pd.DataFrame:
        """
        generate a weight matrix (index=date, columns=tic_name, value=weight)
        based on the daily signal_df (index=date, columns=tic_name, value=-1/0/1)

        Parameters
        ----------
        signal_df : pd.DataFrame
            index：日期（DatetimeIndex 或可转为 Timestamp）
            columns：tic_name
            values：-1 / 0 / 1

        Returns
        -------
        weights_df : pd.DataFrame
            index：日期
            columns：tic_name
            values：权重（float）
        """
        dates = sorted(pd.to_datetime(signal_df.index.unique()))
        all_tics = sorted(signal_df.columns.unique())

        records = []


        for dt in dates:
            # get the signal of the day
            row = signal_df.loc[dt]
            if isinstance(row, pd.DataFrame):
                # if there are multiple rows (uncommon), take the first row
                signal_series = row.iloc[0]
            else:
                signal_series = row

            self.step(dt, signal_series)

            # record the weights of the day
            row_weights = {tic: self.current_weights.get(tic, 0.0) for tic in all_tics}
            row_weights["date"] = pd.Timestamp(dt)
            records.append(row_weights)
        #  calculate the target weight matrix

        weights_df = pd.DataFrame(records).set_index("date").sort_index()

        if hasattr(self, "_compute_target_weights"):
            try:
                target_df = self._compute_target_weights(signal_df)
                # align the index and columns (take the intersection to avoid column inconsistency)
                target_df = target_df.reindex_like(weights_df).fillna(0.0)
                # use the target weights to cover the current weights matrix
                weights_df.update(target_df)
                if self.logger:
                    self.logger.log_info("[ExecutionManager] Applied _compute_target_weights successfully.")
            except Exception as e:
                if self.logger:
                    self.logger.log_error(f"[ExecutionManager] _compute_target_weights failed: {e}")
                else:
                    print(f"[WARN] _compute_target_weights failed: {e}")

        return weights_df

    # frequency control of rebalance
    def _should_rebalance(self, date: pd.Timestamp) -> bool:
        """
        based on the rebalance_freq, determine if the current date needs to rebalance.
        currently supported:
          - "D"：Day 
          - "M"：Month 
        """
        date = pd.Timestamp(date)

        if self.rebalance_freq == "D":
            return True

        if self.rebalance_freq == "W":
            cal = self.universe_mgr.trading_calendar
            # find the trading days of the week
            week_dates = [d for d in cal
                          if d.isocalendar()[1] == date.isocalendar()[1]
                          and d.year == date.year]
            if not week_dates:
                return False
            week_dates = sorted(week_dates)
            return date.normalize() == week_dates[0].normalize()

        if self.rebalance_freq == "M":
            cal = self.universe_mgr.trading_calendar
            month_dates = [d for d in cal if d.year == date.year and d.month == date.month]
            if not month_dates:
                return False
            month_dates = sorted(month_dates)
            # 第二个交易日 & 月末最后一个交易日
            second_day = month_dates[1] if len(month_dates) >= 2 else month_dates[0]
            last_day = month_dates[-1]
            return date.normalize() in [
                pd.Timestamp(second_day).normalize(),
                pd.Timestamp(last_day).normalize()
            ]

    # daily execution logic: update self.current_weights
    def step(self, date, signal_series: pd.Series):
        """
        single day execution logic:
          1. Decrement cooldown period for each stock.
          2. Check if today is a rebalance day based on the strategy's settings.
          3. If today is a rebalance day:
             - Adjust weights according to Universe membership, signals, close-only rule, and cooldown status
             - Apply portfolio constraints such as max_positions and gross_leverage
        """
        date = pd.Timestamp(date)
        signals = signal_series.to_dict()  # tic -> -1/0/1

        #  decrement the cooldown period for each stock
        for tic in list(self.cooldown.keys()):
            if self.cooldown[tic] > 0:
                self.cooldown[tic] -= 1

        #  update prev_date (for close-only judgment)
        prev_date = self.prev_date
        self.prev_date = date

        # if not a rebalance day, do not change the weights (cooldown period still decrements)
        if not self._should_rebalance(date):
            return

        # the universe of stocks that are allowed to open new positions today
        today_universe = self.universe_mgr.get_universe(date)

        current_positions = {tic for tic, w in self.current_weights.items() if abs(w) > 0}

        # all the stocks that need to be considered: have signal or have positions
        all_tics = sorted(set(signals.keys()) | current_positions)

        new_weights = self.current_weights.copy()

        for tic in all_tics:
            old_w = float(self.current_weights.get(tic, 0.0))
            sig = int(signals.get(tic, 0))

            # cooldown status
            cd = int(self.cooldown.get(tic, 0))
            has_pos = abs(old_w) > 0

            in_uni_today = tic in today_universe
            in_uni_yday = False
            if prev_date is not None:
                in_uni_yday = self.universe_mgr.is_in_universe(tic, prev_date)

            # close-only: yesterday in the pool & today not in the pool & still have positions
            close_only = in_uni_yday and (not in_uni_today) and has_pos

            # if no positions and in cooldown period, do not open new positions (regardless of the signal)
            if (not has_pos) and cd > 0:
                effective_sig = 0
            else:
                effective_sig = sig

            # decide the target direction today (0/+1/-1)
            if effective_sig == 0:
                # signal is 0: immediately close
                new_w = 0.0

            elif close_only:
                # close-only: do not open new positions; only keep the original position
                # if the signal turns to 0, close (already covered above)
                new_w = old_w
                print(f"[CLOSE-ONLY] {tic} keep position {old_w:.2f} (still have positions in the pool)")

            elif effective_sig > 0 and in_uni_today:
                target_sign = 1
                new_w = self._update_weight_one_name(
                    old_weight=old_w, target_sign=target_sign, close_only=False, target_weight=self.max_weight, 
                )

            elif effective_sig < 0 and in_uni_today and self.allow_short:
                target_sign = -1
                new_w = self._update_weight_one_name(
                    old_weight=old_w, target_sign=target_sign, close_only=False,target_weight=self.max_weight, 
                )

            else:
                # not in the universe today & no positions → force 0
                new_w = 0.0

            # update the weight of the day
            new_weights[tic] = new_w

            # if the position changes from non-zero to 0 → start the cooldown period
            if (abs(old_w) > 0) and (abs(new_w) == 0) and (self.cooling_days > 0):
                self.cooldown[tic] = self.cooling_days

            # log
            if self.logger is not None:
                if abs(old_w - new_w) > 1e-8:
                    action = "HOLD"
                    if old_w == 0 and new_w != 0:
                        action = "OPEN_LONG" if new_w > 0 else "OPEN_SHORT"
                    elif old_w != 0 and new_w == 0:
                        action = "CLOSE"
                    elif old_w * new_w < 0:
                        action = "FLIP"
                    else:
                        action = "ADJUST"

                    self.logger.log_signal(
                        date=date,
                        symbol=tic,
                        signal=effective_sig,
                        action=action,
                        old_weight=old_w,
                        new_weight=new_w,
                        close_only=close_only,
                        cooldown_left=self.cooldown.get(tic, 0),
                    )

        # ========= portfolio level constraints =========

        # 1) limit the number of positions
        nz = [(tic, w) for tic, w in new_weights.items() if abs(w) > 0]
        if len(nz) > self.max_positions:
            # sort by |weight| in descending order, keep the top max_positions
            nz_sorted = sorted(nz, key=lambda x: abs(x[1]), reverse=True)
            keep = {tic for tic, _ in nz_sorted[: self.max_positions]}
            for tic, w in nz:
                if tic not in keep:
                    new_weights[tic] = 0.0

        # 2) limit the total leverage (by the sum of absolute values)
        gross = sum(abs(w) for w in new_weights.values())
        if gross > 0 and gross > self.gross_leverage:
            scale = self.gross_leverage / gross
            for tic in new_weights:
                new_weights[tic] *= scale

        self.current_weights = new_weights

    # single stock weight adjustment logic
    def _update_weight_one_name(
        self,
        old_weight: float,
        target_sign: int,
        close_only: bool,
        target_weight: float,
    ) -> float:
        w = float(old_weight)

        # in close-only mode, only reduce the position
        if close_only and target_sign != 0:
            return w  # do not open new positions

        if target_sign == 0:
            # immediately close
            return 0.0

        # open new positions or add positions or flip positions: directly set the target position
        return target_sign * target_weight

    def _apply_min_weight_threshold(self, w: float) -> float:
        """
            when the absolute value is less than min_weight, directly treat it as 0 (close),
            to prevent "dirty positions" like 0.01.
        """
        if abs(w) < self.min_weight:
            return 0.0
        return w
    def _compute_target_weights(self, signal_df: pd.DataFrame) -> pd.DataFrame:
        """
        根据 signal_df 计算目标权重矩阵，遵守以下约束：
        1. 单股最大仓位 ≤ 20%
        2. 单股最小仓位 ≥ 2%
        3. 卖空仓位也计算在总仓位中（取绝对值）
        4. ratio 控制总可用仓位比例（默认 1.0）
        5. 最多 20 只股票（超出则随机抽取）
        6. 新股票只用剩余仓位买入，不调整已有仓位
        7. 每月最后一个交易日做等权 Rebalance
        """
        random.seed(42)
        max_weight = self.max_weight
        min_weight = self.min_weight
        ratio = getattr(self, "ratio", 1.0)
        max_positions = self.max_positions

        dates = sorted(pd.to_datetime(signal_df.index.unique()))
        all_tics = signal_df.columns
        weights_target = pd.DataFrame(index=dates, columns=all_tics, dtype=float).fillna(0.0)

        current_holdings = set()
        last_weights = pd.Series(0.0, index=all_tics)   #


        for date in dates:
            cal = self.universe_mgr.trading_calendar
            month_dates = [d for d in cal if d.year == date.year and d.month == date.month]
            if not month_dates:
                continue
            month_dates = sorted(month_dates)

            # 第二个交易日与月底
            signal_day = month_dates[1] if len(month_dates) >= 2 else month_dates[0]
            last_day = month_dates[-1]
            is_signal_day = date.normalize() == pd.Timestamp(signal_day).normalize()
            is_month_end = date.normalize() == pd.Timestamp(last_day).normalize()

            # --- 每月第二个交易日：根据信号建仓 ---
            if is_signal_day:
                row = signal_df.loc[date] if date in signal_df.index else None
                if row is None:
                    weights_target.loc[date] = last_weights
                    continue

                active_tics = [tic for tic, sig in row.items() if sig != 0]
                if not active_tics:
                    weights_target.loc[date] = last_weights
                    continue

                if len(active_tics) > max_positions:
                    active_tics = random.sample(active_tics, max_positions)

                equal_w = min(max_weight, max(min_weight, ratio / len(active_tics)))
                new_weights = pd.Series(0.0, index=all_tics)
                for tic in active_tics:
                    new_weights[tic] = row[tic] * equal_w

                last_weights = new_weights.copy()
                weights_target.loc[date] = last_weights
                current_holdings = set(active_tics)

            # --- 月底 Rebalance ---
            elif is_month_end and len(current_holdings) > 0:
                equal_w = min(max_weight, max(min_weight, ratio / len(current_holdings)))
                new_weights = pd.Series(0.0, index=all_tics)
                for tic in current_holdings:
                    sig = 1 if last_weights.get(tic, 0) >= 0 else -1
                    new_weights[tic] = sig * equal_w

                last_weights = new_weights.copy()
                weights_target.loc[date] = last_weights

            # --- 其他日期：延续上次仓位 ---
            else:
                weights_target.loc[date] = last_weights

        # 最后再清理
        weights_target = weights_target.fillna(0.0)
        return weights_target


# base signaling 

In [163]:
import os
import pandas as pd
from typing import Dict, Optional, Iterable

class BaseSignalEngine:
    """
    BaseSignalEngine (Rebuilt Clean Version)
    ---------------------------------------
    负责：
        ✓ 多文件/单文件读取
        ✓ chunk 加载
        ✓ 字段映射 col_map
        ✓ 为每个 tic 调用 generate_signal_one_ticker()
        ✓ 与 Universe / Position 做基本过滤
    """

    def __init__(
        self,
        strategy_name="default",
        col_map=None,
        universe_mgr=None,
        logger=None,
        chunk_size=200000,
        multi_file=True,
        #signal are generated in this period
        signal_start_date=None,
        signal_end_date=None,

        # data are read in this period
        data_start_date=None,
        data_end_date=None
    ):
        self.strategy_name = strategy_name
        self.universe_mgr = universe_mgr
        self.chunk_size = chunk_size
        self.multi_file = multi_file
        #  time parameters are parsed in advance
        self.signal_start_date = pd.to_datetime(signal_start_date) if signal_start_date else None
        self.signal_end_date   = pd.to_datetime(signal_end_date) if signal_end_date else None
        self.data_start_date   = pd.to_datetime(data_start_date) if data_start_date else None
        self.data_end_date     = pd.to_datetime(data_end_date) if data_end_date else None

        # 统一内部列名
        self.col_map = col_map or {
            "datetime": "date",
            "open": "open",
            "high": "high",
            "low": "low",
            "close": "close",
            "volume": "volume",
            "tic": "tic"
        }

        self.logger = logger or StrategyLogger(strategy_name)

    # ===============================================================
    # 多文件模式：每个股票一个 CSV
    # ===============================================================
    def load_price_data_multi_file(self, folder, tics):
        price_dict = {}

        for tic in tics:
            path = os.path.join(folder, f"{tic}_daily.csv")
            if not os.path.exists(path):
                self.logger.log_error(f"[WARN] Missing file: {path}")
                continue

            print(f"[READ] {path} ...")

            chunks = []
            for chunk in pd.read_csv(path, chunksize=self.chunk_size):

                # rename_map：内部名 ← 文件名
                rename_map = {
                    file_col: internal_col
                    for internal_col, file_col in self.col_map.items()
                    if file_col in chunk.columns
                }
                chunk = chunk.rename(columns=rename_map)

                # 强制加入 tic
                chunk["tic"] = tic

                # 统一 datetime
                if "datetime" in chunk.columns:
                    chunk["date"] = pd.to_datetime(chunk["datetime"])
                    chunk.drop(columns=["datetime"], inplace=True)   # === NEW === 删除冗余列
                elif "date" in chunk.columns:
                    chunk["date"] = pd.to_datetime(chunk["date"])
                else:
                    raise ValueError(f"{path} 缺少 date/datetime 列")
                # === data time filter ===
                if self.data_start_date is not None:
                    chunk = chunk[chunk["date"] >= self.data_start_date]
                if self.data_end_date is not None:
                    chunk = chunk[chunk["date"] <= self.data_end_date]

                # === skip empty chunk ===
                if chunk.empty:
                    continue

                chunks.append(chunk)

            df = pd.concat(chunks, ignore_index=True)
            df = df.sort_values("date")

            price_dict[tic] = df
            print(f"      ✓ loaded {len(df)} rows for {tic}")

        return price_dict

    # ===============================================================
    # 单文件模式（很少使用）
    # ===============================================================
    def load_price_data_single_file(self, filepath):
        print(f"[READ] Big file in chunks: {filepath}")

        chunks = []
        for chunk in pd.read_csv(filepath, chunksize=self.chunk_size):
            rename_map = {
                file_col: internal_col
                for internal_col, file_col in self.col_map.items()
                if file_col in chunk.columns
            }
            chunk = chunk.rename(columns=rename_map)

            if "datetime" in chunk.columns:
                chunk["date"] = pd.to_datetime(chunk["datetime"])
                chunk.drop(columns=["datetime"], inplace=True)   # === NEW === 删除冗余列

            elif "date" in chunk.columns:
                chunk["date"] = pd.to_datetime(chunk["date"])
            else:
                raise ValueError("文件缺少 date/datetime 列")

            # === data time filter ===
            if self.data_start_date is not None:
                chunk = chunk[chunk["date"] >= self.data_start_date]
            if self.data_end_date is not None:
                chunk = chunk[chunk["date"] <= self.data_end_date]

            # === skip empty chunk ===
            if chunk.empty:
                continue


            chunks.append(chunk)

        df = pd.concat(chunks, ignore_index=True)
        df = df.sort_values(["tic", "date"])
        return df
        
    # expand signal to daily, weekly, monthly

    def _expand_signal_to_daily(self, signal_df):
        freq = self.get_signal_frequency()

        # 需要 trading calendar，由 UniverseManager 提供
        cal = pd.DatetimeIndex(self.universe_mgr.trading_calendar)

        # -------- 日频：不需要扩展 --------
        if freq == "D":
            return signal_df.reindex(cal).fillna(0)

        # -------- 周频：覆盖至下次周信号 --------
        if freq == "W":
            idx = signal_df.index
            next_idx = list(idx[1:]) + [idx[-1] + pd.Timedelta(days=7)]
            
            records = []
            for start, end in zip(idx, next_idx):
                mask = (cal >= start) & (cal < end)
                for d in cal[mask]:
                    s = signal_df.loc[start]
                    records.append( (d, s) )

            out = pd.DataFrame({"date": [r[0] for r in records]}).set_index("date")
            for col in signal_df.columns:
                out[col] = [r[1][col] for r in records]
            return out

        # -------- 月频：覆盖至下次月信号 --------
        if freq == "M":
            idx = signal_df.index
            next_idx = list(idx[1:]) + [idx[-1] + pd.offsets.MonthEnd(1)]

            records = []
            for start, end in zip(idx, next_idx):
                mask = (cal >= start) & (cal < end)
                for d in cal[mask]:
                    s = signal_df.loc[start]
                    records.append((d, s))

            out = pd.DataFrame({"date": [r[0] for r in records]}).set_index("date")
            for col in signal_df.columns:
                out[col] = [r[1][col] for r in records]
            return out

        raise ValueError(f"Unsupported signal freq: {freq}")

    # ===============================================================
    # 主方法：生成 signal_df（date × tic）
    # ===============================================================
    def compute_signals(self, price_source, tics, position_df=None):

        # ---- Step 1: 读入 ----
        if self.multi_file:
            price_dict = self.load_price_data_multi_file(price_source, tics)
            full_df = pd.concat(price_dict.values(), ignore_index=True)
        else:
            full_df = self.load_price_data_single_file(price_source)
        # === data time filter ===
        if self.data_start_date is not None:
            full_df = full_df[full_df["date"] >= self.data_start_date]
        if self.data_end_date is not None:
            full_df = full_df[full_df["date"] <= self.data_end_date]

        # ---- Step 2: 当前持仓 ----
        positions = {}
        if position_df is not None and len(position_df) > 0:
            positions = dict(zip(position_df["tic"], position_df["weight"]))

        # ---- Step 3: 为每只股票生成信号 ----
        signal_list = []
        for tic in tics:
            sub = full_df[full_df["tic"] == tic]
            if sub.empty:
                continue

            sig = self.generate_signal_one_ticker(sub)

            # === NEW: 信号时间过滤 ===
            if self.signal_start_date is not None:
                sig = sig[sig.index >= self.signal_start_date]
            if self.signal_end_date is not None:
                sig = sig[sig.index <= self.signal_end_date]
            sig.name = tic
            signal_list.append(sig)
            self.logger.log_raw_signal(tic, sig)

        signal_df = pd.concat(signal_list, axis=1).fillna(0)
        signal_df.to_csv("./log/signal_df.csv")
        final_df = self._expand_signal_to_daily(signal_df)
        # =========================================================
        # filter daily signals by universe 
        # =========================================================
        if self.universe_mgr is not None:
            univ_mgr = self.universe_mgr

            # get all trading dates
            dates = final_df.index

            # get all columns (tic)
            all_tics = final_df.columns

            # build a mask for each date, set the signal of stocks not in the universe to 0
            mask_matrix = []
            for d in dates:
                todays_universe = univ_mgr.get_universe(d)
                mask = all_tics.isin(todays_universe)  # True=keep，False=0
                
                if hasattr(mask, 'values'):
                    mask_matrix.append(mask.values)
                else:
                    # 如果 mask 已经是 numpy array，直接 append
                    mask_matrix.append(mask)
            mask_matrix = np.vstack(mask_matrix)  # shape=(n_dates, n_tics)

            # use mask to filter the signal of stocks not in the universe
            final_df = final_df.where(mask_matrix, 0)
        return final_df
    def get_signal_frequency(self) -> str:
        """
        返回策略生成信号的频率：
            "D": 日度
            "W": 周度
            "M": 月度
        子类应该覆盖。
        """
        return "D"  # 默认日度


### TSMOM 策略子类

In [150]:
import os
import pandas as pd
from typing import Dict, Optional, Iterable
#from StrategyLogger import StrategyLogger
#from BaseSignalEngine import BaseSignalEngine
class TSMOMSignalEngine(BaseSignalEngine):
    """
    TS-MOM (Moskowitz et al., 2012)
    --------------------------------
    严格使用“月度价格”计算信号：
        ret_12m = P(t-1m) / P(t-12m) - 1
    信号是月度频率（M），最终会在 BaseSignalEngine 中扩展成 daily。
    """

    def __init__(
        self,
        strategy_name="tsmom",
        col_map=None,
        universe_mgr=None,
        logger=None,
        chunk_size=200000,
        multi_file=True,
        lookback_months=12,      # lookback 按月
        neutral_band=0.10,       # 信号区间
        # === NEW: 信号时间区间 ===
        signal_start_date=None,
        signal_end_date=None,
        data_start_date=None,
        data_end_date=None
    ):
        # === FIX: 你原来 super 传参错位置，这里纠正 ===
        super().__init__(
            strategy_name=strategy_name,
            col_map=col_map,
            universe_mgr=universe_mgr,
            logger=logger,
            chunk_size=chunk_size,
            multi_file=multi_file,
            # === NEW: 信号时间区间传入基类 ===
            signal_start_date=signal_start_date,
            signal_end_date=signal_end_date,
            data_start_date=data_start_date,
            data_end_date=data_end_date
        )

        self.lookback_months = lookback_months
        self.neutral_band = neutral_band

        # === NEW: data_end_date 默认等于 signal_end_date ===
        if self.data_end_date is None:
            self.data_end_date = self.signal_end_date

        # === NEW: logger record ===
        if self.logger:
            self.logger.log_error(
                f"[TSMOM INIT] signal=[{self.signal_start_date} ~ {self.signal_end_date}], "
                f"data=[{self.data_start_date} ~ {self.data_end_date}], "
                f"lookback_months={self.lookback_months}"
            )

    # =====================================================
    # === NEW: 告诉 BaseSignalEngine 我是月度频率 (M) ===
    # =====================================================
    def get_signal_frequency(self):
        return "M"

    # =====================================================
    # 单股票的月度信号生成
    # =====================================================
    def generate_signal_one_ticker(self, df):

        # === NEW: 数据时间过滤 (data_start / data_end) ===
        if self.data_start_date is not None:
            df = df[df["date"] >= self.data_start_date]
        if self.data_end_date is not None:
            df = df[df["date"] <= self.data_end_date]

        df = df.sort_values("date").copy()

        # ========================
        # ① 按月取最后一天价格
        # ========================
        df_m = (
            df.resample("M", on="date")
              .last()[["close"]]
              .dropna()
        )

        # ========================
        # ② 计算 12 个月动量
        # ========================
        df_m["ret_12m"] = (
            df_m["close"].shift(1) / df_m["close"].shift(self.lookback_months) - 1
        )

        # ========================
        # ③ 生成月度信号
        # ========================
        sig = pd.Series(0, index=df_m.index)

        sig[df_m["ret_12m"] > +self.neutral_band] = 1
        sig[df_m["ret_12m"] < -self.neutral_band] = -1

        sig.index.name = "date"

        # === NEW: 信号窗口过滤 ===
        if self.signal_start_date is not None:
            sig = sig[sig.index >= self.signal_start_date]
        if self.signal_end_date is not None:
            sig = sig[sig.index <= self.signal_end_date]

        return sig


In [151]:
stock_selected = pd.read_csv(
    "./data/test_stock_selected.csv",
    parse_dates=["trade_date"]
)
stock_selected.head()
stock_selected["trade_date"] = stock_selected["trade_date"].dt.tz_localize(None)


In [152]:
col_map = {
    "tic_name": "tic",           # 内部 tic_name 对应文件中的 tic 字段
    "trade_date": "trade_date",  # 内部 trade_date 对应文件中的 trade_date 字段
}

In [153]:
logger = StrategyLogger("TSMOM")

uni_mgr = UniverseManager(
    stock_selection_df=stock_selected,
    col_map=col_map,
    trading_calendar=nyse_dates,
    logger=logger ,    # 或者放入你自己的 logger
    backtest_start="2020-03-01",
    backtest_end="2021-12-31"


)

uni_mgr.universe_df.head(20)


Unnamed: 0,date,tic_name,in_universe
0,2020-03-02 21:00:00,AAPL,1
1,2020-03-02 21:00:00,NVDA,1
2,2020-03-03 21:00:00,AAPL,1
3,2020-03-03 21:00:00,NVDA,1
4,2020-03-04 21:00:00,AAPL,1
5,2020-03-04 21:00:00,NVDA,1
6,2020-03-05 21:00:00,AAPL,1
7,2020-03-05 21:00:00,NVDA,1
8,2020-03-06 21:00:00,AAPL,1
9,2020-03-06 21:00:00,NVDA,1




signal_df ： 

date	AAPL	MSFT	AMZN	…
2020-01-02	1	0	-1	…
2020-01-03	0	1	0	…
2020-01-06	-1	0	1	…

In [154]:
# 从 universe 提取全部股票列表
all_tics = sorted(stock_selected["tic"].unique())
print("Number of tickers:", len(all_tics))
print(all_tics[:10])

Number of tickers: 5
['AAPL', 'AMD', 'AMZN', 'NVDA', 'TSLA']


In [160]:

# ============================================================
# Step 2 — Signal Engine（月度 TS-MOM）
# ============================================================
sig_engine = TSMOMSignalEngine(
    strategy_name="tsmom",
    universe_mgr=uni_mgr,
    logger=logger,
    multi_file=True,          # 多文件模式
    lookback_months=12,
    neutral_band=0.10,
    signal_start_date="2020-03-01",
    signal_end_date="2021-12-31",
    data_start_date="2019-01-01",
    data_end_date="2021-12-31"
)

signal_df = sig_engine.compute_signals(
    "./data/fmp_daily",
    all_tics
)

print("Signal DF:")
print(signal_df.tail())
signal_df.to_csv("./log/signal_dffull.csv")

[READ] ./data/fmp_daily\AAPL_daily.csv ...
      ✓ loaded 757 rows for AAPL
[READ] ./data/fmp_daily\AMD_daily.csv ...
      ✓ loaded 757 rows for AMD
[READ] ./data/fmp_daily\AMZN_daily.csv ...
      ✓ loaded 757 rows for AMZN
[READ] ./data/fmp_daily\NVDA_daily.csv ...
      ✓ loaded 757 rows for NVDA
[READ] ./data/fmp_daily\TSLA_daily.csv ...
      ✓ loaded 757 rows for TSLA
Signal DF:
                     AAPL  AMD  AMZN  NVDA  TSLA
date                                            
2022-01-24 21:00:00     0    1     0     0     1
2022-01-25 21:00:00     0    1     0     0     1
2022-01-26 21:00:00     0    1     0     0     1
2022-01-27 21:00:00     0    1     0     0     1
2022-01-28 21:00:00     0    1     0     0     1


In [156]:
signal_df.to_csv("./log/signal_dffull.csv")

In [168]:
# ============================================================
# Step 3 — ExecutionManager（月度调仓）
# ============================================================
exe_mgr = ExecutionManager(
    universe_mgr=uni_mgr,
    rebalance_freq="M",
    cooling_days=0,
    logger=logger,
    max_positions=20,
    max_weight=0.20,
    min_weight=0.05
)

weights_df = exe_mgr.generate_weight_matrix(signal_df)
weights_df.to_csv("./log/weights_df.csv")
print("Weights DF:")
print(weights_df.tail())
print(len(logger.signal_logs))
print(len(logger.portfolio_logs))
print(len(logger.universe_logs))

logger.close()

Weights DF:
                     AAPL  AMD  AMZN  NVDA  TSLA
date                                            
2022-01-24 21:00:00   0.0  0.2   0.0   0.0   0.2
2022-01-25 21:00:00   0.0  0.2   0.0   0.0   0.2
2022-01-26 21:00:00   0.0  0.2   0.0   0.0   0.2
2022-01-27 21:00:00   0.0  0.2   0.0   0.0   0.2
2022-01-28 21:00:00   0.0  0.2   0.0   0.0   0.2
0
0
0


## Asyncronize mode log split 

In [169]:
# After pipeline finished, split the log into three parts
import pandas as pd
import os

log_path = "./log/strategy_tsmom/2025-11-22/async_logs.csv" 
if os.path.exists(log_path):
    df = pd.read_csv(log_path)
    
    # 按 category 拆分
    signals = df[df['category'] == 'signal']
    portfolios = df[df['category'] == 'portfolio']
    universe = df[df['category'] == 'universe']
    
    # 保存
    signals.to_csv("./log/strategy_tsmom/clean_signal_logs.csv", index=False)
    portfolios.to_csv("./log/strategy_tsmom/clean_portfolio_logs.csv", index=False)
    print(f"Split logs: {len(signals)} signals, {len(portfolios)}")

ParserError: Error tokenizing data. C error: Expected 2 fields in line 8, saw 9


## one big file daily_SPX_500_feature_engineering.csv for data upload

In [170]:
import pandas as pd
import pandas_market_calendars as mcal

# === parameter configuration ===
CONFIG = {
    "price_file": "./feature/daily_SPX_500_feature_engineering.csv",
    "universe_file": "./data/stock_selected_updated.csv",
    
    "data_start_date": "2016-06-01",
    "data_end_date": "2024-12-31",
    "backtest_start_date": "2018-01-01",
    "backtest_end_date": "2024-12-31",
    
    # === 基于文件头的列名映射 ===
    "col_map": {
        # Stock Selected File Mappings
        "tic_name": "tic",
        "trade_date": "trade_date",
        
        # Price File Mappings (左: 内部标准名, 右: CSV列名)
        "datetime": "datadate",  # 日期列
        "tic": "tic",            # 股票代码
        
        # 价格字段
        "open": "prcod",         # 开盘价 (如果为空，部分逻辑可能需要 fillna)
        "high": "prchd",         # 最高价
        "low": "prcld",          # 最低价
        "close": "adj_close",    # 收盘价 (建议优先用复权后的 adj_close 计算收益)
                                 # 如果没有 adj_close，改用 "prccd"
        "volume": "cshtrd"       # 成交量
    }
}
# 获取交易日历 (NYSE)
nyse = mcal.get_calendar('NYSE')
schedule = nyse.schedule(start_date="2016-01-01", end_date="2025-12-31")
trading_days = mcal.date_range(schedule, frequency='1D').tz_localize(None)

In [171]:
# load stock selected data ===
stock_selected = pd.read_csv(
    CONFIG["universe_file"],
    parse_dates=["trade_date"]
)

logger = StrategyLogger("TSMOM", async_mode=False) 


# initialize UniverseManager
uni_mgr = UniverseManager(
    stock_selection_df=stock_selected,
    col_map=CONFIG["col_map"],
    trading_calendar=trading_days,
    backtest_start=CONFIG["backtest_start_date"],
    backtest_end=CONFIG["backtest_end_date"],
    logger=logger  # 假设您已初始化 logger
)

all_tics = sorted(stock_selected["tic"].unique())
print(f"Total tickers in universe: {len(all_tics)}")

Total tickers in universe: 705


In [172]:
# initialize signal engine ===
sig_engine = TSMOMSignalEngine(
    strategy_name="tsmom_spx",
    universe_mgr=uni_mgr,
    logger=logger,
    
    # single file mode configuration
    multi_file=False,  
    chunk_size=500000,  
    
    # strategy parameters
    lookback_months=12,
    neutral_band=0.10,
    
    # 时间区间
    signal_start_date=CONFIG["backtest_start_date"], # 2018-01-01
    signal_end_date=CONFIG["backtest_end_date"],     # 2024-12-31
    data_start_date=CONFIG["data_start_date"],       # 2016-06-01 
    data_end_date=CONFIG["data_end_date"],
    
    col_map=CONFIG["col_map"]
)

In [175]:
# === signal generation ===
# for single file mode, the first parameter is the file path, the second parameter is the stock list (for filtering)

signal_df = sig_engine.compute_signals(
    CONFIG["price_file"], 
    all_tics
)
signal_df.to_csv("./log/signal_dffull.csv")
print("Signal DF Head:")
print(signal_df.head())
print("Signal DF Tail:")
print(signal_df.tail())


[READ] Big file in chunks: ./feature/daily_SPX_500_feature_engineering.csv




Signal DF Head:
                       A  AAL  AAP  AAPL  ABBV  ABNB  ABT  ACGL  ACN  ADBE  \
date                                                                         
2018-01-31 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   
2018-02-01 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   
2018-02-02 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   
2018-02-05 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   
2018-02-06 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   

                     ...  XEL  XOM  XRAY  XRX  XYL  YUM  ZBH  ZBRA  ZION  ZTS  
date                 ...                                                       
2018-01-31 21:00:00  ...  0.0  0.0   0.0  0.0  0.0  0.0  0.0   0.0   0.0  0.0  
2018-02-01 21:00:00  ...  0.0  0.0   0.0  0.0  0.0  0.0  0.0   0.0   0.0  0.0  
2018-02-02 21:00:00  ...  0.0  0.0   0.0  0.0  0.0  0.0  0.0   0.0   0.0  0.0  
2018-02-05 21:00:00  ...  0.0  0.0   

In [174]:
exe_mgr = ExecutionManager(
    universe_mgr=uni_mgr,
    rebalance_freq="M",
    cooling_days=0,
    logger=logger,
    max_positions=20,
    max_weight=0.20,
    min_weight=0.05
)

weights_df = exe_mgr.generate_weight_matrix(signal_df)
weights_df.to_csv("./log/weights_df.csv")
print("Weights DF:")
print(weights_df.tail())
print(len(logger.signal_logs))
print(len(logger.portfolio_logs))
print(len(logger.universe_logs))

logger.close()

Weights DF:
                       A  AAL  AAP  AAPL  ABBV  ABNB  ABT  ACGL  ACN  ADBE  \
date                                                                         
2025-01-24 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   
2025-01-27 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   
2025-01-28 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   
2025-01-29 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   
2025-01-30 21:00:00  0.0  0.0  0.0   0.0   0.0   0.0  0.0   0.0  0.0   0.0   

                     ...  XEL  XOM  XRAY  XRX  XYL  YUM  ZBH  ZBRA  ZION   ZTS  
date                 ...                                                        
2025-01-24 21:00:00  ...  0.0  0.0   0.0  0.0  0.0  0.0  0.0   0.0   0.0 -0.05  
2025-01-27 21:00:00  ...  0.0  0.0   0.0  0.0  0.0  0.0  0.0   0.0   0.0 -0.05  
2025-01-28 21:00:00  ...  0.0  0.0   0.0  0.0  0.0  0.0  0.0   0.0   0.0 -0.05  
2025-01-29 21:00:00  ...  0.0  0.0  