# FX Predicter V2 (Trading-Oriented)

Этот ноутбук разбит на логические этапы:
1. Конфиг и импорты
2. Подготовка данных
3. Фичи и таргеты
4. Окна и масштабирование
5. Архитектура модели
6. Обучение
7. Подбор торговых порогов на validation
8. Финальная оценка на test


In [None]:
from dataclasses import dataclass

import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.preprocessing import RobustScaler
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, mean_absolute_error
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau


In [None]:
@dataclass
class Config:
    data_path: str = "/Users/data/VsCodeProjects/DataScience/data/raw/eurusd_hour.csv"
    window_len: int = 72

    spread_cost: float = 0.00003
    slippage_cost: float = 0.00001
    min_edge_buffer: float = 0.00002

    train_ratio: float = 0.70
    val_ratio: float = 0.15

    epochs: int = 80
    batch_size: int = 64
    seed: int = 42

cfg = Config()
tf.keras.utils.set_random_seed(cfg.seed)
print(cfg)


## 1) Загрузка и базовая подготовка


In [None]:
def load_base_dataframe(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)

    df["Date"] = pd.to_datetime(df["Date"]).dt.date
    df["Time"] = pd.to_datetime(df["Time"]).dt.time
    df["Datetime"] = df.apply(lambda row: pd.Timestamp.combine(row["Date"], row["Time"]), axis=1)

    df.index = pd.to_datetime(df["Datetime"])
    df = df.drop(columns=["Datetime", "Date", "Time"])

    df["Price_open"] = df[["BO", "AO"]].mean(axis=1)
    df["Highest"] = df[["BH", "AH"]].mean(axis=1)
    df["Lowest"] = df[["BL", "AL"]].mean(axis=1)
    df["Price_close"] = df[["BC", "AC"]].mean(axis=1)
    df["change"] = df[["BCh", "ACh"]].mean(axis=1)

    df = df.drop(columns=["BO", "AO", "BH", "AH", "BL", "AL", "BC", "AC", "BCh", "ACh"])
    return df


df = load_base_dataframe(cfg.data_path)
df.head()


## 2) Feature Engineering


