Feature Engineering

a=EMA >=5 RSI>=70
b=EMA >=35, >89
c=EMA >=89 ; sideway == 5,15,35,89,=>200
d= EMA = 89 ; down trend = <=89, <200
e= EMA <=85,5; crash

In [1]:
import numpy as np
import pandas as pd

import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    f1_score, accuracy_score, balanced_accuracy_score, matthews_corrcoef,
    classification_report, confusion_matrix, log_loss
)
import plotly.graph_objects as go
from cassandra.cluster import Cluster
from datetime import datetime

In [2]:
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
session.set_keyspace('data_stock')

eps pe pbv percentyield marketcap


ทำไมตั้งแบบนี้

EPS>0 = บริษัทมีกำไร (คุณภาพพื้นฐานขั้นต่ำ)

P/E 8–25 = กรองถูก/แพงเกินไป (ค่า default ทั่วไป—ปรับได้ตามอุตสาหกรรม)

P/BV 0.8–2.5 = ไม่ต่ำกว่าทุนมากเกิน (เสี่ยง) และไม่แพงเวอร์

Dividend Yield ≥ 3% (≤ 20%) = ให้ผลตอบแทนเงินสดพอเหมาะ (กัน outlier yield สูงผิดปกติ)

Quantile mode ใช้สัดส่วนภายในชุดข้อมูล → ดีสำหรับเปรียบเทียบ “สัมพัทธ์” ในตลาด/อุตสาหกรรมเดียวกัน

ถ้าต้องการ “น้ำหนัก” ไม่เท่ากัน (เช่นให้ EPS/PE หนักกว่า PBV/Yield) บอกได้ ผมปรับให้เป็น weighted score ได้เลย เช่น EPS/PE = 2 คะแนน, อีกสองตัว=1 คะแนน แล้วค่อยแปลงเป็น A–E ตามสัดส่วนคะแนนครับ

In [17]:
rows_fin = session.execute("""
    SELECT symbol,close, pe, pbv, dividendyield,marketcap,bvps FROM smartset_finan_data ALLOW FILTERING
""")
df_financial = pd.DataFrame(rows_fin, columns=["symbol","close","pe","pbv","dividendyield","marketcap","bvps"])
df_financial = df_financial.dropna()

COMFIG

In [3]:
TARGET_MODE = "quadrant"   # "trend" (5 กลุ่ม a–e) หรือ "quadrant" (25 กลุ่ม A–E×a–e)
WINDOW = 60
HORIZON = 1
EPOCHS = 20
BATCH = 256
LR = 1e-3
WEIGHT_DECAY = 1e-4
PATIENCE = 5
SEED = 42

Utils: seed + มาตรฐานคอลัมน์

In [4]:
def set_seed(seed=SEED):
    import random
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed); torch.cuda.manual_seed_all(seed)

