<a href="https://colab.research.google.com/github/skywalker0803r/RL/blob/main/crypto_dqn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 安裝 SDK

In [78]:
#!pip install gate-api pandas numpy

# 導入套件

In [79]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd
import random
from typing import Optional, Tuple, Dict, Any
import warnings
warnings.filterwarnings('ignore')

# GateioDataFetcher

In [80]:
import pandas as pd
from gate_api import Configuration, ApiClient, SpotApi

def fetch_gateio_klines(pair: str, interval: str, limit: int = 1000) -> pd.DataFrame:
    """
    使用 Gate.io SDK 獲取 K 線數據。

    :param pair: 交易對, 例如 'BTC_USDT'
    :param interval: K 線間隔, 例如 '1d', '4h', '1h', '30m'
    :param limit: 獲取數據的數量上限 (Gate.io 限制單次最大為 1000)
    :return: 包含 OHLCV 數據的 Pandas DataFrame
    """

    # 設置 API Client
    # 注意：獲取公共市場數據不需要 API Key 和 Secret，但為了通用性可以配置。
    # 這裡使用默認配置，僅用於獲取公共數據。
    config = Configuration()
    client = ApiClient(config)
    spot_api = SpotApi(client)

    try:
        # 呼叫 API
        # Gate.io 的 K 線數據返回格式為 [時間戳, 交易量, 結算貨幣交易量, 收盤價, 最高價, 最低價, 開盤價]
        klines = spot_api.list_candlesticks(currency_pair=pair, interval=interval, limit=limit)

        # 將結果轉換為 DataFrame
        data = []
        for k in klines:
            # 確保數據的順序和類型
            # [time, volume, close_volume, close, high, low, open]
            data.append([
                float(k[0]), # Time (時間戳)
                float(k[6]), # Open
                float(k[4]), # High
                float(k[5]), # Low
                float(k[3]), # Close
                float(k[1]), # Volume
            ])

        df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])

        # 設置時間索引並進行整理
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
        df = df.set_index('timestamp')
        df = df.sort_index() # 確保時間順序是遞增的 (舊 -> 新)

        # 增加必要的技術指標（這裡只是一個簡單的價格變動）
        df['price_change'] = df['close'].diff().fillna(0)

        return df

    except Exception as e:
        print(f"從 Gate.io 獲取數據失敗: {e}")
        return pd.DataFrame()

fetch_gateio_klines('XRP_USDT','15m').head()

Unnamed: 0_level_0,open,high,low,close,volume,price_change
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2025-11-19 07:15:00,585724.186,2.152,2.157,2.159,1262017.0,0.0
2025-11-19 07:30:00,575053.749,2.152,2.154,2.164,1241721.0,0.005
2025-11-19 07:45:00,753490.433,2.162,2.163,2.171,1633748.0,0.007
2025-11-19 08:00:00,555375.205,2.163,2.168,2.17,1203171.0,-0.001
2025-11-19 08:15:00,679393.654,2.151,2.165,2.165,1467478.0,-0.005


# GateioTradingEnv

In [81]:
import gym
import numpy as np

