## Pretratamiento

In [1]:
# Importamos las librerias 
import pandas as pd 
import numpy as np 
import datetime as dt
from importlib import reload


In [2]:
# --- Parámetros globales ---
TICK = 0.25
EU_IDX = (108, 185)
USA_IDX = (186, 264)

# --- Helpers sesión por índice ---
def add_session_tag_by_index(df, eu=EU_IDX, usa=USA_IDX, idx_col="idx_in_session"):
    out = df.copy()
    ii = out[idx_col].to_numpy()
    tag = np.full(len(out), "ASIA", dtype=object)
    tag[(ii>=eu[0]) & (ii<=eu[1])]  = "EU"
    tag[(ii>=usa[0]) & (ii<=usa[1])] = "USA"
    out["session_tag"] = tag
    return out

### Carga de datos

In [3]:
from ppz.io.mongo import ensure_indexes, read_klines
import ppz.features.zones as z; reload(z)
import ppz.features.vp as vp; reload(vp)

ensure_indexes()
df = read_klines(start="2021-01-01", end="2024-12-31", include_of=True).sort_values("Time").reset_index(drop=True)

# Índices intradía, ATR, PDH/PDL, IB USA (fijo por índice), VWAP±σ
df = z.add_session_indices(df)
df = z.compute_atr(df, period=14)
df = z.compute_pdh_pdl_prev(df)
df = z.compute_usa_ib(df, ib_bars=12, fixed_open_idx=USA_IDX[0])
df = z.compute_vwap_and_bands(df, sigma=1.0)

# VAH/POC/VAL prev (de perfil diario)
df = vp.compute_vah_poc_val_prev(df, tick_size=TICK)

# Tag de sesión por índice
df = add_session_tag_by_index(df)
df.shape, df.head(2)

((282944, 29),
                        Time     Open    High     Low    Close  Delta  Volume  \
 0 2021-01-04 00:00:00+01:00  3748.75  3755.5  3742.5  3746.00   -955    7355   
 1 2021-01-04 00:05:00+01:00  3746.25  3750.0  3745.5  3749.75    274    1746   
 
        MVC  NewSession  NewWeek  ...  USA_IBH  USA_IBL         VWAP  VWAP_std  \
 0  3748.75        True     True  ...   3760.0  3726.25  3748.000000  0.000000   
 1  3746.25       False    False  ...   3760.0  3726.25  3748.079936  1.835032   
 
       VWAP_p1s     VWAP_m1s  VAH_D1  POC_D1  VAL_D1  session_tag  
 0  3748.000000  3748.000000     NaN     NaN     NaN         ASIA  
 1  3749.914968  3746.244904     NaN     NaN     NaN         ASIA  
 
 [2 rows x 29 columns])

In [4]:
# Orden, duplicados, nulos, rango temporal
df = df.sort_values("Time").reset_index(drop=True)
print("Rango:", df["Time"].min(), "→", df["Time"].max())
print("Filas:", len(df), "Duplicados Time:", df["Time"].duplicated().sum())
print("Nulos por columna:\n", df.isna().sum())


Rango: 2021-01-04 00:00:00+01:00 → 2024-12-31 00:55:00+01:00
Filas: 282944 Duplicados Time: 0
Nulos por columna:
 Time                0
Open                0
High                0
Low                 0
Close               0
Delta               0
Volume              0
MVC                 0
NewSession          0
NewWeek             0
NewMonth            0
Bid                 0
Ask                 0
session_id          0
idx_in_session      0
ATR_14              1
PDH_prev          273
PDL_prev          273
usa_open_idx        0
USA_IBH           378
USA_IBL           378
VWAP                0
VWAP_std            0
VWAP_p1s            0
VWAP_m1s            0
VAH_D1            273
POC_D1            273
VAL_D1            273
session_tag         0
dtype: int64


In [5]:
from ppz.utils.schema import (
    ensure_df_base_schema,
    ensure_events_schema,
    ensure_events_labeled_schema,
    assert_has_columns,
)

df = ensure_df_base_schema(df, compute_vwap=True)
assert_has_columns(df, ["session_id","idx_in_session","VWAP","session_tag"], "df")

  if is_datetime64tz_dtype(s):


### Zonas y otros parametros

In [6]:
import ppz.features.zones as z
reload(z)

# 1) índices intradía y ATR
df = z.add_session_indices(df)                    # crea session_id, idx_in_session
df = z.compute_atr(df, period=14)

