In [None]:
import warnings, math, re
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
def load_ohlc(symbol: str, start="2025-10-20", end=None) -> pd.DataFrame:
    """
    Trả về DataFrame: ['date','open','high','low','close','volume'] (tăng dần theo date)
    - Ưu tiên: vnstock3
    - Fallback: yfinance (mã VN -> 'VCB.VN'; chỉ số -> '^VNINDEX')
    """
    # 1) vnstock3
    try:
        from vnstock3 import Vnstock
        api = Vnstock()
        df = api.stock(symbol=symbol).historical_data(
            start=start, end=end, interval="1D", type="stock"
        )
        df = df.rename(columns={"time": "date"})
        df["date"] = pd.to_datetime(df["date"])
        df = df.sort_values("date")
        return df[["date","open","high","low","close","volume"]].copy()
    except Exception:
        pass

    # 2) yfinance
    try:
        import yfinance as yf
        if symbol.upper() in {"VNINDEX", "^VNINDEX"}:
            yf_symbol = "^VNINDEX"
        else:
            yf_symbol = f"{symbol}.VN"
        hist = yf.Ticker(yf_symbol).history(
            start=start, end=end, interval="1d", auto_adjust=False
        )
        if hist is None or len(hist) == 0:
            raise RuntimeError(f"Yahoo Finance không có dữ liệu cho {yf_symbol}")
        hist = hist.reset_index().rename(columns={
            "Date":"date", "Open":"open", "High":"high",
            "Low":"low", "Close":"close", "Volume":"volume"
        })
        hist["date"] = pd.to_datetime(hist["date"])
        hist["volume"] = hist["volume"].fillna(0)
        df = hist[["date","open","high","low","close","volume"]].sort_values("date").copy()
        df = df.dropna(subset=["close"])
        return df
    except Exception as e:
        raise RuntimeError(
            "Không tải được dữ liệu bằng vnstock3 hoặc yfinance. Kiểm tra mã/Internet."
        ) from e

In [None]:
import pandas as pd

def test_fetch_prices(symbol="VCB", start=None, end=None, min_len=250, max_years_back=10):
    """
    Tải dữ liệu giá cho symbol. Nếu start quá gần hiện tại và chuỗi ngắn,
    tự động lùi mốc start lùi dần 1,2,3,5,10 năm (tối đa max_years_back) cho đến khi đủ min_len.
    """
    # nếu người dùng không truyền start -> tự thử lùi dần
    tried = []
    if start is None:
        candidates_years = [1, 2, 3, 5, max_years_back]
        for yrs in candidates_years:
            s = (pd.Timestamp.today().normalize() - pd.DateOffset(years=yrs)).strftime("%Y-%m-%d")
            df = load_ohlc(symbol, start=s, end=end)
            tried.append((s, len(df)))
            if len(df) >= min_len:
                assert set(["date","open","high","low","close","volume"]).issubset(df.columns)
                print(f"[OK] Tải {symbol}: {len(df)} dòng, {df['date'].min().date()} → {df['date'].max().date()} (start={s})")
                return df
        # nếu vẫn không đủ
        raise AssertionError(f"Không đủ dữ liệu (>= {min_len} phiên). Các mốc đã thử: {tried}")
    else:
        # có start do người dùng chỉ định: nếu ngắn -> tự lùi thêm
        df = load_ohlc(symbol, start=start, end=end)
        if len(df) >= min_len:
            assert set(["date","open","high","low","close","volume"]).issubset(df.columns)
            print(f"[OK] Tải {symbol}: {len(df)} dòng, {df['date'].min().date()} → {df['date'].max().date()} (start={start})")
            return df
        # lùi thêm tối đa max_years_back năm
        base = pd.to_datetime(start)
        for yrs in [1, 2, 3, 5, max_years_back]:
            s = (base - pd.DateOffset(years=yrs)).strftime("%Y-%m-%d")
            df2 = load_ohlc(symbol, start=s, end=end)
            tried.append((s, len(df2)))
            if len(df2) >= min_len:
                assert set(["date","open","high","low","close","volume"]).issubset(df2.columns)
                print(f"[OK] Tải {symbol}: {len(df2)} dòng, {df2['date'].min().date()} → {df2['date'].max().date()} (start={s})")
                return df2
        raise AssertionError(f"Không đủ dữ liệu (>= {min_len} phiên) kể cả khi lùi từ {start}. Các mốc đã thử: {tried}")
    
df = test_fetch_prices(symbol="VCB", start=None, min_len=250)

In [None]:
def to_log_returns(price: pd.Series) -> pd.Series:
    """r_t = log(P_t / P_{t-1}). Điền phần tử đầu bằng mean để tránh NaN."""
    r = np.log(price / price.shift(1))
    if r.isna().any():
        if len(r) > 1:
            r.iloc[0] = r.iloc[1:].mean()
        else:
            r.iloc[0] = 0.0
    return r

def returns_to_price(last_price: float, returns_forecast: np.ndarray) -> pd.Series:
    """Chuyển chuỗi returns dự báo -> đường giá dự báo, tính từ last_price."""
    cum = np.cumsum(returns_forecast)
    return pd.Series(last_price * np.exp(cum))

In [None]:
def test_preprocess_to_returns(df: pd.DataFrame):
    """Test 2: chuyển giá -> returns và kiểm tra dừng cơ bản."""
    r = to_log_returns(df["close"])
    assert len(r) == len(df), "Returns không khớp số điểm."
    print(f"[OK] Tạo returns: mean={r.mean():.6f}, std={r.std():.6f}")
    return r

r = test_preprocess_to_returns(df)

In [None]:
# 3) Chọn mô hình ARIMA tự động
#    - Có pmdarima: dùng auto_arima(stepwise)
#    - Không có pmdarima: fallback SARIMAX (grid nhỏ theo AIC)
# =========================================================
try:
    from pmdarima import auto_arima as _pm_auto_arima
    HAVE_PMDA = True
except Exception:
    HAVE_PMDA = False
from statsmodels.tsa.statespace.sarimax import SARIMAX

class _ARIMAWrapper:
    """Chuẩn hoá API: có .order và .predict(n_periods=...)."""
    def __init__(self, fitted, order):
        self.fitted = fitted
        self.order = order
    def predict(self, n_periods: int) -> np.ndarray:
        fc = self.fitted.get_forecast(steps=n_periods)
        return fc.predicted_mean.values

def auto_arima_select(y: pd.Series, d: int = 0, max_p: int = 5, max_q: int = 5) -> _ARIMAWrapper:
    """Tự chọn (p,d,q). Ưu tiên pmdarima; nếu không có dùng grid SARIMAX theo AIC."""
    y = pd.Series(y).astype(float)
    if HAVE_PMDA:
        model = _pm_auto_arima(
            y, d=d, start_p=0, start_q=0, max_p=max_p, max_q=max_q,
            seasonal=False, stepwise=True,
            suppress_warnings=True, error_action="ignore", maxiter=200
        )
        # Bọc lại cho đồng nhất API
        class _PMWrap:
            def __init__(self, m): self.m = m; self.order = m.order
            def predict(self, n_periods): return self.m.predict(n_periods=n_periods)
        return _PMWrap(model)

    # Fallback: SARIMAX grid nhỏ
    best = {"aic": np.inf, "fit": None, "order": None}
    for p in range(max_p+1):
        for q in range(max_q+1):
            try:
                res = SARIMAX(
                    y, order=(p,d,q), seasonal_order=(0,0,0,0),
                    trend="n", enforce_stationarity=False, enforce_invertibility=False
                ).fit(disp=False)
                if res.aic < best["aic"]:
                    best = {"aic": res.aic, "fit": res, "order": (p,d,q)}
            except Exception:
                pass
    if best["fit"] is None:
        raise RuntimeError("Không fit được ARIMA với lưới (p,q). Hãy giảm max_p/max_q hoặc kiểm tra dữ liệu.")
    return _ARIMAWrapper(best["fit"], best["order"])

In [None]:
def train_arima_on_returns(r_train: pd.Series, d: int = 0, max_p: int = 5, max_q: int = 5):
    """Chọn và fit ARIMA trên chuỗi returns train."""
    model = auto_arima_select(r_train, d=d, max_p=max_p, max_q=max_q)
    return model  # có .order, .predict

def forecast_prices_from_returns(model, last_train_price: float, steps: int) -> pd.Series:
    """Dự báo returns bằng model -> chuyển về giá."""
    fc_r = model.predict(n_periods=steps)
    fc_p = returns_to_price(last_train_price, fc_r)
    return fc_p

def evaluate_forecast(y_true_price: pd.Series, y_pred_price: pd.Series) -> dict:
    """RMSE/MAE/MAPE + Directional Accuracy (đúng chiều)."""
    yt, yp = np.asarray(y_true_price), np.asarray(y_pred_price)
    rmse = math.sqrt(np.mean((yt - yp) ** 2))
    mae  = float(np.mean(np.abs(yt - yp)))
    mape = float(np.mean(np.abs((yt - yp) / np.clip(np.abs(yt), 1e-9, None))))
    # đúng chiều tăng/giảm
    true_diff = yt[1:] - yt[:-1]
    pred_diff = yp[1:] - yp[:-1]
    dir_acc = float((np.sign(true_diff) == np.sign(pred_diff)).mean()) if len(yt) > 1 else np.nan
    return {"RMSE": rmse, "MAE": mae, "MAPE": mape, "DIR_ACC": dir_acc}


In [None]:
def test_train_arima(r: pd.Series, train_ratio: float = 0.8, d: int = 0, max_p: int = 5, max_q: int = 5):
    """Test 3: tách train/test returns và train ARIMA."""
    n = len(r); n_train = max(200, int(n * train_ratio))
    r_train, r_test = r.iloc[:n_train], r.iloc[n_train:]
    model = train_arima_on_returns(r_train, d=d, max_p=max_p, max_q=max_q)
    print(f"[OK] Chọn ARIMA order={getattr(model, 'order', None)}, train_len={len(r_train)}, test_len={len(r_test)}")
    return model, r_train, r_test

model, r_train, r_test = test_train_arima(r, train_ratio=0.8, d=0, max_p=5, max_q=5)


In [None]:
def test_forecast_future_prices(df: pd.DataFrame, r_train: pd.Series, r_test: pd.Series, model):
    """Test 4: dự báo giá test từ returns model và đánh giá."""
    # Giá thật: chia train/test cùng index
    n_train = len(r_train)
    price_train = df["close"].iloc[:n_train]
    price_test  = df["close"].iloc[n_train:]

    last_train_price = float(price_train.iloc[-1])
    pred_price_test = forecast_prices_from_returns(model, last_train_price, steps=len(r_test))
    pred_price_test.index = price_test.index  # căn index cho đẹp (nếu cần)

    metrics = evaluate_forecast(price_test, pred_price_test)
    print(f"[OK] Đánh giá: RMSE={metrics['RMSE']:.4f}  MAE={metrics['MAE']:.4f}  "
          f"MAPE={metrics['MAPE']*100:.2f}%  DIR_ACC={metrics['DIR_ACC']*100:.1f}%")

    # Vẽ nhanh phần cuối chuỗi
    tail = 180
    plt.figure(figsize=(12,5))
    plt.plot(df["date"].iloc[-(len(price_test)+tail):], df["close"].iloc[-(len(price_test)+tail):], label="Giá thực tế")
    plt.plot(df["date"].iloc[n_train:], pred_price_test.values, linestyle="--", label="Dự báo (ARIMA)")
    plt.title("So sánh giá thực tế vs. dự báo (ARIMA trên returns)")
    plt.xlabel("Ngày"); plt.ylabel("Giá đóng cửa"); plt.legend(); plt.tight_layout(); plt.show()

    return pred_price_test, metrics

# Test 4: dự báo & đánh giá trên tập test
pred_price_test, metrics = test_forecast_future_prices(df, r_train, r_test, model)

In [None]:
def test_forecast_next_days(df: pd.DataFrame, model, future_days: int = 5):
    """Test 5: dự báo K phiên tới (không có giá thật để chấm điểm)."""
    last_price = float(df["close"].iloc[-1])
    pred_next = forecast_prices_from_returns(model, last_price, steps=future_days)
    # Tạo index là ngày làm việc kế tiếp
    future_idx = pd.bdate_range(start=df["date"].iloc[-1] + pd.Timedelta(days=1), periods=future_days)
    pred_next.index = future_idx
    print(f"[OK] Dự báo {future_days} phiên tới:")
    print(pred_next.round(2))
    return pred_next

# Test 5: dự báo K phiên tới
pred_price_test = test_forecast_next_days(df, model, future_days=5)

In [None]:
# -*- coding: utf-8 -*-
import warnings, math, numpy as np, pandas as pd, datetime as dt
warnings.filterwarnings("ignore")
import matplotlib.pyplot as plt
from statsmodels.tsa.statespace.sarimax import SARIMAX

# ===================== Thời gian & phiên =====================
ICT = dt.timezone(dt.timedelta(hours=7))  # Asia/Ho_Chi_Minh

def is_vn_trading_time(now: dt.datetime | None = None) -> bool:
    """HOSE: 09:00–11:30, 13:00–15:00 (Mon–Fri, ICT)."""
    now = now.astimezone(ICT) if now else dt.datetime.now(ICT)
    if now.weekday() >= 5:
        return False
    t = now.time()
    return (dt.time(9,0) <= t < dt.time(11,30)) or (dt.time(13,0) <= t < dt.time(15,0))

# ===================== Load dữ liệu =====================
def load_daily_ohlc(symbol: str, start="2023-01-01", end=None) -> pd.DataFrame:
    """Daily OHLC: ưu tiên vnstock3; fallback yfinance (VCB->VCB.VN; VNINDEX->^VNINDEX)."""
    # vnstock3
    try:
        from vnstock3 import Vnstock
        api = Vnstock()
        df = api.stock(symbol=symbol).historical_data(
            start=start, end=end, interval="1D", type="stock"
        ).rename(columns={"time":"date"})
        df["date"] = pd.to_datetime(df["date"])
        df = df.sort_values("date")
        return df[["date","open","high","low","close","volume"]].dropna(subset=["close"]).copy()
    except Exception:
        pass
    # yfinance
    import yfinance as yf
    yf_symbol = "^VNINDEX" if symbol.upper() in {"VNINDEX","^VNINDEX"} else f"{symbol}.VN"
    hist = yf.Ticker(yf_symbol).history(start=start, end=end, interval="1d", auto_adjust=False)
    if hist is None or len(hist)==0:
        raise RuntimeError(f"Không có daily OHLC cho {symbol}")
    hist = hist.reset_index().rename(columns={"Date":"date","Open":"open","High":"high","Low":"low","Close":"close","Volume":"volume"})
    hist["date"] = pd.to_datetime(hist["date"])
    hist["volume"] = hist["volume"].fillna(0)
    return hist[["date","open","high","low","close","volume"]].sort_values("date").dropna(subset=["close"]).copy()