class GateioTradingEnv(gym.Env):
    """
    基於 Gate.io K 線數據的 RL 交易環境 (MDP 格式)。

    狀態 (State): 過去 N 根 K 線數據 + 當前賬戶狀態
    動作 (Action): 離散的 (買入/賣出/持有)
    獎勵 (Reward): 淨值變動
    """

    metadata = {'render_modes': ['human'], 'render_fps': 1}

    # 定義動作：離散空間
    ACTION_HOLD = 0
    ACTION_BUY = 1
    ACTION_SELL = 2

    def __init__(self, df: pd.DataFrame, window_size: int = 15, initial_balance: float = 10000.0, fee_rate: float = 0.00075):
        super().__init__()

        self.df = df
        self.window_size = window_size
        self.initial_balance = initial_balance
        self.fee_rate = fee_rate # 交易手續費率 (Gate.io 現貨 VIP0 約 0.075%)

        self.current_step = self.window_size # 從第 window_size 步開始回測

        # --- 動作空間 (Action Space) ---
        # 0: Hold (保持/空倉), 1: Buy (全倉買入), 2: Sell (全倉賣出)
        self.action_space = spaces.Discrete(3)

        # --- 狀態空間 (Observation Space) ---
        # 狀態維度 = window_size * (OHLCV + 指標數) + 賬戶狀態數
        N_FEATURES = 6 # (Open, High, Low, Close, Volume, Price_Change)
        N_ACCOUNT_VARS = 2 # (持倉量, 當前淨值)

        # 狀態是 Box 類型
        self.observation_space = spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(self.window_size * N_FEATURES + N_ACCOUNT_VARS,),
            dtype=np.float32
        )

        # 賬戶變數初始化 (在 reset 中完成)
        self.balance = 0.0      # 當前持有的 USD/USDT
        self.crypto_held = 0.0  # 當前持有的交易對基礎幣 (例如 BTC)
        self.net_worth = 0.0    # 淨值 = balance + crypto_held * current_price
        self.last_net_worth = 0.0 # 用於計算獎勵
        self.trade_history = [] # 交易紀錄 (可選)

    def _get_observation(self) -> np.ndarray:
        """
        獲取當前時間步 t 的狀態 s_t。
        """
        # 1. 提取 K 線數據 (過去 window_size 根)
        start = self.current_step - self.window_size
        end = self.current_step

        # 提取 OHLCV + 指標
        window_data = self.df.iloc[start:end, :]

        # 確保數據形狀正確
        market_features = window_data.values.flatten()

        # 2. 獲取當前價格 (用作淨值計算)
        current_price = self.df.iloc[self.current_step - 1]['close']

        # 3. 更新當前淨值 (用當前價格計算)
        self.net_worth = self.balance + self.crypto_held * current_price

        # 4. 賬戶狀態特徵
        account_state = np.array([
            self.crypto_held,
            self.net_worth / self.initial_balance # 淨值比例 (歸一化)
        ], dtype=np.float32)

        # 5. 合併為最終狀態
        observation = np.concatenate([market_features, account_state])

        return observation

    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None) -> Tuple[np.ndarray, Dict[str, Any]]:
        """
        重設環境到初始狀態。
        """
        super().reset(seed=seed)

        # 隨機選擇起始點，確保有足夠的 window_size 歷史數據
        self.current_step = self.window_size + random.randint(0, len(self.df) - self.window_size - 1)

        # 交易變數重設
        self.balance = self.initial_balance
        self.crypto_held = 0.0
        self.net_worth = self.initial_balance
        self.last_net_worth = self.initial_balance
        self.trade_history = []

        observation = self._get_observation()
        info = {'net_worth': self.net_worth, 'step': self.current_step}

        return observation, info

    def _take_action(self, action: int):
        """
        執行動作並更新賬戶狀態。
        """
        current_price = self.df.iloc[self.current_step - 1]['close']
        trade_amount = 0.0 # 交易量

        if action == self.ACTION_BUY and self.balance > 0:
            # 全倉買入 (假設不考慮最小交易量限制)

            # 可用來購買的 USDT/USD 數量
            buy_power = self.balance / (1 + self.fee_rate)

            # 購買的幣數量
            amount_to_buy = buy_power / current_price

            fee = self.balance - buy_power # 手續費

            self.crypto_held += amount_to_buy
            self.balance = 0.0 # 餘額清零

            trade_amount = amount_to_buy
            self.trade_history.append((self.current_step, 'BUY', trade_amount, current_price, fee))

        elif action == self.ACTION_SELL and self.crypto_held > 0:
            # 全倉賣出
            amount_to_sell = self.crypto_held

            # 獲得的 USDT/USD 數量 (扣除手續費)
            proceeds = amount_to_sell * current_price * (1 - self.fee_rate)

            fee = amount_to_sell * current_price * self.fee_rate

            self.balance += proceeds
            self.crypto_held = 0.0 # 持倉清零

            trade_amount = amount_to_sell
            self.trade_history.append((self.current_step, 'SELL', trade_amount, current_price, fee))

        # ACTION_HOLD (0) 不執行任何交易，保持當前狀態

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict[str, Any]]:
        """
        執行一個時間步。
        :param action: Agent 選擇的動作 (0, 1, 2)
        :return: (observation, reward, terminated, truncated, info)
        """

        # 1. 執行動作 (更新 self.balance, self.crypto_held)
        self._take_action(action)

        # 2. 推進時間
        self.current_step += 1

        # 3. 獲取新狀態 s' (並同時計算當前淨值 self.net_worth)
        terminated = False
        truncated = False

        # 檢查是否到達數據結尾
        if self.current_step >= len(self.df):
            terminated = True
            # 最後一步強制平倉計算最終淨值
            if self.crypto_held > 0:
                 self._take_action(self.ACTION_SELL)

            observation = self._get_observation()

        else:
            observation = self._get_observation()

        # 4. 計算獎勵 (Reward)

        # 獎勵 = 淨值變化率
        # 最小化負值，最大化正值
        reward = (self.net_worth - self.last_net_worth) / self.last_net_worth

        # 更新上一步淨值
        self.last_net_worth = self.net_worth

        # 5. 檢查結束條件 (Termination/Truncation)

        # 簡易爆倉/虧損清算 (可選)
        if self.net_worth < self.initial_balance * 0.5: # 虧損超過 50% 強制結束
             terminated = True
             reward = -10.0 # 爆倉給予極大懲罰


        info = {
            'net_worth': self.net_worth,
            'crypto_held': self.crypto_held,
            'balance': self.balance,
            'reward': reward,
            'step': self.current_step
        }

        return observation, reward, terminated, truncated, info

    def render(self, mode='human'):
        """
        環境可視化 (簡單輸出)。
        """
        if mode == 'human':
            print(f"Step: {self.current_step} | Net Worth: ${self.net_worth:,.2f} | Crypto Held: {self.crypto_held:.4f} | Action: {self.action_space.names[self.action]}")
        pass

    def close(self):
        """
        清理資源 (例如關閉可視化窗口)。
        """
        pass