# 2) niveles diarios previos
df = z.compute_pdh_pdl_prev(df)

# 3) IB USA (5m: open fijo idx 186) y VWAP ±σ
df = z.compute_usa_ib(df, ib_bars=12, fixed_open_idx=186)
df = z.compute_vwap_and_bands(df, sigma=1.0)

# 4) sanity check de columnas necesarias
needed = ["PDH_prev","PDL_prev","USA_IBH","USA_IBL","VWAP","VWAP_p1s","VWAP_m1s"]
missing = [c for c in needed if c not in df.columns]
print("Missing:", missing)  # debería ser []


Missing: []


### Eventos

In [7]:
from ppz.features.zones import EventSpec, build_event_table

zones = [
    EventSpec("PDH_prev","PDH_prev"),
    EventSpec("PDL_prev","PDL_prev"),
    EventSpec("USA_IBH","USA_IBH"),
    EventSpec("USA_IBL","USA_IBL"),
    EventSpec("VWAP","VWAP"),
    EventSpec("VAH_D1","VAH_D1"),
    EventSpec("POC_D1","POC_D1"),
    EventSpec("VAL_D1","VAL_D1"),
]

zone_weights = {
    "PDH_prev":1.00, "PDL_prev":1.00,
    "USA_IBH":0.95, "USA_IBL":0.95,
    "POC_D1":0.92, "VAH_D1":0.90, "VAL_D1":0.90,
    "VWAP":0.85
}

events_df = build_event_table(
    df, zones, r_prox=6, cooldown=10, tick_size=TICK,
    mode="cross", dedupe="priority", zone_weights=zone_weights
)

# distancia a entero de ticks
events_df["dist_ticks"] = np.floor(events_df["dist_ticks"] + 0.5).astype("int16")
len(events_df), events_df.zone_type.value_counts().head()


(26122,
 zone_type
 VWAP       7605
 USA_IBL    3595
 USA_IBH    3434
 POC_D1     3389
 VAH_D1     2450
 Name: count, dtype: int64)

### Calculo POC / VAH / VAL por sesión usando Ask[]/Bid[] de cada vela. Luego asigna esos niveles a la sesión siguiente como *_D1.

In [8]:
from importlib import reload
import ppz.features.vp as vp
reload(vp)

tick = 0.25
df = vp.compute_vah_poc_val_prev(df, tick_size=tick)  # añade VAH_D1, POC_D1, VAL_D1
df[["session_id","VAH_D1","POC_D1","VAL_D1"]].head()


Unnamed: 0,session_id,VAH_D1,POC_D1,VAL_D1
0,1,,,
1,1,,,
2,1,,,
3,1,,,
4,1,,,


#### Los añadimos al detector de zonas

In [9]:
# Añadimos al detector de zonas
from ppz.features.zones import EventSpec, build_event_table

zones = [
    EventSpec("PDH_prev","PDH_prev"),
    EventSpec("PDL_prev","PDL_prev"),
    EventSpec("USA_IBH","USA_IBH"),
    EventSpec("USA_IBL","USA_IBL"),
    EventSpec("VWAP","VWAP"),
    # EventSpec("VWAP_p1s","VWAP_p1s"),   # opcional
    # EventSpec("VWAP_m1s","VWAP_m1s"),   # opcional
    EventSpec("VAH_D1","VAH_D1"),
    EventSpec("POC_D1","POC_D1"),
    EventSpec("VAL_D1","VAL_D1"),
]

zone_weights = {
    "PDH_prev": 1.00, "PDL_prev": 1.00,
    "USA_IBH": 0.95, "USA_IBL": 0.95,
    "VAH_D1": 0.90, "POC_D1": 0.92, "VAL_D1": 0.90,
    "VWAP": 0.85, "VWAP_p1s": 0.80, "VWAP_m1s": 0.80,
}
events_df = build_event_table(df, zones, r_prox=6, cooldown=10, tick_size=tick,
                              mode="cross", dedupe="priority", zone_weights=zone_weights)
len(events_df), events_df.zone_type.value_counts().head()


(26122,
 zone_type
 VWAP       7605
 USA_IBL    3595
 USA_IBH    3434
 POC_D1     3389
 VAH_D1     2450
 Name: count, dtype: int64)

### Etiquetamos los eventos segun: rebound, brealout, none (significa que en el rango de velas considerados no se puede decir ni que rechaza ni que rompe)

