Skip to content

Commit

Permalink
other: v2 server 慢消息检测及追踪
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed May 7, 2024
1 parent 4c6af40 commit 7e12b58
Show file tree
Hide file tree
Showing 15 changed files with 240 additions and 102 deletions.
44 changes: 15 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,23 @@ Minotaur 是一个用于服务端开发的支持库,其中采用了大量泛
![qq-group](https://img.shields.io/badge/QQ%20Group-758219443-green.svg?style=flat&logo=tencent-qq&link=https://qm.qq.com/cgi-bin/qm/qr?k=WzRWJIDLzuJbH6-VjdFiTCd1_qA_Ug-D&jump_from=webapi&authKey=ktLEw3XyY9yO+i9rPbI6Fk0UA0uEhACcUidOFdblaiToZtbHcXyU7sFb31FEc9JJ&noverify=0)
![telegram](https://img.shields.io/badge/Telegram-ziv__siren-green.svg?style=flat&logo=telegram&link=https://telegram.me/ziv_siren)

> - 这是支持快速搭建多功能游戏服务器及 HTTP 服务器的 `Golang` 服务端框架;
> - 网络传输基于 [`gorilla/websocket`](https://github.com/gorilla/websocket)[`gin-gonic/gin`](https://github.com/gin-gonic/gin)[`grpc/grpc-go`](https://github.com/grpc/grpc-go)[`panjf2000/gnet`](https://github.com/panjf2000/gnet)[`xtaci/kcp-go`](https://github.com/xtaci/kcp-go) 构建;
> - 这是支持快速搭建多功能服务器的 `Golang` 服务端框架,支持 WebSocket、TCP、UDP、KCP、HTTP 等常见网络协议
> - 网络传输基于 [`panjf2000/gnet`](https://github.com/panjf2000/gnet)[`xtaci/kcp-go`](https://github.com/xtaci/kcp-go) 构建;
> - 该项目的目标是提供一个简单、高效、可扩展的游戏服务器框架,让开发者可以专注于游戏逻辑的开发,而不用花费大量时间在网络传输、配置导表、日志、监控等基础功能的开发上;
***
在 Minotaur 中不包括任何跨服实现,但支持基于多级路由器快速实现跨服功能。推荐使用 [`NATS.io`](https://nats.io/) 作为跨服消息中间件。
- 目前已实践的弹幕游戏项目以 `NATS.io` 作为消息队列,实现了跨服、埋点日志收集等功能,部署在 `Kubernetes` 集群中;
- 该项目客户端与服务端采用 `WebSocket` 进行通讯,服务端暴露 `HTTP` 接口接收互动数据消息回调,通过负载均衡器进入 `Kubernetes` 集群中的 `Minotaur` 服务,最终通过 `NATS.io` 消息队列转发至对应所在的 `Pod` 中进行处理;

<details>
<summary>关于 Pod 配置参数及非极限压测数据</summary>

> 本次压测 `Pod` 扩容数量为 1,但由于压测连接是最开始就建立好的,所以该扩容的 `Pod` 并没有接受到压力。
> 理论上来说该 `Pod` 也应该接受 `HTTP` 回调压力,实测过程中,这个扩容的 `Pod` 没有接受到任何压力
**Pod 配置参数**

![pod](.github/images/pod.png)

**压测结果**

![压测数据](.github/images/yc1.png)
![压测数据](.github/images/yc2.png)

**监控数据**

![事件](./.github/images/yc-event.png)
![CPU](./.github/images/yc-cpu.png)
![内存](./.github/images/yc-memory.png)

</details>
#### v0.5.X 之后的版本为重构后的版本,重构后的版本主要有以下特点:
- 移除了大量的无用或不合理的实现,减少了代码的复杂度;
- 所有设计均优先考虑了泛型的使用,提高了代码的复用性;
- 关于 `server` 包,对于整体进行了重构,具体如下:
- 移除了对于 `gRPC` 的支持及依赖;
- 移除了对于 `gin` 的依赖,默认的 HTTP 服务将采用 `http.ServeMux` 进行处理,支持使用自定义 `http.Handler`
- 关于 `WebSocket` 更改为使用 `gnet``github.com/gobwas/ws` 进行处理;
- 开放 `server.Network` 接口,支持自定义网络协议;
- `pprof` 支持运行时动态开关;
- 基于 `shunt` 分流概念的 `actor` 模型整体重构、解耦,并将名称调整为 `queue`,连接所在队列不再与连接绑定,现在支持直接向特定队列发送消息;
- 对外开放服务器消息驱动模型接口,支持自定义消息驱动模型;
-`utils` 包更名为 `toolkit`,并对其中的大量函数、设计、目录结构进行了调整;
- 内置了基于 `nats.io``rpc` 支持;

***

Expand Down
7 changes: 0 additions & 7 deletions example/internal/basic-template/main.go

This file was deleted.

16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tealeg/xlsx v1.0.5
github.com/tidwall/gjson v1.17.0
github.com/xtaci/kcp-go/v5 v5.6.7
github.com/xtaci/kcp-go/v5 v5.6.8
go.uber.org/atomic v1.11.0
golang.org/x/crypto v0.22.0
golang.org/x/crypto v0.23.0
google.golang.org/grpc v1.60.1
)

Expand All @@ -47,8 +47,8 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/klauspost/reedsolomon v1.12.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/reedsolomon v1.12.1 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand All @@ -75,11 +75,11 @@ require (
go.uber.org/zap v1.27.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
Expand Down
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ib
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/reedsolomon v1.12.0 h1:I5FEp3xSwVCcEh3F5A7dofEfhXdF/bWhQWPH+XwBFno=
github.com/klauspost/reedsolomon v1.12.0/go.mod h1:EPLZJeh4l27pUGC3aXOjheaoh1I9yut7xTURiW3LQ9Y=
github.com/klauspost/reedsolomon v1.12.1 h1:NhWgum1efX1x58daOBGCFWcxtEhOhXKKl1HAPQUp03Q=
github.com/klauspost/reedsolomon v1.12.1/go.mod h1:nEi5Kjb6QqtbofI6s+cbG/j1da11c96IBYBSnVGtuBs=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
Expand Down Expand Up @@ -202,6 +206,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/xtaci/kcp-go/v5 v5.6.7 h1:7+rnxNFIsjEwTXQk4cSZpXM4pO0hqtpwE1UFFoJBffA=
github.com/xtaci/kcp-go/v5 v5.6.7/go.mod h1:oE9j2NVqAkuKO5o8ByKGch3vgVX3BNf8zqP8JiGq0bM=
github.com/xtaci/kcp-go/v5 v5.6.8 h1:jlI/0jAyjoOjT/SaGB58s4bQMJiNS41A2RKzR6TMWeI=
github.com/xtaci/kcp-go/v5 v5.6.8/go.mod h1:oE9j2NVqAkuKO5o8ByKGch3vgVX3BNf8zqP8JiGq0bM=
github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae h1:J0GxkO96kL4WF+AIT3M4mfUVinOCPgf2uUWYFUzN0sM=
github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
Expand Down Expand Up @@ -234,6 +240,8 @@ golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 h1:QfTh0HpN6hlw6D3vu8DAwC8pBIwikq0AI1evdm+FksE=
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
Expand Down Expand Up @@ -261,6 +269,8 @@ golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -294,6 +304,8 @@ golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
Expand All @@ -302,6 +314,7 @@ golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand All @@ -311,6 +324,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
13 changes: 2 additions & 11 deletions rpc/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ func TestClient(t *testing.T) {
panic(err)
}

discovery, err := rpcbuiltin.NewDiscoveryWithNats(conn, js)
if err != nil {
panic(err)
}
var cli = rpc.NewClient(discovery)
var cli = rpc.NewClient(rpcbuiltin.NewDiscoveryWithNats(conn, js))

for {
if err := func() error {
Expand Down Expand Up @@ -76,15 +72,10 @@ func TestApplication(t *testing.T) {
panic(err)
}

registry, err := rpcbuiltin.NewRegistryWithNats(conn, js)
if err != nil {
panic(err)
}

var app = rpc.NewApplication(rpc.Service{
Name: "test-app",
InstanceId: "test-app-1",
}, registry)
}, rpcbuiltin.NewRegistryWithNats(conn, js))
app.Register("account", "login").Unary(func(reader rpc.Reader) any {
var params struct {
Username string `json:"username"`
Expand Down
14 changes: 12 additions & 2 deletions rpc/rpcbuiltin/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ import (
"github.com/nats-io/nats.go"
)

// NewDiscoveryWithNats 创建基于 Nats 的服务发现器
func NewDiscoveryWithNats(conn *nats.Conn, js nats.JetStreamContext, opts ...*NatsDiscoveryOptions) (rpc.Discovery, error) {
// NewDiscoveryWithNatsE 创建基于 Nats 的服务发现器
func NewDiscoveryWithNatsE(conn *nats.Conn, js nats.JetStreamContext, opts ...*NatsDiscoveryOptions) (rpc.Discovery, error) {
var opt = NewNatsDiscoveryOptions().Apply(opts...)
return discovery.NewNats(conn, js, opt.BucketName, opt.BucketDesc, opt.KeyPrefix, opt.TTL)
}

// NewDiscoveryWithNats 创建基于 Nats 的服务注册器,当创建失败时会 panic
func NewDiscoveryWithNats(conn *nats.Conn, js nats.JetStreamContext, opts ...*NatsDiscoveryOptions) rpc.Discovery {
var opt = NewNatsDiscoveryOptions().Apply(opts...)
d, err := discovery.NewNats(conn, js, opt.BucketName, opt.BucketDesc, opt.KeyPrefix, opt.TTL)
if err != nil {
panic(err)
}
return d
}
14 changes: 12 additions & 2 deletions rpc/rpcbuiltin/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ import (
"github.com/nats-io/nats.go"
)

// NewRegistryWithNats 创建基于 Nats 注册中心的注册器
func NewRegistryWithNats(conn *nats.Conn, js nats.JetStreamContext, opts ...*NatsRegistryOptions) (rpc.Registry, error) {
// NewRegistryWithNatsE 创建基于 Nats 注册中心的注册器
func NewRegistryWithNatsE(conn *nats.Conn, js nats.JetStreamContext, opts ...*NatsRegistryOptions) (rpc.Registry, error) {
var opt = NewNatsRegistryOptions().Apply(opts...)
return registry.NewNats(conn, js, opt.BucketName, opt.BucketDesc, opt.KeyPrefix, opt.TTL, opt.KeepAliveInterval)
}

// NewRegistryWithNats 创建基于 Nats 注册中心的注册器,如果出现错误则 panic
func NewRegistryWithNats(conn *nats.Conn, js nats.JetStreamContext, opts ...*NatsRegistryOptions) rpc.Registry {
var opt = NewNatsRegistryOptions().Apply(opts...)
r, err := registry.NewNats(conn, js, opt.BucketName, opt.BucketDesc, opt.KeyPrefix, opt.TTL, opt.KeepAliveInterval)
if err != nil {
panic(err)
}
return r
}
1 change: 1 addition & 0 deletions server/network/gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type gNetCore struct {
func (w *gNetCore) OnSetup(ctx context.Context, controller server.Controller) (err error) {
w.ctx = ctx
w.controller = controller
w.handler.OnInit(w)
return
}

Expand Down
33 changes: 25 additions & 8 deletions server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"github.com/kercylan98/minotaur/toolkit/log"
"github.com/kercylan98/minotaur/toolkit/nexus"
"os"
"runtime"
"sync"
Expand All @@ -25,14 +26,15 @@ func DefaultOptions() *Options {
type Options struct {
server *server
rw sync.RWMutex
messageChannelSize int // 服务器消息处理管道大小
messageBufferInitialSize int // 服务器消息写入缓冲区初始化大小
lifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器
logger *log.Logger // 日志记录器
debug bool // Debug 模式
syncLowMessageDuration time.Duration // 同步慢消息时间
asyncLowMessageDuration time.Duration // 异步慢消息时间
sparseGoroutineNum int // 稀疏 goroutine 数量
messageChannelSize int // 服务器消息处理管道大小
messageBufferInitialSize int // 服务器消息写入缓冲区初始化大小
lifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器
logger *log.Logger // 日志记录器
debug bool // Debug 模式
syncLowMessageDuration time.Duration // 同步慢消息时间
asyncLowMessageDuration time.Duration // 异步慢消息时间
sparseGoroutineNum int // 稀疏 goroutine 数量
eventOptions *nexus.EventOptions // 事件选项
}

func (opt *Options) init(srv *server) *Options {
Expand All @@ -54,6 +56,7 @@ func (opt *Options) Apply(options ...*Options) {
opt.syncLowMessageDuration = option.syncLowMessageDuration
opt.asyncLowMessageDuration = option.asyncLowMessageDuration
opt.sparseGoroutineNum = option.sparseGoroutineNum
opt.eventOptions = option.eventOptions

option.rw.RUnlock()
}
Expand All @@ -66,6 +69,20 @@ func (opt *Options) active() {
opt.server.notify.lifeCycleTime <- opt.GetLifeCycleLimit()
}

// WithEventOptions 设置服务器事件选项
// - 该函数支持运行时设置
func (opt *Options) WithEventOptions(eventOptions *nexus.EventOptions) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
opt.eventOptions = eventOptions
})
}

func (opt *Options) GetEventOptions() *nexus.EventOptions {
return getOptionsValue(opt, func(opt *Options) *nexus.EventOptions {
return opt.eventOptions
})
}

// WithSparseGoroutineNum 设置服务器稀疏 goroutine 数量
// - 该函数在运行时设置无效
func (opt *Options) WithSparseGoroutineNum(num int) *Options {
Expand Down
File renamed without changes.
14 changes: 9 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ func NewServer(network Network, options ...*Options) Server {
srv.events = new(events).init(srv)
srv.state = new(State).init(srv)
srv.Options.init(srv).Apply(options...)
srv.broker = brokers.NewSparseGoroutine(srv.Options.GetSparseGoroutineNum(), func(index int) nexus.Queue[int, string] {
return queues.NewNonBlockingRW[int, string](index, srv.Options.GetServerMessageChannelSize(), srv.Options.GetServerMessageBufferInitialSize())
}, srv.onEventProcess)
srv.broker = brokers.NewSparseGoroutine(
srv.Options.GetSparseGoroutineNum(),
func(index int) nexus.Queue[int, string] {
return queues.NewNonBlockingRW[int, string](index, srv.Options.GetServerMessageChannelSize(), srv.Options.GetServerMessageBufferInitialSize())
},
srv.onEventProcess,
)
antsPool, err := ants.NewPool(ants.DefaultAntsPoolSize, ants.WithOptions(ants.Options{
ExpiryDuration: 10 * time.Second,
Nonblocking: true,
Expand Down Expand Up @@ -145,7 +149,7 @@ func (s *server) PublishSystemMessage(event nexus.Event[int, string]) {
}

func (s *server) PublishSyncMessage(topic string, handler messageEvents.SynchronousHandler) {
s.PublishMessage(topic, messageEvents.Synchronous[int, string](handler))
s.PublishMessage(topic, messageEvents.Synchronous[int, string](handler, s.GetEventOptions()))
}

func (s *server) PublishAsyncMessage(topic string, handler messageEvents.AsynchronousHandler, callback ...messageEvents.AsynchronousCallbackHandler) {
Expand All @@ -156,7 +160,7 @@ func (s *server) PublishAsyncMessage(topic string, handler messageEvents.Asynchr
s.GetLogger().Warn("Minotaur Server", log.String("topic", topic), log.String("error", err.Error()))
go f(ctx)
}
}, handler, collection.FindFirstOrDefaultInSlice(callback, nil)))
}, handler, collection.FindFirstOrDefaultInSlice(callback, nil), s.GetEventOptions()))
}

func (s *server) PublishSystemSyncMessage(handler messageEvents.SynchronousHandler) {
Expand Down
Loading

0 comments on commit 7e12b58

Please sign in to comment.