In [None]:
def add_features(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()

    out["hour_sin"] = np.sin(2 * np.pi * out.index.hour / 24)
    out["hour_cos"] = np.cos(2 * np.pi * out.index.hour / 24)

    out["log_ret_body"] = np.log(out["Price_close"] / out["Price_open"])
    out["range"] = (out["Highest"] - out["Lowest"]) / out["Price_open"]
    out["upper_wick"] = (out["Highest"] - out[["Price_open", "Price_close"]].max(axis=1)) / out["Price_open"]
    out["lower_wick"] = (out[["Price_open", "Price_close"]].min(axis=1) - out["Lowest"]) / out["Price_open"]
    out["close_pos"] = (out["Price_close"] - out["Lowest"]) / (out["Highest"] - out["Lowest"])

    asia_open, asia_close = 2, 10
    frankfurt_open, frankfurt_close = 10, 11
    london_open, london_close = 11, 19
    ny_open, ny_close = 16, 0

    out["is_asia"] = ((out.index.hour >= asia_open) & (out.index.hour <= asia_close)).astype(int)
    out["is_frankfurt"] = ((out.index.hour >= frankfurt_open) & (out.index.hour <= frankfurt_close)).astype(int)
    out["is_london"] = ((out.index.hour >= london_open) & (out.index.hour <= london_close)).astype(int)
    out["is_ny"] = ((out.index.hour >= ny_open) | (out.index.hour <= ny_close)).astype(int)

    f_change = out["is_frankfurt"].diff().abs() > 0
    l_change = out["is_london"].diff().abs() > 0
    n_change = out["is_ny"].diff().abs() > 0
    out["if_change"] = (f_change | l_change | n_change).astype(int)

    for i in [3, 6, 12]:
        out[f"mom_{i}"] = out["log_ret_body"].rolling(i).sum()
        out[f"vol_{i}"] = out["log_ret_body"].rolling(i).std()

    for i in [3, 6]:
        out[f"pressure_{i}"] = out["close_pos"].rolling(i).mean()

    return out


df = add_features(df)
print(df.shape)
df.tail(3)


## 3) Targets для торговли

- `target_delta`: регрессия движения следующего бара
- `target_dir`: классификация направления
- `target_quality`: есть ли edge после costs


In [None]:
def add_targets(df: pd.DataFrame, cfg: Config) -> pd.DataFrame:
    out = df.copy()
    total_cost = cfg.spread_cost + cfg.slippage_cost + cfg.min_edge_buffer

    out["target_delta"] = np.log(out["Price_close"].shift(-1) / out["Price_close"])
    out["target_dir"] = (out["target_delta"] > 0).astype(int)
    out["target_quality"] = (out["target_delta"].abs() > total_cost).astype(int)

    return out


df = add_targets(df, cfg)

excluded = [
    "Price_open", "Price_close", "Highest", "Lowest", "change",
    "target_delta", "target_dir", "target_quality"
]
features = [c for c in df.columns if c not in excluded]

df = df.dropna(subset=features + ["target_delta", "target_dir", "target_quality"]).copy()

print("Rows:", len(df))
print("Num features:", len(features))
print("target_dir mean:", float(df["target_dir"].mean()))
print("target_quality mean:", float(df["target_quality"].mean()))


## 4) Окна и масштабирование


In [None]:
def make_windows(df: pd.DataFrame, features: list[str], cfg: Config):
    X_all = df[features].values
    y_delta = df["target_delta"].values
    y_dir = df["target_dir"].values
    y_quality = df["target_quality"].values

    X, y_d, y_c, y_q, target_idx = [], [], [], [], []
    for i in range(len(df) - cfg.window_len):
        j = i + cfg.window_len
        X.append(X_all[i : j])
        y_d.append(y_delta[j])
        y_c.append(y_dir[j])
        y_q.append(y_quality[j])
        target_idx.append(j)

    X = np.array(X)
    y_d = np.array(y_d).reshape(-1, 1)
    y_c = np.array(y_c).reshape(-1, 1)
    y_q = np.array(y_q).reshape(-1, 1)
    target_idx = np.array(target_idx)

    train_size = int(len(X) * cfg.train_ratio)
    val_size = int(len(X) * cfg.val_ratio)

    X_train = X[:train_size]
    X_val = X[train_size : train_size + val_size]
    X_test = X[train_size + val_size :]

    y_train = [y_d[:train_size], y_c[:train_size], y_q[:train_size]]
    y_val = [
        y_d[train_size : train_size + val_size],
        y_c[train_size : train_size + val_size],
        y_q[train_size : train_size + val_size],
    ]
    y_test = [y_d[train_size + val_size :], y_c[train_size + val_size :], y_q[train_size + val_size :]]

    idx_train = target_idx[:train_size]
    idx_val = target_idx[train_size : train_size + val_size]
    idx_test = target_idx[train_size + val_size :]

    return X_train, X_val, X_test, y_train, y_val, y_test, idx_train, idx_val, idx_test


def scale_windows(X_train: np.ndarray, X_val: np.ndarray, X_test: np.ndarray):
    scaler = RobustScaler()
    scaler.fit(X_train.reshape(-1, X_train.shape[-1]))

    def apply(x: np.ndarray) -> np.ndarray:
        x_2d = x.reshape(-1, x.shape[-1])
        x_scaled = scaler.transform(x_2d)
        return x_scaled.reshape(x.shape)

    return apply(X_train), apply(X_val), apply(X_test), scaler


X_train, X_val, X_test, y_train, y_val, y_test, idx_train, idx_val, idx_test = make_windows(df, features, cfg)
X_train, X_val, X_test, scaler = scale_windows(X_train, X_val, X_test)

print("X train/val/test:", X_train.shape, X_val.shape, X_test.shape)
print("y train lens:", [arr.shape for arr in y_train])
print("idx test range:", int(idx_test[0]), "->", int(idx_test[-1]))


## 5) Архитектура модели (3 головы)


In [None]:
def build_model(seq_len: int, n_features: int) -> Model:
    inputs = Input(shape=(seq_len, n_features))

    x = Bidirectional(LSTM(96, return_sequences=True))(inputs)
    x = Dropout(0.2)(x)
    x = LSTM(64, return_sequences=False)(x)
    x = Dropout(0.2)(x)
    shared = Dense(96, activation="relu")(x)

    out_delta = Dense(1, name="delta")(shared)
    out_direction = Dense(1, activation="sigmoid", name="direction")(shared)
    out_quality = Dense(1, activation="sigmoid", name="quality")(shared)

    model = Model(inputs=inputs, outputs=[out_delta, out_direction, out_quality])

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss={
            "delta": tf.keras.losses.Huber(delta=0.5),
            "direction": "binary_crossentropy",
            "quality": "binary_crossentropy",
        },
        loss_weights={
            "delta": 0.25,
            "direction": 0.45,
            "quality": 0.30,
        },
        metrics={
            "delta": ["mse"],
            "direction": ["accuracy", tf.keras.metrics.AUC(name="auc")],
            "quality": ["accuracy", tf.keras.metrics.AUC(name="auc")],
        },
    )
    return model


model = build_model(seq_len=X_train.shape[1], n_features=X_train.shape[2])
model.summary()


