Skip to content

pseudocodes/open-md-gateway-go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CTP Market Data Gateway - Go Implementation

Go Version License

QuantAxis CTP 市场行情网关 的 go 实现,和该项目一样,本项目也是基于该项目的 Vibe Coding 作品 提供高性能期货行情数据网关的 Go 语言实现,支持 90+ 期货公司的 CTP 连接,提供 WebSocket 实时行情推送服务。

📋 目录

✨ 特性

  • 多 CTP 连接支持: 同时管理 90+ 期货公司的 CTP 连接
  • 智能负载均衡: 支持轮询、最少连接、连接质量和哈希等多种负载均衡策略
  • 高性能: <5ms p99 延迟,支持 10,000+ 行情更新/秒
  • 高并发: 支持 1,000+ 并发 WebSocket 连接
  • 自动故障转移: CTP 连接失败时自动迁移订阅到健康连接
  • 双协议支持: 同时支持 QIFI 标准的 aid 协议和传统 action 协议
  • Redis 集成: 自动存储最新行情和历史 tick 数据
  • 健康检查: 内置健康检查端点和统计信息
  • 优雅关闭: 支持信号处理和资源清理

🏗️ 架构

系统架构图

┌─────────────────────────────────────────────────────────────┐
│                     Go CTP Gateway                          │
│                                                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │
│  │   Main       │  │   Config     │  │   Logger     │       │
│  │   Process    │──│   Manager    │  │   (zap)      │       │
│  └──────────────┘  └──────────────┘  └──────────────┘       │
│         │                                                   │
│         ├─────────────┬──────────────┬─────────────┐        │
│         │             │              │             │        │
│  ┌──────▼──────┐ ┌────▼─────┐ ┌──────▼──────┐ ┌────▼────┐   │
│  │  WebSocket  │ │   CTP    │ │Subscription │ │ Redis   │   │
│  │   Server    │ │Connection│ │ Dispatcher  │ │ Client  │   │
│  │             │ │  Manager │ │             │ │         │   │
│  └──────┬──────┘ └────┬─────┘ └──────┬──────┘ └────┬────┘   │
│         │             │              │             │        │
└─────────┼─────────────┼──────────────┼─────────────┼────────┘
          │             │              │             │
     WebSocket      CTP API         Channels &     Redis DB
     Clients        90+ Brokers     Mutexes

核心组件

  • CTP Connection Manager: 管理多个 CTP 连接的生命周期
  • Subscription Dispatcher: 智能订阅分发和负载均衡
  • WebSocket Server: 处理客户端连接和消息路由
  • Market Data Processor: 行情数据转换和差异计算
  • Redis Client: 数据持久化和历史查询
  • Health Checker: 监控系统健康状态

🚀 快速开始

前置要求

  • Go 1.21 或更高版本
  • Redis 服务器(可选,用于数据持久化)
  • CTP 库文件(已包含在 libs/ 目录)

安装

# 克隆仓库
git clone <repository-url>
cd omg

# 下载依赖
make deps

# 检查依赖
make check-deps

构建

# 构建应用
make build

# 构建输出位于 bin/qactpmdgateway

运行

# 使用 SimNow 测试配置运行
make run

# 使用多 CTP 配置运行
make run-multi

# 或直接运行二进制文件
./bin/gateway --config config/multi_ctp_config.json

测试

# 运行所有测试
make test

# 运行基准测试
make bench

# 生成测试覆盖率报告
make coverage

⚙️ 配置说明

配置文件结构

配置文件使用 JSON 格式,示例:

{
  "websocket_port": 7799,
  "redis_host": "127.0.0.1",
  "redis_port": 6379,
  "load_balance_strategy": "connection_quality",
  "health_check_interval": 30,
  "maintenance_interval": 60,
  "max_retry_count": 3,
  "auto_failover": true,
  "connections": [
    {
      "connection_id": "simnow_01",
      "front_addr": "tcp://182.254.243.31:30011",
      "broker_id": "9999",
      "max_subscriptions": 500,
      "priority": 1,
      "enabled": true
    }
  ]
}

