/
manager.go
119 lines (113 loc) · 3.02 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package ws
import (
"context"
"errors"
"log/slog"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
)
type SessionManager struct {
ctx context.Context
cancel context.CancelFunc
TTL time.Duration
sessions map[uint64]*Session
wg sync.WaitGroup
rwLocker sync.RWMutex
hook SessionHook
retry int
rwCache int
isClosed int32
closeOnce sync.Once
logger *slog.Logger
}
func newManager(ctx context.Context, logger *slog.Logger, ttl time.Duration, hook SessionHook, retry, rwCache int) *SessionManager {
c, cancel := context.WithCancel(ctx)
sessions := make(map[uint64]*Session)
return &SessionManager{
ctx: c, cancel: cancel, TTL: ttl, hook: hook, retry: retry, rwCache: rwCache, sessions: sessions,
logger: logger,
}
}
func (manager *SessionManager) Context() context.Context {
return manager.ctx
}
func (manager *SessionManager) NewSession(conn *websocket.Conn) *Session {
session := newSession(conn, manager, manager.retry, manager.rwCache, manager.logger)
go manager.startSession(session)
return session
}
func (manager *SessionManager) startSession(session *Session) {
defer func(s *Session) {
if err := recover(); err != nil {
manager.logger.Debug("session run panic", "session", s.ID(), "error", err)
}
}(session)
err := session.Run()
if err != nil {
manager.logger.Info("session run error", "session", session.ID(), "error", err)
}
}
func (manager *SessionManager) Put(session *Session) error {
manager.rwLocker.Lock()
defer manager.rwLocker.Unlock()
if _, ok := manager.sessions[session.ID()]; ok {
return errors.New("session already exists")
}
manager.sessions[session.ID()] = session
manager.wg.Add(1)
manager.logger.Debug("add session ", "session", session.id)
return nil
}
func (manager *SessionManager) Closed() bool {
return atomic.LoadInt32(&manager.isClosed) > 0
}
func (manager *SessionManager) GetSession(id uint64) (*Session, bool) {
manager.rwLocker.RLock()
defer manager.rwLocker.RUnlock()
session, ok := manager.sessions[id]
return session, ok
}
func (manager *SessionManager) Close() {
manager.closeOnce.Do(manager.close)
}
func (manager *SessionManager) close() {
manager.cancel()
manager.wg.Wait()
}
func (manager *SessionManager) Remove(id uint64) {
manager.rwLocker.Lock()
defer manager.rwLocker.Unlock()
_, ok := manager.sessions[id]
if !ok {
return
}
delete(manager.sessions, id)
manager.logger.Debug("remove session", "id", id)
manager.wg.Done()
}
func (manager *SessionManager) Clients() int {
manager.rwLocker.RLock()
defer manager.rwLocker.Unlock()
return len(manager.sessions)
}
func (manager *SessionManager) Broadcast(m Message) (success int, failed int, err error) {
manager.logger.Info("publish Broadcast ")
manager.rwLocker.RLock()
defer manager.rwLocker.RUnlock()
if manager.isClosed > 0 {
err = closedSessionManager
return
}
for _, v := range manager.sessions {
manager.logger.Debug("send message to session", "id", v.id)
err := v.Send(m)
if err == nil {
success += 1
} else {
failed += 1
}
}
return
}