## 6) Обучение


In [None]:
callbacks = [
    EarlyStopping(monitor="val_direction_accuracy", mode="max", patience=8, restore_best_weights=True),
    ReduceLROnPlateau(monitor="val_direction_accuracy", mode="max", factor=0.5, patience=3, min_lr=1e-5),
]

history = model.fit(
    X_train,
    y_train,
    validation_data=(X_val, y_val),
    epochs=cfg.epochs,
    batch_size=cfg.batch_size,
    shuffle=False,
    callbacks=callbacks,
    verbose=1,
)


## 7) Подбор торговых порогов на validation


In [None]:
def optimize_trading_thresholds(
    y_delta_true: np.ndarray,
    p_direction: np.ndarray,
    p_quality: np.ndarray,
    spread_cost: float,
    slippage_cost: float,
):
    best = {
        "dir_t": 0.5,
        "qual_t": 0.5,
        "net_pnl": -1e9,
        "trades": 0,
    }

    dir_grid = np.linspace(0.45, 0.60, 16)
    qual_grid = np.linspace(0.50, 0.75, 11)
    total_cost = spread_cost + slippage_cost

    y = y_delta_true.flatten()
    pd = p_direction.flatten()
    pq = p_quality.flatten()

    for dt in dir_grid:
        signal_dir = np.where(pd >= dt, 1, -1)
        for qt in qual_grid:
            trade_mask = pq >= qt
            net_ret = signal_dir * y - total_cost
            net_ret = np.where(trade_mask, net_ret, 0.0)

            pnl = float(net_ret.sum())
            n_trades = int(trade_mask.sum())

            if n_trades >= 50 and pnl > best["net_pnl"]:
                best = {
                    "dir_t": float(dt),
                    "qual_t": float(qt),
                    "net_pnl": pnl,
                    "trades": n_trades,
                }

    return best


p_val_delta, p_val_dir, p_val_quality = model.predict(X_val, verbose=0)
best = optimize_trading_thresholds(
    y_delta_true=y_val[0],
    p_direction=p_val_dir,
    p_quality=p_val_quality,
    spread_cost=cfg.spread_cost,
    slippage_cost=cfg.slippage_cost,
)

best


## 8) Финальная оценка на test


In [None]:
def evaluate_trading(
    y_delta_true: np.ndarray,
    y_dir_true: np.ndarray,
    p_delta: np.ndarray,
    p_direction: np.ndarray,
    p_quality: np.ndarray,
    dir_t: float,
    qual_t: float,
    spread_cost: float,
    slippage_cost: float,
):
    y_d = y_delta_true.flatten()
    y_c = y_dir_true.flatten()

    pred_dir = (p_direction.flatten() >= dir_t).astype(int)
    trade_mask = p_quality.flatten() >= qual_t

    signed_signal = np.where(pred_dir == 1, 1, -1)
    gross = signed_signal * y_d
    net = gross - (spread_cost + slippage_cost)
    net = np.where(trade_mask, net, 0.0)

    realized_idx = np.where(trade_mask)[0]
    if len(realized_idx) == 0:
        return {
            "mse": mean_squared_error(y_d, p_delta.flatten()),
            "mae": mean_absolute_error(y_d, p_delta.flatten()),
            "direction_acc_all": accuracy_score(y_c, pred_dir),
            "direction_f1_all": f1_score(y_c, pred_dir),
            "trades": 0,
            "net_pnl": 0.0,
            "avg_trade": 0.0,
            "hit_rate_trades": 0.0,
        }

    hit_rate_trades = float((net[realized_idx] > 0).mean())

    return {
        "mse": mean_squared_error(y_d, p_delta.flatten()),
        "mae": mean_absolute_error(y_d, p_delta.flatten()),
        "direction_acc_all": accuracy_score(y_c, pred_dir),
        "direction_f1_all": f1_score(y_c, pred_dir),
        "trades": int(len(realized_idx)),
        "net_pnl": float(net.sum()),
        "avg_trade": float(net[realized_idx].mean()),
        "hit_rate_trades": hit_rate_trades,
    }


p_test_delta, p_test_dir, p_test_quality = model.predict(X_test, verbose=0)
result = evaluate_trading(
    y_delta_true=y_test[0],
    y_dir_true=y_test[1],
    p_delta=p_test_delta,
    p_direction=p_test_dir,
    p_quality=p_test_quality,
    dir_t=best["dir_t"],
    qual_t=best["qual_t"],
    spread_cost=cfg.spread_cost,
    slippage_cost=cfg.slippage_cost,
)

print("=== Thresholds from VAL ===")
print(best)
print("=== Test Metrics ===")
print(result)