# 運行與測試 (使用真實 API 數據)

In [83]:
if __name__ == '__main__':

    # --- 1. 數據獲取配置 ---
    TARGET_PAIR = 'BTC_USDT'
    INTERVAL = '4h' # 4 小時 K 線
    DATA_LIMIT = 500 # 獲取最近 500 條 K 線
    WINDOW_SIZE = 30

    print(f"--- 1. 嘗試從 Gate.io 獲取 {TARGET_PAIR} 的 K 線數據 ({DATA_LIMIT} 條, {INTERVAL} 間隔) ---")

    # 呼叫真實的 API 數據獲取函數
    df_data = fetch_gateio_klines(TARGET_PAIR, INTERVAL, DATA_LIMIT)

    if df_data.empty or len(df_data) < WINDOW_SIZE:
        print("數據不足或獲取失敗，無法創建環境。")
    else:
        print(f"成功獲取數據。數據長度: {len(df_data)} 條 K 線")

        # --- 2. 創建環境 ---
        ENV_CONFIG = {
            'df': df_data,
            'window_size': WINDOW_SIZE,
            'initial_balance': 10000.0,
            'fee_rate': 0.00075
        }

        env = GateioTradingEnv(**ENV_CONFIG)

        # ... (後續的運行和測試邏輯與前一個回答相同) ...
        # 3. 運行一個 Episode
        print("\n--- 3. 運行一個 Episode ---")

        observation, info = env.reset()
        terminated = False
        truncated = False
        total_reward = 0.0

        print(f"起始淨值: ${info['net_worth']:,.2f}")

        while not terminated and not truncated:
            action = env.action_space.sample() # 隨機動作
            observation, reward, terminated, truncated, info = env.step(action)
            total_reward += reward

            if env.current_step % 100 == 0 or terminated:
                print(f"Step {env.current_step}/{len(df_data)} | Net Worth: ${info['net_worth']:,.2f} | Last Reward: {reward:.4f}")

        print("\n--- 4. Episode 結束 ---")
        final_net_worth = info['net_worth']
        profit_rate = (final_net_worth / env.initial_balance - 1) * 100

        print(f"最終淨值: ${final_net_worth:,.2f}")
        print(f"總報酬率: {profit_rate:.2f}%")
        env.close()

--- 1. 嘗試從 Gate.io 獲取 BTC_USDT 的 K 線數據 (500 條, 4h 間隔) ---
成功獲取數據。數據長度: 500 條 K 線

--- 3. 運行一個 Episode ---
起始淨值: $10,000.00
Step 400/500 | Net Worth: $8,942.49 | Last Reward: 0.0060
Step 500/500 | Net Worth: $7,949.07 | Last Reward: -0.0009

--- 4. Episode 結束 ---
最終淨值: $7,949.07
總報酬率: -20.51%


# Deep Q Learning


In [102]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
import numpy as np
import os

# 設置計算設備
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用的設備: {DEVICE}")

class QNetwork(nn.Module):
    """
    深度 Q 網絡 (DQN)
    輸入: 狀態向量 (Observation Space shape)
    輸出: 每個動作的 Q 值 (Action Space size)
    """
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()

        # 定義一個簡單的多層感知機 (MLP)
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, x):
        x = x.float().to(DEVICE) # 確保輸入是 float 且在正確的設備上
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)


