ทดลองสร้างaiพยากรหุ้น

In [None]:
%pip install stable-baselines3 sb3-contrib ta gymnasium pandas numpy torch scikit-learn

import pandas as pd
import numpy as np
import os
import torch
import torch.nn as nn
from ta.momentum import RSIIndicator
from ta.trend import MACD, SMAIndicator, EMAIndicator
from ta.volatility import BollingerBands
from ta.volume import VolumeWeightedAveragePrice
import matplotlib.pyplot as plt
import random
from sklearn.preprocessing import RobustScaler
import traceback  


In [None]:
class Config:
    INPUT_FILE = 'stock_data.xlsx'
    OUTPUT_DIR = './data/features/'
    SYMBOLS = ["PTT", "KTB", "SCB", "TTB", "CPALL", "DELTA", "TRUE", "ADVANC", "PTTEP", "ADVICE", "SIRI", "KBANK"]
    WINDOW_SIZE = 30
    INITIAL_BALANCE = 1_000_000
    COMMISSION_RATE = 0.0025
    MAX_HOLD_DAYS = 14
    SHARPE_WEIGHT = 50
    NUM_FEATURES = None

os.makedirs(Config.OUTPUT_DIR, exist_ok=True)


In [None]:
def calculate_indicators(df, symbol):
    suffix = symbol

    # ตรวจสอบว่าคอลัมน์หลักมีครบ
    required_base_cols = [f'Open_{suffix}', f'High_{suffix}', 
                          f'Low_{suffix}', f'Close_{suffix}', f'Volume_{suffix}']
    for col in required_base_cols:
        if col not in df.columns:
            raise ValueError(f"Missing base column {col} for {symbol}")

    # สร้างชื่อคอลัมน์แบบ dynamic
    open_col = f'Open_{suffix}'
    high_col = f'High_{suffix}'
    low_col = f'Low_{suffix}'
    close_col = f'Close_{suffix}'
    volume_col = f'Volume_{suffix}'

    # ------------------ Indicators ------------------

    # Moving Averages
    df[f'MA_5_{suffix}'] = SMAIndicator(close=df[close_col], window=5).sma_indicator()
    df[f'MA_10_{suffix}'] = SMAIndicator(close=df[close_col], window=10).sma_indicator()
    df[f'MA_20_{suffix}'] = EMAIndicator(close=df[close_col], window=20).ema_indicator()

    # Bollinger Bands
    bb = BollingerBands(close=df[close_col], window=20)
    df[f'BB_upper_{suffix}'] = bb.bollinger_hband()
    df[f'BB_lower_{suffix}'] = bb.bollinger_lband()

    # VWAP
    df[f'VWAP_{suffix}'] = VolumeWeightedAveragePrice(
        high=df[high_col],
        low=df[low_col],
        close=df[close_col],
        volume=df[volume_col],
        window=20
    ).volume_weighted_average_price()

    # RSI
    df[f'RSI_{suffix}'] = RSIIndicator(close=df[close_col], window=14).rsi()

    # MACD
    macd = MACD(close=df[close_col])
    df[f'MACD_{suffix}'] = macd.macd()
    df[f'MACD_signal_{suffix}'] = macd.macd_signal()
    df[f'MACD_diff_{suffix}'] = macd.macd_diff()

    # Returns
    for window in [3, 5, 10]:
        df[f'Return_{window}d_{suffix}'] = df[close_col].pct_change(window)

    # Price Change % (1 วัน)
    df[f'Price_Change_%_{suffix}'] = df[close_col].pct_change(periods=1) * 100

    # Volume Change % (1 วัน)
    df[f'Volume_Change_%_{suffix}'] = df[volume_col].pct_change(periods=1) * 100

    # ------------------ ตรวจสอบผล ------------------
    print(f"[{symbol}] Columns after indicators: {df.columns.tolist()}")
    print(f"[{symbol}] Sample data after indicators:\n{df.tail()}")

    # ------------------ Drop NaN ------------------
    print(f"[{symbol}] Data length before dropna: {len(df)}")

    required_cols = [
        f'Close_{suffix}', f'MA_5_{suffix}', f'MA_10_{suffix}', f'MA_20_{suffix}',
        f'RSI_{suffix}', f'MACD_{suffix}', f'MACD_signal_{suffix}', f'MACD_diff_{suffix}',
        f'Price_Change_%_{suffix}', f'Volume_Change_%_{suffix}'
    ]
    df = df.dropna(subset=required_cols).reset_index(drop=True)

    print(f"[{symbol}] Data length after dropna: {len(df)}")

    # ------------------ ตรวจว่าข้อมูลเพียงพอ ------------------
    if len(df) < Config.WINDOW_SIZE + 1:
        raise ValueError(f"Data for {symbol} is too short after processing")

    return df