def load_intraday(symbol: str, interval="5m", period="1d") -> pd.DataFrame:
    """
    Intraday OHLCV từ yfinance. Trả về ['dt','open','high','low','close','volume'] trong ngày hiện tại (ICT).
    Có thể không có với .VN -> caller nên fallback daily.
    """
    import yfinance as yf
    yf_symbol = "^VNINDEX" if symbol.upper() in {"VNINDEX","^VNINDEX"} else f"{symbol}.VN"
    hist = yf.Ticker(yf_symbol).history(period=period, interval=interval, auto_adjust=False)
    if hist is None or len(hist)==0:
        raise RuntimeError(f"Không có intraday cho {symbol} ({yf_symbol})")
    hist = hist.reset_index().rename(columns={"Datetime":"dt","Open":"open","High":"high","Low":"low","Close":"close","Volume":"volume"})
    hist["dt"] = pd.to_datetime(hist["dt"])
    # yfinance intraday thường là UTC → chuyển về ICT nếu có tz info, nếu không thì localize
    if hist["dt"].dt.tz is not None:
        hist["dt"] = hist["dt"].dt.tz_convert(ICT)
    else:
        hist["dt"] = hist["dt"].dt.tz_localize(ICT)
    today = dt.datetime.now(ICT).date()
    hist = hist[hist["dt"].dt.date == today]
    if len(hist) == 0:
        raise RuntimeError("Intraday không có dữ liệu của hôm nay.")
    return hist[["dt","open","high","low","close","volume"]].sort_values("dt").dropna(subset=["close"]).copy()

# ===================== Returns helpers =====================
def to_log_returns(series: pd.Series) -> pd.Series:
    r = np.log(series/series.shift(1))
    if r.isna().any():
        if len(r) > 1:
            r.iloc[0] = r.iloc[1:].mean()
        else:
            r.iloc[0] = 0.0
    return r

def returns_to_price(last_price: float, r_fore: np.ndarray) -> pd.Series:
    return pd.Series(last_price * np.exp(np.cumsum(r_fore)))

# ===================== ARIMA chọn theo AIC (thử trend) =====================
def arima_select_fit(y: pd.Series, d=0, max_p=5, max_q=5, trends=("n","c")):
    """
    Chọn (p,d,q) + trend theo AIC. Dùng SARIMAX non-seasonal.
    Trả về (fitted_model, order, trend).
    """
    best = {"aic": np.inf, "fit": None, "order": None, "trend": None}
    y = pd.Series(y).astype(float)
    for tr in trends:
        for p in range(max_p+1):
            for q in range(max_q+1):
                try:
                    res = SARIMAX(y, order=(p,d,q),
                                  seasonal_order=(0,0,0,0),
                                  trend=tr,
                                  enforce_stationarity=False,
                                  enforce_invertibility=False).fit(disp=False)
                    if res.aic < best["aic"]:
                        best = {"aic": res.aic, "fit": res, "order": (p,d,q), "trend": tr}
                except Exception:
                    pass
    if best["fit"] is None:
        raise RuntimeError("ARIMA grid không hội tụ — thử giảm max_p/max_q hoặc kiểm tra dữ liệu.")
    return best["fit"], best["order"], best["trend"]

def arima_select_best_of_returns_or_logprice(close: pd.Series, max_p=5, max_q=5):
    """
    So sánh 2 ứng viên:
      - returns (d=0, trend in {'n','c'})
      - log-price (d=1, trend in {'n','c'})
    Chọn mô hình có AIC thấp nhất. Trả về (kind, fit, order, trend, aic).
    """
    cand = []
    # (A) returns
    r = to_log_returns(close)
    fit_r, ord_r, tr_r = arima_select_fit(r, d=0, max_p=max_p, max_q=max_q, trends=("n","c"))
    cand.append(("returns", fit_r, ord_r, tr_r, fit_r.aic))
    # (B) log-price
    lp = np.log(close)
    fit_l, ord_l, tr_l = arima_select_fit(lp, d=1, max_p=max_p, max_q=max_q, trends=("n","c"))
    cand.append(("logprice", fit_l, ord_l, tr_l, fit_l.aic))
    cand.sort(key=lambda x: x[4])
    return cand[0]

