In [1]:
import nbformat as nbf
from pathlib import Path

# path
nb_path = Path("06_sentiment_pipeline.ipynb")

nb = nbf.v4.new_notebook()
nb["metadata"]["colab"] = {"provenance": []}

nb.cells = [
    nbf.v4.new_markdown_cell("# 06 — Sentiment Pipeline (Thai, lexicon v1, dummy-ready)"),
    nbf.v4.new_code_cell("""\
%pip -q install pandas numpy matplotlib pythainlp
"""),
    nbf.v4.new_code_cell("""\
import os, pandas as pd, numpy as np, matplotlib.pyplot as plt
from pythainlp.tokenize import word_tokenize

# โหลด dataset หลัก
BASE_FILE = "dataset_features_labels.csv"
if not os.path.exists(BASE_FILE):
    raise FileNotFoundError("❌ ไม่พบ dataset_features_labels.csv — โปรดรัน 02_feature_label.ipynb ก่อน")

df = pd.read_csv(BASE_FILE, index_col=0, parse_dates=True)
print("Loaded base dataset:", df.shape)
"""),
    nbf.v4.new_code_cell("""\
# Lexicon
POS = {"บวก","พุ่ง","กำไร","เติบโต","ฟื้นตัว","ดี","ทะลุ","สูงขึ้น","แข็งแกร่ง","สดใส"}
NEG = {"ลบ","ร่วง","ขาดทุน","ชะลอ","ถดถอย","แย่","ดิ่ง","ต่ำลง","วิกฤต","ซบเซา"}

def sentiment_score_th(text: str) -> float:
    toks = word_tokenize(str(text), keep_whitespace=False)
    if not toks: return 0.0
    s = sum((t in POS) - (t in NEG) for t in toks)
    return s / np.sqrt(len(toks))
"""),
    nbf.v4.new_code_cell("""\
NEWS_FILE = "news_th.csv"
if os.path.exists(NEWS_FILE):
    news = pd.read_csv(NEWS_FILE)
    news["date"] = pd.to_datetime(news["date"])
else:
    # dummy news
    news = pd.DataFrame({
        "date": pd.date_range(df.index.min(), periods=5, freq="7D"),
        "symbol": ["^SET50"]*5,
        "text": [
            "หุ้นไทยพุ่งแรงหลังเศรษฐกิจฟื้นตัว",
            "ตลาดปรับตัวลบจากความกังวลเศรษฐกิจถดถอย",
            "ผลประกอบการกำไรเติบโต",
            "ความเชื่อมั่นชะลอตัว หุ้นร่วง",
            "แนวโน้มดีขึ้นอย่างต่อเนื่อง",
        ]
    })
    news.to_csv("news_th_template.csv", index=False)
    print("⚠️ ไม่มี news_th.csv → สร้าง news_th_template.csv ให้แล้ว")
"""),
    nbf.v4.new_code_cell("""\
# Sentiment
news["score"] = news["text"].apply(sentiment_score_th)
sent_daily = news.groupby(pd.Grouper(key="date", freq="D"))["score"].mean().rename("Sentiment_Daily").to_frame()
sent_daily.to_csv("sent_daily_preview.csv")
df_out = df.join(sent_daily, how="left").fillna(0.0)
df_out.to_csv("dataset_features_labels_with_sentiment.csv")
print("✅ Saved dataset_features_labels_with_sentiment.csv", df_out.shape)
"""),
]

# save notebook
nb_path.write_text(nbf.writes(nb), encoding="utf-8")

print("✅ Created 06_sentiment_pipeline.ipynb และ news_th_template.csv (ถ้าไม่มีไฟล์ข่าวจริง)")


✅ Created 06_sentiment_pipeline.ipynb และ news_th_template.csv (ถ้าไม่มีไฟล์ข่าวจริง)


In [2]:
import nbformat as nbf
from pathlib import Path

nb_path = Path("05_transformer_upgrade.ipynb")

nb = nbf.v4.new_notebook()
nb["metadata"]["colab"] = {"provenance": []}
cells = []

# Title
cells.append(nbf.v4.new_markdown_cell("# 05 — Transformer Upgrade (LSTM vs Transformer, with Sentiment & Indicators)\n"
"Workflow: เตรียมข้อมูล → LSTM benchmark → Transformer upgrade (multi-head, multi-block) → เปรียบเทียบผล + Backtest"))

# Setup
cells.append(nbf.v4.new_code_cell("""\
%pip -q install pandas numpy matplotlib scikit-learn tensorflow==2.*
"""))

# Imports + Data Load
cells.append(nbf.v4.new_code_cell("""\
import numpy as np, pandas as pd, matplotlib.pyplot as plt, tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import os

# Load dataset (with sentiment if available)
FILE = "dataset_features_labels_with_sentiment.csv"
if not os.path.exists(FILE):
    FILE = "dataset_features_labels.csv"

df = pd.read_csv(FILE, index_col=0, parse_dates=True)
print("Loaded dataset:", df.shape)
df.head(3)
"""))

