In [8]:
# install finrl library
# !pip install git+https://github.com/AI4Finance-Foundation/FinRL.git

# 导入库

In [9]:
# 导入必要的库
import pandas as pd
import numpy as np
import os
import glob
import time
import torch
import matplotlib.pyplot as plt
from stable_baselines3.common.logger import configure
from stable_baselines3.common.utils import set_random_seed
from stable_baselines3.common.callbacks import (
    BaseCallback,
    CheckpointCallback,
    EvalCallback,
    CallbackList,
)
from finrl.agents.stablebaselines3.models import DRLAgent
from finrl.config import INDICATORS, TRAINED_MODEL_DIR, RESULTS_DIR
from finrl.main import check_and_make_directories
from finrl.meta.env_stock_trading.env_stocktrading import StockTradingEnv

plt.rcParams["font.sans-serif"] = ["SimHei"]  # 用来正常显示中文标签
plt.rcParams["axes.unicode_minus"] = False  # 用来正常显示负号

print("检查GPU可用性...")
use_cuda = torch.cuda.is_available()
if use_cuda:
    cuda_device_count = torch.cuda.device_count()
    cuda_device_name = torch.cuda.get_device_name(0)
    print(f"✓ 发现 {cuda_device_count} 个可用的GPU设备")
    print(f"✓ 当前使用: {cuda_device_name}")
else:
    print("✗ 未发现可用的GPU，将使用CPU进行训练")

# 确保模型保存目录存在
check_and_make_directories([TRAINED_MODEL_DIR])
# 设置随机种子以确保结果可复现
set_random_seed(0)

检查GPU可用性...
✓ 发现 1 个可用的GPU设备
✓ 当前使用: NVIDIA GeForce RTX 3060 Laptop GPU


# 加载数据

In [10]:
# 加载预处理后的训练数据
processed_data_file = "data/processed_data/train_data_20150101~20250101.csv"

# 检查文件是否存在
if not os.path.exists(processed_data_file):
    raise FileNotFoundError(
        f"找不到处理后的数据文件: {processed_data_file}，请先运行 process_data.ipynb"
    )

# 加载训练数据
train = pd.read_csv(processed_data_file)
train = train.set_index(train.columns[0])
train.index.names = [""]


# 在加载数据后添加以下代码来减少股票数量
def reduce_stock_dataset(df, top_n_stocks=15):
    """仅保留市值最大的top_n_stocks支股票"""
    print(f"原始股票数量: {len(df.tic.unique())}")

    # 计算每支股票的平均成交量，作为选择大市值股票的简单依据
    stock_volume = df.groupby("tic")["volume"].mean().sort_values(ascending=False)
    selected_stocks = stock_volume.head(top_n_stocks).index.tolist()

    # 过滤数据
    reduced_df = df[df.tic.isin(selected_stocks)].copy()
    print(f"缩减后股票数量: {len(reduced_df.tic.unique())}")
    return reduced_df


# 应用缩减
train = reduce_stock_dataset(train, top_n_stocks=15)  # 减少到15只

print(f"加载训练数据: {len(train)} 条记录")
train.head()

ParserError: Error tokenizing data. C error: Calling read(nbytes) on source failed. Try engine='python'.

# 构建交易环节

In [None]:
# 构建交易环境的参数
# 计算环境参数
stock_dimension = len(train.tic.unique())
state_space = 1 + 2 * stock_dimension + len(INDICATORS) * stock_dimension
print(f"股票数量: {stock_dimension}, 状态空间维度: {state_space}")

# 设置环境参数
buy_cost_list = sell_cost_list = [0.001] * stock_dimension  # 交易成本
num_stock_shares = [0] * stock_dimension  # 初始持有股票数量

env_kwargs = {
    "hmax": 100,  # 每个交易周期的最大交易次数
    "initial_amount": 1000000,  # 初始资金
    "num_stock_shares": num_stock_shares,  # 初始持有股票数量
    "buy_cost_pct": buy_cost_list,  # 买入成本
    "sell_cost_pct": sell_cost_list,  # 卖出成本
    "state_space": state_space,  # 状态空间
    "stock_dim": stock_dimension,  # 股票数量
    "tech_indicator_list": INDICATORS,  # 技术指标列表
    "action_space": stock_dimension,  # 动作空间
    "reward_scaling": 1e-3,  # 奖励缩放
}

