Skip to content

Latest commit

 

History

History
103 lines (62 loc) · 53.5 KB

framework.rst

File metadata and controls

103 lines (62 loc) · 53.5 KB

TqSdk整体结构

文件结构

File Description
api.py TqApi 接口主文件
tqhelper.py TqApi 辅助代码
exception.py 异常类型定义
objs.py 主要业务数据结构定义
sim.py 本地模拟交易
backtest.py 回测支持
lib.py 交易辅助工具
ta.py 技术指标
tafunc.py 技术分析函数
ctpse/* 穿透式监管信息采集模块
test/* 单元测试用例
demo/* 示例程序

数据流

TqSdk中以数据流的方式连接各组件。TqChan(本质是一个asyncio.Queue)被用作两个组件间的单向数据流管道,一个组件向 TqChan 中放入数据包,另一个组件从 TqChan 中依次取出数据包。

实盘运行时,整个数据流结构如下图:

Websocket Client
To OpenTradeGateway
Websocket Client<br>To OpenTradeGateway
Websocket Client
To OpenMdGateway
Websocket Client<br>To OpenMdGateway
TqAccount
TqAccount
api_recv_chan
api_recv_chan
api_send_chan
api_send_chan
TqApi
TqApi
td_recv_chan
td_recv_chan
td_send_chan
td_send_chan
md_recv_chan
md_recv_chan
md_send_chan
md_send_chan

数据包上行流程(以报单为例):

  1. 用户程序调用 TqApi 中的某些需要发出数据包的功能函数, 以 TqApi.insert_order 为例
  2. TqApi.insert_order 函数生成一个需要发出的数据包, 将此数据包放入 api_send_chan
  3. TqAccount 从 api_send_chan 中取出此数据包,根据 aid 字段,决定将此数据包放入 td_send_chan
  4. 连接到交易网关的 websocket client 从 td_send_chan 中取出此数据包,通过网络发出

数据包下行流程(以接收行情为例):

  1. 连接到行情网关的 websocket client 从网络收到一个数据包,将其放入 md_recv_chan
  2. TqAccount 从md_recv_chan中取出此数据包,将它放入 api_recv_chan
  3. TqApi 从api_recv_chan中取出此数据包,将数据包中携带的行情数据合并到内存存储区中

基于这样的数据流结构,可以通过简单更换部分组件的方式实现不同工作模式。例如模拟交易时,我们用 TqSim 替换 TqAccount:

Websocket Client
To OpenMdGateway
Websocket Client<br>To OpenMdGateway
TqSim
TqSim
api_recv_chan
api_recv_chan
api_send_chan
api_send_chan
TqApi
TqApi
md_recv_chan
md_recv_chan
md_send_chan
md_send_chan

策略回测则是这样:

Websocket Client
To OpenMdGateway
Websocket Client<br>To OpenMdGateway
TqSim
TqSim
api_recv_chan
api_recv_chan
api_send_chan
api_send_chan
TqApi
TqApi
md_recv_chan
md_recv_chan
md_send_chan
md_send_chan
TqBacktest
TqBacktest
backtest_recv_chan
backtest_recv_chan
backtest_send_chan
backtest_send_chan

内存数据存储与更新

按照 DIFF 协议推荐的客户端最佳实践,TqApi 使用单一变量(TqApi._data)存储所有业务数据, 它的结构如下:

/
/
quotes
quotes
SHFE.cu1901
SHFE.cu1901
last_price
last_price
volume
volume
SHFE.cu1902
SHFE.cu1902
klines
klines
ticks
ticks
trade
trade
user1
user1
positions
positions
SHFE.cu1901
SHFE.cu1901
pos_long
pos_long
pos_short
pos_short

在每次收到数据包时,TqApi都会将数据包内容合并到 TqApi._data 中. 具体的代码流程如下:

  1. websocket client 收到数据包, 放入 TqApi._pending_diffs
  2. wait_update 函数发现 TqApi._pending_diffs 有待处理数据包, 中止异步循环以处理此数据包:

    while not self._wait_timeout and not self._pending_diffs:     # 这里发现 self._pending_diffs 非空, 中止 while 循环
        self._run_once()
  3. wait_update 调用 self._merge_diff 函数:

    for d in self._diffs:
        self._merge_diff(self._data, d, self._prototype, False)
  4. TqApi._merge_diff 函数将收到的数据包并入本地存储.
  5. 对于k线之类的序列数据, 后续继续将更新的数据复制到 pandas dataframe 中

异步任务调度

TqApi 在 wait_update 函数中完成所有异步任务的调度执行. 每当用户程序执行 api.wait_update 函数时, 会调度所有 task 运行, 直到收到新数据包或超时 wait_update函数返回, 继续执行后续用户代码

用户代码
用户代码
api.wait_update
api.wait_update
用户代码
用户代码
api.wait_update
api.wait_update
websocket client
发送 task
websocket client<br>发送 task
websocket client
接收 task
websocket client<br>接收 task
用户创建的其它task
用户创建的其它task
TqSdk创建的tqsdk
TqSdk创建的tqsdk
用户代码
用户代码
api.wait_update
api.wait_update
用户代码
用户代码
api.wait_update
api.wait_update