In [10]:
# --- 0) Asegura que tenemos events_df ALL (sin filtro USA) ---
# Si tu 'events_first_pass_2021_2024.parquet' ya estaba filtrado por USA,
# RECONSTRÚYELO antes de seguir (sin aplicar usa_mask).
# Si lo tienes en memoria como ALL, omite esto y usa el que ya creaste.
try:
    events_df_all = pd.read_parquet("data/processed/events/events_first_pass_2021_2024_ALL.parquet")
except FileNotFoundError:
    # Si no existe el ALL en disco, usamos el events_df actual (asumiendo sin filtro USA)
    events_df_all = events_df.copy()

print("Events (ALL) ->", len(events_df_all))

Events (ALL) -> 26122


In [11]:
# --- 1) Asegura columnas necesarias en df ---
need = {"session_id","idx_in_session","Open","High","Low","Close","VWAP","ATR_14"}
missing = need - set(df.columns)
if missing:
    from ppz.features import zones as z
    reload(z)
    if "session_id" not in df or "idx_in_session" not in df:
        df = z.add_session_indices(df)
    if "ATR_14" not in df:
        df = z.compute_atr(df, period=14)
    if "VWAP" not in df:
        df = z.compute_vwap_and_bands(df, sigma=1.0)
    missing2 = need - set(df.columns)
    assert not missing2, f"Faltan columnas incluso tras recomputar: {missing2}"

In [12]:
# --- 2) Labeling (ALL) ---
import ppz.labeling.events as lab; reload(lab)
from dataclasses import asdict
from pathlib import Path


params = lab.LabelParams(
    tick_size=TICK, H_horizon_bars=12,
    p_inval_ticks=6, X_rebound_ticks=10, Y_break_confirm_ticks=16,
    restrict_same_session=True
)

labels_df = lab.label_events(df, events_df, params)
events_labeled = lab.join_labels_with_events(events_df, labels_df)

# Añade session_tag a los eventos
events_labeled["session_tag"] = events_labeled["idx"].map(df["session_tag"])

print("Distribución de etiquetas (ALL):")
print(events_labeled["label"].value_counts(dropna=False))

# --- Guardar ALL / EU+USA / USA ---
Path("data/processed/events").mkdir(parents=True, exist_ok=True)
Path("data/interim").mkdir(parents=True, exist_ok=True)

events_labeled.to_parquet("data/processed/events/events_labeled_2021_2024_ALL.parquet",
                          engine="pyarrow", compression="zstd", index=False)

ev_euusa = events_labeled[events_labeled["session_tag"].isin(["EU","USA"])].reset_index(drop=True)
ev_usa   = events_labeled[events_labeled["session_tag"]=="USA"].reset_index(drop=True)

ev_euusa.to_parquet("data/processed/events/events_labeled_2021_2024_EUUSA.parquet",
                    engine="pyarrow", compression="zstd", index=False)
ev_usa.to_parquet("data/processed/events/events_labeled_2021_2024_USA.parquet",
                  engine="pyarrow", compression="zstd", index=False)

# Base sin Ask/Bid pesada (opcional)
df_light = df.drop(columns=[c for c in ["Ask","Bid"] if c in df.columns])
df_light.to_parquet("data/interim/ES_5m_2021_2024_light.parquet",
                    engine="pyarrow", compression="zstd", index=False)


Distribución de etiquetas (ALL):
label
rebound     14355
none         6402
breakout     5365
Name: count, dtype: int64


### Construir dataset supervisado X,y (ALL y EU+USA) y guardar

In [18]:
%load_ext autoreload
%autoreload 2

from importlib import reload
import ppz.features.mvc as mvc; reload(mvc)
import inspect; print(inspect.signature(mvc.compute_mvc_features))
# debe mostrar: (df, idx: Optional[Sequence[int]] = None, *, tick_size: float = 0.25)

import ppz.pipelines.build_dataset as bd; reload(bd)

X_all, y_all = bd.make_supervised_from_events(
    df, events_labeled,
    tick_size=TICK, n_short=20, n_long=60,
    L_prev_touches=60, r_touch_ticks=6,
    drop_none=False
)



(df: 'pd.DataFrame', idx: 'Optional[Sequence[int]]' = None, *, tick_size: 'float' = 0.25) -> 'pd.DataFrame'


  (np.nanmax(buy_ratio)  >= k_ext) if np.isfinite(np.nanmax(buy_ratio))  else False
  (np.nanmax(sell_ratio) >= k_ext) if np.isfinite(np.nanmax(sell_ratio)) else False


