# 周期模型

如何生成信号？
- 读取数据
- 根据算法生成信号，包括原始指标，衍生指标，交易信号
- 交易信号是分类变量，包含类别：中性，超卖/周期底部（看涨），超买/周期顶部（看空）
- 构建二维表格

In [13]:
import talib
import pandas as pd
import numpy as np

In [41]:
def fisher_transform(series: pd.Series, period: int = 10) -> pd.Series:
    """计算费舍尔转换指标"""
    highest = series.rolling(period, min_periods=1).max()
    lowest = series.rolling(period, min_periods=1).min()
    values = np.zeros(len(series))
    fishers = np.zeros(len(series))

    for i in range(1, len(series)):
        values[i] = (
            0.66
            * (
                (series.iloc[i] - lowest.iloc[i]) / (highest.iloc[i] - lowest.iloc[i])
                - 0.5
            )
            + 0.67 * values[i - 1]
        )
        values[i] = max(min(values[i], 0.999), -0.999)
        fishers[i] = (
            0.5 * np.log((1 + values[i]) / (1 - values[i])) + 0.5 * fishers[i - 1]
        )

    return pd.Series(fishers, index=series.index)


def read_metrics(filepath_ohlcv: str, filepath_metric: str) -> pd.DataFrame:
    ohlcv = pd.read_csv(filepath_ohlcv, index_col="datetime", parse_dates=True)
    metric = pd.read_csv(filepath_metric, index_col="datetime", parse_dates=True)

    return (
        pd.concat([ohlcv["close"], metric], axis=1, join="outer")
        .rename(columns={"close": "price"})
        .dropna()
    )


def get_signal_sth_realized_price(
    prices: pd.Series, metrics: pd.Series, period: int = 200, threshold: float = 2.0
) -> pd.DataFrame:

    def _signal(normalized_diff: float) -> str:
        if normalized_diff > threshold:
            return "peak"
        elif normalized_diff < -threshold:
            return "valley"
        else:
            return "neutral"

    diff = prices - metrics
    normalized_diff = fisher_transform(diff, period)
    joined = pd.concat([prices, metrics, normalized_diff], axis=1)
    joined.columns = ["btcusd", "sth_realized_price", "normalized_diff"]
    joined["signal"] = joined["normalized_diff"].apply(_signal)

    return joined


def get_signal_sth_sopr(
    metrics: pd.Series,
    bband_period: int = 200,
    bband_upper_std: float = 2.0,
    bband_lower_std: float = 1.5,
) -> pd.DataFrame:
    bband_upper, _, bband_lower = talib.BBANDS(
        metrics, bband_period, bband_upper_std, bband_lower_std
    )

    df = pd.concat([metrics, bband_upper, bband_lower], axis=1)
    df.columns = ["sth_sopr", "upper_band", "lower_band"]
    df.dropna(inplace=True)

    signal = np.where(df["sth_sopr"] > df["upper_band"], "peak", "neutral")
    signal = np.where(df["sth_sopr"] < df["lower_band"], "valley", signal)
    df["signal"] = signal

    return df


def get_signal_sth_nupl(
    metrics: pd.Series,
    smooth_period: int = 10,
    normalized_period: int = 200,
    threshold: float = 2.0,
) -> pd.DataFrame:
    smoothed = metrics.rolling(smooth_period, min_periods=1).mean()
    normalized = fisher_transform(smoothed, normalized_period)

    df = pd.concat({"sth_nupl": metrics, "normalized_sth_nupl": normalized}, axis=1)
    signal = np.where(df["normalized_sth_nupl"] > threshold, "peak", "neutral")
    signal = np.where(df["normalized_sth_nupl"] < -threshold, "valley", signal)
    df["signal"] = signal

    return df


def get_signal_sth_mvrv(
    metrics: pd.Series,
    smooth_period: int = 10,
    normalized_period: int = 200,
    threshold: float = 2.0,
) -> pd.DataFrame:
    smoothed = metrics.rolling(smooth_period, min_periods=1).mean()
    normalized = fisher_transform(smoothed, normalized_period)

    df = pd.concat({"sth_mvrv": metrics, "normalized_sth_mvrv": normalized}, axis=1)
    signal = np.where(df["normalized_sth_mvrv"] > threshold, "peak", "neutral")
    signal = np.where(df["normalized_sth_mvrv"] < -threshold, "valley", signal)
    df["signal"] = signal

    return df


def get_signal_nrpl(
    metrics: pd.Series,
    bband_period: int = 200,
    bband_upper_std: float = 2.0,
    bband_lower_std: float = 2.0,
) -> pd.DataFrame:
    bband_upper, _, bband_lower = talib.BBANDS(
        metrics, bband_period, bband_upper_std, bband_lower_std
    )

    df = pd.concat([metrics, bband_upper, bband_lower], axis=1)
    df.columns = ["nrpl", "upper_band", "lower_band"]
    df.dropna(inplace=True)

    signal = np.where(df["nrpl"] > df["upper_band"], "peak", "neutral")
    signal = np.where(df["nrpl"] < df["lower_band"], "valley", signal)
    df["signal"] = signal

    return df