# ===================== Trong phiên =====================
def predict_in_session(symbol: str, interval="5m"):
    """
    Lấy intraday hôm nay, fit ARIMA trên returns, dự báo đến hết phiên hiện tại (11:30 hoặc 15:00).
    """
    df = load_intraday(symbol, interval=interval, period="1d")
    close = df["close"]
    r = to_log_returns(close)

    # số bước còn lại
    last_dt = df["dt"].iloc[-1]
    now_t = last_dt.timetz()
    session_end = dt.time(11,30, tzinfo=ICT) if now_t < dt.time(11,30, tzinfo=ICT) else dt.time(15,0, tzinfo=ICT)
    step_min = int(interval.replace("m",""))
    rem_minutes = ((dt.datetime.combine(last_dt.date(), session_end) -
                    dt.datetime.combine(last_dt.date(), now_t)).seconds) // 60
    steps = max(1, rem_minutes // step_min)

    # ARIMA trên returns (intraday dùng grid nhỏ cho nhanh)
    fit, order, trend = arima_select_fit(r, d=0, max_p=3, max_q=3, trends=("n","c"))
    r_fore = fit.get_forecast(steps=steps).predicted_mean.values
    y_pred = returns_to_price(float(close.iloc[-1]), r_fore)

    future_times = pd.date_range(last_dt + pd.Timedelta(minutes=step_min),
                                 periods=steps, freq=f"{step_min}min", tz=ICT)
    y_pred.index = future_times
    direction = np.sign(y_pred.values - np.r_[close.iloc[-1], y_pred.values[:-1]])
    direction = pd.Series(direction, index=y_pred.index).map({-1:"↓", 0:"=", 1:"↑"})

    info = {"order": order, "trend": trend, "last_price": float(close.iloc[-1]), "pred_until": future_times[-1]}
    print(f"[Trong phiên] {symbol} | ARIMA{order} trend={trend} | last={info['last_price']:.2f} | đến {info['pred_until']}")
    return df, y_pred, direction, info

# ===================== Ngoài giờ =====================
def predict_out_of_session(symbol: str, lookback_days=180):
    """
    Daily ~6 tháng gần nhất (tail 120). Chọn tốt nhất giữa returns(d=0) và log-price(d=1),
    có trend 'n'/'c'. Dự báo 1 phiên kế tiếp.
    """
    start = (pd.Timestamp.now(ICT) - pd.Timedelta(days=lookback_days)).strftime("%Y-%m-%d")
    df = load_daily_ohlc(symbol, start=start)
    df = df.tail(120)  # ~6 tháng
    close = df["close"]

    kind, fit, order, trend, _aic = arima_select_best_of_returns_or_logprice(close, max_p=5, max_q=5)

    if kind == "returns":
        r_fore = fit.get_forecast(steps=1).predicted_mean.values
        next_price = returns_to_price(float(close.iloc[-1]), r_fore)
    else:
        lp_fore = fit.get_forecast(steps=1).predicted_mean.values
        next_price = pd.Series(np.exp(lp_fore))

    next_bd = pd.bdate_range(start=df["date"].iloc[-1] + pd.Timedelta(days=1), periods=1)
    next_price.index = next_bd
    direction = "↑" if next_price.iloc[0] > close.iloc[-1] else ("↓" if next_price.iloc[0] < close.iloc[-1] else "=")

    info = {"mode": kind, "order": order, "trend": trend, "last_close": float(close.iloc[-1]), "last_close_date": df["date"].iloc[-1].date()}
    print(f"[Ngoài giờ] {symbol} | {kind.upper()} ARIMA{order} trend={trend} | last={info['last_close']:.2f} ({info['last_close_date']})")
    return df, next_price, direction, info

# ===================== Bộ điều phối =====================
def smart_predict(symbol: str, interval="5m", lookback_days=180):
    """
    Trong phiên → intraday; ngoài giờ → daily. Nếu intraday lỗi → fallback daily.
    """
    if is_vn_trading_time():
        try:
            return {"mode":"intraday", **dict(zip(["df","pred","direction","info"], predict_in_session(symbol, interval=interval)))}
        except Exception as e:
            print(f"[WARN] Intraday lỗi ({e}). Fallback sang daily.")
    return {"mode":"daily", **dict(zip(["df","pred","direction","info"], predict_out_of_session(symbol, lookback_days=lookback_days)))}

# ===================== Ví dụ dùng =====================
# Gọi tự động theo thời điểm:
result = smart_predict("VCB", interval="5m", lookback_days=180)
if result["mode"] == "intraday":
    display(result["pred"].round(2).to_frame("pred_price"))
    display(result["direction"].to_frame("dir"))
else:
    print("Dự báo phiên kế tiếp:")
    print(result["pred"].round(2))


In [None]:
# -*- coding: utf-8 -*-
import warnings, numpy as np, pandas as pd, datetime as dt
warnings.filterwarnings("ignore")
from statsmodels.tsa.statespace.sarimax import SARIMAX

# ===================== Thời gian & phiên =====================
ICT = dt.timezone(dt.timedelta(hours=7))   # Asia/Ho_Chi_Minh
VN_HOLIDAYS = set()  # có thể thêm "2025-04-30","2025-05-01","2025-09-02",...

def is_vn_holiday(d: dt.date) -> bool:
    return d.strftime("%Y-%m-%d") in VN_HOLIDAYS

def is_vn_trading_time(now: dt.datetime | None = None) -> bool:
    """HOSE: 09:00–11:30, 13:00–15:00 (Mon–Fri, ICT)"""
    now = now.astimezone(ICT) if now else dt.datetime.now(ICT)
    if now.weekday() >= 5 or is_vn_holiday(now.date()):
        return False
    t = now.time()
    return (dt.time(9,0) <= t < dt.time(11,30)) or (dt.time(13,0) <= t < dt.time(15,0))

def next_trading_day(d: dt.date | pd.Timestamp) -> dt.date:
    if isinstance(d, pd.Timestamp):
        d = d.date()
    nxt = d + dt.timedelta(days=1)
    while nxt.weekday() >= 5 or is_vn_holiday(nxt):
        nxt += dt.timedelta(days=1)
    return nxt

# ===================== vnstock loaders =====================
def load_daily_ohlc_vnstock(symbol: str, start="2024-01-01", end=None, source="VCI") -> pd.DataFrame:
    """Daily OHLC trực tiếp từ vnstock → ['date','open','high','low','close','volume']"""
    from vnstock import Vnstock
    stock = Vnstock().stock(symbol=symbol, source=source)
    df = stock.quote.history(start=start, end=end, interval='1D')
    if df is None or len(df) == 0:
        raise RuntimeError(f"vnstock.history rỗng cho {symbol}")
    df = df.rename(columns={"time":"date"}).dropna(subset=["close"]).copy()
    df["date"] = pd.to_datetime(df["date"])
    return df.sort_values("date")[["date","open","high","low","close","volume"]]

def _vnstock_intraday_once(symbol: str, page_size=20000, source="VCI", show_log=False) -> pd.DataFrame:
    """
    Lấy intraday TICKS của NGÀY GẦN NHẤT (không trading_date) → ['dt','price','volume','date'] (tz=ICT)
    """
    from vnstock import Vnstock
    stock = Vnstock().stock(symbol=symbol, source=source)
    df = stock.quote.intraday(symbol=symbol, page_size=page_size, show_log=show_log)
    if df is None or len(df) == 0:
        raise RuntimeError("vnstock intraday trả về rỗng.")

    time_col = "time" if "time" in df.columns else ("time_report" if "time_report" in df.columns else None)
    price_col = "price" if "price" in df.columns else ("close" if "close" in df.columns else None)
    volume_col = "volume" if "volume" in df.columns else ("match_volume" if "match_volume" in df.columns else None)
    if not time_col or not price_col:
        raise RuntimeError("Không nhận diện được cột thời gian/giá từ intraday tick.")

    sub = df.rename(columns={time_col:"dt", price_col:"price"}).copy()
    sub["volume"] = df[volume_col] if volume_col and volume_col in df.columns else 0
    sub["dt"] = pd.to_datetime(sub["dt"], errors="coerce")
    sub = sub.dropna(subset=["dt","price"]).copy()
    if sub["dt"].dt.tz is None:
        sub["dt"] = sub["dt"].dt.tz_localize(ICT)
    else:
        sub["dt"] = sub["dt"].dt.tz_convert(ICT)
    sub["date"] = sub["dt"].dt.date
    return sub.sort_values("dt")[["dt","price","volume","date"]]

def resample_to_5m_ohlcv(tick_df: pd.DataFrame) -> pd.DataFrame:
    """Ticks → 5 phút OHLCV  → ['dt','open','high','low','close','volume']"""
    tick = tick_df.set_index("dt")
    ohlc = tick["price"].resample("5min").ohlc()
    vol  = tick["volume"].resample("5min").sum()
    df5  = pd.concat([ohlc, vol], axis=1).dropna(subset=["close"]).reset_index()
    return df5[["dt","open","high","low","close","volume"]]

# ===================== Returns & ARIMA helpers =====================
def to_log_returns(series: pd.Series) -> pd.Series:
    r = np.log(series/series.shift(1))
    if r.isna().any():
        r.iloc[0] = r.iloc[1:].mean() if len(r) > 1 else 0.0
    return r

def returns_to_price(last_price: float, r_fore: np.ndarray) -> pd.Series:
    return pd.Series(last_price * np.exp(np.cumsum(r_fore)))

def session_returns_from_intraday(intra_5m: pd.DataFrame) -> pd.DataFrame:
    """
    Từ intraday 5m → theo ngày: ['date','P_0900','P_1130','P_1300','P_1500','AM_return','PM_return']
    """
    df = intra_5m.copy()
    df["date"] = df["dt"].dt.date
    out = []
    for d, g in df.groupby("date"):
        g = g.sort_values("dt")
        p_0900 = g.loc[g["dt"].dt.time >= dt.time(9,0),  "close"].head(1)
        p_1130 = g.loc[g["dt"].dt.time <= dt.time(11,30), "close"].tail(1)
        p_1300 = g.loc[g["dt"].dt.time >= dt.time(13,0), "close"].head(1)
        p_1500 = g.loc[g["dt"].dt.time <= dt.time(15,0), "close"].tail(1)
        if len(p_0900)==0 or len(p_1130)==0 or len(p_1300)==0 or len(p_1500)==0:
            continue
        p0900, p1130, p1300, p1500 = map(float,[p_0900.iloc[0],p_1130.iloc[0],p_1300.iloc[0],p_1500.iloc[0]])
        out.append({
            "date": pd.to_datetime(d),
            "P_0900": p0900, "P_1130": p1130, "P_1300": p1300, "P_1500": p1500,
            "AM_return": np.log(p1130/p0900),
            "PM_return": np.log(p1500/p1300),
        })
    if not out:
        raise RuntimeError("Không trích được AM/PM (thiếu mốc 09:00/11:30/13:00/15:00).")
    return pd.DataFrame(out).sort_values("date").reset_index(drop=True)

def arima_select_fit(y: pd.Series, d=0, max_p=5, max_q=5, trends=("n","c")):
    """Grid (p,d,q,trend) theo AIC, non-seasonal. Trả (fit, order, trend)."""
    best = {"aic": np.inf, "fit": None, "order": None, "trend": None}
    y = pd.Series(y).astype(float)
    for tr in trends:
        for p in range(max_p+1):
            for q in range(max_q+1):
                try:
                    res = SARIMAX(y, order=(p,d,q),
                                  seasonal_order=(0,0,0,0),
                                  trend=tr,
                                  enforce_stationarity=False,
                                  enforce_invertibility=False).fit(disp=False)
                    if res.aic < best["aic"]:
                        best = {"aic": res.aic, "fit": res, "order": (p,d,q), "trend": tr}
                except Exception:
                    pass
    if best["fit"] is None:
        raise RuntimeError("ARIMA grid không hội tụ — kiểm tra dữ liệu hoặc giảm max_p/max_q.")
    return best["fit"], best["order"], best["trend"]

# ===================== Dự báo TRONG phiên =====================
def predict_in_session(symbol: str, source="VCI"):
    """Dùng intraday hôm nay (5m) → dự báo đến hết phiên hiện tại."""
    tick = _vnstock_intraday_once(symbol, page_size=20000, source=source, show_log=False)
    intra = resample_to_5m_ohlcv(tick)
    close = intra["close"]; r = to_log_returns(close)

    last_dt = intra["dt"].iloc[-1]
    now_t = last_dt.timetz()
    session_end = dt.time(11,30, tzinfo=ICT) if now_t < dt.time(11,30, tzinfo=ICT) else dt.time(15,0, tzinfo=ICT)
    step_min = 5
    rem_minutes = ((dt.datetime.combine(last_dt.date(), session_end) -
                    dt.datetime.combine(last_dt.date(), now_t)).seconds) // 60
    steps = max(1, rem_minutes // step_min)

    fit, order, trend = arima_select_fit(r, d=0, max_p=3, max_q=3, trends=("n","c"))
    r_fore = fit.get_forecast(steps=steps).predicted_mean.values
    y_pred = returns_to_price(float(close.iloc[-1]), r_fore)

    future_times = pd.date_range(last_dt + pd.Timedelta(minutes=step_min),
                                 periods=steps, freq=f"{step_min}min", tz=ICT)
    y_pred.index = future_times
    direction = np.sign(y_pred.values - np.r_[close.iloc[-1], y_pred.values[:-1]])
    direction = pd.Series(direction, index=y_pred.index).map({-1:"↓", 0:"=", 1:"↑"})

    info = {"order": order, "trend": trend, "last_price": float(close.iloc[-1]), "pred_until": future_times[-1]}
    print(f"[Trong phiên] {symbol} | ARIMA{order} trend={trend} | last={info['last_price']:.2f} | đến {info['pred_until']}")
    return intra, y_pred, direction, info

# ===================== AM/PM (ngoài giờ) – KHÔNG CACHE =====================
def forecast_tomorrow_sessions_no_cache(symbol: str,
                                        source: str = "VCI",
                                        max_p: int = 5, max_q: int = 5,
                                        alpha: float = 0.10):
    """
    Dùng intraday NGÀY GẦN NHẤT (thường chỉ 1 ngày) → tính AM/PM returns theo ngày.
    Nếu số ngày < 10 → fallback trung bình (mean ± CI); nếu chỉ 1 ngày → CI mặc định ±0.5%.
    """
    intra5 = resample_to_5m_ohlcv(_vnstock_intraday_once(symbol, page_size=20000, source=source, show_log=False))
    sess = session_returns_from_intraday(intra5)
    n = len(sess)
    if n < 10:
        print(f"[FALLBACK] Lịch sử AM/PM chỉ có {n} ngày → dùng trung bình thay ARIMA.")
        def stat_block(col: str):
            mean = float(sess[col].mean())
            std  = float(sess[col].std(ddof=1)) if n >= 2 else 0.0
            z = 1.645
            lo, hi = (mean - z*std, mean + z*std) if (n >= 2 and std > 0) else (mean - 0.005, mean + 0.005)
            direction = "↑" if mean > 0 else ("↓" if mean < 0 else "=")
            return {"direction": direction, "ret_pred": mean, "ret_ci": [lo, hi],
                    "order": ("mean","-","-"), "trend": "n"}
        AM = stat_block("AM_return"); PM = stat_block("PM_return")
    else:
        am_fit, am_order, am_trend = arima_select_fit(sess["AM_return"], d=0, max_p=max_p, max_q=max_q, trends=("n","c"))
        pm_fit, pm_order, pm_trend = arima_select_fit(sess["PM_return"], d=0, max_p=max_p, max_q=max_q, trends=("n","c"))
        am_fc = am_fit.get_forecast(steps=1); pm_fc = pm_fit.get_forecast(steps=1)
        am_mean = float(am_fc.predicted_mean.values[0]); am_ci = am_fc.conf_int(alpha=alpha).values[0].tolist()
        pm_mean = float(pm_fc.predicted_mean.values[0]); pm_ci = pm_fc.conf_int(alpha=alpha).values[0].tolist()
        AM = {"direction": "↑" if am_mean > 0 else ("↓" if am_mean < 0 else "="),
              "ret_pred": am_mean, "ret_ci": am_ci, "order": am_order, "trend": am_trend}
        PM = {"direction": "↑" if pm_mean > 0 else ("↓" if pm_mean < 0 else "="),
              "ret_pred": pm_mean, "ret_ci": pm_ci, "order": pm_order, "trend": pm_trend}

    last_date = sess["date"].iloc[-1].date()
    target_day = next_trading_day(last_date)
    return {"target_day": target_day, "history": sess, "AM": AM, "PM": PM}

# ===================== GAP (OPEN vs CLOSE hôm qua) & quy đổi sang GIÁ =====================
def price_from_ret(open_price: float, ret_mean: float, ret_ci: list[float]) -> dict:
    mean_px = open_price * np.exp(ret_mean)
    lo_px   = open_price * np.exp(ret_ci[0])
    hi_px   = open_price * np.exp(ret_ci[1])
    return {"px_mean": float(mean_px), "px_lo": float(lo_px), "px_hi": float(hi_px)}

def forecast_gap_open_vs_prev_close(symbol: str, lookback_days=240, source="VCI"):
    """
    Dự báo log-return GAP (OPEN mai vs CLOSE hôm qua) bằng daily ARIMA trên CLOSE (xấp xỉ).
    """
    start = (pd.Timestamp.now(ICT) - pd.Timedelta(days=lookback_days)).strftime("%Y-%m-%d")
    df = load_daily_ohlc_vnstock(symbol, start=start, source=source).dropna(subset=["close"]).copy()
    close = df["close"]
    r = np.log(close/close.shift(1)).dropna()
    fit, order, trend = arima_select_fit(pd.Series(r), d=0, max_p=3, max_q=3, trends=("n","c"))
    fc = fit.get_forecast(steps=1)
    ret_mean = float(fc.predicted_mean.values[0])
    ci = fc.conf_int(alpha=0.10).values[0].tolist()  # CI 90%
    return {"gap_ret_mean": ret_mean, "gap_ret_ci": ci, "order": order, "trend": trend, "last_close": float(close.iloc[-1])}

def apply_gap_then_sessions(gap_fc: dict, ampm_fc: dict):
    """Tạo dải GIÁ dự kiến: OPEN 09:00 -> 11:30 -> 15:00."""
    last_close = gap_fc["last_close"]
    open_am = price_from_ret(last_close, gap_fc["gap_ret_mean"], gap_fc["gap_ret_ci"])
    am_px   = price_from_ret(open_am["px_mean"], ampm_fc["AM"]["ret_pred"], ampm_fc["AM"]["ret_ci"])
    pm_px   = price_from_ret(am_px["px_mean"], ampm_fc["PM"]["ret_pred"], ampm_fc["PM"]["ret_ci"])
    return {"OPEN_am": open_am, "AM_px": am_px, "PM_px": pm_px}

# ===================== Hàm gộp: DỰ BÁO NGÀY MAI (không cache) =====================
def predict_tomorrow_full(symbol: str, source: str = "VCI", alpha: float = 0.10):
    """
    Dự báo NGÀY MAI: GAP (OPEN 09:00), AM (09:00→11:30), PM (13:00→15:00) và quy đổi ra band GIÁ.
    Không lưu CSV; chỉ gọi vnstock trực tiếp.
    """
    # AM/PM từ intraday ngày gần nhất
    ampm = forecast_tomorrow_sessions_no_cache(symbol, source=source, max_p=5, max_q=5, alpha=alpha)
    target_day = ampm["target_day"]; AM, PM = ampm["AM"], ampm["PM"]

    # GAP từ daily
    try:
        gap = forecast_gap_open_vs_prev_close(symbol, lookback_days=240, source=source)
        gap_dir = "↑" if gap["gap_ret_mean"] > 0 else ("↓" if gap["gap_ret_mean"] < 0 else "=")
    except Exception as e:
        # nếu daily lỗi, dùng last close và CI ±0.5%
        df_daily = load_daily_ohlc_vnstock(symbol, start=(pd.Timestamp.now(ICT)-pd.Timedelta(days=30)).strftime("%Y-%m-%d"), source=source)
        last_close = float(df_daily["close"].iloc[-1])
        gap = {"gap_ret_mean": 0.0, "gap_ret_ci": [-0.005, 0.005], "last_close": last_close}
        gap_dir = "="
        print(f"[WARN] GAP ARIMA lỗi ({e}). Fallback OPEN≈last_close ±0.5%.")

    bands = apply_gap_then_sessions(gap, ampm)

    def pct(x): return f"{x*100:.2f}%"
    print(f"=== {symbol} | Dự báo NGÀY MAI {target_day} ===")
    print(f"GAP (OPEN 09:00 vs CLOSE hôm qua): {gap_dir} | ret≈{pct(gap['gap_ret_mean'])} | CI90% ~ [{pct(gap['gap_ret_ci'][0])}, {pct(gap['gap_ret_ci'][1])}]")
    print(f"  → OPEN 09:00 ~ {bands['OPEN_am']['px_mean']:.0f}  (CI90% {bands['OPEN_am']['px_lo']:.0f}–{bands['OPEN_am']['px_hi']:.0f})")

    print(f"AM (09:00→11:30): {AM['direction']} | ret≈{pct(AM['ret_pred'])} | CI90% ~ [{pct(AM['ret_ci'][0])}, {pct(AM['ret_ci'][1])}]")
    print(f"  → 11:30 ~ {bands['AM_px']['px_mean']:.0f}  (CI90% {bands['AM_px']['px_lo']:.0f}–{bands['AM_px']['px_hi']:.0f})")

    print(f"PM (13:00→15:00): {PM['direction']} | ret≈{pct(PM['ret_pred'])} | CI90% ~ [{pct(PM['ret_ci'][0])}, {pct(PM['ret_ci'][1])}]")
    print(f"  → 15:00 ~ {bands['PM_px']['px_mean']:.0f}  (CI90% {bands['PM_px']['px_lo']:.0f}–{bands['PM_px']['px_hi']:.0f})")

    return {"target_day": target_day, "gap": gap, "ampm": ampm, "bands": bands}

# ===================== Ví dụ dùng =====================
# Ngoài giờ (khuyến nghị): 
out = predict_tomorrow_full("VCB", source="VCI", alpha=0.10)

# Trong giờ:
intra, pred, direction, info = predict_in_session("VCB", source="VCI")
display(pred.round(2).to_frame("pred_price")); display(direction.to_frame("dir"))


In [None]:
# -*- coding: utf-8 -*-
import json, math, time, datetime as dt
import numpy as np
import pandas as pd
from typing import Optional, Literal

# ============== Thời gian ==============
ICT = dt.timezone(dt.timedelta(hours=7))

# ============== vnstock loaders ==============
def _vnstock_intraday_once(symbol: str, page_size=20000, source="VCI", show_log=False) -> pd.DataFrame:
    """
    Lấy intraday ticks NGÀY GẦN NHẤT qua vnstock. Trả: ['dt','price','volume','date'] (tz=ICT)
    """
    from vnstock import Vnstock
    stock = Vnstock().stock(symbol=symbol, source=source)
    df = stock.quote.intraday(symbol=symbol, page_size=page_size, show_log=show_log)
    if df is None or len(df) == 0:
        raise RuntimeError("vnstock intraday trả về rỗng.")

    time_col = "time" if "time" in df.columns else ("time_report" if "time_report" in df.columns else None)
    price_col = "price" if "price" in df.columns else ("close" if "close" in df.columns else None)
    vol_col   = "volume" if "volume" in df.columns else ("match_volume" if "match_volume" in df.columns else None)
    if not time_col or not price_col:
        raise RuntimeError("Không nhận diện được cột thời gian/giá từ intraday tick.")

    sub = df.rename(columns={time_col:"dt", price_col:"price"}).copy()
    sub["volume"] = df[vol_col] if vol_col and vol_col in df.columns else 0
    sub["dt"] = pd.to_datetime(sub["dt"], errors="coerce")
    sub = sub.dropna(subset=["dt","price"]).copy()
    # tz → ICT
    if sub["dt"].dt.tz is None:
        sub["dt"] = sub["dt"].dt.tz_localize(ICT)
    else:
        sub["dt"] = sub["dt"].dt.tz_convert(ICT)
    sub["date"] = sub["dt"].dt.date
    return sub.sort_values("dt")[["dt","price","volume","date"]]

def resample_to_5m_ohlcv(tick_df: pd.DataFrame) -> pd.DataFrame:
    """Ticks → 5m OHLCV: ['dt','open','high','low','close','volume']"""
    tick = tick_df.set_index("dt")
    ohlc = tick["price"].resample("5min").ohlc()
    vol  = tick["volume"].resample("5min").sum()
    df5  = pd.concat([ohlc, vol], axis=1).dropna(subset=["close"]).reset_index()
    return df5[["dt","open","high","low","close","volume"]]

# ============== Backend 1: Redis ==============
"""
Thiết kế:
- Mỗi symbol lưu trong 1 Sorted Set: key = f"intra:{symbol}:5m"
- score = epoch seconds (UTC), member = JSON packed candle {ts,open,high,low,close,volume}
- Truy xuất theo khoảng thời gian (ZRANGEBYSCORE)
Ưu điểm: cực nhanh, đúng bài time-series, không cần lưu file.
"""

def redis_connect(host="localhost", port=6379, db=0, password=None):
    import redis
    return redis.Redis(host=host, port=port, db=db, password=password, decode_responses=True)

def candle_row_to_json(row: pd.Series) -> str:
    # lưu ts ở epoch seconds UTC để range dễ
    ts = int(row["dt"].tz_convert("UTC").timestamp()) if hasattr(row["dt"], "tzinfo") and row["dt"].tzinfo else int(pd.Timestamp(row["dt"]).tz_localize("UTC").timestamp())
    payload = {
        "ts": ts,
        "open": float(row["open"]),
        "high": float(row["high"]),
        "low":  float(row["low"]),
        "close":float(row["close"]),
        "volume": float(row.get("volume", 0.0))
    }
    return json.dumps(payload), ts

def redis_upsert_intraday_5m(r, symbol: str, df5: pd.DataFrame) -> int:
    key = f"intra:{symbol}:5m"
    pipe = r.pipeline(transaction=False)
    n=0
    for _, row in df5.iterrows():
        member, score = candle_row_to_json(row)
        pipe.zadd(key, {member: score})
        n += 1
    pipe.execute()
    return n

def redis_fetch_last_days_5m(r, symbol: str, days: int = 10) -> pd.DataFrame:
    key = f"intra:{symbol}:5m"
    now = pd.Timestamp.now("UTC")
    start = int((now - pd.Timedelta(days=days)).timestamp())
    end   = int(now.timestamp())
    members = r.zrangebyscore(key, start, end)
    if not members:
        return pd.DataFrame(columns=["dt","open","high","low","close","volume"])
    recs = [json.loads(m) for m in members]
    df = pd.DataFrame(recs)
    df["dt"] = pd.to_datetime(df["ts"], unit="s", utc=True).dt.tz_convert(ICT)
    return df.sort_values("dt")[["dt","open","high","low","close","volume"]]

# ============== Backend 2: Qdrant ==============
"""
Qdrant chủ yếu cho vector search, nhưng ta vẫn có thể lưu candle làm "point":
- collection = "intraday_5m"
- vector_dim = 1 (dummy) hoặc tính vector đặc trưng nếu bạn muốn similarity
- payload: {symbol, ts, open, high, low, close, volume}
- id: int(f"{ts}{hash(symbol)%1000}") hoặc tự sinh
Truy xuất: dùng filter symbol & ts range → scroll, sau đó sort theo ts.
"""

def qdrant_connect(host="localhost", port=6333, api_key: Optional[str]=None):
    from qdrant_client import QdrantClient
    return QdrantClient(host=host, port=port, api_key=api_key)

def qdrant_ensure_collection(client, collection="intraday_5m", vector_size=1, distance="Cosine"):
    from qdrant_client.http.models import VectorParams, Distance
    dist = getattr(Distance, distance)
    client.recreate_collection(collection_name=collection, vectors_config=VectorParams(size=vector_size, distance=dist))

def qdrant_upsert_intraday_5m(client, symbol: str, df5: pd.DataFrame, collection="intraday_5m") -> int:
    from qdrant_client.http.models import PointStruct
    points = []
    for _, row in df5.iterrows():
        ts = int(row["dt"].tz_convert("UTC").timestamp()) if hasattr(row["dt"], "tzinfo") and row["dt"].tzinfo else int(pd.Timestamp(row["dt"]).tz_localize("UTC").timestamp())
        payload = {
            "symbol": symbol,
            "ts": ts,
            "open": float(row["open"]),
            "high": float(row["high"]),
            "low":  float(row["low"]),
            "close":float(row["close"]),
            "volume": float(row.get("volume", 0.0))
        }
        # dummy vector (1D), bạn có thể thay bằng feature thực tế
        vec = [float(row["close"])]
        pid = int(f"{ts}{abs(hash(symbol))%1000}")
        points.append(PointStruct(id=pid, vector=vec, payload=payload))
    if points:
        client.upsert(collection_name=collection, points=points)
    return len(points)

def qdrant_fetch_last_days_5m(client, symbol: str, days: int = 10, collection="intraday_5m") -> pd.DataFrame:
    from qdrant_client.http.models import Filter, FieldCondition, Range, ScrollRequest, MatchValue

    now = int(pd.Timestamp.now("UTC").timestamp())
    start = int((pd.Timestamp.now("UTC") - pd.Timedelta(days=days)).timestamp())
    cond = Filter(
        must=[
            FieldCondition(key="symbol", match=MatchValue(value=symbol)),
            FieldCondition(key="ts", range=Range(gte=start, lte=now))
        ]
    )
    points = []
    next_page = None
    while True:
        res = client.scroll(collection_name=collection, scroll_filter=cond, limit=2000, with_payload=True, with_vectors=False, offset=next_page)
        points.extend(res[0])
        next_page = res[1]
        if next_page is None:
            break
    if not points:
        return pd.DataFrame(columns=["dt","open","high","low","close","volume"])
    rows = [p.payload for p in points]
    df = pd.DataFrame(rows)
    df["dt"] = pd.to_datetime(df["ts"], unit="s", utc=True).dt.tz_convert(ICT)
    return df.sort_values("dt")[["dt","open","high","low","close","volume"]]

# ============== Orchestrator: cập nhật & truy vấn không cần CSV ==============
def update_store_today(symbol: str,
                       source: str = "VCI",
                       backend: Literal["redis","qdrant"]="redis",
                       redis_cfg: dict = None,
                       qdrant_cfg: dict = None,
                       qdrant_collection: str = "intraday_5m") -> int:
    """
    - Gọi vnstock lấy intraday NGÀY GẦN NHẤT
    - Resample 5m
    - Lưu vào Redis hoặc Qdrant
    - Trả số bản ghi đã lưu
    """
    ticks = _vnstock_intraday_once(symbol, page_size=20000, source=source, show_log=False)
    df5 = resample_to_5m_ohlcv(ticks)

    if backend == "redis":
        redis_cfg = redis_cfg or {}
        r = redis_connect(**redis_cfg)
        n = redis_upsert_intraday_5m(r, symbol, df5)
        return n
    else:
        qdrant_cfg = qdrant_cfg or {}
        client = qdrant_connect(**qdrant_cfg)
        # gọi 1 lần để chắc chắn collection tồn tại (chỉ cần chạy khi mới tạo)
        # qdrant_ensure_collection(client, collection=qdrant_collection)
        n = qdrant_upsert_intraday_5m(client, symbol, df5, collection=qdrant_collection)
        return n

def load_last_days(symbol: str,
                   days: int = 15,
                   backend: Literal["redis","qdrant"]="redis",
                   redis_cfg: dict = None,
                   qdrant_cfg: dict = None,
                   qdrant_collection: str = "intraday_5m") -> pd.DataFrame:
    """
    Lấy lại intraday 5m của N ngày gần nhất từ kho đã lưu (Redis/Qdrant).
    """
    if backend == "redis":
        redis_cfg = redis_cfg or {}
        r = redis_connect(**redis_cfg)
        return redis_fetch_last_days_5m(r, symbol, days=days)
    else:
        qdrant_cfg = qdrant_cfg or {}
        client = qdrant_connect(**qdrant_cfg)
        return qdrant_fetch_last_days_5m(client, symbol, days=days, collection=qdrant_collection)


In [None]:
# tests/test_ml_pipeline.py
# -*- coding: utf-8 -*-
"""
Standalone test runner cho ML ARIMA (SARIMAX + exog tin tức):
- Không dùng pytest
- Không cần __file__
- Chạy được trong notebook hoặc CLI
"""

import os
import math
import tempfile
import numpy as np
import pandas as pd
import importlib
from contextlib import contextmanager

# ---- modules under test ----
# Lưu ý: đường dẫn phải khớp với project của bạn: modules/ml/...
from modules.ML import pipeline as ml_pipe
from modules.ML import registry as ml_registry

# ========= redirect MODEL_DIR về thư mục tạm cho test =========
TMP_MODELS_DIR = tempfile.mkdtemp(prefix="models_")
os.environ["MODEL_DIR"] = TMP_MODELS_DIR
importlib.reload(ml_registry)  # ensure registry đọc MODEL_DIR mới

# ========= helpers (synthetic data) =========
def _make_price_series(days=300, seed=42):
    rng = np.random.default_rng(seed)
    # random walk trên log-price
    rets = rng.normal(loc=0.0005, scale=0.01, size=days)  # ~0.05% drift
    logp = np.cumsum(rets) + math.log(100.0)
    prices = np.exp(logp)
    idx = pd.date_range(end=pd.Timestamp.now().normalize(), periods=days, freq="D")
    s = pd.Series(prices, index=idx, name="close")
    return s

def _make_news_features(start_ts, end_ts, days=300, seed=123):
    rng = np.random.default_rng(seed)
    idx = pd.date_range(end=pd.Timestamp.fromtimestamp(end_ts).normalize(), periods=days, freq="D")
    df = pd.DataFrame({
        "date": idx,
        "news_count": rng.integers(0, 8, size=days),
        "pos_count":  rng.integers(0, 5, size=days),
        "neg_count":  rng.integers(0, 5, size=days),
        "mean_sent":  rng.normal(0, 0.2, size=days),
        "sum_sent":   rng.normal(0, 1.0, size=days),
    })
    return df.sort_values("date").reset_index(drop=True)

# ========= simple patching utility (thay cho pytest.monkeypatch) =========
@contextmanager
def patch_attr(module, attr_name, new_value):
    old = getattr(module, attr_name, None)
    setattr(module, attr_name, new_value)
    try:
        yield
    finally:
        setattr(module, attr_name, old)

# ========= simple test runner =========
PASSED, FAILED = 0, 0
def run_test(name, fn):
    global PASSED, FAILED
    try:
        fn()
        print(f"✅ PASS: {name}")
        PASSED += 1
    except AssertionError as e:
        print(f"❌ FAIL: {name} -> {e}")
        FAILED += 1
    except Exception as e:
        print(f"❌ ERROR: {name} -> {type(e).__name__}: {e}")
        FAILED += 1

def summary():
    print(f"\nKết quả: {PASSED} passed / {FAILED} failed\n")

# ========= tests =========
def test_train_gap_model_with_exog():
    # Patches: close series, prices df, news features, time
    def fake_get_close_series(symbol, days=395):
        return _make_price_series(days=days)
    def fake_get_prices_df(symbol, days=5):
        s = _make_price_series(days=days)
        return pd.DataFrame({"close": s.values}, index=s.index)
    def fake_build_news_features(symbol, start_ts, end_ts):
        return _make_news_features(start_ts, end_ts, days=300)
    fake_time = lambda: "01-01-2025 09:00:00"

    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "get_prices_df", fake_get_prices_df), \
         patch_attr(ml_pipe, "build_news_features", fake_build_news_features), \
         patch_attr(ml_pipe, "get_time_vn", fake_time):
        out = ml_pipe.train_gap_model("VCB", lookback_days=240)
        assert out["symbol"] == "VCB"
        assert isinstance(out["order"], tuple)
        assert out["trend"] in ("n", "c")
        assert out["use_exog"] is True
        assert os.path.exists(out["saved"])

def test_train_gap_model_without_news():
    def fake_get_close_series(symbol, days=395):
        return _make_price_series(days=days)
    def fake_build_news_features(symbol, start_ts, end_ts):
        return pd.DataFrame(columns=["date","news_count","pos_count","neg_count","mean_sent","sum_sent"])

    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "build_news_features", fake_build_news_features):
        out = ml_pipe.train_gap_model("HPG", lookback_days=240)
        assert out["symbol"] == "HPG"
        assert out["use_exog"] is False