def ensure_cols(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    rename_map = {}
    if "close_price" in df: rename_map["close_price"] = "close"
    if "open_price"  in df: rename_map["open_price"]  = "open"
    if "high_price"  in df: rename_map["high_price"]  = "high"
    if "low_price"   in df: rename_map["low_price"]   = "low"
    if "percentyield" in df and "dividendyield" not in df:
        rename_map["percentyield"] = "dividendyield"
    df = df.rename(columns=rename_map)
    for c in ["open","high","low","close","pe","pbv","dividendyield","marketcap","eps"]:
        if c in df: df[c] = pd.to_numeric(df[c], errors="coerce")
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    return df.sort_values(["symbol","date"]).reset_index(drop=True)

Indicators & Labels (a–e, A–E, quadrant)

In [5]:
def add_ema(df: pd.DataFrame, span: int) -> str:
    col = f"ema_{span}"
    if col not in df.columns:
        df[col] = df.groupby("symbol")["close"].transform(lambda s: s.ewm(span=span, adjust=False).mean())
    return col

In [6]:
def add_rsi(df: pd.DataFrame, period: int = 14) -> str:
    col = "rsi"
    if col in df.columns: return col
    g = df.groupby("symbol")["close"]
    delta = g.transform(lambda s: s.diff())
    gain = delta.clip(lower=0.0)
    loss = -delta.clip(upper=0.0)
    avg_gain = gain.groupby(df["symbol"]).transform(lambda s: s.ewm(alpha=1/period, adjust=False).mean())
    avg_loss = loss.groupby(df["symbol"]).transform(lambda s: s.ewm(alpha=1/period, adjust=False).mean())
    rs = avg_gain / avg_loss.replace(0, np.nan)
    df[col] = 100 - (100/(1+rs))
    return col

In [7]:
def label_trend_ae(df: pd.DataFrame, sideway_band_pct=0.015, rsi_hi=70, rsi_lo=30) -> pd.DataFrame:
    df = df.copy()
    e5   = add_ema(df, 5)
    e15  = add_ema(df, 15)
    e35  = add_ema(df, 35)
    e89  = add_ema(df, 89)
    e200 = add_ema(df, 200)
    rsi  = add_rsi(df, 14)

    a_mask = (df["close"] >= df[e5]) & (df[rsi] >= rsi_hi)
    b_mask = (df["close"] >= df[e35]) & (df[e35] >= df[e89])
    short_pack = df[[e5, e15, e35, e89]].copy()
    band = (short_pack.max(axis=1) - short_pack.min(axis=1)) / short_pack.mean(axis=1)
    c_mask = (df["close"] >= df[e89]) & (band <= sideway_band_pct)
    d_mask = (df["close"] < df[e89]) & (df["close"] < df[e200]) & (df[e89] < df[e200])
    e_mask = (
        (df["close"] < df[e5]) &
        (df[e5] < df[e15]) & (df[e15] < df[e35]) &
        (df[e35] < df[e89]) & (df[e89] < df[e200]) &
        (df[rsi] <= rsi_lo)
    )
    out = pd.Series(index=df.index, dtype="object")
    for lab, m in [("e", e_mask), ("d", d_mask), ("a", a_mask), ("b", b_mask), ("c", c_mask)]:
        out = out.mask(out.notna(), out)
        out = out.where(~m, lab)
    df["trend_bucket"] = out.fillna("c").astype("category")
    return df

In [8]:
def label_financial_level(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    if "eps" not in df.columns and {"close","pe"}.issubset(df.columns):
        df["eps"] = np.where((df["pe"]>0) & df["close"].notna(), df["close"]/df["pe"], np.nan)
    s_eps = df["eps"] > 0
    s_pe  = (df["pe"] >= 8) & (df["pe"] <= 25)
    s_pbv = (df["pbv"] >= 0.8) & (df["pbv"] <= 2.5)
    s_yld = (df["dividendyield"] >= 3.0) & (df["dividendyield"] <= 20.0)
    score = s_eps.fillna(False).astype(int) + s_pe.fillna(False).astype(int) + s_pbv.fillna(False).astype(int) + s_yld.fillna(False).astype(int)
    fin = np.where(score==4,"A", np.where(score==3,"B", np.where(score==2,"C", np.where(score==1,"D","E"))))
    df["financial_level"] = pd.Categorical(fin, categories=list("ABCDE"), ordered=True)
    return df

In [9]:
def make_quadrant(df: pd.DataFrame) -> pd.DataFrame:
    df = ensure_cols(df)
    df = label_trend_ae(df)
    df = label_financial_level(df)
    df["quadrant"] = (df["financial_level"].astype(str) + df["trend_bucket"].astype(str)).astype("category")
    return df

เติมฟีเจอร์ลำดับเวลา (ATR/Vol/Gap/Range/Bollinger)

In [10]:
def enrich_seq_features(fe: pd.DataFrame) -> pd.DataFrame:
    fe = fe.copy()
    if "atr" not in fe.columns:
        def _atr(g, p=14):
            pc = g["close"].shift(1)
            tr = pd.concat([(g["high"]-g["low"]).abs(), (g["high"]-pc).abs(), (g["low"]-pc).abs()], axis=1).max(axis=1)
            return tr.ewm(alpha=1/p, adjust=False).mean()
        fe["atr"] = fe.groupby("symbol", group_keys=False).apply(_atr)
    if "vol_20" not in fe.columns:
        ret = fe.groupby("symbol")["close"].pct_change()
        fe["vol_20"] = ret.groupby(fe["symbol"]).rolling(20).std().reset_index(level=0, drop=True)
    if "gap_pct" not in fe.columns and {"open","close"}.issubset(fe.columns):
        prev_close = fe.groupby("symbol")["close"].shift(1)
        fe["gap_pct"] = fe["open"]/prev_close - 1.0
    if "range_pct" not in fe.columns and {"high","low","close"}.issubset(fe.columns):
        fe["range_pct"] = (fe["high"]-fe["low"]) / fe["close"]
    if "bb_width" not in fe.columns or "bb_pos" not in fe.columns:
        mid = fe.groupby("symbol")["close"].rolling(20).mean().reset_index(level=0, drop=True)
        std = fe.groupby("symbol")["close"].rolling(20).std().reset_index(level=0, drop=True)
        upper = mid + 2*std; lower = mid - 2*std
        fe["bb_width"] = (upper - lower) / mid
        fe["bb_pos"]   = (fe["close"] - lower) / (upper - lower)
    return fe

Dataset & Collate

In [11]:
class SeqDS(Dataset):
    def __init__(self, df: pd.DataFrame, seq_cols, static_cols):
        self.df = df.sort_values(["symbol","date"]).reset_index()
        self.seq_cols = seq_cols; self.static_cols = static_cols
        self.samples=[]
        for sym, g in self.df.groupby("symbol"):
            for i in range(WINDOW+HORIZON, len(g)):
                row_idx = g.loc[i, "index"]
                win_idx = g.loc[i-WINDOW-HORIZON:i-HORIZON-1, "index"].values
                if not np.isnan(self.df.loc[row_idx,"y"]):
                    self.samples.append((win_idx, row_idx))
    def __len__(self): return len(self.samples)
    def __getitem__(self, i):
        idx_win, idx_row = self.samples[i]
        seq = self.df.loc[idx_win, self.seq_cols].fillna(seq_med).astype(float)
        seq = (seq - seq_mu) / seq_sd
        x_seq = torch.tensor(seq.values, dtype=torch.float32)
        if self.static_cols:
            stat = self.df.loc[idx_row, self.static_cols].fillna(stat_med).astype(float)
            stat = (stat - stat_mu) / stat_sd
            x_static = torch.tensor(stat.values, dtype=torch.float32)
        else:
            x_static = torch.empty(0)
        y = int(self.df.loc[idx_row,"y"])
        return x_seq, x_static, y

In [12]:
def collate_fn(batch):
    xs, ss, ys = zip(*batch)
    xs = torch.stack(xs)
    ss = torch.stack(ss) if len(ss[0]) else torch.empty((len(batch),0))
    ys = torch.tensor(ys, dtype=torch.long)
    return xs, ss, ys


โมเดล TCN two-tower

In [13]:
class Chomp1d(nn.Module):
    def __init__(self, s): super().__init__(); self.s=s
    def forward(self,x): return x[...,:-self.s].contiguous()

class TBlock(nn.Module):
    def __init__(self, c_in, c_out, k, d, p=0.2):
        super().__init__()
        pad=(k-1)*d
        self.net=nn.Sequential(
            nn.Conv1d(c_in,c_out,k,padding=pad,dilation=d),Chomp1d(pad),nn.ReLU(),nn.Dropout(p),
            nn.Conv1d(c_out,c_out,k,padding=pad,dilation=d),Chomp1d(pad),nn.ReLU(),nn.Dropout(p),
        )
        self.down=nn.Conv1d(c_in,c_out,1) if c_in!=c_out else nn.Identity()
    def forward(self,x): y=self.net(x); return torch.relu(y+self.down(x))

class TCN(nn.Module):
    def __init__(self, in_feat, n_classes, static_dim=0, chans=(64,64,128), k=3, p=0.2):
        super().__init__()
        layers=[]; c_in=in_feat
        for i,c in enumerate(chans):
            layers.append(TBlock(c_in,c,k,d=2**i,p=p)); c_in=c
        self.tcn=nn.Sequential(*layers)
        self.pool=nn.AdaptiveAvgPool1d(1)
        self.head=nn.Sequential(
            nn.Linear(c_in+static_dim,256), nn.ReLU(), nn.Dropout(0.3),
            nn.Linear(256,n_classes)
        )
    def forward(self,x_seq,x_static=None):
        z=self.tcn(x_seq.transpose(1,2))   # [B,F,T]
        z=self.pool(z).squeeze(-1)         # [B,C]
        if x_static is not None and x_static.numel()>0:
            z=torch.cat([z,x_static],dim=1)
        return self.head(z)

Train TCN + ดึงผลทำนายสำหรับเทียบ

In [14]:
def train_tcn_and_predict(df: pd.DataFrame):
    set_seed(SEED)
    fe = make_quadrant(df)
    fe = enrich_seq_features(fe)
    fe = fe.sort_values(["symbol","date"]).reset_index(drop=True)

    SEQ_COLS = [c for c in ["close","ema_5","ema_15","ema_35","ema_89","ema_200","rsi","bb_width","bb_pos","atr","vol_20","gap_pct","range_pct"] if c in fe.columns]
    STATIC_COLS = [c for c in ["eps","pe","pbv","dividendyield","marketcap"] if c in fe.columns]

    if TARGET_MODE == "trend":
        classes = sorted(fe["trend_bucket"].astype(str).unique())
        fe["y"] = fe["trend_bucket"].astype(str).map({c:i for i,c in enumerate(classes)})
    else:
        classes = sorted(fe["quadrant"].astype(str).unique())
        fe["y"] = fe["quadrant"].astype(str).map({c:i for i,c in enumerate(classes)})

    cut = fe["date"].quantile(0.8)
    df_tr, df_va = fe[fe["date"]<=cut].copy(), fe[fe["date"]>cut].copy()

    global seq_mu, seq_sd, stat_mu, stat_sd, seq_med, stat_med
    seq_mu, seq_sd = df_tr[SEQ_COLS].mean(), df_tr[SEQ_COLS].std().replace(0,1.0)
    seq_med = df_tr[SEQ_COLS].median()
    if STATIC_COLS:
        stat_mu = df_tr[STATIC_COLS].mean()
        stat_sd = df_tr[STATIC_COLS].std().replace(0,1.0)
        stat_med = df_tr[STATIC_COLS].median()
    else:
        stat_mu = pd.Series(dtype=float); stat_sd = pd.Series(dtype=float); stat_med = pd.Series(dtype=float)

    tr_ds, va_ds = SeqDS(df_tr, SEQ_COLS, STATIC_COLS), SeqDS(df_va, SEQ_COLS, STATIC_COLS)
    tr_ld = DataLoader(tr_ds, batch_size=BATCH, shuffle=True, collate_fn=collate_fn)
    va_ld = DataLoader(va_ds, batch_size=BATCH, shuffle=False, collate_fn=collate_fn)

    device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model=TCN(in_feat=len(SEQ_COLS), n_classes=len(classes), static_dim=len(STATIC_COLS)).to(device)

    cc=np.bincount([y for *_,y in tr_ds], minlength=len(classes))
    w=(cc.sum()/(cc+1e-6)); w=torch.tensor(w/w.mean(), dtype=torch.float32, device=device)
    crit=nn.CrossEntropyLoss(weight=w)
    opt=torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)

    def evaluate(ld):
        model.eval(); ys=[]; ps=[]; probs=[]
        with torch.no_grad():
            for xs,ss,y in ld:
                xs, y = xs.to(device), y.to(device)
                ss = ss.to(device) if len(STATIC_COLS)>0 else None
                logits = model(xs, ss)
                pred = logits.argmax(1)
                ys.append(y.cpu().numpy()); ps.append(pred.cpu().numpy())
                probs.append(torch.softmax(logits,1).cpu().numpy())
        y=np.concatenate(ys); p=np.concatenate(ps); pr=np.concatenate(probs)
        return f1_score(y,p,average="macro"), y, p, pr

    BEST=-1; patience=0
    for ep in range(1, EPOCHS+1):
        model.train(); tot=0.0; n=0
        for xs,ss,y in tr_ld:
            xs,y=xs.to(device),y.to(device)
            ss=ss.to(device) if len(STATIC_COLS)>0 else None
            opt.zero_grad()
            logits=model(xs,ss)
            loss=crit(logits,y); loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(),1.0)
            opt.step()
            tot += loss.item()*xs.size(0); n += xs.size(0)
        f1, yv, pv, _ = evaluate(va_ld)
        print(f"Epoch {ep:02d}  train_loss={tot/max(1,n):.4f}  val_macroF1={f1:.4f}")
        if f1>BEST+1e-4:
            BEST=f1; patience=0; torch.save(model.state_dict(),"tcn_best.pt")
        else:
            patience+=1
            if patience>=PATIENCE:
                print("Early stop"); break

    model.load_state_dict(torch.load("tcn_best.pt", map_location=device))
    f1, y_true_val, y_pred_dl, y_prob_dl = evaluate(va_ld)
    print("Best Macro-F1:", f1)
    print(classification_report(y_true_val, y_pred_dl, target_names=classes))
    return model, classes, (y_true_val, y_pred_dl, y_prob_dl), df_va.reset_index(drop=True)