In [None]:
def normalize_features(df):
    scaler = RobustScaler()
    date_col = df['Date']

    # คอลัมน์ที่ไม่ต้องการ Normalize (รวมถึงคอลัมน์ใหม่จาก add_holiday_features)
    exclude_cols = ['Date', 'IsWeekend', 'IsHoliday', 'IsTradingDay']
    numeric_cols = [col for col in df.columns if col not in exclude_cols]
    
    df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
    df['Date'] = date_col
    print(f"After normalization sample:\n{df.iloc[Config.WINDOW_SIZE]}")  # ตรวจค่าหลัง normalize
    return df
    

In [None]:
def check_trading_day_gaps(df, symbol):
    # ตรวจสอบเฉพาะวันที่ที่เป็นวันทำการ (Trading Day)
    trading_days = df[df['IsTradingDay']]['Date']
    
    # คำนวณความต่างระหว่างวันทำการ
    day_diff = trading_days.diff().dt.days.dropna()
    
    # อนุญาตให้มีช่องว่างได้เฉพาะช่วงวันหยุดยาว (เช่น สงกรานต์)
    if not (day_diff <= 3).all():  # อนุญาตให้ขาดได้ไม่เกิน 3 วันทำการ
        gaps = day_diff[day_diff > 3]
        print(f"คำเตือน: {symbol} มีช่องว่างวันที่เทรดยาว {gaps.unique()} วัน")


In [None]:
def validate_data(df, symbol):
    suffix = symbol  # ตั้งค่า suffix ตาม symbol

    # ตรวจสอบค่าที่เป็น infinite เฉพาะคอลัมน์ตัวเลข
    numeric_df = df.drop(columns=['Date']).select_dtypes(include=[np.number])
    if np.isinf(numeric_df.values).any():
        raise ValueError(f"พบค่าอนันต์ในข้อมูลของ {symbol}")
    
    # ตรวจสอบว่ามีคอลัมน์ที่จำเป็นครบหรือไม่
    required_cols = [
        'Date',
        'IsTradingDay',
        f'Close_{suffix}',
        f'Volume_{suffix}',
        f'High_{suffix}',
        f'Low_{suffix}',
        f'Open_{suffix}'
    ]
    missing = [col for col in required_cols if col not in df.columns]
    if missing:
        raise ValueError(f"Missing columns for {symbol}: {missing}")
    
    # ตรวจสอบว่ามีค่า null หรือไม่
    if df[required_cols].isnull().any().any():
        null_counts = df[required_cols].isnull().sum()
        raise ValueError(f"Null values in {symbol}:\n{null_counts}")
    
    # ตรวจสอบว่ามีการเว้นช่วงวันเทรดหรือไม่
    check_trading_day_gaps(df, symbol)
    
    return df

In [None]:
import gymnasium as gym
from gymnasium import Env, spaces