class ReplayBuffer:
    """
    經驗回放緩衝區
    """
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        """添加經驗: (s, a, r, s', done)"""
        # 將 numpy 轉換為 list 或 tuple
        state = state.tolist() if isinstance(state, np.ndarray) else state
        next_state = next_state.tolist() if isinstance(next_state, np.ndarray) else next_state

        experience = (state, action, reward, next_state, done)
        self.buffer.append(experience)

    def sample(self, batch_size):
        """從緩衝區中隨機採樣 batch_size 個經驗"""
        if len(self.buffer) < batch_size:
            return None

        batch = random.sample(self.buffer, batch_size)

        # 轉換為 PyTorch Tensor
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.tensor(states, dtype=torch.float32, device=DEVICE)
        actions = torch.tensor(actions, dtype=torch.long, device=DEVICE)
        rewards = torch.tensor(rewards, dtype=torch.float32, device=DEVICE)
        next_states = torch.tensor(next_states, dtype=torch.float32, device=DEVICE)
        # Done 標誌需要是浮點數以進行乘法運算
        dones = torch.tensor(dones, dtype=torch.float32, device=DEVICE)

        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)


class DQNAgent:
    def __init__(self, state_dim, action_dim, gamma, lr, buffer_capacity, batch_size, target_update_freq):

        self.gamma = gamma          # 折扣因子
        self.batch_size = batch_size
        self.target_update_freq = target_update_freq
        self.action_dim = action_dim
        self.learn_step_counter = 0

        # 初始化 Q 網絡和目標 Q 網絡
        self.q_net = QNetwork(state_dim, action_dim).to(DEVICE)
        self.target_net = QNetwork(state_dim, action_dim).to(DEVICE)
        self.target_net.load_state_dict(self.q_net.state_dict()) # 初始同步
        self.target_net.eval() # 目標網絡只用於預測，設為評估模式

        # 初始化優化器和經驗回放
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
        self.buffer = ReplayBuffer(buffer_capacity)

        # 探索參數
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.9995 # 每次動作後的衰減率

    def choose_action(self, state):
        """Epsilon-Greedy 策略選擇動作"""
        if random.random() < self.epsilon:
            return random.randrange(self.action_dim) # 探索
        else:
            with torch.no_grad():
                # 網絡預測 Q 值
                state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(DEVICE)
                q_values = self.q_net(state_tensor)
                return q_values.argmax(dim=1).item() # 利用

    def learn(self):
        """訓練 Q 網絡"""

        # 經驗回放緩衝區不足，不訓練
        if len(self.buffer) < self.batch_size:
            return

        # 1. 從緩衝區採樣經驗
        states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)

        # 2. 計算當前 Q 值 Q(s, a)
        # self.q_net(states) 得到所有動作的 Q 值，然後用 gather 提取實際動作的 Q 值
        q_current = self.q_net(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)

        # 3. 計算目標 Q 值 R + γ * max Q'(s', a')

        # 獲取下一狀態的目標 Q 值 (使用 Target Network)
        # target_net.max(1)[0] 找到每個下一狀態的最高 Q 值
        with torch.no_grad():
            q_next_target = self.target_net(next_states).max(1)[0]

            # 如果 Episode 結束 (done=1)，則目標 Q 值只剩 R (因為 Q'(s', a') = 0)
            q_target = rewards + self.gamma * q_next_target * (1 - dones)

        # 4. 計算損失 (L = MSE(Q_current, Q_target))
        loss = F.mse_loss(q_current, q_target)

        # 5. 優化
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # 6. 軟更新 epsilon (探索率)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        # 7. 硬更新目標網絡
        self.learn_step_counter += 1
        if self.learn_step_counter % self.target_update_freq == 0:
            self.target_net.load_state_dict(self.q_net.state_dict())

        return loss.item()

    def save(self, path):
        torch.save(self.q_net.state_dict(), path)

    def load(self, path):
        self.q_net.load_state_dict(torch.load(path))
        self.target_net.load_state_dict(self.q_net.state_dict())



