In [None]:
# ==== CELL 1: CONFIG & HELPERS (inline only; no file output) ====
import numpy as np, pandas as pd, yfinance as yf
import plotly.graph_objects as go
from IPython.display import display, Markdown

# ---------- CONFIG ----------
LOOKBACK_YEARS = 7
INTERVAL       = "1d"
AUTO_ADJUST    = True

# ค่าธรรมเนียม/สลิปเพจ (bps; 1% = 100 bps)
FEE_BPS      = 10
SLIPPAGE_BPS = 5

# รายชื่อหุ้น (SET25 — แก้ไขได้)
TICKERS = [
    "SCB.BK","SCC.BK","KBANK.BK","BBL.BK","KTB.BK",
    "CPALL.BK","CPN.BK","PTT.BK","PTTEP.BK","PTTGC.BK",
    "GULF.BK","ADVANC.BK","AOT.BK","BDMS.BK","BH.BK",
    "DELTA.BK","TRUE.BK","EA.BK","IVL.BK","MINT.BK",
    "BGRIM.BK","EGCO.BK","TOP.BK","OSP.BK","OR.BK",
]

# กริดพารามิเตอร์ (หา best SMA และ overlays แบบเร็ว)
SMA_GRID_SHORT = [3,5,7,9,11,13,15]
SMA_GRID_LONG  = list(range(80, 201, 10))
ENTRY_BUF_GRID = [1,2,3]
EXIT_BUF_GRID  = [1,2,3]
ATR_PERIOD_GRID= [10,14,20]
ATR_MULT_GRID  = [2.0,2.5,3.0]

# ---------- HELPERS ----------
def _flatten_yf(df: pd.DataFrame, ticker: str) -> pd.DataFrame:
    if df is None or df.empty: return df
    out = df.copy()
    if isinstance(out.columns, pd.MultiIndex):
        try:
            out = out.swaplevel(axis=1).sort_index(axis=1)
            if ticker in out.columns.get_level_values(0):
                out = out[ticker].copy()
        except Exception:
            try: out = out.xs(ticker, axis=1, level=1)
            except Exception: pass
    cols_lower = {c.lower(): c for c in out.columns}
    if "close" not in cols_lower and "adj close" in cols_lower:
        out["Close"] = out[cols_lower["adj close"]]
    if "close" in cols_lower and "Close" not in out.columns:
        out.rename(columns={cols_lower["close"]:"Close"}, inplace=True)
    if "high" in cols_lower and "High" not in out.columns:
        out.rename(columns={cols_lower["high"]:"High"}, inplace=True)
    if "low" in cols_lower and "Low" not in out.columns:
        out.rename(columns={cols_lower["low"]:"Low"}, inplace=True)
    if "High" not in out.columns: out["High"] = out["Close"]
    if "Low"  not in out.columns: out["Low"]  = out["Close"]
    return out

def download_data(ticker: str, years: int) -> pd.DataFrame:
    end = pd.Timestamp.today().normalize()
    start = end - pd.DateOffset(years=years)
    df = yf.download(ticker, start=start, end=end, interval=INTERVAL,
                     auto_adjust=AUTO_ADJUST, progress=False)
    if df is None or df.empty:
        raise ValueError("No data downloaded")
    df = _flatten_yf(df, ticker)[["Close","High","Low"]].dropna(subset=["Close"])
    df.index = pd.to_datetime(df.index)
    return df

def sma_pos(df: pd.DataFrame, short: int, long_: int) -> pd.Series:
    if short >= long_: return pd.Series(0, index=df.index, dtype=int)
    s = df["Close"].rolling(short, min_periods=short).mean()
    l = df["Close"].rolling(long_,  min_periods=long_).mean()
    return (s > l).astype(int)

def compute_atr(df: pd.DataFrame, period: int) -> pd.Series:
    hl = df["High"] - df["Low"]
    hc = (df["High"] - df["Close"].shift()).abs()
    lc = (df["Low"]  - df["Close"].shift()).abs()
    tr = pd.concat([hl,hc,lc], axis=1).max(axis=1)
    return tr.ewm(alpha=1/period, adjust=False).mean()