Financial-only Logistic Regression baseline

In [15]:
def financial_lr_baseline(fe_df: pd.DataFrame, classes):
    FIN_FEATS = ["eps","pe","pbv","dividendyield","marketcap","bvps"]
    fe_df = fe_df.copy()
    if "dividendyield" not in fe_df.columns and "percentyield" in fe_df.columns:
        fe_df["dividendyield"] = fe_df["percentyield"]
    if "eps" not in fe_df.columns and {"close","pe"}.issubset(fe_df.columns):
        fe_df["eps"] = np.where((fe_df["pe"]>0) & fe_df["close"].notna(), fe_df["close"]/fe_df["pe"], np.nan)

    X_cols = [c for c in FIN_FEATS if c in fe_df.columns]
    if not X_cols:
        raise ValueError("ไม่พบคอลัมน์ financial ในชุดข้อมูล")

    cut = fe_df["date"].quantile(0.8)
    tr, va = fe_df[fe_df["date"]<=cut].copy(), fe_df[fe_df["date"]>cut].copy()

    fill_tr = tr[X_cols].median(numeric_only=True)
    Xtr = tr[X_cols].fillna(fill_tr).astype(float)
    Xva = va[X_cols].fillna(fill_tr).astype(float)
    ytr = tr["y"].values
    yva = va["y"].values

    scaler = StandardScaler().fit(Xtr)
    Xtr_s = scaler.transform(Xtr); Xva_s = scaler.transform(Xva)

    lr = LogisticRegression(multi_class="multinomial", max_iter=1000, class_weight="balanced")
    lr.fit(Xtr_s, ytr)
    pred_lr = lr.predict(Xva_s)
    prob_lr = lr.predict_proba(Xva_s)

    print("\n[Financial-only LR]")
    print("Macro-F1:", f1_score(yva, pred_lr, average="macro"))
    print(classification_report(yva, pred_lr, target_names=classes))
    print("Confusion matrix:\n", confusion_matrix(yva, pred_lr))
    return (yva, pred_lr, prob_lr)

