-
Notifications
You must be signed in to change notification settings - Fork 3
/
typeProcessor.go
95 lines (84 loc) · 2.66 KB
/
typeProcessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package sseKit
import (
"github.com/gin-gonic/gin"
"github.com/richelieu-yang/chimera/v2/src/component/web/httpKit"
"github.com/richelieu-yang/chimera/v2/src/component/web/push/pushKit"
"github.com/richelieu-yang/chimera/v2/src/concurrency/mutexKit"
"github.com/richelieu-yang/chimera/v2/src/core/errorKit"
"github.com/richelieu-yang/chimera/v2/src/core/strKit"
"net/http"
)
type SseProcessor struct {
pushKit.Processor
idGenerator func() (string, error)
listeners pushKit.Listeners
msgType messageType
}
func (p *SseProcessor) ProcessWithGin(ctx *gin.Context) {
p.Process(ctx.Writer, ctx.Request)
}
func (p *SseProcessor) Process(w http.ResponseWriter, r *http.Request) {
if err := IsSseSupported(w, r); err != nil {
p.listeners.OnFailure(w, r, err.Error())
return
}
// 设置 response header
SetHeaders(w)
channel, err := p.newChannel(w, r, make(chan string, 1))
if err != nil {
err = errorKit.Wrap(err, "Fail to new channel")
p.listeners.OnFailure(w, r, err.Error())
return
}
if err := channel.Initialize(); err != nil {
err = errorKit.Wrap(err, "Fail to initialize channel")
p.listeners.OnFailure(w, r, err.Error())
return
}
p.listeners.OnHandshake(w, r, channel)
/*
!!!: gin.Context.Done() 和 r.Context().Done() 不同,因为 gin.Context.Done() 返回nil(普通Gin Server情况下).
(1) case为 w.(http.CloseNotifier).CloseNotify() 和 gin.Context.Done(),前端断开会走到 w.(http.CloseNotifier).CloseNotify()
(2) case为 w.(http.CloseNotifier).CloseNotify() 和 r.Context().Done(),前端断开会走到 r.Context().Done()
*/
select {
//case <-w.(http.CloseNotifier).CloseNotify():
// p.listeners.OnClose(channel, "Connection closed")
case <-r.Context().Done():
if channel.SetClosed() {
p.listeners.OnClose(channel, "Context done")
}
case closeInfo := <-channel.GetCloseCh():
p.listeners.OnClose(channel, closeInfo)
}
}
func (p *SseProcessor) newChannel(w http.ResponseWriter, r *http.Request, closeCh chan string) (pushKit.Channel, error) {
id, err := p.idGenerator()
if err != nil {
return nil, errorKit.Wrap(err, "Fail to generate id with idGenerator")
}
if err := strKit.AssertNotBlank(id, "id"); err != nil {
return nil, err
}
ip := httpKit.GetClientIP(r)
channel := &SseChannel{
BaseChannel: pushKit.BaseChannel{
RWMutex: mutexKit.RWMutex{},
CloseCh: closeCh,
ClientIP: ip,
Type: "SSE",
Id: id,
Bsid: "",
User: "",
Group: "",
Data: nil,
Closed: false,
Listeners: p.listeners,
PongInterval: pushKit.GetPongInterval(),
},
w: w,
r: r,
msgType: p.msgType,
}
return channel, nil
}