def overlays(df, raw_pos, entry_buf, exit_buf, atr_mult, atr_period) -> pd.Series:
    atr = compute_atr(df, atr_period)
    pos=0; cin=0; cout=0; maxp=np.nan; out=[]
    close = df["Close"].values; raw = raw_pos.fillna(0).astype(int).values
    for i in range(len(df)):
        if pos==0:
            cin = cin+1 if raw[i]==1 else 0
            if cin >= max(1, entry_buf):
                pos=1; maxp=close[i]; cout=0
        else:
            if np.isnan(maxp) or close[i]>maxp: maxp=close[i]
            ts = maxp - atr_mult * atr.iloc[i] if not np.isnan(atr.iloc[i]) else np.nan
            if not np.isnan(ts) and close[i] <= ts:
                pos=0; cin=0; maxp=np.nan
            else:
                if raw[i]==0: cout+=1
                else: cout=0
                if cout >= max(1, exit_buf):
                    pos=0; cin=0; maxp=np.nan
        out.append(pos)
    return pd.Series(out, index=df.index, dtype=int)

def backtest(df: pd.DataFrame, pos: pd.Series) -> pd.DataFrame:
    out = df.copy()
    out["Pos"] = pos.reindex(out.index).fillna(0).astype(int)
    out["Ret"] = out["Close"].pct_change().fillna(0.0)
    per_trade_cost = (FEE_BPS + SLIPPAGE_BPS) / 1e4
    trades = out["Pos"].diff().fillna(0)
    out["NetRet"] = out["Pos"].shift(1).fillna(0)*out["Ret"] - np.abs(trades)*per_trade_cost
    out["Equity"] = (1 + out["NetRet"]).cumprod()
    out["Drawdown"] = out["Equity"] / out["Equity"].cummax() - 1.0
    return out

def perf_stats(bt: pd.DataFrame, interval: str = INTERVAL) -> dict:
    equity, netret = bt["Equity"], bt["NetRet"]
    periods = 252 if interval == "1d" else (52 if interval == "1wk" else 12)
    n = len(equity)
    if n <= 1: return {"CAGR":0,"MDD":0,"Sharpe":0,"WinRate":0}
    cagr = equity.iloc[-1] ** (periods / n) - 1.0
    mdd  = (equity / equity.cummax() - 1.0).min()
    vol  = netret.std(ddof=0)
    sharpe = (netret.mean()/vol)*np.sqrt(periods) if vol>0 else 0.0
    winrate = (netret>0).mean()
    return {"CAGR":float(cagr), "MDD":float(mdd), "Sharpe":float(sharpe), "WinRate":float(winrate)}

def optimize_sma(df_tr, df_te) -> dict:
    rows=[]
    for sh in SMA_GRID_SHORT:
        for lg in SMA_GRID_LONG:
            if sh>=lg: continue
            bt_te = backtest(df_te, sma_pos(df_te, sh, lg))
            st = perf_stats(bt_te)
            rows.append({"short":sh,"long":lg,"test_Sharpe":st["Sharpe"],"test_CAGR":st["CAGR"],"test_MDD":st["MDD"]})
    res = pd.DataFrame(rows)
    if res.empty: return {"short":5,"long":150,"test_Sharpe":0.0}
    res = res.sort_values(["test_Sharpe","test_CAGR"], ascending=[False,False])
    return res.iloc[0].to_dict()

def optimize_overlays(df_tr, df_te, sh, lg) -> dict:
    raw_tr = sma_pos(df_tr, sh, lg); raw_te = sma_pos(df_te, sh, lg)
    rows=[]
    for ebuf in ENTRY_BUF_GRID:
        for xbuf in EXIT_BUF_GRID:
            for ap in ATR_PERIOD_GRID:
                for am in ATR_MULT_GRID:
                    bt_te = backtest(df_te, overlays(df_te, raw_te, ebuf, xbuf, am, ap))
                    st = perf_stats(bt_te)
                    rows.append({"entry_buf":ebuf,"exit_buf":xbuf,"atr_period":ap,"atr_mult":am,
                                 "test_Sharpe":st["Sharpe"],"test_CAGR":st["CAGR"],"test_MDD":st["MDD"]})
    res = pd.DataFrame(rows)
    if res.empty: return {"entry_buf":1,"exit_buf":3,"atr_period":14,"atr_mult":3.0}
    res = res.sort_values(["test_Sharpe","test_CAGR"], ascending=[False,False])
    return res.iloc[0].to_dict()

def last_signal(df, pos):
    ch = pos.diff().fillna(0)
    trade = 1 if ch.iloc[-1]>0 else (-1 if ch.iloc[-1]<0 else 0)
    status = "LONG" if pos.iloc[-1]==1 else "CASH"
    return trade, status


In [None]:
# ==== 7-day window banner ====
import pandas as pd
from IPython.display import display, HTML