In [19]:
import ppz.pipelines.build_dataset as bd; reload(bd)

# ALL
X_all, y_all = bd.make_supervised_from_events(
    df, events_labeled,
    tick_size=TICK, n_short=20, n_long=60,
    L_prev_touches=60, r_touch_ticks=6,
    drop_none=False
)
# EU+USA
X_euusa, y_euusa = bd.make_supervised_from_events(
    df, ev_euusa,
    tick_size=TICK, n_short=20, n_long=60,
    L_prev_touches=60, r_touch_ticks=6,
    drop_none=False
)

# Tipos compactos
def compact(dfX):
    num = dfX.select_dtypes(include=[np.number]).columns
    dfX[num] = dfX[num].astype(np.float32, errors="ignore")
    for c in dfX.columns:
        if dfX[c].dtype==bool: dfX[c]=dfX[c].astype(np.int8)
    return dfX

X_all = compact(X_all); X_euusa = compact(X_euusa)

# Guardar (features + target juntos para reproducibilidad)
Xa = X_all.copy(); Xa["target"] = y_all.values
Xe = X_euusa.copy(); Xe["target"] = y_euusa.values

# crea la carpeta (y padres) si no existe
Path("data/processed/features").mkdir(parents=True, exist_ok=True)

Xa.to_parquet("data/processed/features/supervised_ALL.parquet",
              engine="pyarrow", compression="zstd", index=False)
Xe.to_parquet("data/processed/features/supervised_EUUSA.parquet",
              engine="pyarrow", compression="zstd", index=False)

X_all.shape, y_all.value_counts(), X_euusa.shape, y_euusa.value_counts()


  (np.nanmax(buy_ratio)  >= k_ext) if np.isfinite(np.nanmax(buy_ratio))  else False
  (np.nanmax(sell_ratio) >= k_ext) if np.isfinite(np.nanmax(sell_ratio)) else False
  (np.nanmax(buy_ratio)  >= k_ext) if np.isfinite(np.nanmax(buy_ratio))  else False
  (np.nanmax(sell_ratio) >= k_ext) if np.isfinite(np.nanmax(sell_ratio)) else False


((26122, 33),
 label
 rebound     14355
 none         6402
 breakout     5365
 Name: count, dtype: int64,
 (15854, 32),
 label
 rebound     8454
 breakout    3897
 none        3503
 Name: count, dtype: int64)

## Añadimos caracteriscas de Order Flow

In [25]:
# opcional: fija CWD al raíz del repo (si el notebook vive en ./notebooks/)
import os, sys
from pathlib import Path
repo_root = Path(__file__).resolve().parents[1] if "__file__" in globals() else Path.cwd().parents[0]
os.chdir(repo_root)
print("CWD fijado a:", os.getcwd())


CWD fijado a: c:\Users\jmbf2\OneDrive\Trading\Machine Learning\ZoneBasedPricePrediction


In [26]:
from importlib import reload
import ppz.pipelines.build_dataset as bd; reload(bd)

# Carga base
df = pd.read_parquet("data/interim/ES_5m_2021_2024.parquet")
events_labeled = pd.read_parquet("data/processed/events/events_labeled_2021_2024.parquet")

# Construye features extendidas (MVC + OF)
X_ext, y_ext = bd.make_supervised_from_events(
    df, events_labeled,
    tick_size=0.25, n_short=20, n_long=60,
    L_prev_touches=60, r_touch_ticks=6,
    drop_none=False,
    add_mvc=True,
    add_orderflow=True,
    of_k=3.0, of_k_ext=5.0,
)

# Tipos compactos
num = X_ext.select_dtypes(include=[np.number]).columns
X_ext[num] = X_ext[num].astype(np.float32)

# Guardar dataset extendido
outp = "data/processed/features/supervised_EUUSA_EXT.parquet"
Xe = X_ext.copy(); Xe["target"] = y_ext.values
Xe.to_parquet(outp, engine="pyarrow", compression="zstd", index=False)
outp, X_ext.shape, y_ext.value_counts()


  imb_buy_strength_avg=float(buy_strength[buy_strength>0].mean() if buy_cnt>0 else 0.0),
  ret = ret.dtype.type(ret / rcount)
  imb_sell_strength_avg=float(sell_strength[sell_strength>0].mean() if sell_cnt>0 else 0.0),


('data/processed/features/supervised_EUUSA_EXT.parquet',
 (8541, 31),
 label
 rebound     4173
 breakout    2286
 none        2082
 Name: count, dtype: int64)