class StockTradingEnv(gym.Env):
    def __init__(self, df_list):
        super().__init__()
        self.df_list = df_list  # รายการข้อมูลหุ้นทั้งหมด
        self.current_stock_idx = 0  # ดัชนีหุ้นปัจจุบัน
        self.action_space = spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float32)
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf,
            shape=(Config.NUM_FEATURES,),
            dtype=np.float32
        )

    def _load_random_stock(self):
        """โหลดข้อมูลหุ้นแบบสุ่มจากรายการ"""
        stock_data = random.choice(self.df_list)
        return stock_data['data'], stock_data['symbol']

    def _get_current_state(self):
        state = self.df.drop(columns=['Date']).iloc[self.current_step].values.astype(np.float32)
        if np.isnan(state).any():
            raise ValueError("NaN values detected in state vector")
        return state
    
    def reset(self, seed=None, options=None):
        # โหลดข้อมูลหุ้นแบบสุ่ม
        self.df, self.symbol = self._load_random_stock()
        
        # ตรวจสอบความถูกต้องของข้อมูล
        if 'Date' not in self.df.columns:
            raise KeyError("Column 'Date' not found in dataframe")
            
        self.current_step = 0
        self.portfolio_value = Config.INITIAL_BALANCE
        self.returns = []
        return self._get_current_state(), {}

    def step(self, action):
        self.current_step += 1
        terminated = self.current_step >= len(self.df) - 1
        
        # คำนวณ reward
        reward = self._calculate_reward(action)
        
        # ได้รับสถานะต่อไป
        next_state = self._get_current_state() if not terminated else None
        
        return next_state, reward, terminated, False, {}

    def _calculate_reward(self, action):
        if self.current_step >= len(self.df) or self.current_step < 1:
            return 0.0

        investment_ratio = np.clip(action[0], -1.0, 1.0)

        suffix = f"{self.symbol}"
        close_col = f"Close_{suffix}"
        current_price = self.df.iloc[self.current_step][close_col]
        prev_price = self.df.iloc[self.current_step - 1][close_col]

        price_change_pct = (current_price - prev_price) / (prev_price + 1e-8)
        prev_portfolio_value = self.portfolio_value

        gross_return = investment_ratio * price_change_pct * prev_portfolio_value
        commission = abs(investment_ratio) * prev_portfolio_value * Config.COMMISSION_RATE
        net_reward = gross_return - commission
        self.portfolio_value = max(self.portfolio_value + net_reward, 0.0)

        daily_return = (self.portfolio_value / prev_portfolio_value) - 1
        self.returns.append(daily_return)

        if len(self.returns) > 1:
            sharpe_ratio = np.mean(self.returns) / (np.std(self.returns) + 1e-8) * np.sqrt(252)
        else:
            sharpe_ratio = 0

        volatility_penalty = np.std(self.returns[-10:]) * 1000 if len(self.returns) >= 10 else 0
        trend_bonus = 0.001 * abs(investment_ratio) if (investment_ratio > 0 and price_change_pct > 0) or (investment_ratio < 0 and price_change_pct < 0) else 0

        reward = net_reward + (sharpe_ratio * Config.SHARPE_WEIGHT) - volatility_penalty + trend_bonus

        # ตรวจสอบ reward ที่ผิดปกติ
        if np.isnan(reward) or np.isinf(reward):
            reward = 0.
        return float(reward)


In [None]:
def load_and_preprocess(symbol):
    try:
        print(f"\nProcessing {symbol}:")
        df = pd.read_excel(Config.INPUT_FILE, sheet_name=symbol)
        print(f"Initial rows: {len(df)}")
        print(f"Raw columns: {df.columns.tolist()}")  # Debug
        
        # ตรวจสอบคอลัมน์ที่มีอยู่จริงในไฟล์ Excel
        actual_columns = df.columns.tolist()
        df = df.rename(columns=lambda x: x.strip().lower())
        print(f"Columns after lowercase: {df.columns.tolist()}")  # Debug

        required_columns = ['date', 'open', 'high', 'low', 'close', 'volume']
        missing_cols = [col for col in required_columns if col not in df.columns]
        if missing_cols:
            raise ValueError(f"Missing columns in {symbol}: {missing_cols}")

        # แปลงคอลัมน์วันที่
        df['date'] = pd.to_datetime(df['date'], errors='coerce')
        df = df.dropna(subset=['date']).reset_index(drop=True)
        df = df.sort_values('date')

        # ตั้งชื่อคอลัมน์ใหม่ตามสัญลักษณ์
        suffix = symbol
        new_columns = {
            'open': f'Open_{suffix}',
            'high': f'High_{suffix}',
            'low': f'Low_{suffix}',
            'close': f'Close_{suffix}',
            'volume': f'Volume_{suffix}',
            'date': 'Date'  
        }
        df = df.rename(columns=new_columns)
        
        # แปลงคอลัมน์ตัวเลขให้ถูกต้อง
        numeric_cols = [f'Open_{suffix}', f'High_{suffix}', f'Low_{suffix}', 
                        f'Close_{suffix}', f'Volume_{suffix}']
        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors='coerce')  # แปลงค่าที่ไม่ถูกต้องเป็น NaN
        
        print(f"Columns after renaming: {df.columns.tolist()}")  # Debug        
        return df
    except Exception as e:
        raise Exception(f"Error processing {symbol}: {e}")