def train_dqn(env, agent, total_episodes, warm_up_steps):

    # 訓練參數
    MAX_STEPS_PER_EPISODE = len(env.df) - env.window_size # 最大步數

    print(f"\n--- 訓練參數 ---")
    print(f"總 Episode 數: {total_episodes}")
    print(f"環境每 Episode 最大步數: {MAX_STEPS_PER_EPISODE}")
    print(f"暖機步數 (Warm-up Steps): {warm_up_steps}")
    print(f"------------------")

    episode_rewards = []

    for episode in range(1, total_episodes + 1):

        # 1. 重設環境
        state, info = env.reset()
        episode_reward = 0
        step_count = 0
        terminated = False
        truncated = False

        # 2. 運行 Episode
        while not terminated and not truncated:

            # 3. Agent 選擇動作
            # 由於是單環境，直接使用 agent.choose_action
            action = agent.choose_action(state)

            # 4. 環境推進
            next_state, reward, terminated, truncated, info = env.step(action)

            # 5. 存儲經驗到回放緩衝區
            agent.buffer.push(state, action, reward, next_state, terminated)

            # 6. 訓練 (暖機步數後才開始訓練)
            if len(agent.buffer) >= warm_up_steps:
                agent.learn()

            state = next_state
            episode_reward += reward
            step_count += 1

            if step_count >= MAX_STEPS_PER_EPISODE:
                truncated = True

        # 7. 記錄和報告
        episode_rewards.append(episode_reward)

        if episode % 10 == 0 or episode == 1:
            avg_net_worth = info['net_worth']
            epsilon = agent.epsilon * 100
            print(f"Episode: {episode}/{total_episodes} | Steps: {step_count} | Avg Reward: {np.mean(episode_rewards[-10:]):.4f} | Final Net Worth: ${avg_net_worth:,.2f} | Epsilon: {epsilon:.2f}%")

    # 8. 保存最終模型
    model_save_path = "./gateio_dqn_model.pth"
    agent.save(model_save_path)
    print(f"\nDQN 訓練完成，模型已保存至 {model_save_path}")

使用的設備: cpu


# 訓練

In [103]:
if __name__ == '__main__':
    df_data = fetch_gateio_klines('BTC_USDT', '4h', 500)
    if df_data.empty or len(df_data) < 30:
        print("數據不足，無法進行訓練。")
    else:
        # --- 1. 環境初始化 ---
        ENV_CONFIG = {
            'df': df_data,
            'window_size': 30,
            'initial_balance': 10000.0,
            'fee_rate': 0.00075
        }
        env = GateioTradingEnv(**ENV_CONFIG)

        # --- 2. Agent 初始化 ---
        STATE_DIM = env.observation_space.shape[0]
        ACTION_DIM = env.action_space.n

        AGENT_CONFIG = {
            'state_dim': STATE_DIM,
            'action_dim': ACTION_DIM,
            'gamma': 0.99,                  # 折扣因子
            'lr': 1e-4,                     # 學習率
            'buffer_capacity': 10000,       # 經驗回放緩衝區容量
            'batch_size': 64,               # 訓練批次大小
            'target_update_freq': 1000      # 目標網絡更新頻率 (每 1000 步更新一次)
        }

        agent = DQNAgent(**AGENT_CONFIG)

        # --- 3. 啟動訓練 ---
        TOTAL_EPISODES = 500
        # 緩衝區需要填滿到至少 batch_size，但通常設一個較大的值
        WARM_UP_STEPS = 2000

        train_dqn(env, agent, TOTAL_EPISODES, WARM_UP_STEPS)

        env.close()


--- 訓練參數 ---
總 Episode 數: 500
環境每 Episode 最大步數: 470
暖機步數 (Warm-up Steps): 2000
------------------
Episode: 1/500 | Steps: 402 | Avg Reward: -0.2910 | Final Net Worth: $7,417.55 | Epsilon: 100.00%
Episode: 10/500 | Steps: 399 | Avg Reward: -0.1328 | Final Net Worth: $8,035.25 | Epsilon: 83.19%
Episode: 20/500 | Steps: 427 | Avg Reward: -0.2088 | Final Net Worth: $6,680.51 | Epsilon: 17.82%
Episode: 30/500 | Steps: 357 | Avg Reward: -0.2145 | Final Net Worth: $8,817.40 | Epsilon: 4.72%
Episode: 40/500 | Steps: 304 | Avg Reward: -0.1529 | Final Net Worth: $8,344.16 | Epsilon: 1.23%
Episode: 50/500 | Steps: 127 | Avg Reward: -0.1550 | Final Net Worth: $9,080.18 | Epsilon: 1.00%
Episode: 60/500 | Steps: 175 | Avg Reward: -0.1659 | Final Net Worth: $7,712.16 | Epsilon: 1.00%
Episode: 70/500 | Steps: 331 | Avg Reward: -0.1401 | Final Net Worth: $7,979.49 | Epsilon: 1.00%
Episode: 80/500 | Steps: 220 | Avg Reward: -0.1162 | Final Net Worth: $8,835.21 | Epsilon: 1.00%
Episode: 90/500 | Steps: 