def test_forecast_gap_after_training():
    def fake_get_close_series(symbol, days=395):
        return _make_price_series(days=days)
    def fake_get_prices_df(symbol, days=5):
        s = _make_price_series(days=days)
        return pd.DataFrame({"close": s.values}, index=s.index)
    def fake_build_news_features(symbol, start_ts, end_ts):
        return _make_news_features(start_ts, end_ts, days=300)

    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "get_prices_df", fake_get_prices_df), \
         patch_attr(ml_pipe, "build_news_features", fake_build_news_features):
        ml_pipe.train_gap_model("VNM", lookback_days=240)
        res = ml_pipe.forecast_gap("VNM", alpha=0.10)
        assert isinstance(res["gap_ret_mean"], float)
        assert len(res["gap_ret_ci"]) == 2
        assert isinstance(res["last_close"], float)
        assert res["use_exog"] is True

def test_predict_tomorrow_full_exog_structure():
    def fake_get_close_series(symbol, days=395):
        return _make_price_series(days=days)
    def fake_get_prices_df(symbol, days=5):
        s = _make_price_series(days=days)
        return pd.DataFrame({"close": s.values}, index=s.index)
    def fake_build_news_features(symbol, start_ts, end_ts):
        return _make_news_features(start_ts, end_ts, days=300)

    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "get_prices_df", fake_get_prices_df), \
         patch_attr(ml_pipe, "build_news_features", fake_build_news_features):
        res = ml_pipe.predict_tomorrow_full_exog("VCB", alpha=0.10)
        assert "target_day" in res
        assert "gap" in res and "bands" in res and "ampm" in res
        bands = res["bands"]
        for k in ("OPEN_am", "AM_px", "PM_px"):
            assert k in bands and all(sub in bands[k] for sub in ("px_mean","px_lo","px_hi"))

def test_smart_predict_in_session():
    # ép trạng thái trong phiên + fake intraday predictor
    fake_is_trading = lambda: True
    fake_pred = {
        "last_dt": pd.Timestamp.now(),
        "last_price": 100.0,
        "next_price": 100.2,
        "next_step_dir": "↑",
        "order": (1,0,1),
        "trend": "c",
        "path_pred": pd.Series([100.2, 100.4], index=pd.date_range(pd.Timestamp.now(), periods=2, freq="5min"))
    }
    with patch_attr(ml_pipe, "is_vn_trading_time", fake_is_trading), \
         patch_attr(ml_pipe, "predict_next_step_in_session", lambda s, source="VCI": fake_pred), \
         patch_attr(ml_pipe, "get_time_vn", lambda: "01-01-2025 10:00:00"):
        res = ml_pipe.smart_predict("VCB")
        assert res["mode"] == "in_session"
        assert res["next_step_dir"] == "↑"
        assert "pred_path" in res and isinstance(res["pred_path"], pd.Series)

def test_smart_predict_out_of_session():
    # ép trạng thái ngoài giờ + fake predict_tomorrow
    fake_is_trading = lambda: False
    fake_out = {
        "target_day": pd.Timestamp.now().date(),
        "gap": {"gap_ret_mean": 0.001, "gap_ret_ci": [-0.002, 0.003], "last_close": 100.0, "use_exog": True},
        "ampm": {"AM": {"ret_pred": 0.0, "ret_ci": [-0.005, 0.005]}, "PM": {"ret_pred": 0.0, "ret_ci": [-0.005, 0.005]}},
        "bands": {"OPEN_am": {"px_mean": 100.1, "px_lo": 99.8, "px_hi": 100.4},
                  "AM_px":   {"px_mean": 100.1, "px_lo": 99.6, "px_hi": 100.6},
                  "PM_px":   {"px_mean": 100.1, "px_lo": 99.3, "px_hi": 100.9}},
        "timestamp": "01-01-2025 20:00:00"
    }
    with patch_attr(ml_pipe, "is_vn_trading_time", fake_is_trading), \
         patch_attr(ml_pipe, "predict_tomorrow_full_exog", lambda sym, alpha=0.10: fake_out):
        res = ml_pipe.smart_predict("VCB")
        assert res["mode"] == "out_of_session"
        assert "bands" in res and "gap" in res

# ========= run all immediately =========
if __name__ == "__main__" or True:
    print(f"MODEL_DIR test: {TMP_MODELS_DIR}")
    tests = [
        ("train_gap_model_with_exog", test_train_gap_model_with_exog),
        ("train_gap_model_without_news", test_train_gap_model_without_news),
        ("forecast_gap_after_training", test_forecast_gap_after_training),
        ("predict_tomorrow_full_exog_structure", test_predict_tomorrow_full_exog_structure),
        ("smart_predict_in_session", test_smart_predict_in_session),
        ("smart_predict_out_of_session", test_smart_predict_out_of_session),
    ]
    for name, fn in tests:
        run_test(name, fn)
    summary()


