# 上三交易机会扫描（H1）

本 Notebook 将 `find_upper_three.py` 拆分为可分步骤测试的模块，使用 `Data/split_by_year/EURUSD_2024.csv` 作为示例数据。

- 输入：无表头 CSV，列顺序 `Date, Time, Open, High, Low, Close, Volume...`
- 重采样：转换为 H1 K 线
- 规则（简化版）：
  - 释放段向上，幅度 ≥ 40 点，角度 ≥ 30°
  - 回撤不超过释放的 55%
  - 三角形累积：高点递降、低点递升，至少 3 个高低点交替
  - 累积时间 ≥ 释放时间 × 1.5
- 输出：
  - 控制台打印交易机会起始/结束时间

  - 快照保存到 `Data/pattern_snapshots/upper_three_*.png`
  - 汇总保存到 `Data/pattern_snapshots/upper_three_summary.csv`

> 建议从上到下依次运行每个单元格；如需微调阈值，修改第 4 步的参数。




In [9]:
from pathlib import Path
import math, pandas as pd, numpy as np
import mplfinance as mpf
import matplotlib.pyplot as plt

# 如果 notebook 放在 main 目录下
ROOT = Path.cwd().parent          # 当前目录是 main，则上一级是项目根目录
# 或者直接写死：
# ROOT = Path(r"E:\Quantitative trading model")

SRC = ROOT / "Data" / "split_by_year" / "EURUSD_2024.csv"
OUT_DIR = ROOT / "Data" / "pattern_snapshots"
OUT_DIR.mkdir(parents=True, exist_ok=True)

print(f"数据源: {SRC}")
print(f"输出目录: {OUT_DIR}")


数据源: e:\Quantitative trading model\Data\split_by_year\EURUSD_2024.csv
输出目录: e:\Quantitative trading model\Data\pattern_snapshots


In [10]:
def load_h1(csv_path: Path) -> pd.DataFrame:
    """读取无表头 CSV，转为 H1 K 线"""
    df = pd.read_csv(
        csv_path,
        header=None,
        names=["Date", "Clock", "Open", "High", "Low", "Close", "Volume", "Volume2", "Extra"],
    )
    df["Time"] = pd.to_datetime(df["Date"].astype(str) + " " + df["Clock"].astype(str), errors="coerce")
    df = df.dropna(subset=["Time"]).set_index("Time")
    df = df[["Open", "High", "Low", "Close", "Volume"]]
    # 重采样到 H1
    h1 = df.resample("1H").agg(
        {
            "Open": "first",
            "High": "max",
            "Low": "min",
            "Close": "last",
            "Volume": "sum",
        }
    ).dropna()
    return h1


df_h1 = load_h1(SRC)
print(df_h1.head())
print("H1 行数:", len(df_h1))


                        Open     High      Low    Close      Volume
Time                                                               
2024-01-01 22:00:00  1.10427  1.10447  1.10420  1.10438   819500000
2024-01-01 23:00:00  1.10446  1.10446  1.10356  1.10365  3104460000
2024-01-02 00:00:00  1.10366  1.10383  1.10342  1.10381  3844349992
2024-01-02 01:00:00  1.10380  1.10432  1.10342  1.10348  7218519992
2024-01-02 02:00:00  1.10347  1.10359  1.10182  1.10187  9343280008
H1 行数: 6250


In [11]:
def zigzag(df: pd.DataFrame, pct: float = 0.0015):
    """简单 ZigZag，返回 (时间, 价格, 类型)，类型 1=高点，-1=低点"""
    prices = df["Close"]
    pivots = []
    last_pivot_idx = prices.index[0]
    last_pivot_price = prices.iloc[0]
    last_type = 0

    for t, p in prices.iloc[1:].items():
        move = (p - last_pivot_price) / last_pivot_price
        if last_type >= 0 and move >= pct:
            pivots.append((last_pivot_idx, last_pivot_price, -1 if last_type == -1 else 0))
            pivots.append((t, p, 1))
            last_pivot_idx, last_pivot_price, last_type = t, p, 1
        elif last_type <= 0 and move <= -pct:
            pivots.append((last_pivot_idx, last_pivot_price, 1 if last_type == 1 else 0))
            pivots.append((t, p, -1))
            last_pivot_idx, last_pivot_price, last_type = t, p, -1
        else:
            if last_type >= 0 and p > last_pivot_price:
                last_pivot_idx, last_pivot_price = t, p
            if last_type <= 0 and p < last_pivot_price:
                last_pivot_idx, last_pivot_price = t, p
    # 重新标注高低点交替
    cleaned = []
    for i, (t, p, _) in enumerate(pivots):
        if i == 0:
            tp = 1 if (len(pivots) > 1 and pivots[1][1] < p) else -1
        else:
            tp = -cleaned[-1][2]
        cleaned.append((t, p, tp))
    return cleaned


