-
Notifications
You must be signed in to change notification settings - Fork 0
/
session.go
77 lines (65 loc) · 1.69 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package account
import (
"github.com/sohaha/zlsgo/zarray"
"github.com/sohaha/zlsgo/zerror"
"github.com/sohaha/zlsgo/znet"
"github.com/sohaha/zlsgo/zstring"
"github.com/sohaha/zlsgo/ztime"
)
var (
sessionHub = zarray.NewHashMap[string, *session]()
)
type session struct {
sessions *zarray.Maper[int64, *znet.SSE]
}
func (m *session) addSession(sse *znet.SSE) int64 {
id := zstring.UUID()
m.sessions.Set(id, sse)
return id
}
func (m *session) removeSession(id int64) {
m.sessions.Delete(id)
}
func (m *Module) newSession(c *znet.Context) (sse *znet.SSE, remove func(), err error) {
uid := Request.UID(c)
if uid == "" {
return nil, nil, zerror.InvalidInput.Text("用户未登录")
}
session, _, _ := sessionHub.ProvideGet(uid, func() (*session, bool) {
return &session{
sessions: zarray.NewHashMap[int64, *znet.SSE](),
}, true
})
sse = znet.NewSSE(c, func(lastID string, opts *znet.SSEOption) {
if m.Options.SSE.HeartbeatsTime != 0 {
opts.HeartbeatsTime = m.Options.SSE.HeartbeatsTime
}
if m.Options.SSE.RetryTime != 0 {
opts.RetryTime = m.Options.SSE.RetryTime
}
})
id := session.addSession(sse)
if m.Options.SSEReconnect != nil && sse.LastEventID() != "" {
go m.Options.SSEReconnect(uid, sse.LastEventID())
}
return sse, func() {
session.removeSession(id)
if session.sessions.Len() == 0 {
sessionHub.Delete(uid)
}
}, nil
}
func SendRealtime(uid string, data string, event ...string) bool {
if session, ok := sessionHub.Get(uid); ok {
if session.sessions.Len() == 0 {
return false
}
id := ztime.Now()
session.sessions.ForEach(func(k int64, v *znet.SSE) bool {
_ = v.Send(id, data, event...)
return true
})
return true
}
return false
}