# Train/Val/Test split
cells.append(nbf.v4.new_code_cell("""\
FEATURES = [c for c in df.columns if c != "Target"]
X_all = df[FEATURES].values
y_all = df["Target"].values

n = len(df); n_train = int(n*0.7); n_val = int(n*0.85)
X_tr, y_tr = X_all[:n_train], y_all[:n_train]
X_va, y_va = X_all[n_train:n_val], y_all[n_train:n_val]
X_te, y_te = X_all[n_val:], y_all[n_val:]

# scaling
scaler = StandardScaler().fit(X_tr)
X_tr = scaler.transform(X_tr); X_va = scaler.transform(X_va); X_te = scaler.transform(X_te)

# make sequences
def make_seq(X, y, win=20):
    xs, ys = [], []
    for i in range(win, len(X)):
        xs.append(X[i-win:i])
        ys.append(y[i])
    return np.array(xs), np.array(ys)

WIN = 20
Xtr, ytr = make_seq(X_tr, y_tr, WIN)
Xva, yva = make_seq(X_va, y_va, WIN)
Xte, yte = make_seq(X_te, y_te, WIN)
idx_test = df.index[n_val+WIN:]

Xtr.shape, Xva.shape, Xte.shape, len(idx_test)
"""))

# LSTM Benchmark
cells.append(nbf.v4.new_code_cell("""\
from tensorflow.keras import layers, models, callbacks

def build_lstm(input_shape, units=64, dropout=0.2):
    m = models.Sequential([
        layers.Input(shape=input_shape),
        layers.LSTM(units),
        layers.Dropout(dropout),
        layers.Dense(32, activation="relu"),
        layers.Dense(1)
    ])
    m.compile(optimizer="adam", loss="mse", metrics=["mae"])
    return m

lstm = build_lstm(Xtr.shape[1:])
es = callbacks.EarlyStopping(patience=10, restore_best_weights=True, monitor="val_loss")
lstm.fit(Xtr, ytr, validation_data=(Xva, yva), epochs=80, batch_size=64, callbacks=[es], verbose=0)
yhat_lstm = lstm.predict(Xte).ravel()
"""))

# Transformer Upgrade
cells.append(nbf.v4.new_code_cell("""\
from tensorflow.keras import layers, models, optimizers

def sinusoidal_position_encoding(seq_len, d_model):
    pos = np.arange(seq_len)[:, None]
    i = np.arange(d_model)[None, :]
    angle_rates = 1 / np.power(10000, (2*(i//2)) / np.float32(d_model))
    angles = pos * angle_rates
    pe = np.zeros((seq_len, d_model))
    pe[:, 0::2] = np.sin(angles[:, 0::2])
    pe[:, 1::2] = np.cos(angles[:, 1::2])
    return tf.constant(pe, dtype=tf.float32)

def build_transformer(input_shape, num_layers=2, num_heads=4, d_model=64, ff_dim=128, dropout=0.2):
    seq_len, n_feat = input_shape
    inp = layers.Input(shape=input_shape)
    x = layers.Dense(d_model)(inp)
    pe = sinusoidal_position_encoding(seq_len, d_model)
    x = x + pe
    for _ in range(num_layers):
        attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model//num_heads)(x, x)
        attn = layers.Dropout(dropout)(attn)
        x = layers.LayerNormalization(epsilon=1e-6)(x + attn)
        f = layers.Dense(ff_dim, activation="relu")(x)
        f = layers.Dropout(dropout)(f)
        f = layers.Dense(d_model)(f)
        x = layers.LayerNormalization(epsilon=1e-6)(x + f)
    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(64, activation="relu")(x)
    out = layers.Dense(1)(x)
    model = models.Model(inp, out)
    model.compile(optimizer="adam", loss="mse", metrics=["mae"])
    return model

trf = build_transformer(Xtr.shape[1:], num_layers=3, num_heads=4, d_model=64, ff_dim=128, dropout=0.2)
es2 = callbacks.EarlyStopping(patience=12, restore_best_weights=True, monitor="val_loss")
trf.fit(Xtr, ytr, validation_data=(Xva, yva), epochs=100, batch_size=64, callbacks=[es2], verbose=0)
yhat_tf = trf.predict(Xte).ravel()
"""))

# Metrics + Backtest
cells.append(nbf.v4.new_code_cell("""\
import numpy as np, pandas as pd, matplotlib.pyplot as plt

def metrics(y_true, y_pred):
    return {
        "RMSE": float(np.sqrt(mean_squared_error(y_true, y_pred))),
        "MAE": float(mean_absolute_error(y_true, y_pred)),
        "R2": float(r2_score(y_true, y_pred)),
        "DA": float((np.sign(y_true)==np.sign(y_pred)).mean())
    }

m_lstm = metrics(yte, yhat_lstm)
m_trf  = metrics(yte, yhat_tf)

print("LSTM:", m_lstm)
print("Transformer:", m_trf)

bt = pd.DataFrame(index=idx_test)
bt["Return"] = yte
bt["LSTM"] = np.where(yhat_lstm>0, 1, -1) * bt["Return"]
bt["Transformer"] = np.where(yhat_tf>0, 1, -1) * bt["Return"]

bt[["Return","LSTM","Transformer"]].cumsum().plot(figsize=(10,5), title="Backtest Comparison")
plt.show()
"""))

nb["cells"] = cells
nb_path.write_text(nbf.writes(nb), encoding="utf-8")

print("✅ Created", nb_path)


✅ Created 05_transformer_upgrade.ipynb
