In [None]:
"""
Lyft Motion Prediction for Autonomous Vehicles
================================================

本代码实现了基于 Lyft 数据集的运动预测任务，使用了 PyTorch Lightning 框架，
并整合了 l5kit 库来处理数据集和场景栅格图像。代码中包含了数据加载、预处理、
模型训练、验证和测试的完整流程，同时提供了详细的注释说明设计初衷及参数选择。
"""

### Imports and general settings

In [5]:
pip install timm==0.4.12

Collecting timm==0.4.12
  Downloading timm-0.4.12-py3-none-any.whl (376 kB)
[K     |████████████████████████████████| 376 kB 7.8 MB/s eta 0:00:01
Installing collected packages: timm
Successfully installed timm-0.4.12
You should consider upgrading via the '/opt/conda/bin/python3.7 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [6]:
# ---------------------------
# Imports 以及全局设置
# ---------------------------
import argparse    # 命令行参数解析模块
import os          # 操作系统交互模块（路径、环境变量等）
import random      # Python 内置随机数模块
import sys         # 访问Python解释器相关的变量
from typing import Tuple  # 用于类型提示

import matplotlib.pyplot as plt  # 用于数据可视化
import numpy as np               # 数值计算模块
import pytorch_lightning as pl   # PyTorch Lightning 框架，用于简化训练过程
import torch                     # PyTorch 框架核心模块
import l5kit                     # L5Kit 提供数据集处理和栅格化功能（Kaggle环境下需预装）
from l5kit.configs import load_config_data  # 用于加载配置信息（yaml格式）
from l5kit.data import ChunkedDataset, LocalDataManager  # 数据集加载和管理工具
from l5kit.dataset import AgentDataset  # 封装了代理（agent）数据集
from l5kit.evaluation import compute_metrics_csv, write_pred_csv  # 评估指标计算和保存预测结果的工具
from l5kit.evaluation.metrics import neg_multi_log_likelihood, time_displace  # 计算多模态损失和时间偏移误差
from l5kit.geometry import transform_points  # 坐标转换工具
from l5kit.rasterization import build_rasterizer  # 构建场景栅格图像的工具
from l5kit.visualization import TARGET_POINTS_COLOR, draw_trajectory  # 用于绘制轨迹及设置目标点颜色
from torch.utils.data import DataLoader  # PyTorch 数据加载工具

# 自定义模块，用于定义损失函数、模型结构和一些工具函数
import lyft_loss
import lyft_models
import lyft_utils

In [7]:
# 定义数据集规模和验证时采样间隔的常数
ALL_DATA_SIZE = 198474478
VAL_INTERVAL_SAMPLES = 250000

# 定义栅格化配置文件路径，该文件中包含了场景渲染需要的各项配置参数
CFG_PATH = "../input/lyft-mpred-seresnext26-pretrained/agent_motion_config.yaml"
# 预测结果输出的CSV文件路径，后续测试模式会生成该文件
CSV_PATH = "./submission.csv"

# 为保证训练和测试环境的一致性，设置历史帧和未来帧的阈值
MIN_FRAME_HISTORY = 0    # 筛选代理：过去至少需要的帧数
MIN_FRAME_FUTURE = 10    # 筛选代理：未来至少需要的帧数
# 验证集中用到的特定帧（比如帧99）用来与测试数据保持一致
VAL_SELECTED_FRAME = (99,)

# ---------------------------
# 固定随机种子设置，确保实验结果的可重复性
# ---------------------------
SEED = 42
os.environ["PYTHONHASHSEED"] = str(SEED)  # Python 的hash随机化的种子
random.seed(SEED)  # Python自带随机种子
np.random.seed(SEED)  # NumPy随机种子

### Run configuration

In [8]:
# ---------------------------
# 加载配置文件
# ---------------------------
cfg = load_config_data(CFG_PATH)  # 从yaml配置文件中加载配置信息

# ---------------------------
# 参数解析与运行配置
# ---------------------------
parser = argparse.ArgumentParser(
    description="Run lyft motion prediction learning",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

# 定义数据集路径
parser.add_argument(
    "--l5kit_data_folder",
    default="/your/dataset/path",
    type=str,
    help="root directory path for lyft motion prediction dataset",
)

# 定义优化器选择，可选 "adam" 或 "sgd"
parser.add_argument(
    "--optim_name",
    choices=["adam", "sgd"],
    default="sgd",
    help="optimizer name",
)

# 定义预测模式数（多模态预测）
parser.add_argument(
    "--num_modes",
    type=int,
    default=3,
    help="number of the modes on each prediction",
)

# 定义学习率
parser.add_argument("--lr", default=7.0e-4, type=float, help="learning rate")

# 定义批次大小
parser.add_argument("--batch_size", type=int, default=220, help="batch size")

# 定义训练轮数
parser.add_argument("--epochs", type=int, default=1, help="epochs for training")

# 定义网络骨干结构，可选 efficientnet_b1 或 seresnext26d_32x4d
parser.add_argument(
    "--backbone_name",
    choices=["efficientnet_b1", "seresnext26d_32x4d"],
    default="seresnext26d_32x4d",
    help="backbone name",
)

# 选择是否在训练时只使用部分帧（4帧），加快收敛但会增加损失值
parser.add_argument(
    "--downsample_train",
    action="store_true",
    help="using only 4 frames from each scene, the loss converge is \
much faster than using all data, but it will get larger loss",
)

# 测试模式的标记，用于区分训练和测试阶段
parser.add_argument("--is_test", action="store_true", help="test mode")

# 模型检查点路径，用于测试模式加载预训练模型权重
parser.add_argument(
    "--ckpt_path",
    type=str,
    default="./model.pth",
    help="path for model checkpoint at test mode",
)

# 定义训练时的数值精度
parser.add_argument(
    "--precision",
    default=16,
    choices=[16, 32],
    type=int,
    help="float precision at training",
)

# 指定使用的GPU设备编号，逗号分隔，如 "0" 或 "0,1"
parser.add_argument(
    "--visible_gpus",
    type=str,
    default="0",
    help="Select gpu ids with comma separated format",
)

# 是否启用自动寻找学习率（如 FastAI 实现）
parser.add_argument(
    "--find_lr",
    action="store_true",
    help="find lr with fast ai implementation",
)

# 定义 DataLoader 使用的 CPU 核数
parser.add_argument(
    "--num_workers",
    default="16",
    type=int,
    help="number of cpus for DataLoader",
)

# 调试模式的标记，用于快速验证代码运行而不消耗全部数据
parser.add_argument("--is_debug", action="store_true", help="debug mode")

# 为了解决笔记本或者Kaggle环境下不能通过命令行传参的问题，这里直接模拟传参
args = parser.parse_args([
    "--l5kit_data_folder",
    "../input/lyft-motion-prediction-autonomous-vehicles",
    "--is_test",
    "--ckpt_path",
    "../input/lyft-mpred-seresnext26-pretrained/epoch-v0.ckpt",
    "--num_workers",
    "4",
    "--batch_size",
    "32"
])

# 根据debug标识调整一些参数，比如数据集、采样间隔、batch_size等
if args.is_debug:
    DEBUG = True
    print("\t ---- DEBUG RUN ---- ")
    cfg["train_data_loader"]["key"] = "scenes/sample.zarr"
    cfg["val_data_loader"]["key"] = "scenes/sample.zarr"
    VAL_INTERVAL_SAMPLES = 5000
    args.batch_size = 16
else:
    DEBUG = False
    print("\t ---- NORMAL RUN ---- ")

# 输出当前的所有参数，方便调试和验证选择的配置
lyft_utils.print_argparse_arguments(args)

# 设置 GPU 可见性，确保程序仅使用指定的 GPU
os.environ["CUDA_VISIBLE_DEVICES"] = args.visible_gpus


	 ---- NORMAL RUN ---- 
PARAMETER SETTING
--------------------------------------------------
backbone_name            :seresnext26d_32x4d
batch_size               :32
ckpt_path                :../input/lyft-mpred-seresnext26-pretrained/epoch-v0.ckpt
downsample_train         :False
epochs                   :1
find_lr                  :False
is_debug                 :False
is_test                  :True
l5kit_data_folder        :../input/lyft-motion-prediction-autonomous-vehicles
lr                       :0.0007
num_modes                :3
num_workers              :4
optim_name               :sgd
precision                :16
visible_gpus             :0
--------------------------------------------------


## Dataloader preparation
`pl.LightningDataModule` for train, validation and test dataloader.

In [9]:
# ---------------------------
# 数据加载模块：LyftMpredDatamodule (继承自 pl.LightningDataModule)
# ---------------------------
class LyftMpredDatamodule(pl.LightningDataModule):
    """
    该数据模块封装了训练、验证和测试数据集的加载逻辑。包含数据准备、数据集分割和可视化的功能。
    """
    def __init__(
        self,
        l5kit_data_folder: str,
        cfg: dict,
        batch_size: int = 440,
        num_workers: int = 16,
        downsample_train: bool = False,
        is_test: bool = False,
        is_debug: bool = False,
    ) -> None:
        super().__init__()
        # 设置数据存放的根目录（通过环境变量设置，l5kit库会自动读取）
        os.environ["L5KIT_DATA_FOLDER"] = l5kit_data_folder
        self.cfg = cfg
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.downsample_train = downsample_train
        self.is_test = is_test
        self.is_debug = is_debug

    def prepare_data(self):
        """
        仅在单个 GPU 上调用，用于下载和预处理数据。
        这里主要初始化本地数据管理器和构建场景栅格化器。
        """
        self.dm = LocalDataManager(None)
        self.rasterizer = build_rasterizer(cfg, self.dm)

    def setup(self):
        """
        在每个 GPU 上调用，用于实际加载和处理数据集。
        根据 is_test 标志加载测试或者训练/验证数据集，
        并对数据集做必要的下采样和可视化处理。
        """
        if self.is_test:
            print("test mode setup")
            self.test_path, test_zarr, self.test_dataset = self.load_zarr_dataset(
                loader_name="test_data_loader"
            )
        else:
            print("train mode setup")
            # 加载训练数据集
            self.train_path, train_zarr, self.train_dataset = self.load_zarr_dataset(
                loader_name="train_data_loader"
            )
            # 加载验证数据集
            self.val_path, val_zarr, self.val_dataset = self.load_zarr_dataset(
                loader_name="val_data_loader"
            )
            # 绘制部分训练数据用于直观检查数据加载情况及渲染效果
            self.plot_dataset(self.train_dataset)

            # 如果设置了 downsample_train，则仅选取部分帧以加速训练
            if self.downsample_train:
                print(
                    "downsampling agents, using only {} frames from each scene".format(
                        len(lyft_utils.TRAIN_DSAMPLE_FRAMES)
                    )
                )
                train_agents_list = lyft_utils.downsample_agents(
                    train_zarr,
                    self.train_dataset,
                    selected_frames=lyft_utils.TRAIN_DSAMPLE_FRAMES,
                )
                self.train_dataset = torch.utils.data.Subset(
                    self.train_dataset, train_agents_list
                )
            # 对验证数据集下采样，保证与测试集采样一致（可以参考 l5kit.evaluation.create_chopped_dataset）
            val_agents_list = lyft_utils.downsample_agents(
                val_zarr, self.val_dataset, selected_frames=VAL_SELECTED_FRAME
            )
            self.val_dataset = torch.utils.data.Subset(
                self.val_dataset, val_agents_list
            )

    def train_dataloader(self):
        """
        返回训练阶段用到的数据加载器，采用随机打乱（shuffle=True）。
        """
        return DataLoader(
            self.train_dataset,
            shuffle=True,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
        )

    def val_dataloader(self):
        """
        返回验证阶段用到的数据加载器，不进行随机打乱。
        """
        return DataLoader(
            self.val_dataset,
            shuffle=False,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
        )

    def test_dataloader(self):
        """
        返回测试阶段用到的数据加载器，不进行随机打乱。
        """
        return DataLoader(
            self.test_dataset,
            shuffle=False,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
        )

    def load_zarr_dataset(
        self, loader_name: str = "train_data_loder"
    ) -> Tuple[str, ChunkedDataset, AgentDataset]:
        """
        加载 zarr 格式的序列化数据集，并根据 loader_name 选择不同的数据预处理方式。
        对于测试数据还会加载对应的 mask 文件，用于标识哪些代理数据可用。
        """
        zarr_path = self.dm.require(self.cfg[loader_name]["key"])
        print("load zarr data:", zarr_path)
        # 打开 zarr 数据集（分块存储，提升加载效率）
        zarr_dataset = ChunkedDataset(zarr_path).open()
        if loader_name == "test_data_loader":
            # 对于测试数据，加载场景中代理的 mask（有效性标识）
            mask_path = os.path.join(os.path.dirname(zarr_path), "mask.npz")
            agents_mask = np.load(mask_path)["arr_0"]
            agent_dataset = AgentDataset(
                self.cfg, zarr_dataset, self.rasterizer, agents_mask=agents_mask
            )
        else:
            # 对于训练和验证数据，使用 min_frame_history 和 min_frame_future 进行筛选
            agent_dataset = AgentDataset(
                self.cfg,
                zarr_dataset,
                self.rasterizer,
                min_frame_history=MIN_FRAME_HISTORY,
                min_frame_future=MIN_FRAME_FUTURE,
            )
        print(zarr_dataset)
        return zarr_path, zarr_dataset, agent_dataset

    def plot_dataset(self, agent_dataset: AgentDataset, plot_num: int = 10) -> None:
        """
        随机挑选若干个样本，生成栅格图像，叠加目标轨迹，用于直观验证数据渲染效果。
        如果处于调试模式下，可以通过 plt.show() 展示图像。
        """
        print("Ploting dataset")
        # 随机选取 plot_num 个样本
        ind = np.random.randint(0, len(agent_dataset), size=plot_num)
        for i in range(plot_num):
            data = agent_dataset[ind[i]]
            # 将数据中的图像张量格式转换为 (H, W, C) 以便可视化
            im = data["image"].transpose(1, 2, 0)
            # 利用栅格化器将图像转换成 RGB 格式
            im = agent_dataset.rasterizer.to_rgb(im)
            # 将目标轨迹点从代理坐标转换为像素坐标
            target_positions_pixels = transform_points(
                data["target_positions"], data["raster_from_agent"]
            )
            # 绘制目标轨迹（使用预定义的颜色）
            draw_trajectory(
                im,
                target_positions_pixels,
                TARGET_POINTS_COLOR,
                yaws=data["target_yaws"],
            )
            # 反转图像的 Y 轴进行正确显示
            plt.imshow(im[::-1])
            if self.is_debug:
                plt.show()

## Train, validation and test steps
 with `pl.LightningModule`.

In [10]:
# ---------------------------
# 模型训练模块：LightningModule 封装了训练、验证及测试步骤
# ---------------------------
class LitModel(pl.LightningModule):
    """
    继承自 PyTorch LightningModule，封装了前向计算、训练、验证和测试流程。
    同时负责配置优化器和学习率调度策略。
    """
    def __init__(
        self,
        cfg: dict,
        num_modes: int = 3,
        ba_size: int = 128,
        lr: float = 3.0e-4,
        backbone_name: str = "efficientnet_b1",
        epochs: int = 1,
        total_steps: int = 100,
        data_size: int = ALL_DATA_SIZE,
        optim_name: str = "adam",
    ) -> None:
        super().__init__()
        # 保存所有超参数，方便日志记录和后续恢复
        self.save_hyperparameters(
            "lr",
            "backbone_name",
            "num_modes",
            "ba_size",
            "epochs",
            "optim_name",
            "data_size",
            "total_steps",
        )
        # 使用自定义模型构造函数，构建多模态预测模型
        self.model = lyft_models.LyftMultiModel(
            cfg, num_modes=num_modes, backbone_name=backbone_name
        )
        # 测试阶段需要用到的关键数据字段
        self.test_keys = ("world_from_agent", "centroid", "timestamp", "track_id")

    def forward(self, x):
        """
        前向传播函数，直接调用内部的模型。
        """
        x = self.model(x)
        return x

    def training_step(self, batch, batch_idx):
        """
        单个训练批次的处理逻辑：
        1. 获取输入图像及对应的目标轨迹和可用性标记。
        2. 前向传播获得预测值及置信度。
        3. 利用自定义损失函数计算负多模态对数似然损失。
        4. 记录并返回损失值以供反向传播。
        """
        inputs = batch["image"]
        target_availabilities = batch["target_availabilities"].unsqueeze(-1)
        targets = batch["target_positions"]

        outputs, confidences = self.model(inputs)
        loss = lyft_loss.pytorch_neg_multi_log_likelihood_batch(
            targets,
            outputs,
            confidences.squeeze(),
            target_availabilities.squeeze(),
        )
        self.log(
            "train_epoch_loss",
            loss,
            prog_bar=False,
            on_epoch=True,
            on_step=False,
        )
        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        """
        与训练步骤类似，验证时计算损失值用于监控模型表现，调参时也作为早停的依据。
        """
        inputs = batch["image"]
        target_availabilities = batch["target_availabilities"].unsqueeze(-1)
        targets = batch["target_positions"]

        outputs, confidences = self.model(inputs)
        loss = lyft_loss.pytorch_neg_multi_log_likelihood_batch(
            targets,
            outputs,
            confidences.squeeze(),
            target_availabilities.squeeze(),
        )
        self.log("val_loss", loss)
        return loss

    def test_step(self, batch, batch_idx):
        """
        测试步骤：
        1. 前向传播获得模型预测的轨迹及对应置信度。
        2. 同时保留批次中的其他关键信息（如世界坐标转换矩阵、车体中心等）。
        这些信息将在 test_epoch_end 中进一步转换处理并保存为 CSV 文件。
        """
        inputs = batch["image"]
        outputs, confidences = self.model(inputs)
        test_batch = {key_: batch[key_] for key_ in self.test_keys}

        return outputs, confidences, test_batch

    def test_epoch_end(self, outputs):
        """
        测试阶段所有批次结束后：
        1. 将批次结果中的预测轨迹转换回世界坐标，计算相对偏移。
        2. 整合所有批次的预测结果、置信度、时间戳和轨迹ID。
        3. 调用 write_pred_csv 函数，将预测结果保存为 CSV 文件以供提交评估。
        """
        pred_coords_list = []
        confidences_list = []
        timestamps_list = []
        track_id_list = []

        # 对所有批次输出结果进行处理
        for outputs, confidences, batch in outputs:
            # 将 tensor 数据转换为 numpy 格式
            outputs = outputs.cpu().numpy()

            world_from_agents = batch["world_from_agent"].cpu().numpy()
            centroids = batch["centroid"].cpu().numpy()
            # 对每个样本和每个模态，转换预测结果到世界坐标，并调整为相对于车体中心的坐标
            for idx in range(len(outputs)):
                for mode in range(3):
                    outputs[idx, mode, :, :] = (
                        transform_points(
                            outputs[idx, mode, :, :], world_from_agents[idx]
                        )
                        - centroids[idx][:2]
                    )
            pred_coords_list.append(outputs)
            confidences_list.append(confidences)
            timestamps_list.append(batch["timestamp"])
            track_id_list.append(batch["track_id"])

        # 将多个批次的预测结果拼接起来
        coords = np.concatenate(pred_coords_list)
        confs = torch.cat(confidences_list).cpu().numpy()
        track_ids = torch.cat(track_id_list).cpu().numpy()
        timestamps = torch.cat(timestamps_list).cpu().numpy()

        # 将测试预测结果写入 CSV 文件
        write_pred_csv(
            CSV_PATH,
            timestamps=timestamps,
            track_ids=track_ids,
            coords=coords,
            confs=confs,
        )
        print(f"Saved to {CSV_PATH}")

    def configure_optimizers(self):
        """
        根据命令行选择的优化器类型，配置相应的优化器（SGD 或 Adam）
        并采用 OneCycleLR 调度器控制学习率变化。
        """
        if self.hparams.optim_name == "sgd":
            optimizer = torch.optim.SGD(
                self.parameters(),
                lr=self.hparams.lr,
                momentum=0.9,
                weight_decay=4e-5,
            )
        elif self.hparams.optim_name == "adam":
            optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.lr)
        else:
            raise NotImplementedError

        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer, max_lr=self.hparams.lr, total_steps=self.hparams.total_steps
        )
        return [optimizer], [scheduler]