# 构建交易环境
e_train_gym = StockTradingEnv(df=train, **env_kwargs)
env_train, _ = e_train_gym.get_sb_env()
print(f"环境类型: {type(env_train)}")

# 早停机制 

In [None]:
# 早停机制
class EarlyStoppingCallback(BaseCallback):
    def __init__(self, check_freq=1000, patience=5, min_delta=0.01, verbose=1):
        super(EarlyStoppingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.patience = patience
        self.min_delta = min_delta
        self.best_mean_reward = -np.inf
        self.no_improvement_count = 0

    def _on_step(self) -> bool:
        if self.n_calls % self.check_freq == 0:
            if len(self.model.ep_info_buffer) > 0:
                mean_reward = np.mean(
                    [ep_info["r"] for ep_info in self.model.ep_info_buffer]
                )
                if self.verbose > 0:
                    print(f"步数: {self.num_timesteps}, 平均奖励: {mean_reward:.2f}")
                if mean_reward - self.best_mean_reward < self.min_delta:
                    self.no_improvement_count += 1
                    if self.verbose > 0:
                        print(
                            f"无显著改进: {self.no_improvement_count}/{self.patience}"
                        )
                    if self.no_improvement_count >= self.patience:
                        if self.verbose > 0:
                            print("早停: 已连续多次无显著改进")
                        return False
                else:
                    self.no_improvement_count = 0
                    self.best_mean_reward = mean_reward
        return True


def setup_callbacks(model_name, env_train, eval_freq=5000, check_freq=1000):
    os.makedirs(f"{RESULTS_DIR}/eval/{model_name}", exist_ok=True)
    os.makedirs(f"{TRAINED_MODEL_DIR}/best/{model_name}", exist_ok=True)
    os.makedirs(f"{TRAINED_MODEL_DIR}/checkpoints/{model_name}", exist_ok=True)

    eval_env = env_train
    eval_callback = EvalCallback(
        eval_env,
        best_model_save_path=f"{TRAINED_MODEL_DIR}/best/{model_name}/",
        log_path=f"{RESULTS_DIR}/eval/{model_name}/",
        eval_freq=eval_freq,
        deterministic=True,
        render=False,
        verbose=1,
    )
    checkpoint_callback = CheckpointCallback(
        save_freq=eval_freq,
        save_path=f"{TRAINED_MODEL_DIR}/checkpoints/{model_name}/",
        name_prefix=f"{model_name}_model",
        save_replay_buffer=True,
        save_vecnormalize=True,
        verbose=1,
    )
    early_stopping_callback = EarlyStoppingCallback(
        check_freq=check_freq, patience=5, min_delta=0.01, verbose=1
    )
    return CallbackList([checkpoint_callback, eval_callback, early_stopping_callback])

# 算法选择

In [None]:
# 算法选择与GPU/CPU设置
# 设置为 True 选择使用相应算法
if_using_a2c = True
if_using_ddpg = True
if_using_ppo = True
if_using_td3 = True
if_using_sac = True

# GPU相关设置
if use_cuda:
    # 根据算法特性分配设备
    cpu_device = torch.device("cpu")
    gpu_device = torch.device("cuda")

    print(f"CPU设备: {cpu_device}")
    print(f"GPU设备: {gpu_device}")
else:
    # 如果没有GPU，所有模型都使用CPU
    cpu_device = gpu_device = torch.device("cpu")
    print("未检测到GPU，所有模型将使用CPU")


a2c_timesteps = 20000
ddpg_timesteps = 20000
ppo_timesteps = 20000
td3_timesteps = 20000
sac_timesteps = 20000

print("选中的算法及其训练设备:")
print(f"A2C: {'✓' if if_using_a2c else '✗'} (设备: CPU)")
print(f"DDPG: {'✓' if if_using_ddpg else '✗'} (设备: {'GPU' if use_cuda else 'CPU'})")
print(f"PPO: {'✓' if if_using_ppo else '✗'} (设备: {'GPU' if use_cuda else 'CPU'})")
print(f"TD3: {'✓' if if_using_td3 else '✗'} (设备: {'GPU' if use_cuda else 'CPU'})")
print(f"SAC: {'✓' if if_using_sac else '✗'} (设备: {'GPU' if use_cuda else 'CPU'})")

## A2C 模型

In [None]:
# A2C 模型
if if_using_a2c:
    print("\n======== 开始训练 A2C 模型 ========")
    agent = DRLAgent(env=env_train)
    A2C_PARAMS = {
        "n_steps": 16,
        "ent_coef": 0.01,
        "learning_rate": 0.001,
        "device": cpu_device,
    }
    model_a2c = agent.get_model("a2c", model_kwargs=A2C_PARAMS)
    tmp_path = RESULTS_DIR + "/a2c"
    new_logger_a2c = configure(tmp_path, ["stdout", "csv", "tensorboard"])
    model_a2c.set_logger(new_logger_a2c)
    callbacks = setup_callbacks("a2c", env_train, eval_freq=5000, check_freq=1000)

    train_start_time = time.time()
    print(f"开始训练，总步数: {a2c_timesteps}")
    trained_a2c = model_a2c.learn(
        total_timesteps=a2c_timesteps, callback=callbacks, tb_log_name="a2c"
    )
    train_end_time = time.time()
    train_time = train_end_time - train_start_time
    print(f"A2C 训练完成，耗时: {train_time:.2f}秒 ({train_time/60:.2f}分钟)")
    trained_a2c.save(TRAINED_MODEL_DIR + "/agent_a2c_20150101~20250101")
    print(f"最终模型已保存至 {TRAINED_MODEL_DIR}/agent_a2c_20150101~20250101")
    print(f"检查点保存在 {TRAINED_MODEL_DIR}/checkpoints/a2c/")
    print(f"最佳模型已保存至 {TRAINED_MODEL_DIR}/best/a2c/")

## DDPG 模型

In [None]:
# DDPG 模型
if if_using_ddpg:
    print("\n======== 开始训练 DDPG 模型 ========")
    agent = DRLAgent(env=env_train)
    DDPG_PARAMS = {
        "buffer_size": 10000,
        "learning_rate": 0.0005,
        "batch_size": 128,
        "device": gpu_device,
    }
    model_ddpg = agent.get_model("ddpg", model_kwargs=DDPG_PARAMS)
    tmp_path = RESULTS_DIR + "/ddpg"
    new_logger_ddpg = configure(tmp_path, ["stdout", "csv", "tensorboard"])
    model_ddpg.set_logger(new_logger_ddpg)
    callbacks = setup_callbacks("ddpg", env_train, eval_freq=5000, check_freq=1000)

    train_start_time = time.time()
    total_steps = ddpg_timesteps
    steps_per_stage = 5000
    stages = total_steps // steps_per_stage
    print(f"开始分阶段训练DDPG，总步数: {total_steps}，分为{stages}个阶段")

    import gc

    for stage in range(stages):
        print(f"阶段 {stage+1}/{stages}，训练步数: {steps_per_stage}")
        model_ddpg.learn(
            total_timesteps=steps_per_stage,
            callback=callbacks,
            tb_log_name=f"ddpg_stage_{stage}",
        )
        gc.collect()

    train_end_time = time.time()
    train_time = train_end_time - train_start_time
    print(f"DDPG 训练完成，耗时: {train_time:.2f}秒 ({train_time/60:.2f}分钟)")
    model_ddpg.save(TRAINED_MODEL_DIR + "/agent_ddpg_20150101~20250101")
    print(f"最终模型已保存至 {TRAINED_MODEL_DIR}/agent_ddpg_20150101~20250101")
    print(f"检查点保存在 {TRAINED_MODEL_DIR}/checkpoints/ddpg/")
    print(f"最佳模型已保存至 {TRAINED_MODEL_DIR}/best/ddpg/")
    gc.collect()
    print("内存已清理")

## PPO 模型

In [None]:
# PPO 模型
if if_using_ppo:
    print("\n======== 开始训练 PPO 模型 ========")
    agent = DRLAgent(env=env_train)
    PPO_PARAMS = {
        "n_steps": 512,
        "ent_coef": 0.01,
        "learning_rate": 0.001,
        "batch_size": 256,
        "device": gpu_device,
    }
    model_ppo = agent.get_model("ppo", model_kwargs=PPO_PARAMS)
    tmp_path = RESULTS_DIR + "/ppo"
    new_logger_ppo = configure(tmp_path, ["stdout", "csv", "tensorboard"])
    model_ppo.set_logger(new_logger_ppo)
    callbacks = setup_callbacks("ppo", env_train, eval_freq=5000, check_freq=1000)

    train_start_time = time.time()
    total_steps = ppo_timesteps
    steps_per_stage = 5000
    stages = total_steps // steps_per_stage
    print(f"开始分阶段训练PPO，总步数: {total_steps}，分为{stages}个阶段")

    import gc

    try:
        for stage in range(stages):
            print(f"阶段 {stage+1}/{stages}，训练步数: {steps_per_stage}")
            model_ppo.learn(
                total_timesteps=steps_per_stage,
                callback=callbacks,
                tb_log_name=f"ppo_stage_{stage}",
            )
            gc.collect()
            checkpoint_path = f"{TRAINED_MODEL_DIR}/agent_ppo_checkpoint_{stage+1}"
            model_ppo.save(checkpoint_path)
            print(f"保存阶段性检查点: {checkpoint_path}")
    except KeyboardInterrupt:
        print("\n训练被用户中断，保存当前模型...")
        interrupt_path = f"{TRAINED_MODEL_DIR}/agent_ppo_interrupted"
        model_ppo.save(interrupt_path)
        print(f"中断模型已保存至: {interrupt_path}")

    train_end_time = time.time()
    train_time = train_end_time - train_start_time
    print(f"PPO 训练完成，耗时: {train_time:.2f}秒 ({train_time/60:.2f}分钟)")
    model_ppo.save(TRAINED_MODEL_DIR + "/agent_ppo_20150101~20250101")
    print(f"最终模型已保存至 {TRAINED_MODEL_DIR}/agent_ppo_20150101~20250101")
    print(f"检查点保存在 {TRAINED_MODEL_DIR}/checkpoints/ppo/")
    print(f"最佳模型已保存至 {TRAINED_MODEL_DIR}/best/ppo/")
    gc.collect()
    print("内存已清理")

## TD3 模型

In [None]:
# TD3 模型
if if_using_td3:
    print("\n======== 开始训练 TD3 模型 ========")
    agent = DRLAgent(env=env_train)
    TD3_PARAMS = {
        "batch_size": 256,
        "buffer_size": 300000,
        "learning_rate": 0.001,
        "device": gpu_device,
    }
    model_td3 = agent.get_model("td3", model_kwargs=TD3_PARAMS)
    tmp_path = RESULTS_DIR + "/td3"
    new_logger_td3 = configure(tmp_path, ["stdout", "csv", "tensorboard"])
    model_td3.set_logger(new_logger_td3)
    callbacks = setup_callbacks("td3", env_train, eval_freq=5000, check_freq=1000)

    train_start_time = time.time()
    print(f"开始训练，总步数: {td3_timesteps}")
    trained_td3 = model_td3.learn(
        total_timesteps=td3_timesteps, callback=callbacks, tb_log_name="td3"
    )
    train_end_time = time.time()
    train_time = train_end_time - train_start_time
    print(f"TD3 训练完成，耗时: {train_time:.2f}秒 ({train_time/60:.2f}分钟)")
    trained_td3.save(TRAINED_MODEL_DIR + "/agent_td3_20150101~20250101")
    print(f"最终模型已保存至 {TRAINED_MODEL_DIR}/agent_td3_20150101~20250101")
    print(f"检查点保存在 {TRAINED_MODEL_DIR}/checkpoints/td3/")
    print(f"最佳模型已保存至 {TRAINED_MODEL_DIR}/best/td3/")

## SAC 模型

In [None]:
# SAC 模型
if if_using_sac:
    print("\n======== 开始训练 SAC 模型 ========")
    agent = DRLAgent(env=env_train)
    SAC_PARAMS = {
        "batch_size": 256,
        "buffer_size": 300000,
        "learning_rate": 0.001,
        "learning_starts": 100,
        "ent_coef": "auto_0.1",
        "device": gpu_device,
    }
    model_sac = agent.get_model("sac", model_kwargs=SAC_PARAMS)
    tmp_path = RESULTS_DIR + "/sac"
    new_logger_sac = configure(tmp_path, ["stdout", "csv", "tensorboard"])
    model_sac.set_logger(new_logger_sac)
    callbacks = setup_callbacks("sac", env_train, eval_freq=5000, check_freq=1000)

    train_start_time = time.time()
    print(f"开始训练，总步数: {sac_timesteps}")
    trained_sac = model_sac.learn(
        total_timesteps=sac_timesteps, callback=callbacks, tb_log_name="sac"
    )
    train_end_time = time.time()
    train_time = train_end_time - train_start_time
    print(f"SAC 训练完成，耗时: {train_time:.2f}秒 ({train_time/60:.2f}分钟)")
    trained_sac.save(TRAINED_MODEL_DIR + "/agent_sac_20150101~20250101")
    print(f"最终模型已保存至 {TRAINED_MODEL_DIR}/agent_sac_20150101~20250101")
    print(f"检查点保存在 {TRAINED_MODEL_DIR}/checkpoints/sac/")
    print(f"最佳模型已保存至 {TRAINED_MODEL_DIR}/best/sac/")

# 可视化训练过程

In [None]:
# 可视化训练过程
def visualize_training_results(model_name):
    plt.figure(figsize=(15, 12))
    
    # 训练奖励
    plt.subplot(2, 2, 1)
    csv_path = os.path.join(RESULTS_DIR, model_name, "*.monitor.csv")
    csv_files = glob.glob(csv_path)
    if csv_files:
        data = pd.read_csv(csv_files[0], skiprows=1)
        plt.plot(data["r"], label="单步奖励", alpha=0.3, color='gray')
        window_size = 100
        if len(data) > window_size:
            rolling_mean = data["r"].rolling(window=window_size).mean()
            plt.plot(rolling_mean, label=f"奖励滚动平均(窗口={window_size})", color='blue', linewidth=2)
        steps = np.arange(0, len(data), 5000)
        if len(steps) > 0 and len(data) > max(steps):
            checkpoint_rewards = [data["r"].iloc[step] if step < len(data) else np.nan for step in steps]
            plt.scatter(steps, checkpoint_rewards, color='red', s=50, zorder=5, label='每5000步检查点')
            for i in range(len(steps)-1):
                if steps[i+1] < len(data):
                    plt.plot(steps[i:i+2], data["r"].iloc[steps[i:i+2]], 'r--', alpha=0.7, linewidth=1.5)
        plt.title(f"{model_name.upper()} 训练奖励")
        plt.xlabel("训练步数")
        plt.ylabel("奖励")
        plt.legend()
        plt.grid(alpha=0.3)
    else:
        plt.text(0.5, 0.5, "未找到训练记录", ha='center', va='center', transform=plt.gca().transAxes)
    
    # 评估奖励
    plt.subplot(2, 2, 2)
    eval_path = os.path.join(RESULTS_DIR, "eval", model_name, "evaluations.npz")
    if os.path.exists(eval_path):
        data = np.load(eval_path)
        rewards = data["results"]
        steps = data["timesteps"]
        mean_rewards = rewards.mean(axis=1)
        std_rewards = rewards.std(axis=1)
        plt.plot(steps, mean_rewards, label="平均评估奖励", marker='o', color='green')
        plt.fill_between(steps, mean_rewards - std_rewards, mean_rewards + std_rewards, alpha=0.2, color='green')
        if len(steps) > 1:
            z = np.polyfit(steps, mean_rewards, 1)
            p = np.poly1d(z)
            plt.plot(steps, p(steps), "r--", alpha=0.7, label=f"趋势线 (斜率: {z[0]:.6f})")
        plt.title(f"{model_name.upper()} 评估性能")
        plt.xlabel("训练步数")
        plt.ylabel("平均评估奖励")
        plt.legend()
        plt.grid(alpha=0.3)
    else:
        plt.text(0.5, 0.5, "未找到评估记录", ha='center', va='center', transform=plt.gca().transAxes)
    
    # 检查点性能比较
    plt.subplot(2, 2, 3)
    checkpoint_files = sorted(glob.glob(os.path.join(TRAINED_MODEL_DIR, "checkpoints", model_name, "*.zip")))
    if checkpoint_files:
        steps = []
        for file in checkpoint_files:
            try:
                step = int(file.split("_")[-2])
                steps.append(step)
            except:
                continue
        if steps and os.path.exists(eval_path):
            data = np.load(eval_path)
            eval_steps = data["timesteps"]
            eval_rewards = data["results"].mean(axis=1)
            checkpoint_rewards = [eval_rewards[np.argmin(np.abs(eval_steps - step))] for step in steps]
            plt.bar(range(len(steps)), checkpoint_rewards, color='skyblue', alpha=0.7)
            plt.xticks(range(len(steps)), [f"{s//1000}k" for s in steps], rotation=45)
            best_idx = np.argmax(checkpoint_rewards)
            plt.bar(best_idx, checkpoint_rewards[best_idx], color='green', alpha=0.7)
            plt.title(f"{model_name.upper()} 检查点性能比较")
            plt.xlabel("训练步数")
            plt.ylabel("平均评估奖励")
            plt.grid(alpha=0.3)
    else:
        plt.text(0.5, 0.5, "未找到检查点文件", ha='center', va='center', transform=plt.gca().transAxes)
    
    # 收敛分析
    plt.subplot(2, 2, 4)
    csv_files = glob.glob(csv_path)
    if csv_files:
        data = pd.read_csv(csv_files[0], skiprows=1)
        window_size = 100
        if len(data) > window_size:
            rolling_rewards = data["r"].rolling(window=window_size).mean()
            sample_steps = np.arange(0, len(data), 5000)
            sample_steps = sample_steps[sample_steps >= window_size]
            if len(sample_steps) > 1:
                sample_rewards = [rolling_rewards.iloc[step] for step in sample_steps if step < len(rolling_rewards)]
                sample_steps = sample_steps[:len(sample_rewards)]
                reward_changes = np.diff(sample_rewards)
                reward_changes = np.append(reward_changes, reward_changes[-1])
                ax1 = plt.gca()
                ax1.set_xlabel("训练步数")
                ax1.set_ylabel("滚动平均奖励", color='blue')
                ax1.plot(sample_steps, sample_rewards, 'o-', color='blue', label="滚动平均奖励")
                ax1.tick_params(axis='y', labelcolor='blue')
                ax2 = ax1.twinx()
                ax2.set_ylabel("奖励变化率", color='red')
                ax2.plot(sample_steps, reward_changes, 'o--', color='red', label="奖励变化率")
                ax2.tick_params(axis='y', labelcolor='red')
                converged = np.all(np.abs(reward_changes[-3:]) < 0.01) if len(reward_changes) >= 3 else False
                title = f"{model_name.upper()} 收敛分析 - "
                title += "已收敛" if converged else "训练中"
                plt.title(title)
                lines1, labels1 = ax1.get_legend_handles_labels()
                lines2, labels2 = ax2.get_legend_handles_labels()
                ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')
                plt.grid(alpha=0.3)
    plt.tight_layout()
    os.makedirs("results/figures", exist_ok=True)
    plt.savefig(f"results/figures/{model_name}_training_analysis.png", dpi=300)
    print(f"可视化结果已保存至 results/figures/{model_name}_training_analysis.png")
    plt.show()

# 总结训练结果

In [None]:
# 总结训练结果
trained_models = []
if if_using_a2c:
    trained_models.append("A2C")
if if_using_ddpg:
    trained_models.append("DDPG")
if if_using_ppo:
    trained_models.append("PPO")
if if_using_td3:
    trained_models.append("TD3")
if if_using_sac:
    trained_models.append("SAC")

print("\n======== 训练完成 ========")
print(f"使用设备: {'GPU (CUDA)' if use_cuda else 'CPU'}")
print(f"训练的模型: {', '.join(trained_models)}")
print(f"所有模型已保存至 {TRAINED_MODEL_DIR} 目录")
print("\n下一步: 运行 back_test.ipynb 来评估训练好的模型表现")