In [None]:
# --- thêm ở đầu file (nếu chưa có) ---
import numpy as np
import pandas as pd
from typing import Optional, Tuple, Dict, Any
from statsmodels.tsa.statespace.sarimax import SARIMAX

from modules.api.stock_api import get_close_series, get_prices_df, get_time_vn
from modules.ML.features import build_news_features
from modules.ML.registry import save_model, load_model, MODEL_DIR

# ===== helper: build exog matrix theo same index của y & shift T-1 =====
def _build_exog_matrix(symbol: str, y: pd.Series, shift: int = 1) -> Tuple[pd.DataFrame, list[str]]:
    """
    Tạo ma trận exog (news features) khớp theo index của y (daily close series),
    rồi dịch (shift) 1 ngày để dùng X_{t-1} -> dự báo r_t (T+1).
    Trả: (X_aligned, feature_cols)
    """
    if y is None or y.empty:
        return pd.DataFrame(index=pd.Index([], name="date")), []

    start_ts = int(y.index.min().timestamp())
    end_ts   = int(y.index.max().timestamp())

    news = build_news_features(symbol, start_ts, end_ts)
    if news is None or news.empty:
        return pd.DataFrame(index=y.index), []

    # đảm bảo 'date' là DatetimeIndex để join
    news = news.copy()
    if "date" in news.columns:
        news["date"] = pd.to_datetime(news["date"])
        news = news.set_index("date")
    news = news.sort_index()

    # chọn các cột numeric làm exog
    feature_cols = [c for c in news.columns if pd.api.types.is_numeric_dtype(news[c])]
    if not feature_cols:
        return pd.DataFrame(index=y.index), []

    X = news[feature_cols].reindex(y.index).ffill().fillna(0.0)

    # dịch T-1: dùng thông tin ngày T để dự báo ngày T+1
    if shift:
        X = X.shift(1).ffill().fillna(0.0)

    return X, feature_cols


# ===== Train GAP model (ARIMAX nếu có exog) =====
def train_gap_model(symbol: str, lookback_days: int = 240) -> Dict[str, Any]:
    """
    Train ARIMA trên log-returns (close) với exog tin tức (T-1).
    Lưu model + meta (gồm feature_cols) để predict sau này.
    """
    s = get_close_series(symbol, days=lookback_days + 10)  # buffer vài ngày
    # returns daily
    r = np.log(s / s.shift(1)).dropna()
    if r.empty:
        raise ValueError("Không có đủ dữ liệu returns để train.")

    # exog matrix (shifted)
    X, feature_cols = _build_exog_matrix(symbol, r.index.to_series().rename("date").to_frame().index, shift=1)
    # Align exog theo r.index (nếu _build_exog_matrix nhận y là Series thay vì index, ta sửa lại):
    X, feature_cols = _build_exog_matrix(symbol, r, shift=1)

    use_exog = len(feature_cols) > 0 and not X.reindex(r.index).isna().all(axis=1).all()
    if use_exog:
        X_train = X.reindex(r.index).fillna(0.0)
    else:
        X_train = None

    # Grid nhỏ để ổn định
    best = {"aic": np.inf, "fit": None, "order": None, "trend": None}
    for tr in ("n", "c"):
        for p in range(0, 3):
            for q in range(0, 3):
                try:
                    m = SARIMAX(
                        r.astype("float64"),
                        order=(p, 0, q),
                        seasonal_order=(0, 0, 0, 0),
                        trend=tr,
                        exog=X_train if use_exog else None,
                        enforce_stationarity=False,
                        enforce_invertibility=False,
                    )
                    res = m.fit(disp=False)
                    if res.aic < best["aic"]:
                        best = {"aic": res.aic, "fit": res, "order": (p, 0, q), "trend": tr}
                except Exception:
                    continue

    if best["fit"] is None:
        raise RuntimeError("ARIMA grid không hội tụ — thử giảm p/q.")

    meta = {
        "symbol": symbol,
        "order": best["order"],
        "trend": best["trend"],
        "use_exog": use_exog,
        "feature_cols": feature_cols if use_exog else [],
        "trained_until": str(r.index.max()),
    }
    saved_path = save_model(symbol=symbol, model=best["fit"], meta=meta, tag="gap")

    return {
        "symbol": symbol,
        "order": best["order"],
        "trend": best["trend"],
        "use_exog": use_exog,
        "feature_cols": feature_cols,
        "saved": saved_path,
    }


# ===== Forecast GAP (1 bước) =====
def forecast_gap(symbol: str, alpha: float = 0.10) -> Dict[str, Any]:
    """
    Load model đã train; nếu model có exog → build X1 đúng thứ tự cột và truyền vào get_forecast.
    """
    fit, meta = load_model(symbol=symbol, tag="gap")  # bạn đã có hàm load_model_meta
    if fit is None:
        # chưa train thì train nhanh rồi forecast
        _ = train_gap_model(symbol)
        fit, meta = load_model(symbol=symbol, tag="gap")

    use_exog = bool(meta.get("use_exog"))
    feature_cols = meta.get("feature_cols", [])

    # Lấy last_close để quy đổi giá
    last_close = float(get_prices_df(symbol, days=5)["close"].iloc[-1])

    X1 = None
    if use_exog and feature_cols:
        # tái tạo exog cho toàn bộ lịch sử returns để lấy hàng cuối
        s = get_close_series(symbol, days=260)
        r = np.log(s / s.shift(1)).dropna()

        X_full, feat_cols_now = _build_exog_matrix(symbol, r, shift=1)
        # đảm bảo đúng thứ tự cột (như khi train)
        X_full = X_full.reindex(columns=feature_cols).fillna(0.0)

        if not X_full.empty:
            # 1-step ahead exog: dùng hàng cuối cùng làm proxy cho tương lai
            X1 = X_full.iloc[[-1]]  # shape (1, k)

    # Forecast
    if use_exog and X1 is not None:
        fc = fit.get_forecast(steps=1, exog=X1)
    else:
        # fallback nếu thiếu exog
        fc = fit.get_forecast(steps=1)

    ret_mean = float(fc.predicted_mean.values[0])
    ci = fc.conf_int(alpha=alpha).values[0].tolist()

    return {
        "gap_ret_mean": ret_mean,
        "gap_ret_ci": ci,
        "last_close": last_close,
        "use_exog": use_exog and (X1 is not None),
    }


# ===== Dùng forecast_gap + fallback AM/PM để tạo bands =====
def _price_from_ret(open_price: float, ret_mean: float, ret_ci: list[float]) -> dict:
    mean_px = open_price * np.exp(ret_mean)
    lo_px   = open_price * np.exp(ret_ci[0])
    hi_px   = open_price * np.exp(ret_ci[1])
    return {"px_mean": float(mean_px), "px_lo": float(lo_px), "px_hi": float(hi_px)}

def _fallback_am_pm() -> dict:
    # Bạn có thể thay bằng block AM/PM intraday của bạn
    return {
        "AM": {"ret_pred": 0.0, "ret_ci": [-0.005, 0.005]},
        "PM": {"ret_pred": 0.0, "ret_ci": [-0.005, 0.005]},
    }

def predict_tomorrow_full_exog(symbol: str, alpha: float = 0.10) -> dict:
    gap = forecast_gap(symbol, alpha=alpha)
    ampm = _fallback_am_pm()

    open_am = _price_from_ret(gap["last_close"], gap["gap_ret_mean"], gap["gap_ret_ci"])
    am_px   = _price_from_ret(open_am["px_mean"], ampm["AM"]["ret_pred"], ampm["AM"]["ret_ci"])
    pm_px   = _price_from_ret(am_px["px_mean"], ampm["PM"]["ret_pred"], ampm["PM"]["ret_ci"])

    return {
        "target_day": pd.Timestamp.today(tz="Asia/Ho_Chi_Minh").normalize().date() + pd.Timedelta(days=1),
        "gap": gap,
        "ampm": ampm,
        "bands": {"OPEN_am": open_am, "AM_px": am_px, "PM_px": pm_px},
        "timestamp": get_time_vn(),
    }


# ===== Router thông minh (giữ như trước) =====
from modules.ML.intraday import is_vn_trading_time, predict_next_step_in_session

def smart_predict(symbol: str, alpha: float = 0.10, source: str = "VCI") -> dict:
    if is_vn_trading_time():
        intra = predict_next_step_in_session(symbol, source=source)
        return {
            "mode": "in_session",
            "symbol": symbol,
            "next_step_dir": intra["next_step_dir"],
            "last_price": intra["last_price"],
            "next_price": intra["next_price"],
            "order": intra["order"], "trend": intra["trend"],
            "pred_path": intra["path_pred"],
            "timestamp": get_time_vn(),
        }
    else:
        return {
            "mode": "out_of_session",
            "symbol": symbol,
            **predict_tomorrow_full_exog(symbol, alpha=alpha),
        }


In [None]:
# tests/test_ml_pipeline.py
# -*- coding: utf-8 -*-
"""
Standalone test runner cho modules.ml.pipeline:
- Kiểm tra train với exog (news) + meta (feature_cols, k_exog)
- Kiểm tra forecast có exog (truyền đúng vào get_forecast)
- Kiểm tra fallback zero-vector khi training có exog nhưng lúc dự báo thiếu news
- Kiểm tra trường hợp không có news (use_exog=False)
- Kiểm tra cấu trúc predict_tomorrow_full_exog
- Kiểm tra smart router trong/ngoài phiên
Chạy được trong notebook hoặc CLI: python tests/test_ml_pipeline.py
"""

import os
import math
import tempfile
import numpy as np
import pandas as pd
import importlib
from contextlib import contextmanager

# ===== Project imports
from modules.ML import pipeline as ml_pipe
from modules.ML import registry as ml_registry

# ===== Redirect MODEL_DIR vào thư mục tạm để test không ghi đè model thật
TMP_MODELS_DIR = tempfile.mkdtemp(prefix="models_")
os.environ["MODEL_DIR"] = TMP_MODELS_DIR
importlib.reload(ml_registry)  # để registry đọc env mới

# ===== Utilities
@contextmanager
def patch_attr(module, attr_name, new_value):
    old = getattr(module, attr_name, None)
    setattr(module, attr_name, new_value)
    try:
        yield
    finally:
        setattr(module, attr_name, old)

def _make_price_series(days=300, seed=42):
    rng = np.random.default_rng(seed)
    rets = rng.normal(loc=0.0005, scale=0.01, size=days)  # drift ~0.05%
    logp = np.cumsum(rets) + math.log(100.0)
    prices = np.exp(logp)
    idx = pd.date_range(end=pd.Timestamp.now().normalize(), periods=days, freq="D")
    return pd.Series(prices, index=idx, name="close")

def _make_news_features(start_ts, end_ts, days=300, seed=123):
    rng = np.random.default_rng(seed)
    idx = pd.date_range(end=pd.Timestamp.fromtimestamp(end_ts).normalize(), periods=days, freq="D")
    df = pd.DataFrame({
        "date": idx,
        "news_count": rng.integers(0, 8, size=days),
        "pos_count":  rng.integers(0, 5, size=days),
        "neg_count":  rng.integers(0, 5, size=days),
        "mean_sent":  rng.normal(0, 0.2, size=days),
        "sum_sent":   rng.normal(0, 1.0, size=days),
    })
    return df.sort_values("date").reset_index(drop=True)

# ===== Tiny runner (không dùng pytest)
PASSED, FAILED = 0, 0
def run_test(name, fn):
    global PASSED, FAILED
    try:
        fn()
        print(f"✅ PASS: {name}")
        PASSED += 1
    except AssertionError as e:
        print(f"❌ FAIL:  {name} -> {e}")
        FAILED += 1
    except Exception as e:
        print(f"❌ ERROR: {name} -> {type(e).__name__}: {e}")
        FAILED += 1

def summary():
    print(f"\nKết quả: {PASSED} passed / {FAILED} failed")

# ===== Tests

def test_train_with_exog_meta():
    """
    Train có tin (exog) -> meta ghi nhận feature_cols & k_exog > 0
    """
    def fake_get_close_series(symbol, days=395):
        return _make_price_series(days=days)
    def fake_build_news_features(symbol, start_ts, end_ts):
        return _make_news_features(start_ts, end_ts, days=300)

    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "build_news_features", fake_build_news_features):
        out = ml_pipe.train_gap_model("VCB", lookback_days=240)
        assert out["symbol"] == "VCB"
        assert isinstance(out["order"], tuple)
        assert out["trend"] in ("n", "c")
        assert out["use_exog"] is True
        assert len(out["feature_cols"]) > 0
        # meta có k_exog
        fit, meta = ml_registry.load_model_meta(symbol="VCB", tag="gap")
        assert fit is not None and isinstance(meta, dict)
        assert meta.get("use_exog") is True
        assert int(meta.get("k_exog", 0)) == len(out["feature_cols"])
        assert os.path.exists(out["saved"])

def test_forecast_gap_with_exog():
    """
    Sau khi train có exog -> forecast truyền exog (shape 1,k) và res['use_exog'] True
    """
    def fake_get_close_series(symbol, days=395):
        return _make_price_series(days=days)
    def fake_get_prices_df(symbol, days=5):
        s = _make_price_series(days=days)
        return pd.DataFrame({"close": s.values}, index=s.index)
    def fake_build_news_features(symbol, start_ts, end_ts):
        return _make_news_features(start_ts, end_ts, days=300)

    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "get_prices_df", fake_get_prices_df), \
         patch_attr(ml_pipe, "build_news_features", fake_build_news_features):
        ml_pipe.train_gap_model("HPG", lookback_days=240)
        res = ml_pipe.forecast_gap("HPG", alpha=0.10)
        assert isinstance(res["gap_ret_mean"], float)
        assert len(res["gap_ret_ci"]) == 2
        assert isinstance(res["last_close"], float)
        assert res["use_exog"] is True  # vì có news, X1 được build

def test_forecast_gap_zero_vector_fallback_when_news_missing_now():
    """
    Trường hợp train có exog nhưng lúc dự báo news trống:
    -> _build_exog_for_forecast fallback zero-vector (shape 1,k_exog)
    -> res['use_exog'] vẫn True (vì model cần exog, ta truyền vector 0)
    """
    def fake_get_close_series(symbol, days=395):
        return _make_price_series(days=days)
    def fake_get_prices_df(symbol, days=5):
        s = _make_price_series(days=days)
        return pd.DataFrame({"close": s.values}, index=s.index)
    def fake_build_news_features(symbol, start_ts, end_ts):
        return _make_news_features(start_ts, end_ts, days=300)

    # train với exog
    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "build_news_features", fake_build_news_features):
        ml_pipe.train_gap_model("VNM", lookback_days=240)

    # forecast: news trống -> zero vector fallback
    def empty_news(symbol, start_ts, end_ts):
        return pd.DataFrame(columns=["date","news_count","pos_count","neg_count","mean_sent","sum_sent"])
    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "get_prices_df", fake_get_prices_df), \
         patch_attr(ml_pipe, "build_news_features", empty_news):
        res = ml_pipe.forecast_gap("VNM", alpha=0.10)
        assert isinstance(res["gap_ret_mean"], float)
        assert len(res["gap_ret_ci"]) == 2
        assert isinstance(res["last_close"], float)
        # vẫn True vì model có exog và ta đã cấp fallback vector 0
        assert res["use_exog"] is True