In [11]:
# ---------------------------
# 实例化数据模块并准备数据
# ---------------------------
# 使用命令行参数配置，创建数据模块对象，该对象负责加载训练、验证和测试数据
mpred_dm = LyftMpredDatamodule(  # type: ignore[abstract]
    args.l5kit_data_folder,
    cfg,
    batch_size=args.batch_size,
    num_workers=args.num_workers,
    downsample_train=args.downsample_train,
    is_test=args.is_test,
    is_debug=args.is_debug,
)
mpred_dm.prepare_data()  # 预处理数据，仅在单个 GPU 上运行
mpred_dm.setup()         # 配置数据集（加载文件、下采样、可视化等）

test mode setup
load zarr data: ../input/lyft-motion-prediction-autonomous-vehicles/scenes/test.zarr




+------------+------------+------------+---------------+-----------------+----------------------+----------------------+----------------------+---------------------+
| Num Scenes | Num Frames | Num Agents | Num TR lights | Total Time (hr) | Avg Frames per Scene | Avg Agents per Frame | Avg Scene Time (sec) | Avg Frame frequency |
+------------+------------+------------+---------------+-----------------+----------------------+----------------------+----------------------+---------------------+
|   11314    |  1131400   |  88594921  |    7854144    |      31.43      |        100.00        |        78.31         |        10.00         |        10.00        |
+------------+------------+------------+---------------+-----------------+----------------------+----------------------+----------------------+---------------------+


