Skip to content

Commit

Permalink
other: reactor 实现
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Mar 31, 2024
1 parent ef1bb32 commit 1408fdc
Show file tree
Hide file tree
Showing 25 changed files with 554 additions and 243 deletions.
10 changes: 6 additions & 4 deletions server/v2/conn.go → server/internal/v2/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package server

import (
"context"
"github.com/kercylan98/minotaur/server/v2/actor"
"github.com/kercylan98/minotaur/server/internal/v2/dispatcher"
"net"
)

type ConnWriter func(packet Packet) error

type Conn interface {
}

func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter, handler actor.MessageHandler[Packet]) Conn {
func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter) Conn {
return &conn{
conn: c,
writer: connWriter,
actor: actor.NewActor[Packet](ctx, handler),
actor: dispatcher.NewActor[Packet](ctx, handler),
}
}

type conn struct {
conn net.Conn
writer ConnWriter
actor *actor.Actor[Packet]
actor *dispatcher.Actor[Packet]
}
4 changes: 4 additions & 0 deletions server/internal/v2/conn_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package server

type ConnContext interface {
}
25 changes: 25 additions & 0 deletions server/internal/v2/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package server

import "net"

type Controller interface {
Run() error
Shutdown() error
}

type controller struct {
*server
}

func (s *controller) init(srv *server) *controller {
s.server = srv
return s
}

func (s *controller) RegisterConn(conn net.Conn, writer ConnWriter) {

}

func (s *controller) UnRegisterConn() {

}
80 changes: 80 additions & 0 deletions server/internal/v2/loadbalancer/consistent_hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package loadbalancer

import (
"github.com/kercylan98/minotaur/utils/super"
"hash/fnv"
"sort"
"sync"
)

func NewConsistentHash(replicas int) *ConsistentHash {
return &ConsistentHash{
replicas: replicas,
keys: []int{},
hashMap: make(map[int]string),
mutex: sync.RWMutex{},
}
}

type ConsistentHash struct {
replicas int // 虚拟节点倍数
keys []int // 哈希环上的所有节点的哈希值
hashMap map[int]string // 哈希值到真实节点的映射
mutex sync.RWMutex // 用于保护数据结构
}

// Add 添加一个节点到哈希环
func (c *ConsistentHash) Add(node string) {
c.mutex.Lock()
defer c.mutex.Unlock()

for i := 0; i < c.replicas; i++ {
hash := c.hash(node + super.IntToString(i))
c.keys = append(c.keys, hash)
c.hashMap[hash] = node
}
sort.Ints(c.keys)
}

// Remove 从哈希环中移除一个节点
func (c *ConsistentHash) Remove(node string) {
c.mutex.Lock()
defer c.mutex.Unlock()

for i := 0; i < c.replicas; i++ {
hash := c.hash(node + super.IntToString(i))
delete(c.hashMap, hash)
// 从 keys 中移除节点的哈希值
for j, k := range c.keys {
if k == hash {
c.keys = append(c.keys[:j], c.keys[j+1:]...)
break
}
}
}
}

// Get 返回给定 key 所在的节点
func (c *ConsistentHash) Get(key string) string {
c.mutex.RLock()
defer c.mutex.RUnlock()

if len(c.keys) == 0 {
return ""
}

hash := c.hash(key)
// 顺时针找到第一个比 key 大的哈希值,即对应的节点
idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash })
if idx == len(c.keys) {
idx = 0 // 如果 key 大于所有哈希值,则返回第一个节点
}
return c.hashMap[c.keys[idx]]
}

// hash 计算字符串的哈希值
func (c *ConsistentHash) hash(key string) int {
h := fnv.New32a()
_, _ = h.Write([]byte(key))
return int(h.Sum32())
}
78 changes: 78 additions & 0 deletions server/internal/v2/loadbalancer/round_robin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package loadbalancer

import "sync"

func NewRoundRobin[Id comparable, T RoundRobinItem[Id]]() *RoundRobin[Id, T] {
return &RoundRobin[Id, T]{
head: nil,
curr: nil,
size: 0,
}
}

type roundRobinNode[Id comparable, T RoundRobinItem[Id]] struct {
Value T
Next *roundRobinNode[Id, T]
}

type RoundRobin[Id comparable, T RoundRobinItem[Id]] struct {
head *roundRobinNode[Id, T]
curr *roundRobinNode[Id, T]
size int
rw sync.RWMutex
}