配置参数说明

全局配置

参数 类型 默认值 说明
websocket_port int 7799 WebSocket 服务器监听端口
redis_host string "127.0.0.1" Redis 服务器地址
redis_port int 6379 Redis 服务器端口
load_balance_strategy string "connection_quality" 负载均衡策略
health_check_interval int 30 健康检查间隔(秒)
maintenance_interval int 60 维护任务间隔(秒)
max_retry_count int 3 最大重试次数
auto_failover bool true 是否启用自动故障转移

负载均衡策略

  • round_robin: 轮询选择连接
  • least_connections: 选择订阅数最少的连接
  • connection_quality: 选择质量分数最高的连接(推荐)
  • hash_based: 基于合约 ID 哈希选择连接

连接配置

参数 类型 说明
connection_id string 连接唯一标识符
front_addr string CTP 前置机地址
broker_id string 期货公司代码
max_subscriptions int 最大订阅数限制
priority int 连接优先级(1-10)
enabled bool 是否启用此连接

环境变量

支持通过环境变量覆盖配置:

export QACTPMD_WEBSOCKET_PORT=8080
export QACTPMD_REDIS_HOST=redis.example.com
export QACTPMD_REDIS_PORT=6380

📡 API 文档

WebSocket 连接

连接到 WebSocket 服务器:

ws://localhost:7799/

aid 协议(QIFI 标准)

订阅行情

{
  "aid": "subscribe_quote",
  "ins_list": "rb2512,cu2512,au2512"
}

响应:

{
  "aid": "subscribe_quote",
  "status": "ok"
}

长轮询获取行情

{
  "aid": "peek_message"
}

响应(仅返回变化的字段):

{
  "aid": "rtn_data",
  "data": [
    {
      "quotes": {
        "rb2501": {
          "last_price": 4180.0,
          "volume": 123456,
          "timestamp": 1699600000000
        }
      }
    },
    {
      "account_id": "",
      "ins_list": "",
      "mdhis_more_data": false
    }
  ]
}

action 协议(传统兼容)

订阅行情

{
  "action": "subscribe",
  "instruments": ["rb2501", "cu2501"]
}

取消订阅

{
  "action": "unsubscribe",
  "instruments": ["rb2501"]
}

查询已订阅合约

{
  "action": "list_instruments"
}

响应:

{
  "action": "list_instruments",
  "instruments": ["rb2501", "cu2501", "au2506"]
}

行情推送

服务器主动推送(无需 peek_message):

{
  "action": "market_data",
  "instrument_id": "rb2501",
  "data": {
    "instrument_id": "rb2501",
    "trading_day": "20231110",
    "update_time": "09:30:00",
    "update_millisec": 500,
    "last_price": 4180.0,
    "volume": 123456,
    "open_interest": 234567.0,
    "bid_price1": 4179.0,
    "bid_volume1": 100,
    "ask_price1": 4181.0,
    "ask_volume1": 150,
    "timestamp": 1699600000500,
    "datetime": "2023-11-10 09:30:00.500"
  }
}

行情数据字段说明

字段 类型 说明
instrument_id string 合约代码
trading_day string 交易日
update_time string 更新时间
update_millisec int 更新毫秒
last_price float64 最新价
volume int 成交量
turnover float64 成交额
open_interest float64 持仓量
bid_price1 float64 买一价
bid_volume1 int 买一量
ask_price1 float64 卖一价
ask_volume1 int 卖一量
open_price float64 开盘价
highest_price float64 最高价
lowest_price float64 最低价
close_price float64 收盘价
upper_limit_price float64 涨停价
lower_limit_price float64 跌停价
timestamp int64 时间戳(毫秒)
datetime string 日期时间字符串

健康检查端点

curl http://localhost:7799/health

