# Promedios de ETA a próxima estación por línea y dirección

In [3]:
from pathlib import Path
import pandas as pd
import numpy as np

FEATS_DIR = Path("D:/2025/UVG/Tesis/repos/backend/features_ready_without_idle_rows")
files = sorted(FEATS_DIR.glob("*.parquet"))
assert files, "No hay .parquet"

# -------------------------
# Parámetros de robustez
# -------------------------
D_IN_M = 20.0      # umbral de llegada por distancia
T_IN_S = 20.0      # umbral alterno por ETA
V_SLOW_MPS = 2.5   # opcional: "llegada" con velocidad baja
USE_SLOW_VEL = False  # True para exigir velocidad baja

# -------------------------
# Utilidades
# -------------------------
def _runs_of_equal(values: pd.Series):
    v = values.astype("string").fillna("<NA>").values
    n = len(v)
    if n == 0:
        return []
    start = np.empty(n, dtype=bool); start[0] = True
    start[1:] = v[1:] != v[:-1]
    rid = np.cumsum(start) - 1
    out = []
    for r in np.unique(rid):
        idx = np.where(rid == r)[0]
        out.append((int(idx[0]), int(idx[-1]), v[idx[0]]))
    return out  # lista de (i_ini, i_fin, nombre_estacion_objetivo)

def _detect_arrival_idx(block: pd.DataFrame) -> int | None:
    # candidato: índice con mínima distancia
    i_min = int(block["dist_a_prox_m"].idxmin())
    row = block.loc[i_min]
    ok_dist = row.get("dist_a_prox_m", np.inf) <= D_IN_M
    ok_eta  = row.get("ETA_prox_est_s", np.inf) <= T_IN_S
    ok_vel  = (row.get("vel_mps", np.nan) <= V_SLOW_MPS) if USE_SLOW_VEL else True
    if (ok_dist or ok_eta) and ok_vel:
        return i_min
    # fallback: usar el último registro del bloque
    return int(block.index[-1])

def _first_move_after(df: pd.DataFrame, idx_after: int) -> int | None:
    # Buscar el primer índice > idx_after con "progreso"
    # Si no se tiene progress_event/is_no_progress, usa cambio de posición o vel_mps> threshold
    if "progress_event" in df.columns:
        s = df.loc[idx_after+1:, "progress_event"]
        got = s.index[s.values == 1]
        return int(got[0]) if len(got) else None
    elif "is_no_progress" in df.columns:
        s = df.loc[idx_after+1:, "is_no_progress"]
        got = s.index[s.values == 0]
        return int(got[0]) if len(got) else None
    else:
        # heurística: vel_mps > 0.8 m/s
        if "vel_mps" in df.columns:
            s = df.loc[idx_after+1:, "vel_mps"]
            got = s.index[s.values > 0.8]
            return int(got[0]) if len(got) else None
        return None

def _clip_outliers_iqr(x: pd.Series):
    q1, q3 = x.quantile([0.25, 0.75])
    iqr = q3 - q1
    lo, hi = q1 - 1.5*iqr, q3 + 1.5*iqr
    return x[(x >= lo) & (x <= hi)], lo, hi

# -------------------------
# Proceso principal
# -------------------------
all_rows = []

