Skip to content

Commit

Permalink
Use channels for EventStream instead of diodes
Browse files Browse the repository at this point in the history
  • Loading branch information
deluan committed Apr 3, 2023
1 parent fea2de8 commit b22d036
Showing 1 changed file with 28 additions and 21 deletions.
49 changes: 28 additions & 21 deletions server/events/sse.go
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/navidrome/navidrome/consts"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model/request"
"github.com/navidrome/navidrome/utils/diodes"
"github.com/navidrome/navidrome/utils/pl"
"github.com/navidrome/navidrome/utils/singleton"
)

Expand All @@ -24,6 +24,7 @@ type Broker interface {
const (
keepAliveFrequency = 15 * time.Second
writeTimeOut = 5 * time.Second
bufferSize = 1
)

type (
Expand All @@ -41,7 +42,7 @@ type (
username string
userAgent string
clientUniqueId string
diode *diodes.Diode[message]
msgC chan message
}
)

Expand Down Expand Up @@ -80,7 +81,7 @@ func GetBroker() Broker {

func (b *broker) SendMessage(ctx context.Context, evt Event) {
msg := b.prepareMessage(ctx, evt)
log.Trace("Broker received new event", "event", msg)
log.Trace("Broker received new event", "type", msg.event, "data", msg.data)
b.publish <- msg
}

Expand Down Expand Up @@ -147,21 +148,18 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Each connection registers its own message channel with the Broker's connections registry
c := b.subscribe(r)
defer b.unsubscribe(c)
log.Debug(ctx, "New broker client", "client", c.String())
log.Debug(ctx, "Started new EventStream connection", "client", c.String())

for {
event := c.diode.Next()
if event == nil {
log.Trace(ctx, "Client closed the EventStream connection", "client", c.String())
return
}
log.Trace(ctx, "Sending event to client", "event", *event, "client", c.String())
err := writeEvent(ctx, w, *event, writeTimeOut)
for event := range pl.ReadOrDone(ctx, c.msgC) {
log.Trace(ctx, "Sending event to client", "event", event, "client", c.String())
err := writeEvent(ctx, w, event, writeTimeOut)
if err != nil {
log.Debug(ctx, "Error sending event to client. Closing connection", "event", *event, "client", c.String(), err)
log.Debug(ctx, "Error sending event to client. Closing connection", "event", event, "client", c.String(), err)
return
}
}
log.Trace(ctx, "Client EventStream connection closed", "client", c.String())
return

Check failure on line 162 in server/events/sse.go

View workflow job for this annotation

GitHub Actions / Lint Go code

S1023: redundant `return` statement (gosimple)
}

func (b *broker) subscribe(r *http.Request) client {
Expand All @@ -175,9 +173,7 @@ func (b *broker) subscribe(r *http.Request) client {
userAgent: r.UserAgent(),
clientUniqueId: clientUniqueId,
}
c.diode = diodes.New[message](ctx, 1024, diodes.AlertFunc(func(missed int) {
log.Debug("Dropped SSE events", "client", c.String(), "missed", missed)
}))
c.msgC = make(chan message, bufferSize)

// Signal the broker that we have a new client
b.subscribing <- c
Expand Down Expand Up @@ -220,18 +216,19 @@ func (b *broker) listen() {
// A new client has connected.
// Register their message channel
clients[c] = struct{}{}
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
log.Debug("Client added to EventStream broker", "numActiveClients", len(clients), "newClient", c.String())

// Send a serverStart event to new client
msg := b.prepareMessage(context.Background(),
&ServerStart{StartTime: consts.ServerStart, Version: consts.Version})
c.diode.Put(msg)
sendOrDrop(c, msg)

case c := <-b.unsubscribing:
// A client has detached, and we want to
// stop sending them messages.
close(c.msgC)
delete(clients, c)
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
log.Debug("Removed client from EventStream broker", "numActiveClients", len(clients), "client", c.String())

case msg := <-b.publish:
msg.id = getNextEventId()
Expand All @@ -241,7 +238,7 @@ func (b *broker) listen() {
for c := range clients {
if b.shouldSend(msg, c) {
log.Trace("Putting event on client's queue", "client", c.String(), "event", msg)
c.diode.Put(msg)
sendOrDrop(c, msg)
}
}

Expand All @@ -254,8 +251,18 @@ func (b *broker) listen() {
msg.id = getNextEventId()
for c := range clients {
log.Trace("Putting a keepalive event on client's queue", "client", c.String(), "event", msg)
c.diode.Put(msg)
sendOrDrop(c, msg)
}
}
}
}

func sendOrDrop(client client, msg message) {
select {
case client.msgC <- msg:
default:
if log.CurrentLevel() >= log.LevelTrace {
log.Trace("Event dropped because client's channel is full", "event", msg, "client", client.String())
}
}
}

0 comments on commit b22d036

Please sign in to comment.