def test_train_without_news_then_forecast_no_exog():
    """
    Không có news ngay từ đầu -> model ARIMA (không exog).
    Forecast không dùng exog -> res['use_exog'] False
    """
    def fake_get_close_series(symbol, days=395):
        return _make_price_series(days=days)
    def empty_news(symbol, start_ts, end_ts):
        return pd.DataFrame(columns=["date","news_count","pos_count","neg_count","mean_sent","sum_sent"])
    def fake_get_prices_df(symbol, days=5):
        s = _make_price_series(days=days)
        return pd.DataFrame({"close": s.values}, index=s.index)

    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "build_news_features", empty_news):
        out = ml_pipe.train_gap_model("FPT", lookback_days=240)
        assert out["use_exog"] is False

    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "get_prices_df", fake_get_prices_df), \
         patch_attr(ml_pipe, "build_news_features", empty_news):
        res = ml_pipe.forecast_gap("FPT", alpha=0.10)
        assert isinstance(res["gap_ret_mean"], float)
        assert len(res["gap_ret_ci"]) == 2
        assert isinstance(res["last_close"], float)
        assert res["use_exog"] is False

def test_predict_tomorrow_full_exog_structure():
    """
    Cấu trúc output predict_tomorrow_full_exog
    """
    def fake_get_close_series(symbol, days=395):
        return _make_price_series(days=days)
    def fake_get_prices_df(symbol, days=5):
        s = _make_price_series(days=days)
        return pd.DataFrame({"close": s.values}, index=s.index)
    def fake_build_news_features(symbol, start_ts, end_ts):
        return _make_news_features(start_ts, end_ts, days=300)

    with patch_attr(ml_pipe, "get_close_series", fake_get_close_series), \
         patch_attr(ml_pipe, "get_prices_df", fake_get_prices_df), \
         patch_attr(ml_pipe, "build_news_features", fake_build_news_features):
        res = ml_pipe.predict_tomorrow_full_exog("VCB", alpha=0.10)
        assert "target_day" in res and "gap" in res and "ampm" in res and "bands" in res
        bands = res["bands"]
        for k in ("OPEN_am", "AM_px", "PM_px"):
            assert k in bands and all(sub in bands[k] for sub in ("px_mean","px_lo","px_hi"))

def test_smart_predict_in_session_router():
    """
    Router: trong phiên -> dùng predict_next_step_in_session
    """
    fake_is_trading = lambda: True
    fake_pred = {
        "last_dt": pd.Timestamp.now(),
        "last_price": 100.0,
        "next_price": 100.2,
        "next_step_dir": "↑",
        "order": (1,0,1),
        "trend": "c",
        "path_pred": pd.Series([100.2, 100.4], index=pd.date_range(pd.Timestamp.now(), periods=2, freq="5min"))
    }
    with patch_attr(ml_pipe, "is_vn_trading_time", fake_is_trading), \
         patch_attr(ml_pipe, "predict_next_step_in_session", lambda s, source="VCI": fake_pred), \
         patch_attr(ml_pipe, "get_time_vn", lambda: "01-01-2025 10:00:00"):
        res = ml_pipe.smart_predict("VCB")
        assert res["mode"] == "in_session"
        assert res["next_step_dir"] == "↑"
        assert "pred_path" in res and isinstance(res["pred_path"], pd.Series)

def test_smart_predict_out_of_session_router():
    """
    Router: ngoài giờ -> gọi predict_tomorrow_full_exog
    """
    fake_is_trading = lambda: False
    fake = {
        "target_day": pd.Timestamp.now().date(),
        "gap": {"gap_ret_mean": 0.001, "gap_ret_ci": [-0.002, 0.003], "last_close": 100.0, "use_exog": True},
        "ampm": {"AM": {"ret_pred": 0.0, "ret_ci": [-0.005, 0.005]}, "PM": {"ret_pred": 0.0, "ret_ci": [-0.005, 0.005]}},
        "bands": {"OPEN_am": {"px_mean": 100.1, "px_lo": 99.8, "px_hi": 100.4},
                  "AM_px":   {"px_mean": 100.1, "px_lo": 99.6, "px_hi": 100.6},
                  "PM_px":   {"px_mean": 100.1, "px_lo": 99.3, "px_hi": 100.9}},
        "timestamp": "01-01-2025 20:00:00"
    }
    with patch_attr(ml_pipe, "is_vn_trading_time", fake_is_trading), \
         patch_attr(ml_pipe, "predict_tomorrow_full_exog", lambda sym, alpha=0.10: fake):
        res = ml_pipe.smart_predict("VCB")
        assert res["mode"] == "out_of_session"
        assert "bands" in res and "gap" in res

# ===== Run all
if __name__ == "__main__" or True:
    print(f"MODEL_DIR test: {TMP_MODELS_DIR}")
    tests = [
        ("train_with_exog_meta", test_train_with_exog_meta),
        ("forecast_gap_with_exog", test_forecast_gap_with_exog),
        ("forecast_gap_zero_vector_fallback_when_news_missing_now", test_forecast_gap_zero_vector_fallback_when_news_missing_now),
        ("train_without_news_then_forecast_no_exog", test_train_without_news_then_forecast_no_exog),
        ("predict_tomorrow_full_exog_structure", test_predict_tomorrow_full_exog_structure),
        ("smart_predict_in_session_router", test_smart_predict_in_session_router),
        ("smart_predict_out_of_session_router", test_smart_predict_out_of_session_router),
    ]
    for name, fn in tests:
        run_test(name, fn)
    summary()


In [None]:
from modules.ML.intraday import predict_next_step_in_session
from modules.api.stock_api import get_time_vn

def predict_direction_in_session(symbol: str):
    """Trả về dict: hướng mũi tên, giá hiện tại, giá dự kiến kế tiếp."""
    sym = symbol.upper()
    out = predict_next_step_in_session(sym)  # đã fit ARIMA trên 5m, nội suy đến hết phiên
    arrow = out["next_step_dir"]   # "↑" | "↓" | "="
    last_px = float(out["last_price"])
    next_px = float(out["next_price"])
    return {
        "symbol": sym,
        "when": get_time_vn(),
        "direction": arrow,
        "last_price": last_px,
        "next_price": next_px,
        "delta": round(next_px - last_px, 3),
        "pct": round((next_px/last_px - 1.0)*100, 3) if last_px else 0.0
    }

# Ví dụ:
res = predict_direction_in_session("VCB")
print(f"⏱️ {res['when']} | {res['symbol']} • Next 5m: {res['direction']} "
      f"( {res['last_price']:.2f} → {res['next_price']:.2f} | {res['delta']:+.2f} | {res['pct']:+.3f}% )")


In [None]:
from modules.ML.pipeline import forecast_gap, predict_tomorrow_full_exog

def direction_from_return(ret_value: float, eps: float = 1e-6) -> str:
    if ret_value > eps:  return "↑"
    if ret_value < -eps: return "↓"
    return "="

def predict_direction_tomorrow(symbol: str, alpha: float = 0.10):
    sym = symbol.upper()
    gap = forecast_gap(sym, alpha=alpha)
    bands_all = predict_tomorrow_full_exog(sym, alpha=alpha)["bands"]

    gap_dir = direction_from_return(gap["gap_ret_mean"])
    open_mean = bands_all["OPEN_am"]["px_mean"]

    return {
        "symbol": sym,
        "gap_direction": gap_dir,
        "gap_ret_mean": gap["gap_ret_mean"],
        "last_close": gap["last_close"],
        "open_mean": open_mean,
        "open_ci90": (bands_all["OPEN_am"]["px_lo"], bands_all["OPEN_am"]["px_hi"]),
        "use_exog": gap.get("use_exog", False),
        "at_11h30": bands_all["AM_px"],
        "at_15h00": bands_all["PM_px"],
    }

# Ví dụ:
out = predict_direction_tomorrow("HPG", alpha=0.10)
print(
    f"📅 {out['symbol']} • GAP (OPEN vs CLOSE hôm qua): {out['gap_direction']} "
    f"| last={out['last_close']:.0f} → OPEN~{out['open_mean']:.0f} "
    f"(CI90% {out['open_ci90'][0]:.0f}-{out['open_ci90'][1]:.0f}) "
    f"| exog_news={out['use_exog']}"
)


In [None]:
from modules.api.stock_api import get_history_df_vnstock, get_prices_df

df_raw = get_history_df_vnstock("HPG")     # 1 năm gần nhất (mặc định)
print("rows:", len(df_raw)); display(df_raw.tail())

df_model = get_prices_df("HPG", days=365)  # đã chuẩn hóa cho model
print("model rows:", len(df_model)); display(df_model.tail())


In [None]:
from modules.ML.pipeline import (
    train_gap_model, forecast_gap,
    predict_tomorrow_full_exog,
    predict_next_step_in_session, smart_predict
)

# Train (lần đầu cho mỗi mã)
print(train_gap_model("VCB", lookback_days=365))

# Dự báo NGÀY MAI (ngoài giờ)
print(predict_tomorrow_full_exog("VCB", alpha=0.10))

# Dự báo bước 5' tiếp theo (chỉ trong giờ giao dịch)
print(predict_next_step_in_session("VCB"))

# Router tự quyết theo giờ VN
print(smart_predict("VCB", alpha=0.10))


In [None]:
from modules.ML.pipeline import train_gap_model
print(train_gap_model("VCB", lookback_days=365))  # kỳ vọng use_exog=True nếu có feature


In [None]:
from modules.ML.pipeline import predict_tomorrow_full_exog
res = predict_tomorrow_full_exog("VCB", alpha=0.10)
arrow = "↑" if res["gap"]["gap_ret_mean"] > 0 else ("↓" if res["gap"]["gap_ret_mean"] < 0 else "=")
print(f"VCB • GAP: {arrow} | OPEN~{res['bands']['OPEN_am']['px_mean']:.2f} "
      f"(CI90% {res['bands']['OPEN_am']['px_lo']:.2f}-{res['bands']['OPEN_am']['px_hi']:.2f}) "
      f"| exog={res['gap']['use_exog']}")


In [None]:
from modules.ML.pipeline import train_gap_model, forecast_gap, predict_tomorrow_full_exog, predict_next_step_in_session, smart_predict

print(train_gap_model("VCB", lookback_days=365))
print(predict_tomorrow_full_exog("VCB", alpha=0.10))   # trước 09:00 / nghỉ trưa → target_day = hôm nay
print(smart_predict("VCB", alpha=0.10))                # tự chọn in/out theo giờ


In [None]:
# -*- coding: utf-8 -*-
import json
from modules.ML.direction import predict_direction

def run(symbol: str = "VCB", alpha: float = 0.10):
    out = predict_direction(symbol, alpha)
    print(json.dumps(out, ensure_ascii=False, default=str, indent=2))

if __name__ == "__main__":
    # tránh lỗi tham số Jupyter, không parse argparse
    run("VCB", 0.10)


In [None]:
from modules.ML.features import build_news_features
import pandas as pd, pytz, time

ICT = pytz.timezone("Asia/Ho_Chi_Minh")
end_ts = int(pd.Timestamp.now(tz=ICT).timestamp())
start_ts = end_ts - 7*24*3600
df_feat = build_news_features("VCB", start_ts, end_ts)
print(df_feat.head(3))
print(df_feat.tail(3))


In [None]:
from modules.ML.features import build_news_features
import pandas as pd, pytz, time

ICT = pytz.timezone("Asia/Ho_Chi_Minh")
end_ts = int(pd.Timestamp.now(tz=ICT).timestamp())
start_ts = end_ts - 7*24*3600

df_feat = build_news_features("VCB", start_ts, end_ts)
print("rows:", len(df_feat))
display(df_feat.tail(5))


In [None]:
from qdrant_client import QdrantClient
cli = QdrantClient(host="localhost", port=6333)
pts, _ = cli.scroll("cafef_articles", limit=3, with_payload=True)
for p in pts:
    print(p.payload)


In [None]:
from modules.ML.features import build_news_features
import pandas as pd, pytz

ICT = pytz.timezone("Asia/Ho_Chi_Minh")
end_ts = int(pd.Timestamp.now(tz=ICT).timestamp())
start_ts = end_ts - 14*24*3600

# Thử với mã chắc chắn có bài trong payload
df_vix = build_news_features("VIX", start_ts, end_ts)
df_bid = build_news_features("BID", start_ts, end_ts)

# Trường hợp VCB – sẽ chỉ có nếu payload có 'symbol'="VCB"
# hoặc title/content có nhắc VCB/Vietcombank (alias ở trên)
df_vcb = build_news_features("VCB", start_ts, end_ts)

display(df_vix.tail(3))
display(df_bid.tail(3))
display(df_vcb.tail(3))


In [None]:
from modules.ML.features import build_news_features
import pandas as pd, pytz, os
from qdrant_client import QdrantClient

ICT = pytz.timezone("Asia/Ho_Chi_Minh")
end_ts = int(pd.Timestamp.now(tz=ICT).timestamp())
start_ts = end_ts - 30*24*3600  # 30 ngày

# 1) Thử lấy với alias mở rộng
df_vcb = build_news_features("VCB", start_ts, end_ts)
print("rows:", len(df_vcb))
display(df_vcb.tail(10))

# 2) Nếu vẫn rỗng, dump nhanh 5 bài mới nhất trong range để xem payload thực tế
cli = QdrantClient(host=os.getenv("QDRANT_HOST","localhost"), port=int(os.getenv("QDRANT_PORT",6333)))
batch, _ = cli.scroll(collection_name=os.getenv("QDRANT_COLLECTION","cafef_articles"), limit=50, with_payload=True)
cand = []
for p in batch:
    pl = p.payload or {}
    ts = pl.get("timestamp")
    try:
        ts = int(ts)
        if ts > 10**12: ts//=1000
    except:
        continue
    if start_ts <= ts <= end_ts:
        cand.append(pl)

print("Sample payloads in range:", len(cand))
for x in cand[:5]:
    print({k: x.get(k) for k in ["symbol","title","summary","timestamp","sentiment"]})


In [None]:
from modules.ML.pipeline import train_gap_model
print(train_gap_model("VCB", lookback_days=365))


In [None]:
from modules.ML.pipeline import predict_tomorrow_full_exog, forecast_gap
from modules.ML.pipeline import pick_target_trading_day

def direction_from_return(x: float, eps: float = 1e-6) -> str:
    return "↑" if x > eps else "↓" if x < -eps else "="

gap = forecast_gap("VCB", alpha=0.10)         # trả về gap_ret_mean, CI, last_close, use_exog
bands = predict_tomorrow_full_exog("VCB", alpha=0.10)
day  = pick_target_trading_day()              # chọn đúng “hôm nay” nếu đang pre_open/lunch; ngược lại là ngày kế tiếp