for f in files:
    print(f'Procesando {f.name}...')
    df = pd.read_parquet(f, columns=[
        "LINEA","DIR","Placa","trip_id","Fecha","vel_mps","dist_a_prox_m","ETA_proxima_est_s","proxima_est_teorica"
    ])
    
    if df.empty:
        print(f"Advertencia: archivo vacío {f.name}")
        continue

    # Normalizaciones mínimas
    if "Fecha" in df.columns:
        df["Fecha"] = pd.to_datetime(df["Fecha"])
    if "vel_mps" not in df.columns and "Velocidad (km/h)" in df.columns:
        df["vel_mps"] = df["Velocidad (km/h)"] / 3.6

    # Orden crítico
    df = df.sort_values(["LINEA","DIR","Placa","trip_id","Fecha"]).reset_index(drop=True)

    # Recorre por viaje
    for (linea, direc, placa, trip), g in df.groupby(["LINEA","DIR","Placa","trip_id"], sort=False, observed=False):
        if g.empty: 
            continue
        g = g.copy()

        # Construye runs por proxima_est_teorica
        runs = _runs_of_equal(g["proxima_est_teorica"])
        if len(runs) < 2:
            continue  # no hay siguiente estación

        # Detecta llegadas por bloque y arma pares consecutivos
        arrivals = []  # [(estacion, idx_arr, ts_arr)]
        for (i0, i1, est) in runs:
            blk = g.iloc[i0:i1+1]
            if est == "<NA>":
                continue
            idx_arr = _detect_arrival_idx(blk)
            ts_arr  = g.loc[idx_arr, "Fecha"]
            arrivals.append((str(est), idx_arr, ts_arr))

        # Pares (S_j -> S_{j+1})
        for j in range(len(arrivals)-1):
            est_j, idx_arr_j, t_arr_j = arrivals[j]
            est_k, idx_arr_k, t_arr_k = arrivals[j+1]

            if pd.isna(t_arr_j) or pd.isna(t_arr_k) or t_arr_k <= t_arr_j:
                continue

            # A→A
            aa_s = (t_arr_k - t_arr_j).total_seconds()

            # D→A (opcional)
            idx_dep = _first_move_after(g, idx_arr_j)
            da_s = None
            if idx_dep is not None:
                t_dep = g.loc[idx_dep, "Fecha"]
                if t_dep < t_arr_k:
                    da_s = (t_arr_k - t_dep).total_seconds()

            all_rows.append({
                "LINEA": linea,
                "DIR": direc,
                "estacion_actual": est_j,
                "siguiente_estacion": est_k,
                "trip_id": str(trip),
                "A2A_s": aa_s,
                "D2A_s": da_s,
                "archivo": f.name
            })

# Tabla cruda de observaciones por tramo
obs = pd.DataFrame(all_rows)
if obs.empty:
    raise SystemExit("No se generaron observaciones de tramos (revisar columnas y datos)")

def robust_agg(x: pd.Series, colname: str):
    x = x.dropna()
    n_total = len(x)
    if n_total == 0:
        return pd.Series({
            f"{colname}_n": 0,
            f"{colname}_p50": np.nan,
            f"{colname}_p10": np.nan,
            f"{colname}_p25": np.nan,
            f"{colname}_p75": np.nan,
            f"{colname}_p90": np.nan,
            f"{colname}_mean_trim10": np.nan,
            f"{colname}_std": np.nan,
            f"{colname}_discard_ratio": np.nan
        })
    x_f, lo, hi = _clip_outliers_iqr(x)
    discard_ratio = 1 - (len(x_f) / n_total)
    if len(x_f) == 0:
        x_f = x  # si filtró todo, usa sin filtrar pero lo sabrás por discard_ratio=1
    # media recortada 10%
    xf_sorted = x_f.sort_values().values
    k = int(0.10*len(xf_sorted))
    if len(xf_sorted) - 2*k > 0:
        mean_trim10 = xf_sorted[k:len(xf_sorted)-k].mean()
    else:
        mean_trim10 = x_f.mean()

    return pd.Series({
        f"{colname}_n": n_total,
        f"{colname}_p50": x_f.median(),
        f"{colname}_p10": x_f.quantile(0.10),
        f"{colname}_p25": x_f.quantile(0.25),
        f"{colname}_p75": x_f.quantile(0.75),
        f"{colname}_p90": x_f.quantile(0.90),
        f"{colname}_mean_trim10": mean_trim10,
        f"{colname}_std": x_f.std(ddof=1),
        f"{colname}_discard_ratio": discard_ratio
    })

# Agregación robusta por tramo
grp = ["LINEA","DIR","estacion_actual","siguiente_estacion"]
summary = (obs
    .groupby(grp, as_index=False)
    .apply(lambda g: pd.concat([robust_agg(g["A2A_s"], "A2A"),
                                robust_agg(g["D2A_s"], "D2A")], axis=0))
    .reset_index()
    .drop(columns=["level_0","level_1"], errors="ignore")
)

# ETA experimentado por el usuario (incluye dwell)
summary["ETA_usuario_s"] = summary["A2A_p50"]

# ETA operativo (sin dwell)
summary["ETA_operativo_s"] = summary["D2A_p50"]

# ETA sugerido = usuario, si no hay, operativo (casos raros)
summary["ETA_sugerido_s"] = np.where(
    summary["A2A_p50"].notna(), summary["A2A_p50"], summary["D2A_p50"]
)