func (r *RoundRobin[Id, T]) Add(t T) {
r.rw.Lock()
defer r.rw.Unlock()

newNode := &roundRobinNode[Id, T]{Value: t}

if r.head == nil {
r.head = newNode
r.curr = newNode
newNode.Next = newNode
} else {
newNode.Next = r.head.Next
r.head.Next = newNode
}
r.size++
}

func (r *RoundRobin[Id, T]) Remove(t T) {
r.rw.Lock()
defer r.rw.Unlock()

if r.head == nil {
return
}

prev := r.head
for i := 0; i < r.size; i++ {
if prev.Next.Value.Id() == t.Id() {
if prev.Next == r.curr {
r.curr = prev
}
prev.Next = prev.Next.Next
r.size--
if r.size == 0 {
r.head = nil
r.curr = nil
}
return
}
prev = prev.Next
}
}

func (r *RoundRobin[Id, T]) Next() (t T) {
r.rw.Lock()
defer r.rw.Unlock()

if r.curr == nil {
return
}

r.curr = r.curr.Next
return r.curr.Value
}
6 changes: 6 additions & 0 deletions server/internal/v2/loadbalancer/round_robin_item.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package loadbalancer

type RoundRobinItem[Id comparable] interface {
// Id 返回唯一标识
Id() Id
}
2 changes: 1 addition & 1 deletion server/v2/network.go → server/internal/v2/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type Network interface {
OnSetup(ctx context.Context, event NetworkCore) error
OnSetup(ctx context.Context, controller Controller) error

OnRun() error

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package network

import (
"context"
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/pkg/errors"
"net"
"net/http"
"time"
)

func Http(addr string) server.Network {
func Http(addr string) server.server {
return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()})
}

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package network
import (
"context"
"fmt"
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/utils/collection"
"github.com/panjf2000/gnet/v2"
"time"
)

func WebSocket(addr string, pattern ...string) server.Network {
func WebSocket(addr string, pattern ...string) server.server {
ws := &websocketCore{
addr: addr,
pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/utils/log"
"github.com/panjf2000/gnet/v2"
"time"
Expand Down Expand Up @@ -35,7 +35,7 @@ func (w *websocketHandler) OnShutdown(eng gnet.Engine) {
func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
wrapper := newWebsocketWrapper(w.core.ctx, c)
c.SetContext(wrapper)
w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.Packet) error {
w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.server) error {
return wsutil.WriteServerMessage(c, message.GetContext().(ws.OpCode), message.GetBytes())
})
return
Expand Down
File renamed without changes.
File renamed without changes.
7 changes: 7 additions & 0 deletions server/internal/v2/reactor/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package reactor

type queueMessageHandler[M any] func(q *queue[M], msg M)

type MessageHandler[M any] func(msg M)

type ErrorHandler[M any] func(msg M, err error)
71 changes: 71 additions & 0 deletions server/internal/v2/reactor/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package reactor

import (
"errors"
"github.com/kercylan98/minotaur/utils/buffer"
"sync"
"sync/atomic"
)

func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] {
q := &queue[M]{
c: make(chan M, chanSize),
buf: buffer.NewRing[M](bufferSize),
rw: sync.NewCond(&sync.Mutex{}),
}
q.QueueState = &QueueState[M]{
queue: q,
idx: idx,
status: QueueStatusNone,
}
return q
}

type queue[M any] struct {
*QueueState[M]
c chan M // 通道
buf *buffer.Ring[M] // 缓冲区
rw *sync.Cond // 读写锁
}

func (q *queue[M]) Id() int {
return q.idx
}

func (q *queue[M]) run() {
atomic.StoreInt32(&q.status, QueueStatusRunning)
defer func(q *queue[M]) {
atomic.StoreInt32(&q.status, QueueStatusClosed)
}(q)
for {
q.rw.L.Lock()
for q.buf.IsEmpty() {
if atomic.LoadInt32(&q.status) >= QueueStatusClosing {
q.rw.L.Unlock()
close(q.c)
return
}
q.rw.Wait()
}
items := q.buf.ReadAll()
q.rw.L.Unlock()
for _, item := range items {
q.c <- item
}
}
}

func (q *queue[M]) push(m M) error {
if atomic.LoadInt32(&q.status) != QueueStatusRunning {
return errors.New("queue status exception")
}
q.rw.L.Lock()
q.buf.Write(m)
q.rw.Signal()
q.rw.L.Unlock()
return nil
}

func (q *queue[M]) read() <-chan M {
return q.c
}
Loading

0 comments on commit 1408fdc

Please sign in to comment.