arrow = direction_from_return(gap["gap_ret_mean"])
print(f"🎯 {day} • {bands['gap']['symbol']}  GAP: {arrow} | last={gap['last_close']:.2f} "
      f"→ OPEN~{bands['bands']['OPEN_am']['px_mean']:.2f} "
      f"(CI90% {bands['bands']['OPEN_am']['px_lo']:.2f}-{bands['bands']['OPEN_am']['px_hi']:.2f}) "
      f"| exog_news={gap['use_exog']}")


In [None]:
from modules.ML.pipeline import train_gap_model, predict_direction_tomorrow, smart_predict

print(train_gap_model("VCB", lookback_days=365))

sig = predict_direction_tomorrow("VCB", alpha=0.10)
print(
    f"🎯 {sig['target_day']} • {sig['symbol']}  GAP: {sig['gap_direction']} "
    f"| last={sig['last_close']:.2f} → OPEN~{sig['open_mean']:.2f} "
    f"(CI90% {sig['open_ci90'][0]:.2f}-{sig['open_ci90'][1]:.2f}) "
    f"| exog_news={sig['use_exog']}"
)

print(smart_predict("VCB", alpha=0.10))


In [None]:
from modules.ML.pipeline import train_gap_model, predict_tomorrow_full_exog, predict_direction_tomorrow

# Train (thêm exog từ index nếu muốn)
print(train_gap_model("VCB", lookback_days=365, add_index=["VNINDEX","VN30"]))

# Dự báo phiên mục tiêu (ngoài/ trước giờ)
print(predict_tomorrow_full_exog("VCB", alpha=0.10))

# Chỉ mũi tên tăng/giảm/đứng
print(predict_direction_tomorrow("VCB"))


In [None]:
import pandas as pd
import pytz
from modules.ML.features import build_news_features

ICT = pytz.timezone("Asia/Ho_Chi_Minh")

end_ts   = int(pd.Timestamp.now(tz=ICT).timestamp())
start_ts = end_ts - 14*24*3600  # 14 ngày gần nhất

fe = build_news_features("VCB", start_ts, end_ts, add_index=["VNINDEX","VN30"])
print(fe.tail(5))
print("nonzero cols:", {c:int((fe[c]!=0).sum()) for c in fe.columns if c!="date"})
print("mean_sent (VCB) last:", fe["mean_sent"].tail(5).values)


In [None]:
from qdrant_client import QdrantClient, models
cli = QdrantClient(host="localhost", port=6333)
res = cli.scroll(
    collection_name="cafef_articles",
    scroll_filter=models.Filter(
        must=[models.FieldCondition(
          key="index_codes", match=models.MatchAny(any=["VNINDEX"])
        )]
    ),
    limit=5, with_payload=True
)
print(len(res[0]))  # >0 là có


In [None]:
import pandas as pd, pytz, time
ICT = pytz.timezone("Asia/Ho_Chi_Minh")

now_ts  = int(pd.Timestamp.now(tz=ICT).timestamp())
start_ts = now_ts - 7*24*3600
end_ts   = now_ts


In [None]:
from modules.utils.services import qdrant_services
from qdrant_client import models
pts, _ = qdrant_services.client.scroll(
    collection_name=qdrant_services.collection_name,
    scroll_filter=models.Filter(must=[
        models.FieldCondition(key="index_codes", match=models.MatchAny(any=["VNINDEX"])),
        models.FieldCondition(key="time_ts", range=models.Range(gte=start_ts, lte=end_ts)),
    ]),
    with_payload=True,
    limit=5
)
print(len(pts), [p.payload.get("title") for p in pts])


In [None]:
from modules.ML.features import build_news_features
fe = build_news_features("VCB", start_ts, end_ts, add_index=["VNINDEX","VN30"])
print(fe.tail())


In [None]:
# modules/ML/diagnostics.py
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import pytz
from typing import Dict, Any, Optional, List, Tuple

from modules.api.stock_api import get_close_series
from modules.ML.features import build_news_features
from modules.ML.predictors.sarimax_exog import arima_select_fit
from modules.ML.registry import load_model_meta

ICT = pytz.timezone("Asia/Ho_Chi_Minh")


# -----------------------------
# Helpers (local, độc lập)
# -----------------------------
def _to_returns(close_s: pd.Series) -> pd.Series:
    """Log-return theo ngày (drop NA)."""
    r = np.log(close_s / close_s.shift(1))
    return r.dropna()


def _align_exog_to_y(
    symbol: str,
    y: pd.Series,
    add_index: Optional[List[str]] = None,
    shift: int = 1
) -> pd.DataFrame:
    """
    Dựng exog (tin tức) theo index của y.
    - shift=1 để tránh look-ahead (tin hôm nay ảnh hưởng ngày mai).
    """
    if y.empty:
        return pd.DataFrame(index=y.index)

    start_ts = int(pd.Timestamp(y.index[0], tz=ICT).timestamp())
    end_ts   = int(pd.Timestamp(y.index[-1], tz=ICT).timestamp())

    feats = build_news_features(symbol, start_ts, end_ts, add_index=add_index or [])
    if feats.empty:
        return pd.DataFrame(index=y.index)

    X = feats.set_index("date")
    # đảm bảo tz-naive và cùng index y
    X.index = pd.DatetimeIndex(X.index).tz_localize(None)
    X = X.reindex(pd.DatetimeIndex(y.index).tz_localize(None)).fillna(0.0)

    if shift:
        X = X.shift(shift).fillna(0.0)

    # đảm bảo numeric
    for c in X.columns:
        X[c] = pd.to_numeric(X[c], errors="coerce").fillna(0.0)

    return X