In [42]:
# 参数
filepath_ohlcv = "./data/btcusd.csv"
filepath_metric = "./data/nrpl.csv"

# 读取数据
df = read_metrics(filepath_ohlcv, filepath_metric)
df

Unnamed: 0_level_0,price,nrpl
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1
2014-09-17,457.334015,-2296.825620
2014-09-18,424.440002,-1921.845934
2014-09-19,394.795990,-1988.424538
2014-09-20,408.903992,-6969.753243
2014-09-21,398.821014,-6349.884731
...,...,...
2025-04-21,87518.906250,5416.091879
2025-04-22,93441.890625,6404.850481
2025-04-23,93699.109375,9536.229588
2025-04-24,93943.796875,10146.373583


In [38]:
# signals = get_signal_sth_realized_price(
#     prices=df["price"], metrics=df["sth_realized_price"], period=200, threshold=2
# )
# signals = get_signal_sth_sopr(df["sth_sopr"], 200, 2.0, 1.5)
# signals = get_signal_sth_nupl(df["sth_nupl"])
# signals = get_signal_sth_mvrv(df["sth_mvrv"])
signals = get_signal_nrpl(df["nrpl"])

signals

Unnamed: 0_level_0,nrpl,upper_band,lower_band,signal
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2015-04-04,-11260.155052,1904.296179,-24196.120715,neutral
2015-04-05,-10281.309949,1804.561353,-24176.230734,neutral
2015-04-06,-9430.024195,1702.953354,-24149.704517,neutral
2015-04-07,-8844.575881,1606.742848,-24122.055524,neutral
2015-04-08,-8189.237125,1593.664310,-24121.171825,neutral
...,...,...,...,...
2025-04-21,5416.091879,34178.073037,-3676.208268,neutral
2025-04-22,6404.850481,34175.538538,-3668.210256,neutral
2025-04-23,9536.229588,34164.793620,-3621.293147,neutral
2025-04-24,10146.373583,34151.035693,-3560.590593,neutral


In [43]:
# 参数
filepath_ohlcv = "./data/btcusd.csv"

# 信号函数
signal_mappings = {
    "sth_realized_price": get_signal_sth_realized_price,
    "sth_sopr": get_signal_sth_sopr,
    "sth_nupl": get_signal_sth_nupl,
    "sth_mvrv": get_signal_sth_mvrv,
    "nrpl": get_signal_nrpl,
}

# 计算信号
all_signals = {}
for name in signal_mappings.keys():
    filepath_metric = f"./data/{name}.csv"
    metrics = read_metrics(filepath_ohlcv, filepath_metric)
    signal_func = signal_mappings[name]
    if name == "sth_realized_price":
        signals = signal_func(metrics["price"], metrics[name])
    else:
        signals = signal_func(metrics[name])
    all_signals[name] = signals

In [52]:
signals_df = pd.concat({k: v["signal"] for k, v in all_signals.items()}, axis=1)
signals_df.ffill(inplace=True)
signals_df

Unnamed: 0_level_0,sth_realized_price,sth_sopr,sth_nupl,sth_mvrv,nrpl
datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2014-09-17,neutral,,neutral,neutral,
2014-09-18,neutral,,neutral,neutral,
2014-09-19,neutral,,neutral,neutral,
2014-09-20,neutral,,neutral,neutral,
2014-09-21,neutral,,neutral,neutral,
...,...,...,...,...,...
2025-04-22,neutral,neutral,valley,valley,neutral
2025-04-23,neutral,neutral,valley,valley,neutral
2025-04-24,neutral,neutral,valley,valley,neutral
2025-04-25,neutral,neutral,neutral,valley,neutral


In [62]:
start_date = "2024-12-01"
end_date = "2024-12-10"


def color_signal(value: str) -> str:
    if value == "peak":
        color = "red"
    elif value == "valley":
        color = "green"
    else:
        color = ""
    return f"color: {color}"


dashboard = signals_df.loc[start_date:end_date].T
dashboard.columns = [col.strftime("%Y%m%d") for col in dashboard.columns]
dashboard.style.map(color_signal)

Unnamed: 0,20241201,20241202,20241203,20241204,20241205,20241206,20241207,20241208,20241209,20241210
sth_realized_price,neutral,neutral,neutral,neutral,neutral,neutral,neutral,neutral,neutral,neutral
sth_sopr,neutral,neutral,neutral,neutral,neutral,neutral,neutral,neutral,peak,neutral
sth_nupl,peak,peak,peak,peak,peak,peak,peak,peak,peak,peak
sth_mvrv,peak,peak,peak,peak,peak,peak,peak,peak,neutral,neutral
nrpl,neutral,neutral,neutral,neutral,peak,peak,peak,peak,peak,peak