## 9) Backtest со Stop-Loss и Take-Profit

Логика сделки (горизонт 1 бар):
- вход по `Price_close[t]` на баре таргета,
- если есть сигнал (`direction` + `quality`), открываем позицию,
- внутри следующего бара (`t+1`) проверяем TP/SL по `Highest/Lowest`,
- если не сработали TP/SL, закрытие по `Price_close[t+1]`,
- если в одном баре и TP и SL, используем консервативно `SL`.


In [None]:
import matplotlib.pyplot as plt

def run_backtest_sl_tp(
    df: pd.DataFrame,
    idx: np.ndarray,
    p_direction: np.ndarray,
    p_quality: np.ndarray,
    dir_t: float,
    qual_t: float,
    sl_pct: float = 0.0015,
    tp_pct: float = 0.0025,
    spread_cost: float = 0.00003,
    slippage_cost: float = 0.00001,
):
    close = df["Price_close"].values
    high = df["Highest"].values
    low = df["Lowest"].values

    pd_sig = p_direction.flatten()
    pq_sig = p_quality.flatten()

    trades = []
    rets = []

    for k, t in enumerate(idx):
        # Нужен следующий бар для проверки TP/SL
        if t + 1 >= len(df):
            continue

        do_trade = pq_sig[k] >= qual_t
        if not do_trade:
            rets.append(0.0)
            continue

        side = 1 if pd_sig[k] >= dir_t else -1

        entry = close[t]
        nxt_high = high[t + 1]
        nxt_low = low[t + 1]
        nxt_close = close[t + 1]

        if side == 1:
            tp_price = entry * (1 + tp_pct)
            sl_price = entry * (1 - sl_pct)

            hit_tp = nxt_high >= tp_price
            hit_sl = nxt_low <= sl_price

            if hit_tp and hit_sl:
                # Консервативно: считаем, что SL сработал первым.
                gross_ret = -sl_pct
                exit_type = "SL_both"
            elif hit_sl:
                gross_ret = -sl_pct
                exit_type = "SL"
            elif hit_tp:
                gross_ret = tp_pct
                exit_type = "TP"
            else:
                gross_ret = (nxt_close - entry) / entry
                exit_type = "Close"
        else:
            tp_price = entry * (1 - tp_pct)
            sl_price = entry * (1 + sl_pct)

            hit_tp = nxt_low <= tp_price
            hit_sl = nxt_high >= sl_price

            if hit_tp and hit_sl:
                gross_ret = -sl_pct
                exit_type = "SL_both"
            elif hit_sl:
                gross_ret = -sl_pct
                exit_type = "SL"
            elif hit_tp:
                gross_ret = tp_pct
                exit_type = "TP"
            else:
                gross_ret = (entry - nxt_close) / entry
                exit_type = "Close"

        net_ret = gross_ret - (spread_cost + slippage_cost)
        rets.append(net_ret)
        trades.append({
            "idx": int(t),
            "side": "LONG" if side == 1 else "SHORT",
            "gross_ret": float(gross_ret),
            "net_ret": float(net_ret),
            "exit_type": exit_type,
        })

    rets = np.array(rets, dtype=float)
    trade_rets = rets[rets != 0.0]

    result = {
        "bars": int(len(rets)),
        "trades": int(len(trade_rets)),
        "net_pnl": float(rets.sum()),
        "avg_trade": float(trade_rets.mean()) if len(trade_rets) else 0.0,
        "hit_rate": float((trade_rets > 0).mean()) if len(trade_rets) else 0.0,
        "tp_share": float(sum(t["exit_type"] == "TP" for t in trades) / len(trades)) if trades else 0.0,
        "sl_share": float(sum(t["exit_type"].startswith("SL") for t in trades) / len(trades)) if trades else 0.0,
    }

    equity = rets.cumsum()
    return result, equity, pd.DataFrame(trades)


bt_result, bt_equity, bt_trades = run_backtest_sl_tp(
    df=df,
    idx=idx_test,
    p_direction=p_test_dir,
    p_quality=p_test_quality,
    dir_t=best["dir_t"],
    qual_t=best["qual_t"],
    sl_pct=0.0015,  # 0.15%
    tp_pct=0.0025,  # 0.25%
    spread_cost=cfg.spread_cost,
    slippage_cost=cfg.slippage_cost,
)

print("=== SL/TP Backtest (Test) ===")
print(bt_result)

plt.figure(figsize=(12, 4))
plt.plot(bt_equity)
plt.title("Equity Curve (SL/TP backtest)")
plt.xlabel("Test bars")
plt.ylabel("Cumulative return")
plt.grid(True, alpha=0.3)
plt.show()

bt_trades.head(10)