响应:

{
  "status": "healthy",
  "timestamp": "2023-11-10T09:30:00Z",
  "connections": [
    {
      "id": "simnow_01",
      "status": "logged_in",
      "quality": 95,
      "subscriptions": 150
    }
  ],
  "websocket": {
    "active_sessions": 25,
    "total_messages": 123456
  },
  "redis": {
    "connected": true,
    "latency": "1.2ms"
  }
}

🐳 部署指南

Docker 部署

构建镜像

# 构建 Docker 镜像
make docker-build

# 或指定版本
make docker-build VERSION=v1.0.0

运行容器

# 运行容器
make docker-run

# 或使用 docker 命令
docker run -d \
  --name qactpmd-gateway \
  -p 7799:7799 \
  -v $(pwd)/config:/app/config \
  -v $(pwd)/ctpflow:/app/ctpflow \
  -v $(pwd)/logs:/app/logs \
  --restart unless-stopped \
  qactpmdgateway-go:latest

查看日志

# 使用 Makefile
make docker-logs

# 或使用 docker 命令
docker logs -f qactpmd-gateway

停止容器

make docker-stop

Docker Compose 部署

创建 docker-compose.yml

version: '3.8'

services:
  gateway:
    image: qactpmdgateway-go:latest
    container_name: qactpmd-gateway
    ports:
      - "7799:7799"
    volumes:
      - ./config:/app/config
      - ./ctpflow:/app/ctpflow
      - ./logs:/app/logs
    environment:
      - TZ=Asia/Shanghai
    restart: unless-stopped
    depends_on:
      - redis

  redis:
    image: redis:7-alpine
    container_name: qactpmd-redis
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    restart: unless-stopped

volumes:
  redis-data:

启动服务:

docker-compose up -d

生产环境部署建议

  1. 资源配置

    • CPU: 4 核心或更多
    • 内存: 8GB 或更多
    • 网络: 低延迟网络连接
  2. 监控和日志

    • 配置日志级别为 infowarn
    • 使用日志聚合工具(如 ELK、Loki)
    • 监控健康检查端点
  3. 高可用性

    • 部署多个实例
    • 使用负载均衡器
    • 配置自动重启策略
  4. 安全性

    • 使用防火墙限制访问
    • 配置 TLS/SSL(如需要)
    • 定期更新依赖和镜像

💻 开发指南

项目结构

omg/
├── cmd/
│   └── gateway/
│       └── main.go              # 应用程序入口点
├── internal/
│   ├── config/
│   │   └── config.go           # 配置管理
│   ├── ctp/
│   │   ├── api.go              # CTP API 包装器
│   │   ├── connection.go       # 单个连接管理
│   │   ├── manager.go          # 多连接管理器
│   │   ├── types.go            # 类型定义
│   │   └── errors.go           # 错误处理
│   ├── websocket/
│   │   ├── server.go           # WebSocket 服务器
│   │   ├── session.go          # 会话管理
│   │   ├── protocol_aid.go     # aid 协议处理
│   │   ├── protocol_action.go  # action 协议处理
│   │   └── broadcast.go        # 广播逻辑
│   ├── dispatcher/
│   │   ├── dispatcher.go       # 订阅分发器
│   │   ├── loadbalancer.go     # 负载均衡
│   │   └── subscription.go     # 订阅管理
│   ├── redis/
│   │   ├── client.go           # Redis 客户端
│   │   ├── storage.go          # 存储逻辑
│   │   └── batch.go            # 批量操作
│   ├── market/
│   │   ├── data.go             # 数据结构
│   │   ├── converter.go        # 数据转换
│   │   ├── diff.go             # 差异计算
│   │   └── zerocopy.go         # 零拷贝优化
│   ├── health/
│   │   └── checker.go          # 健康检查
│   ├── stats/
│   │   └── stats.go            # 统计系统
│   └── worker/
│       └── pool.go             # Goroutine 池
├── pkg/
│   └── logger/
│       └── logger.go           # 日志包装器
├── config/                      # 配置文件
├── libs/                        # CTP 库文件
├── go.mod                       # Go 模块定义
├── go.sum                       # 依赖锁定
├── Makefile                     # 构建脚本
├── Dockerfile                   # Docker 镜像定义
└── README.md                    # 本文档