def angle_deg(p1: float, p2: float, bars: int) -> float:
    if bars <= 0:
        return 90.0
    slope = ((p2 - p1) / p1) * 100 / bars * 2
    return abs(math.degrees(math.atan(slope)))


def find_patterns(
    df: pd.DataFrame,
    pivots,
    min_release_pips: float = 40,
    min_release_angle: float = 30,
    max_retrace_ratio: float = 0.55,
    min_time_ratio: float = 1.5,
):
    results = []
    for i in range(len(pivots) - 5):
        t1, p1, tp1 = pivots[i]
        t2, p2, tp2 = pivots[i + 1]
        t3, p3, tp3 = pivots[i + 2]
        t4, p4, tp4 = pivots[i + 3]
        t5, p5, tp5 = pivots[i + 4]
        t6, p6, tp6 = pivots[i + 5]

        # 需要 低-高-低-高-低-高
        if not (tp1 == -1 and tp2 == 1 and tp3 == -1 and tp4 == 1 and tp5 == -1 and tp6 == 1):
            continue

        # 释放段
        release_pips = (p2 - p1) * 10000
        release_bars = int((t2 - t1).total_seconds() / 3600)
        if release_pips < min_release_pips or release_bars <= 0:
            continue
        ang = angle_deg(p1, p2, release_bars)
        if ang < min_release_angle:
            continue

        # 回撤
        retrace_pips = (p2 - p3) * 10000
        if retrace_pips < 0 or retrace_pips > release_pips * max_retrace_ratio:
            continue

        # 三角形：高点递降，低点递升
        if not (p4 < p2 and p6 < p2 and p4 < p6):
            continue
        if not (p3 < p5 < p2):
            continue

        # 累积时间
        acc_bars = int((t6 - t2).total_seconds() / 3600)
        if acc_bars < release_bars * min_time_ratio:
            continue

        results.append(
            dict(
                start=t1,
                release_end=t2,
                pattern_end=t6,
                release_pips=release_pips,
                release_bars=release_bars,
                angle=ang,
                acc_bars=acc_bars,
                acc_ratio=acc_bars / release_bars if release_bars else None,
            )
        )
    return results


In [12]:
def plot_pattern(df_h1: pd.DataFrame, res: dict, out_path: Path):
    start, end = res["start"], res["pattern_end"]
    # 扩大前后各 30 根 K 线
    df_sub = df_h1.loc[start - pd.Timedelta(hours=30): end + pd.Timedelta(hours=30)]
    title = (
        f"上三机会 | 释放: {res['start']} -> {res['release_end']} "
        f"| 累积结束: {res['pattern_end']} "
        f"| 幅度: {res['release_pips']:.1f}p | 角度: {res['angle']:.1f}° | 时间比: {res['acc_ratio']:.1f}x"
    )
    mpf.plot(
        df_sub,
        type="candle",
        style="yahoo",
        title=title,
        volume=True,
        savefig=dict(fname=str(out_path), dpi=160, bbox_inches="tight"),
    )


print("绘图函数已定义")


绘图函数已定义


In [13]:
# 运行检测

# 1) 构建拐点
pivots = zigzag(df_h1, pct=0.001)  # 可调，例如 0.001 (更敏感) 或 0.002 (更稳)

# 2) 查找形态
patterns = find_patterns(
    df_h1,
    pivots,
    min_release_pips=30,    # 可调：30~60
    min_release_angle=20,   # 可调：25~45
    max_retrace_ratio=0.6, # 可调：0.5~0.6
    min_time_ratio=1.2,     # 可调：1.2~2.0
)

print(f"发现 {len(patterns)} 个上三交易机会")

# 3) 保存快照与汇总
rows = []
for idx, res in enumerate(patterns, 1):
    out_png = OUT_DIR / f"upper_three_{idx:02d}.png"
    plot_pattern(df_h1, res, out_png)
    rows.append(
        {
            "start": res["start"],
            "release_end": res["release_end"],
            "pattern_end": res["pattern_end"],
            "release_pips": res["release_pips"],
            "release_bars": res["release_bars"],
            "angle": res["angle"],
            "acc_bars": res["acc_bars"],
            "acc_ratio": res["acc_ratio"],
            "snapshot": out_png.name,
        }
    )
    print(f"{idx}. {res['start']} -> {res['pattern_end']} | 快照: {out_png.name}")

if rows:
    summary_path = OUT_DIR / "upper_three_summary.csv"
    pd.DataFrame(rows).to_csv(summary_path, index=False, encoding="utf-8-sig")
    print(f"汇总已保存: {summary_path}")
else:
    print("未找到形态，可尝试：降低 min_release_pips、降低 min_release_angle、或调小 pct")


发现 0 个上三交易机会
未找到形态，可尝试：降低 min_release_pips、降低 min_release_angle、或调小 pct