Compare Scores (DL vs LR) + McNemar + Bootstrap CI

In [16]:
def compare_models(dl_tuple, lr_tuple, classes):
    y_true_val, y_pred_dl, y_prob_dl = dl_tuple
    yva, pred_lr, prob_lr = lr_tuple

    n = min(len(y_true_val), len(yva))
    yt = y_true_val[:n]
    p_dl = y_pred_dl[:n]; pr_dl = y_prob_dl[:n]
    p_lr = pred_lr[:n];   pr_lr = prob_lr[:n]

    def summarize(y_true, y_pred, y_prob, name):
        return {
            "model": name,
            "macro_f1": f1_score(y_true, y_pred, average="macro"),
            "balanced_acc": balanced_accuracy_score(y_true, y_pred),
            "accuracy": accuracy_score(y_true, y_pred),
            "mcc": matthews_corrcoef(y_true, y_pred),
            "log_loss": (log_loss(y_true, y_prob, labels=list(range(len(classes)))) 
                         if y_prob is not None and y_prob.shape[1]==len(classes) else np.nan)
        }

    scores = pd.DataFrame([
        summarize(yt, p_lr, pr_lr, "Financial-only LR"),
        summarize(yt, p_dl, pr_dl, "Deep Learning (TCN)")
    ])
    display(scores)

    from scipy.stats import chi2, binomtest
    agree_dl = (p_dl == yt); agree_lr = (p_lr == yt)
    b = int(((agree_dl==1) & (agree_lr==0)).sum())
    c = int(((agree_dl==0) & (agree_lr==1)).sum())
    N = b + c
    if N > 0:
        stat = (abs(b - c) - 1)**2 / (b + c)
        pval = 1 - chi2.cdf(stat, df=1)
        p_exact = binomtest(min(b,c), n=N, p=0.5, alternative="two-sided").pvalue
        print(f"\nMcNemar: b={b}, c={c}, chi2={stat:.3f}, p={pval:.4f}, exact_p={p_exact:.4f}")
    else:
        print("\nMcNemar: b+c=0 (คำตอบสองโมเดลตรงกันเกือบหมด)")

    rng = np.random.default_rng(123)
    diffs = []
    for _ in range(1000):
        idx = rng.integers(0, n, n)
        diffs.append(
            f1_score(yt[idx], p_dl[idx], average="macro")
            - f1_score(yt[idx], p_lr[idx], average="macro")
        )
    lo, hi = np.percentile(diffs, [2.5, 97.5])
    print(f"ΔMacro-F1 (DL − LR) bootstrap 95% CI: [{lo:.4f}, {hi:.4f}]")


 การใช้งาน 

In [None]:
# สมมติคุณมี DataFrame 'df' ที่รวมราคา+พื้นฐาน: columns ต้องมีอย่างน้อย symbol, date, open, high, low, close
# 1) ฝึก DL และดึงผล val
# model, classes, dl_tuple, val_df = train_tcn_and_predict(df)
# 2) ทำ baseline สถิติแบบใช้เฉพาะ financial
# lr_tuple = financial_lr_baseline(val_df.assign(**{}).pipe(lambda d: pd.concat([d,], axis=1)) if True else val_df, classes)
#    (หมายเหตุ: lr ใช้ทั้ง train/val แยกในฟังก์ชันแล้วจาก fe_df ภายใน pipeline ก่อนหน้า)
# ทางที่ง่ายกว่า: ใช้ fe_df ตัวเดียวกับใน train_tcn_and_predict:
#   ให้สร้าง fe_df = make_quadrant(df) ก่อน แล้วส่งเข้า financial_lr_baseline(fe_df, classes)

# ตัวอย่างสั้น:
# fe_df = make_quadrant(df)
# lr_tuple = financial_lr_baseline(fe_df, classes)
# compare_models(dl_tuple, lr_tuple, classes)