开发工作流

  1. 创建功能分支

    git checkout -b feature/your-feature
  2. 编写代码

    • 遵循 Go 代码规范
    • 添加必要的注释
    • 编写单元测试
  3. 格式化和检查

    make fmt
    make lint
  4. 运行测试

    make test
  5. 提交代码

    git add .
    git commit -m "feat: add your feature"
    git push origin feature/your-feature

代码规范

  • 使用 gofmt 格式化代码
  • 使用 golangci-lint 进行代码检查
  • 遵循 Effective Go 指南
  • 为导出的函数和类型添加文档注释
  • 保持函数简短和单一职责

测试指南

# 运行所有测试
make test

# 运行特定包的测试
go test -v ./internal/market/...

# 运行基准测试
make bench

# 生成覆盖率报告
make coverage

📊 性能指标

基准测试结果

  • 行情数据处理延迟: <5ms (p99)
  • 并发 WebSocket 连接: 1,000+
  • 行情更新吞吐量: 10,000+ 更新/秒
  • 内存使用: ~200MB(1000 连接)
  • CPU 使用: ~50%(4 核心,满负载)

性能优化

  • Goroutine 池限制并发
  • 对象池减少内存分配
  • 零拷贝 JSON 编码
  • 批量 Redis 操作
  • 连接复用和长连接

🔧 故障排除

常见问题

1. CTP 连接失败

症状: 日志显示 "Failed to connect to CTP"

解决方案:

  • 检查网络连接
  • 验证 front_addr 配置是否正确
  • 确认 CTP 库文件存在且可执行
  • 检查防火墙设置

2. WebSocket 连接断开

症状: 客户端频繁断开连接

解决方案:

  • 检查网络稳定性
  • 增加心跳间隔
  • 检查服务器资源使用情况
  • 查看错误日志

3. Redis 连接失败

症状: 日志显示 "Redis connection failed"

解决方案:

  • 确认 Redis 服务器正在运行
  • 检查 redis_hostredis_port 配置
  • 验证网络连接
  • 检查 Redis 认证配置

4. 内存使用过高

症状: 内存持续增长

解决方案:

  • 检查 goroutine 泄漏
  • 使用 pprof 分析内存使用
  • 调整 goroutine 池大小
  • 检查订阅清理逻辑

调试技巧

启用调试日志

修改配置或设置环境变量:

export LOG_LEVEL=debug
./bin/gateway --config config/multi_ctp_config.json

使用 pprof 分析性能

# 启动应用时启用 pprof
go run cmd/gateway/main.go --config config/multi_ctp_config.json --pprof

# 在另一个终端中分析
go tool pprof http://localhost:6060/debug/pprof/heap
go tool pprof http://localhost:6060/debug/pprof/goroutine

查看统计信息

# 使用 --status 标志查看连接状态
./bin/gateway --config config/multi_ctp_config.json --status

📝 更新日志

v1.0.0 (2025-11-10)

  • ✅ 完整的 CTP 连接管理
  • ✅ WebSocket 服务器实现
  • ✅ aid 和 action 双协议支持
  • ✅ 智能订阅分发和负载均衡
  • ✅ Redis 数据持久化
  • ✅ 健康检查和监控
  • ✅ Docker 部署支持
  • ✅ 完整的文档和示例

🤝 贡献

欢迎贡献代码、报告问题或提出建议!

📄 许可证

本项目采用 MIT 许可证。详见 LICENSE 文件。

Reference


QuantAxis - 让量化交易更简单

About

兼容 diff 的多交易所行情网关 go 版本

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published