Skip to content

Commit

Permalink
Don't rely on goroutines to send keepalive events
Browse files Browse the repository at this point in the history
  • Loading branch information
deluan committed Jul 1, 2021
1 parent 452c8dc commit ed286c7
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions server/events/sse.go
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"io"
"net/http"
"sync/atomic"
"time"

"code.cloudfoundry.org/go-diodes"
Expand All @@ -28,11 +27,6 @@ const (
writeTimeOut = 5 * time.Second
)

var (
eventId uint32
errWriteTimeOut = errors.New("write timeout")
)

type (
message struct {
id uint32
Expand Down Expand Up @@ -74,7 +68,7 @@ func GetBroker() Broker {
instance := singleton.Get(&broker{}, func() interface{} {
// Instantiate a broker
broker := &broker{
publish: make(messageChan, 100),
publish: make(messageChan, 2),
subscribing: make(clientsChan, 1),
unsubscribing: make(clientsChan, 1),
}
Expand All @@ -101,6 +95,8 @@ func (b *broker) prepareMessage(ctx context.Context, event Event) message {
return msg
}

var errWriteTimeOut = errors.New("write timeout")

// writeEvent Write to the ResponseWriter, Server Sent Events compatible
func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) {
flusher, _ := w.(http.Flusher)
Expand Down Expand Up @@ -170,7 +166,7 @@ func (b *broker) subscribe(r *http.Request) client {
clientUniqueId: clientUniqueId,
}
c.diode = newDiode(ctx, 1024, diodes.AlertFunc(func(missed int) {
log.Trace("Dropped SSE events", "client", c.String(), "missed", missed)
log.Debug("Dropped SSE events", "client", c.String(), "missed", missed)
}))

// Signal the broker that we have a new client
Expand Down Expand Up @@ -201,6 +197,12 @@ func (b *broker) listen() {
defer keepAlive.Stop()

clients := map[client]struct{}{}
var eventId uint32

getNextEventId := func() uint32 {
eventId++
return eventId
}

for {
select {
Expand All @@ -222,7 +224,7 @@ func (b *broker) listen() {
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())

case msg := <-b.publish:
msg.id = atomic.AddUint32(&eventId, 1)
msg.id = getNextEventId()
log.Trace("Got new published event", "event", msg)
// We got a new event from the outside!
// Send event to all connected clients
Expand All @@ -234,8 +236,16 @@ func (b *broker) listen() {
}

case ts := <-keepAlive.C:
// Send a keep alive message every 15 seconds
go b.SendMessage(context.Background(), &KeepAlive{TS: ts.Unix()})
// Send a keep alive message every 15 seconds to all connected clients
if len(clients) == 0 {
continue
}
msg := b.prepareMessage(context.Background(), &KeepAlive{TS: ts.Unix()})
msg.id = getNextEventId()
for c := range clients {
log.Trace("Putting a keepalive event on client's queue", "client", c.String(), "event", msg)
c.diode.put(msg)
}
}
}
}

0 comments on commit ed286c7

Please sign in to comment.