## Running test 

In [14]:
# ---------------------------
# 模型测试流程
# ---------------------------
if args.is_test:
    print("\t\t ==== TEST MODE ====")
    print("load from: ", args.ckpt_path)
    # 从给定的检查点路径加载模型权重，构建模型实例
    model = LitModel.load_from_checkpoint(args.ckpt_path, cfg=cfg)
    # 使用指定的 GPU 数量创建 Trainer 对象
    trainer = pl.Trainer(gpus=len(args.visible_gpus.split(",")))
    # 执行测试流程，内部调用 test_dataloader、test_step、test_epoch_end 等方法
    trainer.test(model, datamodule=mpred_dm)

		 ==== TEST MODE ====
load from:  ../input/lyft-mpred-seresnext26-pretrained/epoch-v0.ckpt


GPU available: True, used: True
TPU available: False, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Testing', layout=Layout(flex='2'), max=…

Saved to ./submission.csv
--------------------------------------------------------------------------------



## Running train

In [15]:
if not args.is_test:
    print("\t\t ==== TRAIN MODE ====")
    print(
        "training samples: {}, valid samples: {}".format(
            len(mpred_dm.train_dataset), len(mpred_dm.val_dataset)
        )
    )
    # 根据数据集的大小和批次数计算训练总步数，以及每个间隔内的验证步数
    total_steps = args.epochs * len(mpred_dm.train_dataset) // args.batch_size
    val_check_interval = VAL_INTERVAL_SAMPLES // args.batch_size

    # 实例化 LightningModule，构造模型以及保存超参数信息
    model = LitModel(
        cfg,
        lr=args.lr,
        backbone_name=args.backbone_name,
        num_modes=args.num_modes,
        optim_name=args.optim_name,
        ba_size=args.batch_size,
        epochs=args.epochs,
        data_size=len(mpred_dm.train_dataset),
        total_steps=total_steps,
    )

    # 定义模型检查点回调，监控验证损失，并保存在验证集上表现最好的模型状态
    checkpoint_callback = pl.callbacks.ModelCheckpoint(
        monitor="val_loss",
        save_last=True,
        mode="min",
        verbose=True,
    )
    # 固定整体种子，确保多GPU训练中各个进程的随机性一致
    pl.trainer.seed_everything(seed=SEED)
    # 构造 Trainer 对象，包含训练的 GPU 数量、最大步数、验证间隔、精度选项和检查点回调
    trainer = pl.Trainer(
        gpus=len(args.visible_gpus.split(",")),
        max_steps=total_steps,
        val_check_interval=val_check_interval,
        precision=args.precision,
        benchmark=True,
        deterministic=False,
        checkpoint_callback=checkpoint_callback,
    )
    # 开始模型训练
    trainer.fit(model, datamodule=mpred_dm)