TODAY = pd.Timestamp.today().normalize()
START_7D = TODAY - pd.Timedelta(days=7)

# แบนเนอร์ช่วง 7 วัน
display(HTML(f"""
<div style="margin:10px 0;padding:8px 12px;border:1px solid #e5e7eb;border-radius:10px;
            background:#f8fafc;font-size:16px;">
  <b>ช่วง 7 วันล่าสุด:</b> {START_7D.date()} – {TODAY.date()}
</div>
"""))


In [None]:
# ==== CELL 2: Run WFO for all tickers & show summary table (NO FILE OUTPUT) ====
summary_rows = []

for tk in TICKERS:
    try:
        df = download_data(tk, LOOKBACK_YEARS)
        cut = int(len(df)*0.7)
        df_tr, df_te = df.iloc[:cut], df.iloc[cut:]

        # 1) หา best SMA บน test
        best_sma = optimize_sma(df_tr, df_te)
        sh, lg = int(best_sma["short"]), int(best_sma["long"])

        # 2) หา best overlays บน test
        best_ov = optimize_overlays(df_tr, df_te, sh, lg)
        ebuf, xbuf = int(best_ov["entry_buf"]), int(best_ov["exit_buf"])
        ap, am     = int(best_ov["atr_period"]), float(best_ov["atr_mult"])

        # 3) backtest ทั้งช่วงด้วย params ล่าสุด
        raw = sma_pos(df, sh, lg)
        pos = overlays(df, raw, ebuf, xbuf, am, ap)
        bt  = backtest(df, pos)
        st  = perf_stats(bt)

        trd, status = last_signal(df, pos)
        last_sig = "BUY" if trd==1 else ("SELL" if trd==-1 else "")
        last_date = df.index[-1].date() if last_sig else ""

        summary_rows.append({
            "ticker": tk,
            "status_now": status,
            "last_signal": last_sig,
            "last_signal_date": str(last_date) if last_sig else "",
            "short": sh, "long": lg,
            "entry_buf": ebuf, "exit_buf": xbuf,
            "atr_period": ap, "atr_mult": am,
            "Sharpe": round(st["Sharpe"],3),
            "CAGR": round(st["CAGR"],3),
            "MDD": round(st["MDD"],3),
        })
    except Exception as e:
        summary_rows.append({"ticker": tk, "status_now": "ERR", "last_signal": str(e)})

summary_df = pd.DataFrame(summary_rows).sort_values(["Sharpe","CAGR"], ascending=[False,False]).reset_index(drop=True)
display(Markdown("### สรุปผล WFO (inline) — ไม่สร้างไฟล์/โฟลเดอร์"))
display(summary_df)


In [None]:
# ==== CELL 3: Plot เฉพาะหุ้นที่มี "last signal" ภายใน 7 วัน (inline only) ====
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from IPython.display import display, Markdown

# ตรวจว่ามี summary_df จาก Cell 2 หรือไม่
if 'summary_df' not in globals() or summary_df.empty:
    display(Markdown("**ไม่มี summary_df จาก Cell 2 — โปรดรัน Cell 2 ก่อน**"))