def _directional_accuracy(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Tỷ lệ đúng hướng (↑/↓/=) trên returns."""
    eps = 1e-12
    s_true = np.sign(np.where(np.abs(y_true) <= eps, 0.0, y_true))
    s_pred = np.sign(np.where(np.abs(y_pred) <= eps, 0.0, y_pred))
    return float((s_true == s_pred).mean())


def _series_1d_from_endog(endog_obj, index) -> pd.Series:
    """Chuẩn hoá endog (có thể là ndarray 1D/2D, Series/DataFrame) → Series 1D."""
    if isinstance(endog_obj, pd.Series):
        y = endog_obj.copy()
        y.index = pd.DatetimeIndex(index).tz_localize(None)
        return y
    if isinstance(endog_obj, pd.DataFrame):
        arr = np.asarray(endog_obj.values).reshape(-1)
    else:
        arr = np.asarray(endog_obj).reshape(-1)
    return pd.Series(arr, index=pd.DatetimeIndex(index).tz_localize(None))


# -----------------------------
# 1) News coverage (sanity)
# -----------------------------
def recent_news_coverage(
    symbol: str,
    days: int = 90,
    add_index: Optional[List[str]] = None
) -> Dict[str, Any]:
    """
    Kiểm tra mức phủ tin trong N ngày gần nhất.
    Trả về:
      - rows, nonzero_days
      - nonzero_cols: cột nào có giá trị khác 0 trong cửa sổ
      - sample_tail: 5 dòng cuối (để bạn nhìn nhanh)
    """
    end_ts   = int(pd.Timestamp.now(tz=ICT).timestamp())
    start_ts = end_ts - days * 24 * 3600

    fe = build_news_features(symbol, start_ts, end_ts, add_index=add_index or [])
    if fe.empty:
        return {
            "symbol": symbol.upper(),
            "days_window": days,
            "rows": 0,
            "nonzero_days": 0,
            "nonzero_cols": {},
            "sample_tail": pd.DataFrame(columns=["date"])
        }

    nonzero_cols = {c: int((fe[c].fillna(0) != 0).any()) for c in fe.columns if c != "date"}
    nonzero_days = int((fe.get("news_count", pd.Series([0]*len(fe))) > 0).sum())
    return {
        "symbol": symbol.upper(),
        "days_window": days,
        "rows": int(len(fe)),
        "nonzero_days": nonzero_days,
        "nonzero_cols": nonzero_cols,
        "sample_tail": fe.tail(5),
    }


# -----------------------------
# 2) Exog alignment vs model meta
# -----------------------------
def check_exog_alignment(symbol: str, add_index: Optional[List[str]] = None) -> Dict[str, Any]:
    sym = symbol.upper()
    fit, meta = load_model_meta(sym, "gap")
    if fit is None or meta is None:
        return {"ok": False, "error": "Model/meta chưa tồn tại. Hãy train trước."}

    use_exog = bool(meta.get("use_exog", False))
    feat_cols: List[str] = list(meta.get("feature_cols", []))

    # endog -> Series 1D
    endog_idx = pd.DatetimeIndex(fit.model.data.row_labels).tz_localize(None)
    y = _series_1d_from_endog(fit.model.endog, endog_idx)

    # build exog theo cùng index
    X = _align_exog_to_y(sym, y, add_index=add_index or [], shift=1)

    report: Dict[str, Any] = {
        "symbol": sym,
        "use_exog_meta": use_exog,
        "meta_feature_cols": feat_cols,
        "built_X_shape": X.shape,
        "built_cols": list(X.columns),
        "issues": [],
        "ok": True,
    }

    if not use_exog:
        # Model đã train không dùng exog → không cần so chiếu cột
        return report

    if X.empty:
        report["issues"].append({"type": "no_exog_now", "detail": "Không tạo được X từ tin tức hiện có."})
        report["ok"] = False
        return report

    built_cols = list(X.columns)
    meta_cols  = list(feat_cols)

    missing_cols = [c for c in meta_cols  if c not in built_cols]
    extra_cols   = [c for c in built_cols if c not in meta_cols]
    if missing_cols:
        report["issues"].append({"type": "missing_cols", "detail": missing_cols})
    if extra_cols:
        report["issues"].append({"type": "extra_cols", "detail": extra_cols})
    if not missing_cols and not extra_cols and built_cols != meta_cols:
        report["issues"].append({"type": "column_order_mismatch", "detail": {"built": built_cols, "meta": meta_cols}})

    if X.shape[0] != len(y):
        report["issues"].append({"type": "row_mismatch", "detail": (X.shape[0], len(y))})

    report["ok"] = (len(report["issues"]) == 0)
    return report


# -----------------------------
# 3) Backtest: exog vs no-exog
# -----------------------------
def backtest_arima_exog(
    symbol: str,
    lookback_days: int = 365,
    test_ratio: float = 0.2,
    add_index: Optional[List[str]] = None,
    d: int = 0,
    max_p: int = 3,
    max_q: int = 3,
    trends: Tuple[str, ...] = ("n", "c"),
) -> Dict[str, Any]:
    """
    Backtest (time split) cho 2 mô hình:
      - SARIMA (no exog)
      - SARIMAX (with exog từ Qdrant)
    Trả về MAE, RMSE, Directional Accuracy trên tập test.
    """
    sym = symbol.upper()

    # 1) Endog = log-return
    close = get_close_series(sym, days=lookback_days)
    if close is None or len(close) < 60:
        raise ValueError("Không đủ dữ liệu giá để backtest.")
    y = _to_returns(close)
    if len(y) < 30:
        raise ValueError("Không đủ số điểm returns.")

    # 2) Exog aligned + shift(1)
    X = _align_exog_to_y(sym, y, add_index=add_index or [], shift=1)

    # 3) Split theo thời gian
    n = len(y)
    n_test = max(10, int(n * test_ratio))
    n_train = n - n_test
    y_tr, y_te = y.iloc[:n_train], y.iloc[n_train:]
    X_tr = X.iloc[:n_train] if not X.empty else pd.DataFrame(index=y_tr.index)
    X_te = X.iloc[n_train:] if not X.empty else pd.DataFrame(index=y_te.index)

    # 4) Fit 2 mô hình
    fit_no_exog, order0, trend0 = arima_select_fit(
        y_tr, d=d, max_p=max_p, max_q=max_q, trends=trends, exog=None
    )
    if not X_tr.empty:
        fit_exog, order1, trend1 = arima_select_fit(
            y_tr, d=d, max_p=max_p, max_q=max_q, trends=trends, exog=X_tr
        )
    else:
        fit_exog, order1, trend1 = None, None, None

    # 5) Forecast (one-shot trên cửa sổ test)
    fc_no = fit_no_exog.get_forecast(steps=len(y_te))
    yhat_no = np.asarray(fc_no.predicted_mean, dtype=float)

    if fit_exog is not None and not X_te.empty:
        fc_ex = fit_exog.get_forecast(steps=len(y_te), exog=X_te)
        yhat_ex = np.asarray(fc_ex.predicted_mean, dtype=float)
    else:
        yhat_ex = None

    # 6) Metrics
    def _metrics(y_true, y_pred) -> Dict[str, float]:
        y_true = np.asarray(y_true, dtype=float)
        y_pred = np.asarray(y_pred, dtype=float)
        mae  = float(np.mean(np.abs(y_true - y_pred)))
        rmse = float(np.sqrt(np.mean((y_true - y_pred) ** 2)))
        da   = _directional_accuracy(y_true, y_pred)
        return {"MAE": mae, "RMSE": rmse, "DA": da}

    m_no = _metrics(y_te.values, yhat_no)
    m_ex = _metrics(y_te.values, yhat_ex) if yhat_ex is not None else None

    return {
        "symbol": sym,
        "n_total": n,
        "n_train": n_train,
        "n_test": n_test,
        "order_no_exog": order0,
        "trend_no_exog": trend0,
        "order_exog": order1,
        "trend_exog": trend1,
        "metrics_no_exog": m_no,
        "metrics_exog": m_ex,
        "has_exog": bool(fit_exog is not None and yhat_ex is not None),
        "test_start": str(y_te.index[0].date()) if len(y_te) else None,
        "test_end": str(y_te.index[-1].date()) if len(y_te) else None,
    }


def compare_models_report(symbol: str, **kwargs) -> Dict[str, Any]:
    """
    Wrapper backtest và tóm tắt so sánh exog vs no-exog.
    """
    res = backtest_arima_exog(symbol, **kwargs)
    note = (
        "Exog giúp cải thiện"
        if (res.get("metrics_exog") and res["metrics_exog"]["MAE"] < res["metrics_no_exog"]["MAE"])
        else "Exog chưa cải thiện (hoặc chưa dùng)"
    )
    return {
        "symbol": res["symbol"],
        "window": f"{res['n_train']} train / {res['n_test']} test",
        "order_no_exog": res["order_no_exog"],
        "order_exog": res["order_exog"],
        "metrics_no_exog": res["metrics_no_exog"],
        "metrics_exog": res["metrics_exog"],
        "has_exog": res["has_exog"],
        "period_test": (res["test_start"], res["test_end"]),
        "note": note,
    }


# -----------------------------
# 4) Console helper
# -----------------------------
def print_diagnostics(symbol: str, add_index: Optional[List[str]] = None, recent_days: int = 90):
    print("=== [Coverage] ===")
    cov = recent_news_coverage(symbol, days=recent_days, add_index=add_index or [])
    print(f"Symbol: {cov['symbol']} | window: {cov['days_window']}d | rows: {cov['rows']}")
    print("nonzero_days(news_count>0):", cov["nonzero_days"])
    print("nonzero_cols:", cov["nonzero_cols"])
    print(cov["sample_tail"])

    print("\n=== [Alignment] ===")
    align = check_exog_alignment(symbol, add_index=add_index or [])
    print("use_exog_meta:", align.get("use_exog_meta"))
    print("built_X_shape:", align.get("built_X_shape"))
    print("meta_feature_cols:", align.get("meta_feature_cols"))
    print("built_cols:", align.get("built_cols"))
    print("issues:", align.get("issues"))
    print("OK:", align.get("ok"))

    print("\n=== [Backtest] ===")
    rep = compare_models_report(symbol, lookback_days=365, test_ratio=0.25, add_index=add_index or [])
    print("period_test:", rep["period_test"])
    print("order_no_exog:", rep["order_no_exog"], "| order_exog:", rep["order_exog"])
    print("metrics_no_exog:", rep["metrics_no_exog"])
    print("metrics_exog:", rep["metrics_exog"])
    print("has_exog:", rep["has_exog"])
    print("note:", rep["note"])


In [None]:
print(recent_news_coverage("VCB", days=90, add_index=["VNINDEX","VN30"]))
print(check_exog_alignment("VCB", add_index=["VNINDEX","VN30"]))
print(compare_models_report("VCB", lookback_days=400, test_ratio=0.25, add_index=["VNINDEX","VN30"]))


In [None]:
from modules.ML.pipeline import train_gap_model, predict_tomorrow_full_exog, predict_direction_tomorrow

print(train_gap_model("VCB", lookback_days=365, add_index=["VNINDEX","VN30"]))
print(predict_tomorrow_full_exog("VCB"))
print(predict_direction_tomorrow("VCB"))


In [None]:
from modules.ML.pipeline import smart_predict, train_gap_model, forecast_gap
print(smart_predict("VCB"))


In [1]:
from modules.ML.pipeline import smart_predict

# Tự động chọn chế độ: trong phiên → bước tiếp theo; ngoài giờ → phiên kế tiếp
res = smart_predict("FPT")
print(res)


IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
W1024 15:01:06.511000 27848 site-packages\torch\distributed\elastic\multiprocessing\redirects.py:29] NOTE: Redirects are currently not supported in Windows or MacOs.
Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


Collection `cafef_articles` đã tồn tại.
EmbedderServices device: cuda
RerankerServices device: cuda


Device set to use cuda:0
`return_all_scores` is now deprecated,  if want a similar functionality use `top_k=None` instead of `return_all_scores=True` or `top_k=1` instead of `return_all_scores=False`.


{'mode': 'next_session', 'symbol': 'FPT', 'timestamp': '24-10-2025 15:01:26', 'next_session': 'AM', 'target_day': datetime.date(2025, 10, 27), 'open_band': {'px_mean': 97.7733318119878, 'px_lo': 95.3892856840437, 'px_hi': 100.21696194771023}, 'gap': {'symbol': 'FPT', 'gap_ret_mean': 0.0007502999488289655, 'gap_ret_ci': [-0.02393529629324368, 0.025435896190901613], 'last_close': 97.7, 'use_exog': True}, 'open_direction': 'tăng', 'open_gap_pct': 0.075, 'open_confidence': 'uncertain', 'note': 'AM dùng band OPEN (ước lượng khoảng mở cửa).'}


In [2]:
from modules.api.stock_api import get_intraday_df

df = get_intraday_df("FPT", interval="1m", days=1, debug=True)
print(df.tail(5))


                     open  high   low  close   volume
time                                                 
2025-10-24 14:45:00  97.7  97.7  97.7   97.7  90500.0


In [2]:
# tests/test_forecast_next_session.py
import pprint
from modules.ML.pipeline import smart_predict, predict_next_session
from modules.api.stock_api import get_stock_quote
from modules.api.stock_api import format_market_summary

TEST_SYMBOL = "VCB"

def test_smart_predict():
    print("=== [TEST] smart_predict() ===")
    pack = smart_predict(TEST_SYMBOL)
    pprint.pprint(pack)

    # Gợi ý cách đọc kết quả
    mode = pack.get("mode")
    print(f"\nMode dự báo: {mode}")
    if mode == "in_session":
        print(f"- Đang ở phiên {pack.get('session')}")
        print(f"- next_step_dir (đi tiếp trong vài bước tới): {pack.get('next_step_dir')}")
        print(f"- step_confidence: {pack.get('step_confidence')}")
        print(f"- last_px: {pack.get('last_px')}")
        print(f"- path_pred (giá ước lượng các bước kế tiếp):\n{pack.get('path_pred')}")
    else:
        # out_of_session / next_session mode
        print(f"- next_session: {pack.get('next_session')}")
        if pack.get('open_direction'):
            print(f"  open_direction: {pack['open_direction']}")
            print(f"  open_gap_pct: {pack['open_gap_pct']}%")
            print(f"  open_confidence: {pack['open_confidence']}")
            print(f"  OPEN_am band: {pack.get('bands',{}).get('OPEN_am')}")

    print("\n")

def test_predict_next_session():
    print("=== [TEST] predict_next_session() ===")
    pack = predict_next_session(TEST_SYMBOL)
    pprint.pprint(pack)

    print(f"\nPhiên kế tiếp: {pack.get('next_session')}")
    if "open_band" in pack:
        # AM scenario
        print("→ Dự báo khoảng giá mở cửa (OPEN_am):", pack["open_band"])
        print("→ Hướng:", pack.get("open_direction"))
        print("→ Độ tự tin:", pack.get("open_confidence"))
        print("→ Gap % so với đóng cửa phiên trước:", pack.get("open_gap_pct"), "%")
    elif "pm_band" in pack:
        # PM scenario
        print("→ Dự báo khung giá buổi chiều (PM_band):", pack["pm_band"])
        print("→ Hướng:", pack.get("pm_direction"))
        print("→ Độ tự tin:", pack.get("pm_confidence"))
        print("→ Dựa trên:", pack.get("base_from"))

    print("\n")

def test_market_snapshot():
    print("=== [TEST] Snapshot thị trường chung ===")
    # Kiểm tra nhanh để đảm bảo API index / tổng quan thị trường không chết
    print(format_market_summary())

    # Kiểm tra realtime 1 mã để đối chiếu với dự báo
    print("\n--- Giá hiện tại mã", TEST_SYMBOL, "---")
    data = get_stock_quote(TEST_SYMBOL)
    pprint.pprint(data)
    print("\n")

if __name__ == "__main__":
    test_smart_predict()
    test_predict_next_session()
    test_market_snapshot()


=== [TEST] smart_predict() ===
{'error': None,
 'last_px': 59.7,
 'mode': 'in_session',
 'next_step_dir': 'giảm',
 'path_pred': t+step
1    59.680020
2    59.660047
3    59.640080
dtype: float64,
 'ret_mean': -0.00033472811160588297,
 'ret_std': 0.001831466880738437,
 'session': 'AM',
 'source_used': 'intraday',
 'step_confidence': 'low',
 'symbol': 'VCB',
 'timestamp': '27-10-2025 09:23:12'}

Mode dự báo: in_session
- Đang ở phiên AM
- next_step_dir (đi tiếp trong vài bước tới): giảm
- step_confidence: low
- last_px: 59.7
- path_pred (giá ước lượng các bước kế tiếp):
t+step
1    59.680020
2    59.660047
3    59.640080
dtype: float64


=== [TEST] predict_next_session() ===
{'base_from': 'AM_close',
 'mode': 'next_session',
 'next_session': 'PM',
 'note': 'PM dựa trên giá kết thúc buổi sáng và band PM mặc định.',
 'pm_band': {'px_hi': 59.99924749530624,
             'px_lo': 59.40224500780314,
             'px_mean': 59.7},
 'pm_confidence': 'uncertain',
 'pm_direction': 'không thay đổi

In [None]:
# tests/test_sarimax_train_eval.py
import numpy as np
import pandas as pd
from modules.ML.pipeline import train_gap_model
from modules.ML.registry import load_model_meta
from modules.ML.metrics import rmse, mape
from modules.api.stock_api import get_close_series

TEST_SYMBOL = "VCB"

def test_train_and_evaluate(lookback_days=365):
    print("=== [TEST] SARIMAX train & đánh giá nội bộ ===")

    # 1. Train và lưu model/meta
    model_path, meta_path = train_gap_model(TEST_SYMBOL, lookback_days=lookback_days)
    print(f"- Model saved: {model_path}")
    print(f"- Meta saved : {meta_path}")

    # 2. Load lại để chắc chắn có thể đọc
    model, meta = load_model_meta(TEST_SYMBOL, "gap")
    if model is None or meta is None:
        print("Không load được model sau khi train.")
        return
    
    print("\n--- META ---")
    for k,v in meta.items():
        print(f"{k}: {v}")

    # Lấy endog gốc (log-returns) từ model
    # statsmodels SARIMAXResults lưu data trong model.model
    try:
        endog = model.model.endog  # numpy array, log-return y_t
    except Exception as e:
        print("Không đọc được endog từ model:", e)
        return

    fitted = model.fittedvalues  # giá trị dự báo nội bộ in-sample
    if hasattr(fitted, "values"):
        fitted = fitted.values

    # align độ dài
    n = min(len(endog), len(fitted))
    y_true = np.array(endog[-n:], dtype="float64")
    y_pred = np.array(fitted[-n:], dtype="float64")

    err_rmse = rmse(y_true, y_pred)
    err_mape = mape(y_true, y_pred)

    print("\n--- Hiệu suất in-sample ---")
    print(f"RMSE (trên log-return): {err_rmse:.6f}")
    print(f"MAPE (trên log-return): {err_mape:.6f}")
    print("AIC  :", getattr(model, "aic", None))

    # 3. Diễn giải: quy đổi log-return dự báo vs giá thật
    closes = get_close_series(TEST_SYMBOL, days=lookback_days)
    closes = closes.dropna()
    print("\nClose series tail:")
    print(closes.tail())

    # Tính thử ví dụ: dùng last_close và dự báo one-step ahead
    if len(closes) > 0:
        last_close = float(closes.iloc[-1])
        if len(y_pred) > 0:
            ret_hat = y_pred[-1]
            px_hat  = last_close * np.exp(ret_hat)
            print(f"\nLast close: {last_close:.3f}  → Dự báo next(px): {px_hat:.3f} (ret_hat={ret_hat:.5f})")

if __name__ == "__main__":
    test_train_and_evaluate()


=== [TEST] SARIMAX train & đánh giá nội bộ ===


Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`
Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`
Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`
Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`
Down

- Model saved: models\VCB_gap.pkl
- Meta saved : models\VCB_gap.json

--- META ---
symbol: VCB
order: [1, 0, 0]
trend: n
use_exog: True
feature_cols: ['news_count', 'pos_count', 'neg_count', 'neu_count', 'mean_sent', 'sum_sent', 'ret_lag1', 'ret_lag2', 'ret_lag5']
scaler: {'news_count': {'mu': 0.0, 'sd': 1.0}, 'pos_count': {'mu': 0.0, 'sd': 1.0}, 'neg_count': {'mu': 0.0, 'sd': 1.0}, 'neu_count': {'mu': 0.0, 'sd': 1.0}, 'mean_sent': {'mu': 0.0, 'sd': 1.0}, 'sum_sent': {'mu': 0.0, 'sd': 1.0}, 'ret_lag1': {'mu': -5.007558725072949e-05, 'sd': 0.012279129799331172}, 'ret_lag2': {'mu': -0.00010303191597674378, 'sd': 0.017594109387141794}, 'ret_lag5': {'mu': -0.0002205868864140254, 'sd': 0.024495925614274057}}
train_len: 364
timestamp: 25-10-2025 14:26:08
target: gap_ret
add_index: ['VNINDEX', 'VN30']

--- Hiệu suất in-sample ---
RMSE (trên log-return): 0.012487
MAPE (trên log-return): 49920.896584
AIC  : -2155.4299817259543

Close series tail:
_dt
2025-10-20    59.4
2025-10-21    59.3
2025-1

In [1]:
from importlib import reload
from modules.ML import pipeline
reload(pipeline)

fit, meta, eval_report = pipeline.train_gap_model("VCB")
print(">>> eval_report =", eval_report)

print("\n>>> smart_predict")
print(pipeline.smart_predict("VCB"))

print("\n>>> forecast_gap")
print(pipeline.forecast_gap("VCB"))


IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
W1027 09:21:02.560000 21436 site-packages\torch\distributed\elastic\multiprocessing\redirects.py:29] NOTE: Redirects are currently not supported in Windows or MacOs.
Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


Collection `cafef_articles` đã tồn tại.
EmbedderServices device: cuda
RerankerServices device: cuda


Device set to use cuda:0
`return_all_scores` is now deprecated,  if want a similar functionality use `top_k=None` instead of `return_all_scores=True` or `top_k=1` instead of `return_all_scores=False`.
Maximum Likelihood optimization failed to converge. Check mle_retvals


- Model saved: models\VCB_gap.pkl
- Meta saved : models\VCB_gap.json

--- META ---
symbol: VCB
order: [1, 0, 0]
trend: n
use_exog: True
feature_cols: ['news_count', 'pos_count', 'neg_count', 'neu_count', 'mean_sent', 'sum_sent', 'idx_VNINDEX_news_count', 'idx_VNINDEX_pos_count', 'idx_VNINDEX_neg_count', 'idx_VNINDEX_neu_count', 'idx_VNINDEX_mean_sent', 'idx_VNINDEX_sum_sent', 'idx_VN30_news_count', 'idx_VN30_pos_count', 'idx_VN30_neg_count', 'idx_VN30_neu_count', 'idx_VN30_mean_sent', 'idx_VN30_sum_sent', 'ret_lag1', 'ret_lag2', 'ret_lag5']
scaler: {'news_count': {'mu': 0.008241758241758242, 'sd': 0.09053369044416568}, 'pos_count': {'mu': 0.0027472527472527475, 'sd': 0.052414241836095915}, 'neg_count': {'mu': 0.0, 'sd': 1.0}, 'neu_count': {'mu': 0.005494505494505495, 'sd': 0.0740227607951281}, 'mean_sent': {'mu': 0.0027386276972490354, 'sd': 0.035715182961767074}, 'sum_sent': {'mu': 0.0027386276972490354, 'sd': 0.035715182961767074}, 'idx_VNINDEX_news_count': {'mu': 0.00824175824175824

In [2]:
from modules.api.stock_api import format_forecast_text
from modules.ML.pipeline import smart_predict

pack = smart_predict("VCB")
print(format_forecast_text("VCB", pack))


📅 Phiên sáng (AM) sắp tới của VCB (2025-10-27):
- Giá mở cửa dự kiến khoảng 59.43 VNĐ (dải 58.26 ~ 60.62).
- Dự kiến giảm khoảng -0.12%.
- Mức độ tự tin mô hình: uncertain.
⚠️ Đây chỉ là ước lượng dựa trên tin tức & hành vi giá gần nhất, không phải khuyến nghị đầu tư.