# Ordenar por línea y dirección
summary = summary.sort_values(grp).reset_index(drop=True)

# Vista previa
summary.head(15)


Procesando u049_trips_with_next_station_features.parquet...
Procesando u050_trips_with_next_station_features.parquet...
Procesando u051_trips_with_next_station_features.parquet...
Procesando u052_trips_with_next_station_features.parquet...
Procesando u053_trips_with_next_station_features.parquet...
Procesando u055_trips_with_next_station_features.parquet...
Procesando u056_trips_with_next_station_features.parquet...
Procesando u057_trips_with_next_station_features.parquet...
Procesando u059_trips_with_next_station_features.parquet...
Procesando u060_trips_with_next_station_features.parquet...
Procesando u061_trips_with_next_station_features.parquet...
Procesando u062_trips_with_next_station_features.parquet...
Procesando u063_trips_with_next_station_features.parquet...
Procesando u064_trips_with_next_station_features.parquet...
Procesando u066_trips_with_next_station_features.parquet...
Procesando u067_trips_with_next_station_features.parquet...
Procesando u068_trips_with_next_station_

Unnamed: 0,index,LINEA,DIR,estacion_actual,siguiente_estacion,A2A_n,A2A_p50,A2A_p10,A2A_p25,A2A_p75,...,D2A_p10,D2A_p25,D2A_p75,D2A_p90,D2A_mean_trim10,D2A_std,D2A_discard_ratio,ETA_usuario_s,ETA_operativo_s,ETA_sugerido_s
0,0,Linea_1,CIRCULAR,BEATAS DE BELÉN,CENTRO CÍVICO,424.0,552.0,371.6,426.5,788.0,...,63.0,77.0,209.5,300.0,151.767857,86.660171,0.027778,552.0,140.0,552.0
1,1,Linea_1,CIRCULAR,BEATAS DE BELÉN,CORREOS,84.0,25.0,2.0,3.0,60.0,...,60.3,60.75,193.25,217.1,133.5,87.026816,0.2,25.0,120.5,25.0
2,2,Linea_1,CIRCULAR,BEATAS DE BELÉN,GÓMEZ CARRILLO,5.0,300.0,300.0,300.0,300.0,...,,,,,,,,300.0,,300.0
3,3,Linea_1,CIRCULAR,BEATAS DE BELÉN,MERCADO CENTRAL,21.0,883.0,401.3,571.25,1158.5,...,218.0,300.0,1010.0,1063.0,573.333333,381.918673,0.153846,883.0,418.0,883.0
4,4,Linea_1,CIRCULAR,BEATAS DE BELÉN,PARQUE CENTENARIO,23.0,428.0,300.0,300.0,503.0,...,45.2,92.0,300.0,445.8,244.8,205.85116,0.0,428.0,275.0,428.0
5,5,Linea_1,CIRCULAR,BEATAS DE BELÉN,PASEO DE LAS LETRAS,43081.0,193.0,120.0,180.0,243.0,...,60.0,60.0,180.0,240.0,124.810186,72.189201,0.024493,193.0,120.0,193.0
6,6,Linea_1,CIRCULAR,BEATAS DE BELÉN,SAN AGUSTÍN,14.0,347.0,93.8,105.0,440.0,...,5.0,26.0,265.25,453.0,200.833333,233.022245,0.142857,347.0,144.5,347.0
7,7,Linea_1,CIRCULAR,BEATAS DE BELÉN,SAN SEBASTIÁN,29.0,546.0,462.6,511.0,596.0,...,300.0,300.0,300.0,300.0,300.0,,0.0,546.0,300.0,546.0
8,8,Linea_1,CIRCULAR,CENTRO CÍVICO,BEATAS DE BELÉN,19.0,923.0,32.4,35.0,1304.0,...,300.0,300.0,300.0,300.0,300.0,,0.0,923.0,300.0,923.0
9,9,Linea_1,CIRCULAR,CENTRO CÍVICO,CORREOS,11.0,768.0,28.0,289.0,1204.5,...,4.2,4.5,5.5,5.8,5.0,1.414214,0.0,768.0,5.0,768.0


In [None]:
# Guardar resultados
OUT_DIR = Path("D:/2025/UVG/Tesis/repos/backend/models")
summary.to_csv(OUT_DIR / "eta_tramos_robusto.csv", index=False)