else:
    # คัดเฉพาะตัวที่มี last_signal (BUY/SELL) และวันที่อยู่ใน 7 วันล่าสุด
    today = pd.Timestamp.today().normalize()
    df_sig = summary_df.copy()
    df_sig['last_signal_date'] = pd.to_datetime(df_sig['last_signal_date'], errors='coerce')
    recent = df_sig[
        df_sig['last_signal'].astype(str).str.len().gt(0) &
        df_sig['last_signal_date'].notna() &
        (df_sig['last_signal_date'] >= today - pd.Timedelta(days=7))
    ].reset_index(drop=True)

    if recent.empty:
        display(Markdown("**7 วันที่ผ่านมาไม่มีสัญญาณ BUY/SELL ใหม่ในรายการนี้**"))
    else:
        display(Markdown(f"### หุ้นที่มีสัญญาณในรอบ 7 วัน ({len(recent)} ตัว)"))
        display(recent[['ticker','last_signal','last_signal_date','status_now','short','long','entry_buf','exit_buf','atr_period','atr_mult','Sharpe','CAGR','MDD']])

        for _, row in recent.iterrows():
            tk   = row['ticker']
            sh   = int(row['short']); lg = int(row['long'])
            ebuf = int(row['entry_buf']); xbuf = int(row['exit_buf'])
            ap   = int(row['atr_period']); am  = float(row['atr_mult'])

            # เตรียมข้อมูลและเส้น
            df = download_data(tk, LOOKBACK_YEARS)
            s  = df["Close"].rolling(sh, min_periods=sh).mean()
            l  = df["Close"].rolling(lg, min_periods=lg).mean()
            raw = (s > l).astype(int)
            pos = overlays(df, raw, ebuf, xbuf, am, ap)

            # BUY/SELL points
            ch = pos.diff().fillna(0)
            trades = (ch > 0).astype(int) + (ch < 0).astype(int) * -1
            buys  = df[trades == 1]
            sells = df[trades == -1]

            # Trailing stop (ไว้แสดง)
            atr = compute_atr(df, ap)
            ts = pd.Series(np.nan, index=df.index, dtype=float)
            p=0; cin=0; cout=0; maxp=np.nan
            for i,(dt_i,rowp) in enumerate(df.iterrows()):
                raw_i = 1 if s.iloc[i] > l.iloc[i] else 0
                if p == 0:
                    cin = cin + 1 if raw_i == 1 else 0
                    if cin >= max(1, ebuf): p = 1; maxp = rowp["Close"]; cout = 0
                else:
                    if np.isnan(maxp) or rowp["Close"] > maxp: maxp = rowp["Close"]
                    if not np.isnan(atr.iloc[i]):
                        ts.iloc[i] = maxp - am * atr.iloc[i]
                        if rowp["Close"] <= ts.iloc[i]: p = 0; cin = 0; maxp = np.nan
                    if p == 1:
                        if raw_i == 0: cout += 1
                        else: cout = 0
                        if cout >= max(1, xbuf): p = 0; cin = 0; maxp = np.nan

            # Plotly figure (หนึ่งรูปต่อหนึ่งหุ้นที่มีสัญญาณ)
            fig = go.Figure()
            fig.add_trace(go.Scatter(x=df.index, y=df["Close"], name="Close", mode="lines",
                                     hovertemplate="Date=%{x|%Y-%m-%d}<br>Close=%{y:.2f}<extra></extra>"))
            fig.add_trace(go.Scatter(x=df.index, y=s, name=f"SMA({sh})", mode="lines",
                                     hovertemplate="Date=%{x|%Y-%m-%d}<br>SMA(short)=%{y:.2f}<extra></extra>"))
            fig.add_trace(go.Scatter(x=df.index, y=l, name=f"SMA({lg})", mode="lines",
                                     hovertemplate="Date=%{x|%Y-%m-%d}<br>SMA(long)=%{y:.2f}<extra></extra>"))
            fig.add_trace(go.Scatter(x=df.index, y=ts, name=f"TS {am}×ATR{ap}",
                                     mode="lines", line=dict(dash="dash"),
                                     hovertemplate="Date=%{x|%Y-%m-%d}<br>TS=%{y:.2f}<extra></extra>"))
            fig.add_trace(go.Scatter(x=buys.index, y=buys["Close"], name="BUY",
                                     mode="markers",
                                     marker=dict(symbol="triangle-up", size=12, color="green",
                                                 line=dict(width=1, color="darkgreen")),
                                     hovertemplate="BUY<br>Date=%{x|%Y-%m-%d}<br>Price=%{y:.2f}<extra></extra>"))
            fig.add_trace(go.Scatter(x=sells.index, y=sells["Close"], name="SELL",
                                     mode="markers",
                                     marker=dict(symbol="triangle-down", size=12, color="red",
                                                 line=dict(width=1, color="darkred")),
                                     hovertemplate="SELL<br>Date=%{x|%Y-%m-%d}<br>Price=%{y:.2f}<extra></extra>"))
            fig.update_layout(
                title=f"{tk} — สัญญาณภายใน 7 วันล่าสุด: {row['last_signal']} @ {row['last_signal_date'].date()} | "
                      f"SMA {sh}/{lg}, EB/XB {ebuf}/{xbuf}, ATR{ap}×{am}",
                template="plotly_white",
                hovermode="x unified",
                xaxis=dict(rangeslider=dict(visible=False)),
                yaxis_title="Price (THB)",
            )
            fig.show(config={"scrollZoom": True, "displayModeBar": True})


In [None]:
import pandas as pd
from IPython.display import display, Markdown
now = pd.Timestamp.now(tz='Asia/Bangkok').strftime('%Y-%m-%d %H:%M %Z')
display(Markdown(f"**รันเสร็จแล้ว:** {now}"))
