/
sse.go
260 lines (226 loc) 路 7.06 KB
/
sse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
// Based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go
package events
import (
"context"
"fmt"
"io"
"net/http"
"time"
"code.cloudfoundry.org/go-diodes"
"github.com/google/uuid"
"github.com/navidrome/navidrome/consts"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model/request"
"github.com/navidrome/navidrome/utils/singleton"
)
type Broker interface {
http.Handler
SendMessage(ctx context.Context, event Event)
}
const (
keepAliveFrequency = 15 * time.Second
writeTimeOut = 5 * time.Second
)
type (
message struct {
id uint32
event string
data string
senderCtx context.Context
}
messageChan chan message
clientsChan chan client
client struct {
id string
address string
username string
userAgent string
clientUniqueId string
diode *diode
}
)
func (c client) String() string {
if log.CurrentLevel() >= log.LevelTrace {
return fmt.Sprintf("%s (%s - %s - %s - %s)", c.id, c.username, c.address, c.clientUniqueId, c.userAgent)
}
return fmt.Sprintf("%s (%s - %s - %s)", c.id, c.username, c.address, c.clientUniqueId)
}
type broker struct {
// Events are pushed to this channel by the main events-gathering routine
publish messageChan
// New client connections
subscribing clientsChan
// Closed client connections
unsubscribing clientsChan
}
func GetBroker() Broker {
return singleton.GetInstance(func() *broker {
// Instantiate a broker
broker := &broker{
publish: make(messageChan, 2),
subscribing: make(clientsChan, 1),
unsubscribing: make(clientsChan, 1),
}
// Set it running - listening and broadcasting events
go broker.listen()
return broker
})
}
func (b *broker) SendMessage(ctx context.Context, evt Event) {
msg := b.prepareMessage(ctx, evt)
log.Trace("Broker received new event", "event", msg)
b.publish <- msg
}
func (b *broker) prepareMessage(ctx context.Context, event Event) message {
msg := message{}
msg.data = event.Data(event)
msg.event = event.Name(event)
msg.senderCtx = ctx
return msg
}
// writeEvent writes a message to the given io.Writer, formatted as a Server-Sent Event.
// If the writer is an http.Flusher, it flushes the data immediately instead of buffering it.
func writeEvent(ctx context.Context, w io.Writer, event message, timeout time.Duration) error {
if err := setWriteTimeout(w, timeout); err != nil {
log.Debug(ctx, "Error setting write timeout", err)
}
_, err := fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.id, event.event, event.data)
if err != nil {
return err
}
// If the writer is an http.Flusher, flush the data immediately.
if flusher, ok := w.(http.Flusher); ok && flusher != nil {
flusher.Flush()
}
return nil
}
func setWriteTimeout(rw io.Writer, timeout time.Duration) error {
for {
switch t := rw.(type) {
case interface{ SetWriteDeadline(time.Time) error }:
return t.SetWriteDeadline(time.Now().Add(timeout))
case interface{ Unwrap() http.ResponseWriter }:
rw = t.Unwrap()
default:
return fmt.Errorf("%T - %w", rw, http.ErrNotSupported)
}
}
}
func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
user, _ := request.UserFrom(ctx)
// Make sure that the writer supports flushing.
_, ok := w.(http.Flusher)
if !ok {
log.Error(r, "Streaming unsupported! Events cannot be sent to this client", "address", r.RemoteAddr,
"userAgent", r.UserAgent(), "user", user.UserName)
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Tells Nginx to not buffer this response. See https://stackoverflow.com/a/33414096
w.Header().Set("X-Accel-Buffering", "no")
// Each connection registers its own message channel with the Broker's connections registry
c := b.subscribe(r)
defer b.unsubscribe(c)
log.Debug(ctx, "New broker client", "client", c.String())
for {
event := c.diode.next()
if event == nil {
log.Trace(ctx, "Client closed the EventStream connection", "client", c.String())
return
}
log.Trace(ctx, "Sending event to client", "event", *event, "client", c.String())
err := writeEvent(ctx, w, *event, writeTimeOut)
if err != nil {
log.Debug(ctx, "Error sending event to client", "event", *event, "client", c.String(), err)
}
}
}
func (b *broker) subscribe(r *http.Request) client {
ctx := r.Context()
user, _ := request.UserFrom(ctx)
clientUniqueId, _ := request.ClientUniqueIdFrom(ctx)
c := client{
id: uuid.NewString(),
username: user.UserName,
address: r.RemoteAddr,
userAgent: r.UserAgent(),
clientUniqueId: clientUniqueId,
}
c.diode = newDiode(ctx, 1024, diodes.AlertFunc(func(missed int) {
log.Debug("Dropped SSE events", "client", c.String(), "missed", missed)
}))
// Signal the broker that we have a new client
b.subscribing <- c
return c
}
func (b *broker) unsubscribe(c client) {
b.unsubscribing <- c
}
func (b *broker) shouldSend(msg message, c client) bool {
clientUniqueId, originatedFromClient := request.ClientUniqueIdFrom(msg.senderCtx)
if !originatedFromClient {
return true
}
if c.clientUniqueId == clientUniqueId {
return false
}
if username, ok := request.UsernameFrom(msg.senderCtx); ok {
return username == c.username
}
return true
}
func (b *broker) listen() {
keepAlive := time.NewTicker(keepAliveFrequency)
defer keepAlive.Stop()
clients := map[client]struct{}{}
var eventId uint32
getNextEventId := func() uint32 {
eventId++
return eventId
}
for {
select {
case c := <-b.subscribing:
// A new client has connected.
// Register their message channel
clients[c] = struct{}{}
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
// Send a serverStart event to new client
msg := b.prepareMessage(context.Background(),
&ServerStart{StartTime: consts.ServerStart, Version: consts.Version})
c.diode.put(msg)
case c := <-b.unsubscribing:
// A client has detached, and we want to
// stop sending them messages.
delete(clients, c)
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
case msg := <-b.publish:
msg.id = getNextEventId()
log.Trace("Got new published event", "event", msg)
// We got a new event from the outside!
// Send event to all connected clients
for c := range clients {
if b.shouldSend(msg, c) {
log.Trace("Putting event on client's queue", "client", c.String(), "event", msg)
c.diode.put(msg)
}
}
case ts := <-keepAlive.C:
// Send a keep alive message every 15 seconds to all connected clients
if len(clients) == 0 {
continue
}
msg := b.prepareMessage(context.Background(), &KeepAlive{TS: ts.Unix()})
msg.id = getNextEventId()
for c := range clients {
log.Trace("Putting a keepalive event on client's queue", "client", c.String(), "event", msg)
c.diode.put(msg)
}
}
}
}