🚧 开发状态: v0.0.1 迭代中 | 当前版本覆盖 REST/gRPC/WebSocket 多协议,默认用于本地联调与策略验证,请勿直接用于生产环境
基于 FastAPI + gRPC + WebSocket 的 xtquant 量化交易代理服务
提供 RESTful API、gRPC 与 WebSocket 多协议接口,封装国金 QMT xtquant SDK 的数据和交易功能
- 🌐 REST API: 基于 FastAPI,提供 HTTP/HTTPS 接口,自动生成 Swagger 文档
- ⚡ gRPC: 高性能 RPC 框架,支持流式调用和双向通信
- 🔔 WebSocket: 提供行情订阅实时推送,内置心跳与限流控制
- 🔄 统一服务: 两种协议共享相同的业务逻辑层,一次部署同时服务
- 🔐 API Key 认证: 多环境 API Key 管理
- 🚦 交易拦截: dev 模式自动拦截真实交易,保护账户安全
- 📝 完整日志: 基于 Loguru 的结构化日志,支持日志轮转和压缩
- 🔒 异常保护: 全局异常处理,xtdata 连接超时保护
- 📈 市场数据: K线、分时、tick、财务数据、板块数据、行情订阅
- 💼 交易功能: 下单、撤单、持仓查询、订单管理
- ❤️ 健康检查: REST 和 gRPC 双协议健康检查
- 🎯 三种模式: mock/dev/prod 灵活切换
- 📋 统一配置: config.yml 集中管理所有配置
- 🔄 环境切换: 通过 APP_MODE 环境变量一键切换模式
- 🎨 单例模式: 配置和服务单例加载,避免重复初始化
quant-qmt-proxy/
├── app/
│ ├── main.py # FastAPI 应用入口
│ ├── grpc_server.py # gRPC 服务器入口
│ ├── grpc_client.py # gRPC 客户端封装
│ ├── config.py # 配置管理(单例)
│ ├── dependencies.py # 依赖注入(单例服务)
│ ├── models/ # Pydantic 数据模型
│ │ ├── data_models.py # 数据相关模型
│ │ └── trading_models.py # 交易相关模型
│ ├── routers/ # REST API 路由
│ │ ├── data.py # 数据服务 API
│ │ ├── trading.py # 交易服务 API
│ │ ├── health.py # 健康检查 API
│ │ └── websocket.py # WebSocket 行情推送
│ ├── grpc_services/ # gRPC 服务实现
│ │ ├── data_grpc_service.py # 数据服务 gRPC
│ │ ├── trading_grpc_service.py # 交易服务 gRPC
│ │ └── health_grpc_service.py # 健康检查 gRPC
│ ├── services/ # 业务服务层(REST 和 gRPC 共享)
│ │ ├── data_service.py # 数据服务(xtdata 封装)
│ │ ├── trading_service.py # 交易服务(xttrader 封装)
│ │ └── subscription_manager.py # 行情订阅管理器
│ └── utils/ # 工具函数
│ ├── exceptions.py # 自定义异常
│ ├── helpers.py # 辅助函数
│ └── logger.py # 日志配置
├── proto/ # Protocol Buffers 定义
│ ├── common.proto # 公共消息定义
│ ├── data.proto # 数据服务定义
│ ├── trading.proto # 交易服务定义
│ └── health.proto # 健康检查定义
├── generated/ # protobuf 生成的 Python 代码
│ ├── data_pb2.py # 数据服务消息
│ ├── data_pb2_grpc.py # 数据服务 stub
│ ├── trading_pb2.py # 交易服务消息
│ ├── trading_pb2_grpc.py # 交易服务 stub
│ └── ...
├── tests/ # 测试套件
│ ├── rest/ # REST API 测试
│ │ ├── test_data_api.py
│ │ ├── test_trading_api.py
│ │ └── test_health_api.py
│ └── grpc/ # gRPC 测试
│ ├── test_data_grpc_service.py
│ ├── test_trading_grpc_service.py
│ └── test_health_grpc_service.py
├── xtquant/ # xtquant SDK(国金 QMT)请自行下载
├── scripts/ # 工具脚本
│ └── generate_proto.py # protobuf 代码生成脚本
├── logs/ # 日志文件目录
├── config.yml # 统一配置文件
├── requirements.txt # Python 依赖
├── run.py # 启动脚本(同时启动 REST + gRPC)
└── README.md # 项目说明
核心修复与增强:
- ✅ 修复 gRPC 行情订阅: 解决 asyncio.Queue 在 gRPC 线程中无法创建的问题,实现惰性队列初始化
- ✅ 复权参数透传: 实现
adjust_type(前复权/后复权) 正确传递到 xtdata API - ✅ 空标的校验: 添加多层验证(Pydantic + 业务逻辑 + gRPC),禁止创建空股票列表订阅
- ✅ 事件循环管理: gRPC 服务自动检测并创建事件循环,确保订阅管理器正常运行
- ✅ 测试覆盖增强: gRPC 订阅测试全部通过 (7 passed, 1 skipped)
测试结果:
tests/grpc/test_subscription_grpc.py::TestSubscriptionGrpc
✅ test_subscribe_quote_mock_mode - 基本订阅功能
✅ test_unsubscribe_quote - 取消订阅
✅ test_get_subscription_info - 获取订阅信息
✅ test_subscribe_with_adjust_type - 复权类型订阅
✅ test_subscribe_with_empty_symbols - 空标的校验
7 passed, 1 skipped in 1.35s
- Python 3.10+
- 国金 QMT 客户端(dev/prod 模式需要)
- Windows 系统(QMT 仅支持 Windows)
pip install -r requirements.txt建议先使用
python -m venv .venv创建虚拟环境并激活,再安装依赖;根据实际凭证将env.example复制为.env后补全机密配置。
编辑 config.yml,修改 QMT 安装路径:
xtquant:
qmt_userdata_path: "C:/quant/国金QMT交易端模拟/userdata_mini"服务默认同时启动 REST API (端口 8000) 和 gRPC (端口 50051)。
# mock 模式 - 不连接 QMT,使用模拟数据(无需 QMT)
$env:APP_MODE="mock"; python run.py
# dev 模式 - 连接 QMT,获取真实数据,禁止交易(默认,推荐开发使用)
$env:APP_MODE="dev"; python run.py
# prod 模式 - 连接 QMT,获取真实数据,允许交易(生产环境)
$env:APP_MODE="prod"; python run.py启动后可访问:
| 服务 | 地址 | 说明 |
|---|---|---|
| REST API | http://localhost:8000 | RESTful API 主入口 |
| gRPC | localhost:50051 | gRPC 服务端口 |
| Swagger UI | http://localhost:8000/docs | 交互式 API 文档 |
| ReDoc | http://localhost:8000/redoc | API 文档(阅读友好) |
| 健康检查 | http://localhost:8000/health/ | 服务健康状态 |
| WebSocket 测试页 | http://localhost:8000/ws/test | 行情推送调试页面 |
# REST API 测试
pytest tests/rest/ -v
# gRPC 测试
pytest tests/grpc/ -v
# 所有测试
pytest tests/ -v项目支持三种运行模式,通过环境变量 APP_MODE 切换:
| 模式 | 连接 xtquant | 真实交易 | 使用场景 |
|---|---|---|---|
| mock | ❌ 否 | ❌ 禁止 | 开发测试,无需 QMT 客户端 |
| dev | ✅ 是 | ❌ 禁止 | 开发调试,获取真实数据但不下单 |
| prod | ✅ 是 | ✅ 允许 | 生产环境,真实交易 |
- ✅ 不需要运行 QMT 客户端
- ✅ 使用模拟数据响应 API 请求
- ✅ 适合前端开发、接口测试
- 📝 订单 ID 前缀:
mock_order_*
- ✅ 需要运行 QMT 客户端
- ✅ 获取真实的市场数据、财务数据
- ✅ 交易请求被拦截,返回模拟订单
- ✅ 适合策略开发、回测验证
- 📝 订单 ID 前缀:
mock_order_* - 🔒 日志记录所有拦截操作
- ✅ 需要运行 QMT 客户端
- ✅ 获取真实数据
⚠️ 允许真实交易下单⚠️ 适合生产环境、实盘交易- 📝 订单 ID: xttrader 返回的真实订单号
POST /api/v1/data/market- 获取市场行情数据POST /api/v1/data/financial- 获取财务数据GET /api/v1/data/sectors- 获取板块列表POST /api/v1/data/sector- 获取板块成分股POST /api/v1/data/index-weight- 获取指数权重GET /api/v1/data/trading-calendar/{year}- 获取交易日历GET /api/v1/data/instrument/{stock_code}- 获取合约信息GET /api/v1/data/etf/{etf_code}- 获取 ETF 基础信息POST /api/v1/data/subscription- 创建行情订阅(支持前复权/后复权)GET /api/v1/data/subscription/{subscription_id}- 查询订阅详情GET /api/v1/data/subscriptions- 获取订阅列表DELETE /api/v1/data/subscription/{subscription_id}- 取消订阅
POST /api/v1/trading/connect- 连接交易账户POST /api/v1/trading/disconnect/{session_id}- 断开账户GET /api/v1/trading/account/{session_id}- 获取账户信息GET /api/v1/trading/positions/{session_id}- 获取持仓信息POST /api/v1/trading/order/{session_id}- 提交订单POST /api/v1/trading/cancel/{session_id}- 撤销订单GET /api/v1/trading/orders/{session_id}- 获取订单列表GET /api/v1/trading/trades/{session_id}- 获取成交记录GET /api/v1/trading/asset/{session_id}- 获取资产信息GET /api/v1/trading/risk/{session_id}- 获取风险指标GET /api/v1/trading/strategies/{session_id}- 获取策略列表GET /api/v1/trading/status/{session_id}- 查询连接状态
GET /health/- 健康检查GET /health/ready- 就绪检查GET /health/live- 存活检查
GetMarketData()- 获取市场数据GetFinancialData()- 获取财务数据GetSectorList()- 获取板块列表GetStockListInSector()- 获取板块成分股GetIndexWeight()- 获取指数权重GetTradingCalendar()- 获取交易日历GetInstrumentDetail()- 获取合约信息SubscribeQuote()- 订阅行情(流式推送,支持复权)SubscribeWholeQuote()- 订阅全推行情(流式推送)UnsubscribeQuote()- 取消订阅GetSubscriptionInfo()- 获取订阅详情ListSubscriptions()- 列出所有订阅
Connect()- 连接交易账户Disconnect()- 断开连接GetAccountInfo()- 获取账户信息GetPositions()- 获取持仓SubmitOrder()- 提交订单CancelOrder()- 撤销订单GetOrders()- 查询订单GetTrades()- 查询成交GetAsset()- 查询资产GetRiskInfo()- 查询风险GetStrategies()- 查询策略列表
Check()- 健康检查Watch()- 健康状态订阅(流式)
GET /ws/quote/{subscription_id}- 行情订阅推送,支持ping/pong心跳GET /ws/test- 内置测试页面,可浏览器直接调试订阅
✨ 行情订阅特性:
- 支持多股票同时订阅
- 支持复权类型选择(none/front/back)
- 自动队列管理,防止内存溢出
- gRPC 流式推送,低延迟高吞吐
- WebSocket 实时推送,内置心跳与重连
- FastAPI: 现代高性能 Web 框架
- gRPC: 高性能 RPC 框架
- Protocol Buffers: 数据序列化协议
- Pydantic: 数据验证和序列化
- uvicorn: ASGI 服务器
- Loguru: 结构化日志库
- xtquant: 国金 QMT Python SDK
- ✅ 依赖注入: 使用 FastAPI 的 Depends 系统
- ✅ 单例模式: 服务实例全局唯一,避免重复初始化
- ✅ 策略模式: 不同模式下的不同行为
- ✅ 拦截器模式: 交易请求拦截保护
- ✅ 适配器模式: REST 和 gRPC 共享业务逻辑
- 🎯 分层架构: Router → Service → SDK,职责清晰
- 🔄 代码复用: REST 和 gRPC 共享同一套业务服务
- ⚡ 高性能: gRPC 二进制传输,FastAPI 异步处理,惰性队列初始化
- 🛡️ 可靠性: 完整的异常处理、多层数据校验、事件循环自动管理
- 📈 可扩展: 易于添加新接口和功能
- 🧵 线程安全: gRPC 订阅支持多线程环境,自动处理事件循环
- ✅ 健康检查测试
- ✅ 数据服务测试(市场数据、财务数据、板块数据)
- ✅ 交易服务测试(下单、撤单、持仓查询)
- ✅ 行情订阅测试(订阅创建、查询、取消、空标的校验)
- ✅ 健康检查服务测试(Check、Watch)
- ✅ 数据服务测试(14 个接口)
- ✅ 交易服务测试(8 个接口)
- ✅ 行情订阅测试(订阅、取消、复权、空标的、流式推送)
- ✅ 订阅管理器单元测试(初始化、多订阅、流式数据)
- 🎯 使用 pytest 框架
- 🔄 支持 mock/dev/prod 三种模式测试
- 📊 详细的测试报告和日志
- ✅ gRPC 订阅测试 7 passed, 1 skipped
- ✅ 订阅管理器测试 3 passed, 1 skipped
- 🚀 CI/CD 集成就绪
💡 接口覆盖率: 20/125+ ≈ 16% | ✨ 最新更新: gRPC 行情订阅完整实现
- ✅ 市场数据获取(K线、分时、tick)
- ✅ 财务数据查询
- ✅ 板块数据管理
- ✅ 指数权重查询
- ✅ 交易日历查询
- ✅ 合约信息查询
- ✅ ETF 信息查询(占位)
- ✅ 行情订阅管理(REST API 完整实现)
- ✅ gRPC 行情订阅(流式推送,支持复权)
- ✅ WebSocket 行情推送(实时数据,心跳保活)
- ✅ 订阅队列管理(惰性初始化,防溢出)
- ✅ 空标的校验(多层验证)
- ✅ 复权参数透传(前复权/后复权)
- ✅ 事件循环自动管理(gRPC 线程安全)
- ✅ 账户连接管理
- ✅ 下单/撤单
- ✅ 持仓查询
- ✅ 订单查询
- ✅ 交易模式拦截
- ✅ 资产/风险/策略查询(mock 数据)
- ✅ 配置管理(单例模式)
- ✅ 日志系统(Loguru,结构化日志)
- ✅ 健康检查(REST + gRPC)
- ✅ API 认证(API Key)
- ✅ 异常处理(统一错误码映射)
- ✅ 行情订阅管理(SubscriptionManager)
- ✅ WebSocket 推送(心跳保活,限流控制)
- ✅ 事件循环管理(gRPC 线程安全)
- ✅ 队列管理(惰性初始化,自动溢出控制)
- 🔄 L2 行情数据接口(Level2 逐笔数据)
- ❌ 资产查询接口(真实数据)
- ❌ 成交查询接口(真实数据)
- ❌ 异步下单/撤单
- ❌ 交易回调推送(WebSocket)
- ❌ 历史数据下载管理
- ❌ 财务数据下载管理
- ❌ 新股申购功能
- ❌ 行情订阅性能优化(批量订阅)
- ❌ 信用交易(融资融券)
- ❌ 资金管理(银证转账)
- ❌ 约券功能
- ❌ 板块管理
当前进度: 数据模块 14/50+,交易模块 6/60+,系统模块核心功能完成
app:
name: "xtquant-proxy"
version: "1.0.0"
# gRPC 配置
grpc:
enabled: true
host: "0.0.0.0"
port: 50051
max_workers: 10
# 日志配置
logging:
file: "logs/app.log"
error_file: "logs/error.log"
rotation: "10 MB"
retention: "30 days"
compression: "zip"
# xtquant 配置
xtquant:
qmt_userdata_path: "C:/quant/国金QMT交易端模拟/userdata_mini"
# 运行模式配置
modes:
mock:
connect_xtquant: false
allow_real_trading: false
dev:
connect_xtquant: true
allow_real_trading: false # 🔒 拦截交易
prod:
connect_xtquant: true
allow_real_trading: true # ⚠️ 允许真实交易- 使用 mock 模式 进行前端开发和接口测试
- 使用 dev 模式 连接真实 QMT 进行策略开发
- 充分测试后切换到 prod 模式 进行实盘交易
默认使用 API Key 认证,请求头格式:
X-API-Key: your-api-key
dev 模式可用的 API Key(在 config.yml 中配置):
dev-api-key-001dev-api-key-002
# 查看应用日志
Get-Content logs/app.log -Wait -Tail 50
# 查看错误日志
Get-Content logs/error.log -Wait -Tail 50- 在
app/services/中添加新的业务服务 - 在
app/models/中定义 Pydantic 数据模型 - 在
app/routers/中添加 REST API 路由 - 在
proto/中定义 gRPC 服务和消息 - 运行
python scripts/generate_proto.py生成代码 - 在
app/grpc_services/中实现 gRPC 服务 - 在
tests/中添加测试用例
⚠️ 生产环境必须修改默认 API Key⚠️ prod 模式会真实下单,请谨慎使用⚠️ 建议使用 HTTPS 和更严格的 CORS 配置⚠️ 不要将包含真实账号密码的配置文件提交到 Git
- xtquant 仅支持 Windows 系统
- 需要 QMT 客户端正在运行(dev/prod 模式)
- xtdata 连接有 5 秒超时限制
- 连接失败会自动降级到模拟模式
- gRPC 最大消息大小限制为 50MB
- 订阅队列默认最大长度 1000,超出会丢弃旧数据
- 单实例最大订阅数默认限制为 100
- ✅ 使用单例模式避免重复初始化
- ✅ xtdata 连接成功后会复用
- ✅ gRPC 使用连接池提升性能
- ✅ 订阅队列惰性初始化,减少内存占用
- ✅ 队列满时自动丢弃旧数据,防止内存溢出
- 📋 考虑添加 Redis 缓存高频查询数据(可选)
- 📋 考虑添加数据库持久化订阅配置(可选)
- 📋 批量订阅优化,减少 xtdata 调用次数(规划中)
- 检查 Python 版本 >= 3.10
- 确认所有依赖已安装:
pip install -r requirements.txt - 检查端口 8000 和 50051 是否被占用
- 查看启动日志:
logs/app.log
- 确认 QMT 客户端正在运行
- 检查
config.yml中的 QMT 路径是否正确 - 查看错误日志:
logs/error.log - 尝试重启 QMT 客户端
- 检查请求头是否包含
X-API-Key: xxx - 确认 API Key 在 config.yml 的当前模式配置中
- 检查 API Key 是否正确
- 确认 gRPC 服务已启动(查看启动日志)
- 检查端口 50051 是否被占用
- 确认客户端使用正确的地址:
localhost:50051 - 检查防火墙设置
- 使用 REST 接口确认订阅状态为
active - 检查 WebSocket 是否按需发送
ping保持心跳 - mock 模式下推送为模拟数据,若需真实行情请使用 dev/prod 模式
- 查看
logs/app.log是否有订阅队列过载或 xtdata 异常 - 检查股票代码列表是否为空(已添加多层校验)
- 确认使用 mock 模式测试:
$env:APP_MODE="mock" - 检查是否传入空股票列表(会返回 INVALID_ARGUMENT)
- 查看是否有 RuntimeError 关于 asyncio.Queue(已修复)
- 检查复权参数是否正确: none/front/back
- 这是正常行为!dev 模式会拦截所有交易请求
- 如需真实交易,切换到 prod 模式
- 检查订单 ID 前缀区分模拟/真实订单
欢迎提交 Issue 和 Pull Request!
# 克隆仓库
git clone https://github.com/liqimore/quant-qmt-proxy.git
cd quant-qmt-proxy
# 安装依赖
pip install -r requirements.txt
# 生成 protobuf 代码
python scripts/generate_proto.py
# 运行测试
pytest tests/ -vfeat: 新功能fix: 修复 bugdocs: 文档更新refactor: 代码重构test: 测试相关chore: 构建/工具相关
MIT License - 详见 LICENSE 文件
如果觉得项目有帮助,请给个 ⭐ Star 支持一下!
Made with ❤️ by liqimore