In [None]:
# Databento 数据目录

[NautilusTrader](https://nautilustrader.io/docs/) 高性能算法交易平台和事件驱动回测器教程。

[在 GitHub 上查看源代码](https://github.com/nautechsystems/nautilus_trader/blob/develop/docs/tutorials/databento_data_catalog.ipynb)。

:::info
我们正在开发这个教程。
:::


In [None]:
## 概览

本教程将引导您如何使用各种Databento架构设置Nautilus Parquet数据目录。


In [None]:
## 先决条件

- 已安装 Python 3.11+
- 已安装 [JupyterLab](https://jupyter.org/) 或类似工具 (`pip install -U jupyterlab`)
- 已安装 [NautilusTrader](https://pypi.org/project/nautilus_trader/) 最新版本 (`pip install -U nautilus_trader`)
- 已安装 [databento](https://pypi.org/project/databento/) Python 客户端库用于数据请求 (`pip install -U databento`)
- [Databento](https://databento.com) 账户


In [None]:
## 请求数据


In [None]:
我们将在本教程的其余部分使用Databento历史客户端。您可以通过将Databento API密钥传递给构造函数来初始化一个，或者隐式使用`DATABENTO_API_KEY`环境变量（如下所示）。


In [None]:
import databento as db


client = db.Historical()  # 这将使用DATABENTO_API_KEY环境变量（推荐的最佳实践）


In [None]:
**重要的是要注意，从`timeseries.get_range`的每个历史流请求都会产生费用（即使对于相同的数据），因此我们需要**：
- 在请求之前了解并理解费用
- 不要对相同数据进行多次请求（效率低下）
- 通过写入zstd压缩的DBN文件将响应持久化到磁盘（这样我们就不必再次请求）


In [None]:
我们可以使用Databento API的元数据[get_cost端点](https://databento.com/docs/api-reference-historical/metadata/metadata-get-cost?historical=python&live=python)来获取数据费用报价，在每个请求之前。
每个请求序列将首先请求数据费用，然后仅在磁盘上尚不存在数据时才发出请求。

注意返回的响应以美元为单位，显示为分的小数。


In [None]:
以下请求仅针对少量数据（如本Medium文章[在Python中使用Databento和sklearn构建高频交易信号](https://databento.com/blog/hft-sklearn-python)中使用的），只是为了演示基本工作流程。


In [None]:
from pathlib import Path

from databento import DBNStore


In [None]:
我们将为原始Databento DBN格式数据准备一个目录，我们将在教程的其余部分使用它。


In [None]:
DATABENTO_DATA_DIR = Path("databento")
DATABENTO_DATA_DIR.mkdir(exist_ok=True)


In [None]:
# 请求费用报价（美元）- 此端点是'免费'的
client.metadata.get_cost(
    dataset="GLBX.MDP3",
    symbols=["ES.n.0"],
    stype_in="continuous",
    schema="mbp-10",
    start="2023-12-06T14:30:00",
    end="2023-12-06T20:30:00",
)


In [None]:
使用历史API请求Medium文章中使用的数据。


In [None]:
path = DATABENTO_DATA_DIR / "es-front-glbx-mbp10.dbn.zst"

if not path.exists():
    # 请求数据
    client.timeseries.get_range(
        dataset="GLBX.MDP3",
        symbols=["ES.n.0"],
        stype_in="continuous",
        schema="mbp-10",
        start="2023-12-06T14:30:00",
        end="2023-12-06T20:30:00",
        path=path,  # <-- 传递`path`参数将确保数据写入磁盘
    )


In [None]:
通过从磁盘读取并转换为pandas.DataFrame来检查数据


In [None]:
data = DBNStore.from_file(path)

df = data.to_df()
df


In [None]:
## 写入数据目录


In [None]:
import shutil
from pathlib import Path

from nautilus_trader.adapters.databento.loaders import DatabentoDataLoader
from nautilus_trader.model import InstrumentId
from nautilus_trader.persistence.catalog import ParquetDataCatalog


In [None]:
CATALOG_PATH = Path.cwd() / "catalog"

# 如果已经存在则清除
if CATALOG_PATH.exists():
    shutil.rmtree(CATALOG_PATH)
CATALOG_PATH.mkdir()

# 创建目录实例
catalog = ParquetDataCatalog(CATALOG_PATH)


In [None]:
现在我们已经准备好数据目录，我们需要一个 `DatabentoDataLoader`，我们将用它来解码数据并将其加载为 Nautilus 对象。


In [None]:
loader = DatabentoDataLoader()


In [None]:
接下来，我们将通过设置 `as_legacy_cython=False` 来加载 Rust pyo3 对象以写入目录（我们可以使用传统的 Cython 对象，但这稍微更高效）。

我们还传递一个 `instrument_id`，这不是必需的，但可以使数据加载更快，因为不需要符号映射。


In [None]:
path = DATABENTO_DATA_DIR / "es-front-glbx-mbp10.dbn.zst"
instrument_id = InstrumentId.from_str("ES.n.0")  # 这应该是原始符号（更新）

depth10 = loader.from_dbn_file(
    path=path,
    instrument_id=instrument_id,
    as_legacy_cython=False,
)


In [None]:
# 将数据写入目录（目前写入 MBP-10 大约需要 ~20 秒或约 250,000/秒）
catalog.write_data(depth10)


In [None]:
# 测试从目录读取
depths = catalog.order_book_depth10()
len(depths)


In [None]:
## 准备一个月的 AAPL 交易数据


In [None]:
现在我们将扩展这个工作流程，通过使用 Databento `trade` 架构准备纳斯达克交易所的一个月 AAPL 交易数据，这将转换为 Nautilus `TradeTick` 对象。


In [None]:
# 请求费用报价（美元）- 此端点是'免费'的
client.metadata.get_cost(
    dataset="XNAS.ITCH",
    symbols=["AAPL"],
    schema="trades",
    start="2024-01",
)


In [None]:
当使用 Databento `Historical` 数据客户端请求历史数据时，确保传递 `path` 参数将数据写入磁盘。


In [None]:
path = DATABENTO_DATA_DIR / "aapl-xnas-202401.trades.dbn.zst"

if not path.exists():
    # 请求数据
    client.timeseries.get_range(
        dataset="XNAS.ITCH",
        symbols=["AAPL"],
        schema="trades",
        start="2024-01",
        path=path,  # <-- 传递 `path` 参数
    )


In [None]:
通过从磁盘读取并转换为 pandas.DataFrame 来检查数据


In [None]:
data = DBNStore.from_file(path)

df = data.to_df()
df


In [None]:
我们将使用 `"AAPL.XNAS"` 的 `InstrumentId`，其中 XNAS 是纳斯达克交易所的 ISO 10383 MIC（市场标识符代码）。

虽然向加载器传递 `instrument_id` 并不是严格必需的，但它通过消除符号映射的需要来加速数据加载。此外，将 `as_legacy_cython` 选项设置为 False 进一步优化了过程，因为我们将把加载的数据写入目录。虽然我们可以使用传统的 Cython 对象，但这种方法在加载方面更高效。


In [None]:
instrument_id = InstrumentId.from_str("AAPL.XNAS")

trades = loader.from_dbn_file(
    path=path,
    instrument_id=instrument_id,
    as_legacy_cython=False,
)


In [None]:
在这里，我们将把数据组织为每月一个文件，这是一个任意选择，每天一个文件也同样有效。


In [None]:
# 将数据写入目录
catalog.write_data(trades)


In [None]:
trades = catalog.trade_ticks([instrument_id])


In [None]:
len(trades)
