diff --git a/server/v2/conn.go b/server/internal/v2/conn.go similarity index 54% rename from server/v2/conn.go rename to server/internal/v2/conn.go index bfbb0f64..51bb9e92 100644 --- a/server/v2/conn.go +++ b/server/internal/v2/conn.go @@ -2,23 +2,25 @@ package server import ( "context" - "github.com/kercylan98/minotaur/server/v2/actor" + "github.com/kercylan98/minotaur/server/internal/v2/dispatcher" "net" ) +type ConnWriter func(packet Packet) error + type Conn interface { } -func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter, handler actor.MessageHandler[Packet]) Conn { +func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter) Conn { return &conn{ conn: c, writer: connWriter, - actor: actor.NewActor[Packet](ctx, handler), + actor: dispatcher.NewActor[Packet](ctx, handler), } } type conn struct { conn net.Conn writer ConnWriter - actor *actor.Actor[Packet] + actor *dispatcher.Actor[Packet] } diff --git a/server/internal/v2/conn_context.go b/server/internal/v2/conn_context.go new file mode 100644 index 00000000..16b7d0f8 --- /dev/null +++ b/server/internal/v2/conn_context.go @@ -0,0 +1,4 @@ +package server + +type ConnContext interface { +} diff --git a/server/internal/v2/controller.go b/server/internal/v2/controller.go new file mode 100644 index 00000000..74ee17d8 --- /dev/null +++ b/server/internal/v2/controller.go @@ -0,0 +1,25 @@ +package server + +import "net" + +type Controller interface { + Run() error + Shutdown() error +} + +type controller struct { + *server +} + +func (s *controller) init(srv *server) *controller { + s.server = srv + return s +} + +func (s *controller) RegisterConn(conn net.Conn, writer ConnWriter) { + +} + +func (s *controller) UnRegisterConn() { + +} diff --git a/server/internal/v2/loadbalancer/consistent_hash.go b/server/internal/v2/loadbalancer/consistent_hash.go new file mode 100644 index 00000000..e6d5895d --- /dev/null +++ b/server/internal/v2/loadbalancer/consistent_hash.go @@ -0,0 +1,80 @@ +package loadbalancer + +import ( + "github.com/kercylan98/minotaur/utils/super" + "hash/fnv" + "sort" + "sync" +) + +func NewConsistentHash(replicas int) *ConsistentHash { + return &ConsistentHash{ + replicas: replicas, + keys: []int{}, + hashMap: make(map[int]string), + mutex: sync.RWMutex{}, + } +} + +type ConsistentHash struct { + replicas int // 虚拟节点倍数 + keys []int // 哈希环上的所有节点的哈希值 + hashMap map[int]string // 哈希值到真实节点的映射 + mutex sync.RWMutex // 用于保护数据结构 +} + +// Add 添加一个节点到哈希环 +func (c *ConsistentHash) Add(node string) { + c.mutex.Lock() + defer c.mutex.Unlock() + + for i := 0; i < c.replicas; i++ { + hash := c.hash(node + super.IntToString(i)) + c.keys = append(c.keys, hash) + c.hashMap[hash] = node + } + sort.Ints(c.keys) +} + +// Remove 从哈希环中移除一个节点 +func (c *ConsistentHash) Remove(node string) { + c.mutex.Lock() + defer c.mutex.Unlock() + + for i := 0; i < c.replicas; i++ { + hash := c.hash(node + super.IntToString(i)) + delete(c.hashMap, hash) + // 从 keys 中移除节点的哈希值 + for j, k := range c.keys { + if k == hash { + c.keys = append(c.keys[:j], c.keys[j+1:]...) + break + } + } + } +} + +// Get 返回给定 key 所在的节点 +func (c *ConsistentHash) Get(key string) string { + c.mutex.RLock() + defer c.mutex.RUnlock() + + if len(c.keys) == 0 { + return "" + } + + hash := c.hash(key) + // 顺时针找到第一个比 key 大的哈希值,即对应的节点 + idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash }) + if idx == len(c.keys) { + idx = 0 // 如果 key 大于所有哈希值,则返回第一个节点 + } + return c.hashMap[c.keys[idx]] +} + +// hash 计算字符串的哈希值 +func (c *ConsistentHash) hash(key string) int { + h := fnv.New32a() + _, _ = h.Write([]byte(key)) + return int(h.Sum32()) +} diff --git a/server/internal/v2/loadbalancer/round_robin.go b/server/internal/v2/loadbalancer/round_robin.go new file mode 100644 index 00000000..40d946bd --- /dev/null +++ b/server/internal/v2/loadbalancer/round_robin.go @@ -0,0 +1,78 @@ +package loadbalancer + +import "sync" + +func NewRoundRobin[Id comparable, T RoundRobinItem[Id]]() *RoundRobin[Id, T] { + return &RoundRobin[Id, T]{ + head: nil, + curr: nil, + size: 0, + } +} + +type roundRobinNode[Id comparable, T RoundRobinItem[Id]] struct { + Value T + Next *roundRobinNode[Id, T] +} + +type RoundRobin[Id comparable, T RoundRobinItem[Id]] struct { + head *roundRobinNode[Id, T] + curr *roundRobinNode[Id, T] + size int + rw sync.RWMutex +} + +func (r *RoundRobin[Id, T]) Add(t T) { + r.rw.Lock() + defer r.rw.Unlock() + + newNode := &roundRobinNode[Id, T]{Value: t} + + if r.head == nil { + r.head = newNode + r.curr = newNode + newNode.Next = newNode + } else { + newNode.Next = r.head.Next + r.head.Next = newNode + } + r.size++ +} + +func (r *RoundRobin[Id, T]) Remove(t T) { + r.rw.Lock() + defer r.rw.Unlock() + + if r.head == nil { + return + } + + prev := r.head + for i := 0; i < r.size; i++ { + if prev.Next.Value.Id() == t.Id() { + if prev.Next == r.curr { + r.curr = prev + } + prev.Next = prev.Next.Next + r.size-- + if r.size == 0 { + r.head = nil + r.curr = nil + } + return + } + prev = prev.Next + } +} + +func (r *RoundRobin[Id, T]) Next() (t T) { + r.rw.Lock() + defer r.rw.Unlock() + + if r.curr == nil { + return + } + + r.curr = r.curr.Next + return r.curr.Value +} diff --git a/server/internal/v2/loadbalancer/round_robin_item.go b/server/internal/v2/loadbalancer/round_robin_item.go new file mode 100644 index 00000000..50c971da --- /dev/null +++ b/server/internal/v2/loadbalancer/round_robin_item.go @@ -0,0 +1,6 @@ +package loadbalancer + +type RoundRobinItem[Id comparable] interface { + // Id 返回唯一标识 + Id() Id +} diff --git a/server/v2/network.go b/server/internal/v2/network.go similarity index 63% rename from server/v2/network.go rename to server/internal/v2/network.go index 05ffee93..053563db 100644 --- a/server/v2/network.go +++ b/server/internal/v2/network.go @@ -5,7 +5,7 @@ import ( ) type Network interface { - OnSetup(ctx context.Context, event NetworkCore) error + OnSetup(ctx context.Context, controller Controller) error OnRun() error diff --git a/server/v2/network/http.go b/server/internal/v2/network/http.go similarity index 92% rename from server/v2/network/http.go rename to server/internal/v2/network/http.go index 0faeb331..88faa1d7 100644 --- a/server/v2/network/http.go +++ b/server/internal/v2/network/http.go @@ -2,14 +2,14 @@ package network import ( "context" - "github.com/kercylan98/minotaur/server/v2" + "github.com/kercylan98/minotaur/server/internal/v2" "github.com/pkg/errors" "net" "net/http" "time" ) -func Http(addr string) server.Network { +func Http(addr string) server.server { return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()}) } diff --git a/server/v2/network/http_serve.go b/server/internal/v2/network/http_serve.go similarity index 100% rename from server/v2/network/http_serve.go rename to server/internal/v2/network/http_serve.go diff --git a/server/v2/network/websocket.go b/server/internal/v2/network/websocket.go similarity index 88% rename from server/v2/network/websocket.go rename to server/internal/v2/network/websocket.go index 39b2f036..f10c746a 100644 --- a/server/v2/network/websocket.go +++ b/server/internal/v2/network/websocket.go @@ -3,13 +3,13 @@ package network import ( "context" "fmt" - "github.com/kercylan98/minotaur/server/v2" + "github.com/kercylan98/minotaur/server/internal/v2" "github.com/kercylan98/minotaur/utils/collection" "github.com/panjf2000/gnet/v2" "time" ) -func WebSocket(addr string, pattern ...string) server.Network { +func WebSocket(addr string, pattern ...string) server.server { ws := &websocketCore{ addr: addr, pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"), diff --git a/server/v2/network/websocket_handler.go b/server/internal/v2/network/websocket_handler.go similarity index 96% rename from server/v2/network/websocket_handler.go rename to server/internal/v2/network/websocket_handler.go index 63efdea8..c88e3dd0 100644 --- a/server/v2/network/websocket_handler.go +++ b/server/internal/v2/network/websocket_handler.go @@ -4,7 +4,7 @@ import ( "errors" "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" - "github.com/kercylan98/minotaur/server/v2" + "github.com/kercylan98/minotaur/server/internal/v2" "github.com/kercylan98/minotaur/utils/log" "github.com/panjf2000/gnet/v2" "time" @@ -35,7 +35,7 @@ func (w *websocketHandler) OnShutdown(eng gnet.Engine) { func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { wrapper := newWebsocketWrapper(w.core.ctx, c) c.SetContext(wrapper) - w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.Packet) error { + w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.server) error { return wsutil.WriteServerMessage(c, message.GetContext().(ws.OpCode), message.GetBytes()) }) return diff --git a/server/v2/network/wsbsocket_wrapper.go b/server/internal/v2/network/wsbsocket_wrapper.go similarity index 100% rename from server/v2/network/wsbsocket_wrapper.go rename to server/internal/v2/network/wsbsocket_wrapper.go diff --git a/server/v2/packet.go b/server/internal/v2/packet.go similarity index 100% rename from server/v2/packet.go rename to server/internal/v2/packet.go diff --git a/server/internal/v2/reactor/handlers.go b/server/internal/v2/reactor/handlers.go new file mode 100644 index 00000000..e43bc67d --- /dev/null +++ b/server/internal/v2/reactor/handlers.go @@ -0,0 +1,7 @@ +package reactor + +type queueMessageHandler[M any] func(q *queue[M], msg M) + +type MessageHandler[M any] func(msg M) + +type ErrorHandler[M any] func(msg M, err error) diff --git a/server/internal/v2/reactor/queue.go b/server/internal/v2/reactor/queue.go new file mode 100644 index 00000000..b34ea204 --- /dev/null +++ b/server/internal/v2/reactor/queue.go @@ -0,0 +1,71 @@ +package reactor + +import ( + "errors" + "github.com/kercylan98/minotaur/utils/buffer" + "sync" + "sync/atomic" +) + +func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] { + q := &queue[M]{ + c: make(chan M, chanSize), + buf: buffer.NewRing[M](bufferSize), + rw: sync.NewCond(&sync.Mutex{}), + } + q.QueueState = &QueueState[M]{ + queue: q, + idx: idx, + status: QueueStatusNone, + } + return q +} + +type queue[M any] struct { + *QueueState[M] + c chan M // 通道 + buf *buffer.Ring[M] // 缓冲区 + rw *sync.Cond // 读写锁 +} + +func (q *queue[M]) Id() int { + return q.idx +} + +func (q *queue[M]) run() { + atomic.StoreInt32(&q.status, QueueStatusRunning) + defer func(q *queue[M]) { + atomic.StoreInt32(&q.status, QueueStatusClosed) + }(q) + for { + q.rw.L.Lock() + for q.buf.IsEmpty() { + if atomic.LoadInt32(&q.status) >= QueueStatusClosing { + q.rw.L.Unlock() + close(q.c) + return + } + q.rw.Wait() + } + items := q.buf.ReadAll() + q.rw.L.Unlock() + for _, item := range items { + q.c <- item + } + } +} + +func (q *queue[M]) push(m M) error { + if atomic.LoadInt32(&q.status) != QueueStatusRunning { + return errors.New("queue status exception") + } + q.rw.L.Lock() + q.buf.Write(m) + q.rw.Signal() + q.rw.L.Unlock() + return nil +} + +func (q *queue[M]) read() <-chan M { + return q.c +} diff --git a/server/internal/v2/reactor/queue_state.go b/server/internal/v2/reactor/queue_state.go new file mode 100644 index 00000000..93de96a3 --- /dev/null +++ b/server/internal/v2/reactor/queue_state.go @@ -0,0 +1,38 @@ +package reactor + +import ( + "sync/atomic" +) + +const ( + QueueStatusNone = iota - 1 // 队列未运行 + QueueStatusRunning // 队列运行中 + QueueStatusClosing // 队列关闭中 + QueueStatusClosed // 队列已关闭 +) + +type QueueState[M any] struct { + queue *queue[M] + idx int // 队列索引 + status int32 // 状态标志 +} + +// IsClosed 判断队列是否已关闭 +func (q *QueueState[M]) IsClosed() bool { + return atomic.LoadInt32(&q.status) == QueueStatusClosed +} + +// IsClosing 判断队列是否正在关闭 +func (q *QueueState[M]) IsClosing() bool { + return atomic.LoadInt32(&q.status) == QueueStatusClosing +} + +// IsRunning 判断队列是否正在运行 +func (q *QueueState[M]) IsRunning() bool { + return atomic.LoadInt32(&q.status) == QueueStatusRunning +} + +// Close 关闭队列 +func (q *QueueState[M]) Close() { + atomic.CompareAndSwapInt32(&q.status, QueueStatusRunning, QueueStatusClosing) +} diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go new file mode 100644 index 00000000..ffbf4e2b --- /dev/null +++ b/server/internal/v2/reactor/reactor.go @@ -0,0 +1,153 @@ +package reactor + +import ( + "fmt" + "github.com/alphadose/haxmap" + "github.com/kercylan98/minotaur/server/internal/v2/loadbalancer" + "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/super" + "log/slog" + "runtime" + "runtime/debug" + "sync" + "time" +) + +// NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列 +func NewReactor[M any](systemQueueSize, socketQueueSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { + r := &Reactor[M]{ + logger: log.Default().Logger, + systemQueue: newQueue[M](-1, systemQueueSize, 1024*16), + identifiers: haxmap.New[string, int](), + lb: loadbalancer.NewRoundRobin[int, *queue[M]](), + errorHandler: errorHandler, + socketQueueSize: socketQueueSize, + } + + r.handler = func(q *queue[M], msg M) { + defer func(msg M) { + if err := super.RecoverTransform(recover()); err != nil { + defer func(msg M) { + if err = super.RecoverTransform(recover()); err != nil { + fmt.Println(err) + debug.PrintStack() + } + }(msg) + if r.errorHandler != nil { + r.errorHandler(msg, err) + } else { + fmt.Println(err) + debug.PrintStack() + } + } + }(msg) + var startedAt = time.Now() + if handler != nil { + handler(msg) + } + r.log(log.String("action", "handle"), log.Int("queue", q.Id()), log.Int64("cost/ns", time.Since(startedAt).Nanoseconds())) + } + + return r +} + +// Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列 +type Reactor[M any] struct { + logger *slog.Logger // 日志记录器 + systemQueue *queue[M] // 系统级别的队列 + socketQueueSize int // Socket 队列大小 + queues []*queue[M] // Socket 使用的队列 + identifiers *haxmap.Map[string, int] // 标识符到队列索引的映射 + lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器 + wg sync.WaitGroup // 等待组 + handler queueMessageHandler[M] // 消息处理器 + errorHandler ErrorHandler[M] // 错误处理器 + debug bool // 是否开启调试模式 +} + +// SetLogger 设置日志记录器 +func (r *Reactor[M]) SetLogger(logger *slog.Logger) *Reactor[M] { + r.logger = logger + return r +} + +// SetDebug 设置是否开启调试模式 +func (r *Reactor[M]) SetDebug(debug bool) *Reactor[M] { + r.debug = debug + return r +} + +// SystemDispatch 将消息分发到系统级别的队列 +func (r *Reactor[M]) SystemDispatch(msg M) error { + return r.systemQueue.push(msg) +} + +// Dispatch 将消息分发到 identifier 使用的队列,当 identifier 首次使用时,将会根据负载均衡策略选择一个队列 +func (r *Reactor[M]) Dispatch(identifier string, msg M) error { + next := r.lb.Next() + if next == nil { + return r.Dispatch(identifier, msg) + } + idx, _ := r.identifiers.GetOrSet(identifier, next.Id()) + q := r.queues[idx] + r.log(log.String("action", "dispatch"), log.String("identifier", identifier), log.Int("queue", q.Id())) + return q.push(msg) +} + +// Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列 +func (r *Reactor[M]) Run() { + r.initQueue(r.systemQueue) + for i := 0; i < runtime.NumCPU(); i++ { + r.addQueue() + } + r.wg.Wait() +} + +func (r *Reactor[M]) addQueue() { + r.log(log.String("action", "add queue"), log.Int("queue", len(r.queues))) + r.wg.Add(1) + q := newQueue[M](len(r.queues), r.socketQueueSize, 1024*8) + r.initQueue(q) + r.queues = append(r.queues, q) +} + +func (r *Reactor[M]) removeQueue(q *queue[M]) { + idx := q.Id() + if idx < 0 || idx >= len(r.queues) || r.queues[idx] != q { + return + } + r.queues = append(r.queues[:idx], r.queues[idx+1:]...) + for i := idx; i < len(r.queues); i++ { + r.queues[i].idx = i + } + r.log(log.String("action", "remove queue"), log.Int("queue", len(r.queues))) +} + +func (r *Reactor[M]) initQueue(q *queue[M]) { + r.wg.Add(1) + go func(r *Reactor[M], q *queue[M]) { + defer r.wg.Done() + go q.run() + if q.idx >= 0 { + r.lb.Add(q) + } + for m := range q.read() { + r.handler(q, m) + } + }(r, q) + r.log(log.String("action", "run queue"), log.Int("queue", q.Id())) +} + +func (r *Reactor[M]) Close() { + queues := append(r.queues, r.systemQueue) + for _, q := range queues { + q.Close() + } +} + +func (r *Reactor[M]) log(args ...any) { + if !r.debug { + return + } + r.logger.Debug("Reactor", args...) +} diff --git a/server/internal/v2/reactor/reactor_test.go b/server/internal/v2/reactor/reactor_test.go new file mode 100644 index 00000000..3f418658 --- /dev/null +++ b/server/internal/v2/reactor/reactor_test.go @@ -0,0 +1,56 @@ +package reactor_test + +import ( + "github.com/kercylan98/minotaur/server/internal/v2/reactor" + "github.com/kercylan98/minotaur/utils/random" + "testing" + "time" +) + +func BenchmarkReactor_Dispatch(b *testing.B) { + + var r = reactor.NewReactor(1024*16, 1024, func(msg func()) { + msg() + }, func(msg func(), err error) { + b.Error(err) + }).SetDebug(false) + + go r.Run() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := r.Dispatch(random.HostName(), func() { + + }); err != nil { + + } + } + }) +} + +func TestReactor_Dispatch(t *testing.T) { + var r = reactor.NewReactor(1024*16, 1024, func(msg func()) { + msg() + }, func(msg func(), err error) { + t.Error(err) + }).SetDebug(false) + + go r.Run() + + for i := 0; i < 10000; i++ { + go func() { + id := random.HostName() + for { + // 每秒 50 次 + time.Sleep(time.Millisecond * 20) + if err := r.Dispatch(id, func() { + + }); err != nil { + t.Error(err) + } + } + }() + } + + time.Sleep(time.Second * 10) +} diff --git a/server/v2/server.go b/server/internal/v2/server.go similarity index 59% rename from server/v2/server.go rename to server/internal/v2/server.go index 17852e41..3934b2c1 100644 --- a/server/v2/server.go +++ b/server/internal/v2/server.go @@ -1,9 +1,6 @@ package server -import ( - "context" - "github.com/kercylan98/minotaur/utils/super" -) +import "golang.org/x/net/context" type Server interface { Run() error @@ -11,17 +8,18 @@ type Server interface { } type server struct { - *networkCore - ctx *super.CancelContext + *controller + ctx context.Context + cancel context.CancelFunc network Network } func NewServer(network Network) Server { srv := &server{ - ctx: super.WithCancelContext(context.Background()), network: network, } - srv.networkCore = new(networkCore).init(srv) + srv.ctx, srv.cancel = context.WithCancel(context.Background()) + srv.controller = new(controller).init(srv) return srv } @@ -30,14 +28,14 @@ func (s *server) Run() (err error) { return } - if err = s.network.OnRun(s.ctx); err != nil { + if err = s.network.OnRun(); err != nil { panic(err) } return } func (s *server) Shutdown() (err error) { - defer s.ctx.Cancel() + defer s.server.cancel() err = s.network.OnShutdown() return } diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go new file mode 100644 index 00000000..74170d0f --- /dev/null +++ b/server/internal/v2/server_test.go @@ -0,0 +1,14 @@ +package server_test + +import ( + "github.com/kercylan98/minotaur/server/internal/v2" + "github.com/kercylan98/minotaur/server/internal/v2/network" + "testing" +) + +func TestNewServer(t *testing.T) { + srv := server.server.NewServer(network.WebSocket(":9999")) + if err := srv.Run(); err != nil { + panic(err) + } +} diff --git a/server/v2/actor/actor.go b/server/v2/actor/actor.go deleted file mode 100644 index bb971848..00000000 --- a/server/v2/actor/actor.go +++ /dev/null @@ -1,153 +0,0 @@ -package actor - -import ( - "context" - "github.com/kercylan98/minotaur/utils/buffer" - "github.com/kercylan98/minotaur/utils/super" - "sync" - "time" -) - -// MessageHandler 定义了处理消息的函数类型 -type MessageHandler[M any] func(message M) - -// NewActor 创建一个新的 Actor,并启动其消息处理循环 -func NewActor[M any](ctx context.Context, handler MessageHandler[M]) *Actor[M] { - a := newActor(ctx, handler) - a.counter = new(super.Counter[int]) - go a.run() - return a -} - -// newActor 创建一个新的 Actor -func newActor[M any](ctx context.Context, handler MessageHandler[M]) (actor *Actor[M]) { - a := &Actor[M]{ - buf: buffer.NewRing[M](1024), - handler: handler, - } - a.cond = sync.NewCond(&a.rw) - a.ctx, a.cancel = context.WithCancel(ctx) - return a -} - -// Actor 是一个消息驱动的并发实体 -type Actor[M any] struct { - idx int // Actor 在其父 Actor 中的索引 - ctx context.Context // Actor 的上下文 - cancel context.CancelFunc // 用于取消 Actor 的函数 - buf *buffer.Ring[M] // 用于缓存消息的环形缓冲区 - handler MessageHandler[M] // 处理消息的函数 - rw sync.RWMutex // 读写锁,用于保护 Actor 的并发访问 - cond *sync.Cond // 条件变量,用于触发消息处理流程 - counter *super.Counter[int] // 消息计数器,用于统计处理的消息数量 - dying bool // 标识 Actor 是否正在关闭中 - parent *Actor[M] // 父 Actor - subs []*Actor[M] // 子 Actor 切片 - gap []int // 用于记录已经关闭的子 Actor 的索引位置,以便复用 -} - -// run 启动 Actor 的消息处理循环 -func (a *Actor[M]) run() { - var ctx = a.ctx - var clearGap = time.NewTicker(time.Second * 30) - defer func(a *Actor[M], clearGap *time.Ticker) { - clearGap.Stop() - a.cancel() - a.parent.removeSub(a) - }(a, clearGap) - for { - select { - case <-a.ctx.Done(): - a.rw.Lock() - if ctx == a.ctx { - a.dying = true - } else { - ctx = a.ctx - } - a.rw.Unlock() - a.cond.Signal() - case <-clearGap.C: - a.rw.Lock() - for _, idx := range a.gap { - a.subs = append(a.subs[:idx], a.subs[idx+1:]...) - } - for idx, sub := range a.subs { - sub.idx = idx - } - a.gap = a.gap[:0] - a.rw.Unlock() - default: - a.rw.Lock() - if a.buf.IsEmpty() { - if a.dying && a.counter.Val() == 0 { - return - } - a.cond.Wait() - } - messages := a.buf.ReadAll() - a.rw.Unlock() - for _, message := range messages { - a.handler(message) - } - a.counter.Add(-len(messages)) - } - } -} - -// Reuse 重用 Actor,Actor 会重新激活 -func (a *Actor[M]) Reuse(ctx context.Context) { - before := a.cancel - defer before() - - a.rw.Lock() - a.ctx, a.cancel = context.WithCancel(ctx) - a.dying = false - for _, sub := range a.subs { - sub.Reuse(a.ctx) - } - a.rw.Unlock() - a.cond.Signal() -} - -// Send 发送消息 -func (a *Actor[M]) Send(message M) { - a.rw.Lock() - a.counter.Add(1) - a.buf.Write(message) - a.rw.Unlock() - a.cond.Signal() -} - -// Sub 派生一个子 Actor,该子 Actor 生命周期将继承父 Actor 的生命周期 -func (a *Actor[M]) Sub() { - a.rw.Lock() - defer a.rw.Unlock() - - sub := newActor(a.ctx, a.handler) - sub.counter = a.counter.Sub() - sub.parent = a - if len(a.gap) > 0 { - sub.idx = a.gap[0] - a.gap = a.gap[1:] - } else { - sub.idx = len(a.subs) - } - a.subs = append(a.subs, sub) - go sub.run() -} - -// removeSub 从父 Actor 中移除指定的子 Actor -func (a *Actor[M]) removeSub(sub *Actor[M]) { - if a == nil { - return - } - - a.rw.Lock() - defer a.rw.Unlock() - if sub.idx == len(a.subs)-1 { - a.subs = a.subs[:sub.idx] - return - } - a.subs[sub.idx] = nil - a.gap = append(a.gap, sub.idx) -} diff --git a/server/v2/message.go b/server/v2/message.go deleted file mode 100644 index c9487cd3..00000000 --- a/server/v2/message.go +++ /dev/null @@ -1,4 +0,0 @@ -package server - -type message struct { -} diff --git a/server/v2/network_core.go b/server/v2/network_core.go deleted file mode 100644 index 46f398fc..00000000 --- a/server/v2/network_core.go +++ /dev/null @@ -1,50 +0,0 @@ -package server - -import ( - "github.com/kercylan98/minotaur/utils/hub" - "golang.org/x/net/context" - "net" -) - -type ConnWriter func(message Packet) error - -type NetworkCore interface { - OnConnectionOpened(ctx context.Context, conn net.Conn, writer ConnWriter) - - OnConnectionClosed(conn Conn) - - OnReceivePacket(packet Packet) - - GeneratePacket(data []byte) Packet -} - -type networkCore struct { - *server - packetPool *hub.ObjectPool[*packet] -} - -func (ne *networkCore) init(srv *server) *networkCore { - ne.server = srv - ne.packetPool = hub.NewObjectPool(func() *packet { - return new(packet) - }, func(data *packet) { - data.reset() - }) - return ne -} - -func (ne *networkCore) OnConnectionOpened(ctx context.Context, conn net.Conn, writer ConnWriter) { - -} - -func (ne *networkCore) OnConnectionClosed(conn Conn) { - -} - -func (ne *networkCore) OnReceivePacket(packet Packet) { - -} - -func (ne *networkCore) GeneratePacket(data []byte) Packet { - return ne.packetPool.Get().init(data) -} diff --git a/server/v2/server_test.go b/server/v2/server_test.go deleted file mode 100644 index 0130dd63..00000000 --- a/server/v2/server_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package server_test - -import ( - "github.com/kercylan98/minotaur/server/v2" - "github.com/kercylan98/minotaur/server/v2/network" - "testing" -) - -func TestNewServer(t *testing.T) { - srv := server.NewServer(network.WebSocket(":9999")) - if err := srv.Run(); err != nil { - panic(err) - } -} diff --git a/utils/super/counter.go b/utils/super/counter.go index 2585945f..a84f1b2f 100644 --- a/utils/super/counter.go +++ b/utils/super/counter.go @@ -20,7 +20,7 @@ func (c *Counter[T]) Sub() *Counter[T] { func (c *Counter[T]) Add(delta T) { c.rw.Lock() c.v += delta - c.rw.RUnlock() + c.rw.Unlock() if c.p != nil { c.p.Add(delta) }