Skip to content

Commit

Permalink
feat: server 包增加消息统计相关函数
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Dec 22, 2023
1 parent e43185f commit 05aeed0
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 15 deletions.
2 changes: 1 addition & 1 deletion server/http_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (slf *HttpRouter[Context]) handlesConvert(handlers []HandlerFunc[Context])
for i := 0; i < len(handlers); i++ {
handler := handlers[i]
handles = append(handles, func(ctx *gin.Context) {
slf.srv.messageCounter.Add(1)
slf.srv.hitMessageStatistics()
defer func() {
slf.srv.messageCounter.Add(-1)
}()
Expand Down
45 changes: 32 additions & 13 deletions server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/timer"
"google.golang.org/grpc"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -28,19 +30,36 @@ type option struct {
}

type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小
tickerPool *timer.Pool // 定时器池
ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小
websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
packetWarnSize int // 数据包大小警告
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小
tickerPool *timer.Pool // 定时器池
ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小
websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
packetWarnSize int // 数据包大小警告
messageStatisticsDuration time.Duration // 消息统计时长
messageStatisticsLimit int // 消息统计数量
messageStatistics []*atomic.Int64 // 消息统计数量
messageStatisticsLock *sync.RWMutex // 消息统计锁
}

// WithMessageStatistics 通过消息统计的方式创建服务器
// - 默认不开启,当 duration 和 limit 均大于 0 的时候,服务器将记录每 duration 期间的消息数量,并保留最多 limit 条
func WithMessageStatistics(duration time.Duration, limit int) Option {
return func(srv *Server) {
if duration <= 0 || limit <= 0 {
return
}
srv.messageStatisticsDuration = duration
srv.messageStatisticsLimit = limit
srv.messageStatisticsLock = new(sync.RWMutex)
}
}

// WithPacketWarnSize 通过数据包大小警告的方式创建服务器,当数据包大小超过指定大小时,将会输出 WARN 类型的日志
Expand Down
82 changes: 81 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func New(network Network, options ...Option) *Server {
dispatcherMember: map[string]map[string]*Conn{},
currDispatcher: map[string]*dispatcher{},
}
server.ctx, server.cancel = context.WithCancel(server.ctx)
server.event = newEvent(server)

switch network {
Expand Down Expand Up @@ -92,6 +93,7 @@ type Server struct {
ants *ants.Pool // 协程池
messagePool *concurrent.Pool[*Message] // 消息池
ctx context.Context // 上下文
cancel context.CancelFunc // 停止上下文
online *concurrent.BalanceMap[string, *Conn] // 在线连接
systemDispatcher *dispatcher // 系统消息分发器
network Network // 网络类型
Expand Down Expand Up @@ -130,6 +132,7 @@ func (slf *Server) Run(addr string) error {
}
slf.event.check()
slf.addr = addr
slf.startMessageStatistics()
slf.systemDispatcher = generateDispatcher(serverSystemDispatcher, slf.dispatchMessage)
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
var messageInitFinish = make(chan struct{}, 1)
Expand Down Expand Up @@ -488,6 +491,7 @@ func (slf *Server) shutdown(err error) {
slf.multipleRuntimeErrorChan <- err
}
}()
slf.cancel()
if slf.gServer != nil && slf.isRunning {
if shutdownErr := gnet.Stop(context.Background(), fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil {
log.Error("Server", log.Err(shutdownErr))
Expand Down Expand Up @@ -669,7 +673,7 @@ func (slf *Server) pushMessage(message *Message) {
slf.messagePool.Release(message)
return
}
slf.messageCounter.Add(1)
slf.hitMessageStatistics()
dispatcher.put(message)
}

Expand Down Expand Up @@ -918,3 +922,79 @@ func (slf *Server) PushErrorMessage(err error, errAction MessageErrorAction, mar
func (slf *Server) PushShuntMessage(conn *Conn, caller func(), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToShuntMessage(conn, caller, mark...))
}

// startMessageStatistics 开始消息统计
func (slf *Server) startMessageStatistics() {
if !slf.HasMessageStatistics() {
return
}
slf.runtime.messageStatistics = append(slf.runtime.messageStatistics, new(atomic.Int64))
ticker := time.NewTicker(slf.runtime.messageStatisticsDuration)
go func(ctx context.Context, ticker *time.Ticker, r *runtime) {
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.messageStatisticsLock.Lock()
r.messageStatistics = append([]*atomic.Int64{new(atomic.Int64)}, r.messageStatistics...)
if len(r.messageStatistics) > r.messageStatisticsLimit {
r.messageStatistics = r.messageStatistics[:r.messageStatisticsLimit]
}
r.messageStatisticsLock.Unlock()
case <-ctx.Done():
return
}
}
}(slf.ctx, ticker, slf.runtime)
}

// hitMessageStatistics 命中消息统计
func (slf *Server) hitMessageStatistics() {
slf.messageCounter.Add(1)
if !slf.HasMessageStatistics() {
return
}
slf.runtime.messageStatisticsLock.RLock()
slf.runtime.messageStatistics[0].Add(1)
slf.runtime.messageStatisticsLock.RUnlock()
}

// GetDurationMessageCount 获取当前 WithMessageStatistics 设置的 duration 期间的消息量
func (slf *Server) GetDurationMessageCount() int64 {
return slf.GetDurationMessageCountByOffset(0)
}

// GetDurationMessageCountByOffset 获取特定偏移次数的 WithMessageStatistics 设置的 duration 期间的消息量
// - 该值小于 0 时,将与 GetDurationMessageCount 无异,否则将返回 +n 个期间的消息量,例如 duration 为 1 分钟,limit 为 10,那么 offset 为 1 的情况下,获取的则是上一分钟消息量
func (slf *Server) GetDurationMessageCountByOffset(offset int) int64 {
if !slf.HasMessageStatistics() {
return 0
}
slf.runtime.messageStatisticsLock.Lock()
if offset >= len(slf.runtime.messageStatistics)-1 {
slf.runtime.messageStatisticsLock.Unlock()
return 0
}
v := slf.runtime.messageStatistics[offset].Load()
slf.runtime.messageStatisticsLock.Unlock()
return v
}

// GetAllDurationMessageCount 获取所有 WithMessageStatistics 设置的 duration 期间的消息量
func (slf *Server) GetAllDurationMessageCount() []int64 {
if !slf.HasMessageStatistics() {
return nil
}
slf.runtime.messageStatisticsLock.Lock()
var vs = make([]int64, len(slf.runtime.messageStatistics))
for i, statistic := range slf.runtime.messageStatistics {
vs[i] = statistic.Load()
}
slf.runtime.messageStatisticsLock.Unlock()
return vs
}

// HasMessageStatistics 是否了开启消息统计
func (slf *Server) HasMessageStatistics() bool {
return slf.runtime.messageStatisticsLock != nil
}

0 comments on commit 05aeed0

Please sign in to comment.