In [None]:
def load_holidays(holiday_file='holiday.csv'):
    try:
        holidays_df = pd.read_csv(holiday_file, encoding='utf-8-sig')  
        print("ตัวอย่างข้อมูลวันหยุด:\n", holidays_df.head())  # <-- ตรวจสอบการโหลดวันหยุด
        
        # ตรวจสอบคอลัมน์ที่จำเป็น
        required_cols = ['ปี', 'เดือน', 'วัน']
        if not all(col in holidays_df.columns for col in required_cols):
            raise ValueError("ไฟล์วันหยุดขาดคอลัมน์จำเป็น (ปี, เดือน, วัน)")
        
        # แปลงวันที่แบบพุทธศักราชเป็นคริสต์ศักราช (2566 → 2023)
        holidays_df['year_ad'] = holidays_df['ปี'] - 543
        
        # สร้างคอลัมน์ Date
        holidays_df['Date'] = pd.to_datetime(
            holidays_df[['year_ad', 'เดือน', 'วัน']].rename(columns={'year_ad':'year', 'เดือน':'month', 'วัน':'day'}))
        
        return set(holidays_df['Date'].dt.date)
    
    except Exception as e:
        print(f"คำเตือน: โหลดวันหยุดไม่สำเร็จ ({e}) ใช้เฉพาะเสาร์-อาทิตย์")
        return set()

def add_holiday_features(df, holidays):
    """เพิ่มคอลัมน์ระบุว่าเป็นวันหยุด (รวมเสาร์อาทิตย์และวันหยุดพิเศษ)"""
    df['IsWeekend'] = df['Date'].dt.dayofweek.isin([5, 6]).astype(int)  # 5=Saturday, 6=Sunday
    df['IsHoliday'] = df['Date'].dt.date.isin(holidays).astype(int)
    # แก้ไขตรงนี้: ใช้การเปรียบเทียบเพื่อให้ได้ boolean แทนการใช้ bitwise operators
    df['IsTradingDay'] = (df['IsWeekend'] == 0) & (df['IsHoliday'] == 0)
    return df

# โหลดรายชื่อวันหยุด
holidays = load_holidays()

processed_data = []
for symbol in Config.SYMBOLS:
    try:
        df = load_and_preprocess(symbol)
        df = add_holiday_features(df, holidays)
        df = validate_data(df, symbol)
        df = calculate_indicators(df, symbol)
        df = normalize_features(df)
        processed_data.append({'symbol': symbol, 'data': df})
        print(f"Successfully processed {symbol} with shape {df.shape}")
    except Exception as e:
        print(f"Error details for {symbol}:")
        traceback.print_exc()  
        continue

In [None]:
from stable_baselines3.common.callbacks import CheckpointCallback
from sb3_contrib import RecurrentPPO
from sb3_contrib.ppo_recurrent.policies import RecurrentActorCriticPolicy
from stable_baselines3.common.vec_env import DummyVecEnv

def train_model(processed_data):
    env = StockTradingEnv(processed_data)  # ใช้ Environment โดยตรง
    policy_kwargs = dict(
        lstm_hidden_size=128,
        enable_critic_lstm=True,
        net_arch=dict(pi=[64, 64], vf=[64, 64])
    )
    
    model = RecurrentPPO(
        "MlpLstmPolicy",
        env,
        policy_kwargs=policy_kwargs,
        verbose=1,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=64,
        tensorboard_log="./logs/",
        device='cuda' if torch.cuda.is_available() else 'auto'
    )
    
    callbacks = [CheckpointCallback(save_freq=10000, save_path='./models/')]
    model.learn(total_timesteps=1_000_000, callback=callbacks)
    return model


In [None]:
def backtest(model, test_data):
    plt.figure(figsize=(15, 6))
    for item in test_data:
        symbol = item['symbol']
        df = item['data']
        env = StockTradingEnv([{'symbol': symbol, 'data': df}])
        obs = env.reset()[0]
        portfolio_values = [env.portfolio_value]
        
        while True:
            action, _ = model.predict(obs)
            obs, reward, terminated, truncated, _ = env.step(action)
            portfolio_values.append(env.portfolio_value)
            if terminated:
                break
        plt.plot(portfolio_values, label=symbol)
    
    plt.title('Portfolio Value During Backtesting')
    plt.legend()
    plt.show()


In [None]:
# เพิ่มส่วนคำนวณ NUM_FEATURES ก่อนสร้าง Environment
if len(processed_data) >= 1:
    # กำหนดจำนวน Features จากข้อมูลตัวอย่างแรก (หลังตัดคอลัมน์ Date)
    sample_data = processed_data[0]['data'].drop(columns=['Date'])
    Config.NUM_FEATURES = sample_data.shape[1]
    print(f"กำหนดจำนวน Features เป็น: {Config.NUM_FEATURES}")

    trained_model = train_model(processed_data)
    backtest(trained_model, processed_data[:4])
else:
    print("ข้อมูลไม่เพียงพอสำหรับการฝึกโมเดล")