Skip to content

Commit

Permalink
feat(api): rework sse (#3452)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault committed Oct 12, 2018
1 parent d08f3d6 commit 8005c4a
Show file tree
Hide file tree
Showing 35 changed files with 159 additions and 4,429 deletions.
30 changes: 7 additions & 23 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
"context"
"net/http"
"sync"

"github.com/ovh/cds/engine/api/observability"
"github.com/ovh/cds/sdk"
Expand All @@ -19,10 +18,9 @@ func (api *API) InitRouter() {
api.eventsBroker = &eventsBroker{
router: api.Router,
cache: api.Cache,
clients: make(map[string]eventsBrokerSubscribe),
clients: make(map[string]*eventsBrokerSubscribe),
dbFunc: api.DBConnectionFactory.GetDBMap,
messages: make(chan sdk.Event),
mutex: &sync.Mutex{},
}
api.eventsBroker.Init(context.Background())

Expand Down
142 changes: 67 additions & 75 deletions engine/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"time"

"github.com/tevino/abool"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/cache"
Expand All @@ -23,55 +25,22 @@ import (

// eventsBrokerSubscribe is the information needed to subscribe
type eventsBrokerSubscribe struct {
UUID string
User *sdk.User
Queue chan sdk.Event
UUID string
User *sdk.User
Queue chan sdk.Event
Mutex *sync.Mutex
IsAlive *abool.AtomicBool
}

// lastUpdateBroker keeps connected client of the current route,
type eventsBroker struct {
clients map[string]eventsBrokerSubscribe
messages chan sdk.Event
mutex *sync.Mutex
dbFunc func() *gorp.DbMap
cache cache.Store
router *Router
}

// AddClient add a client to the client map
func (b *eventsBroker) addClient(ctx context.Context, client eventsBrokerSubscribe) {
b.mutex.Lock()
b.clients[client.UUID] = client
b.mutex.Unlock()
go observability.Record(ctx, b.router.Stats.SSEClients, 1)
}

// CleanAll cleans all clients
func (b *eventsBroker) cleanAll() {
b.mutex.Lock()
if b.clients != nil {
defer observability.Record(b.router.Background, b.router.Stats.SSEClients, -1*int64(len(b.clients)))
for c, v := range b.clients {
close(v.Queue)
delete(b.clients, c)
}
}
b.mutex.Unlock()
}

func (b *eventsBroker) disconnectClient(ctx context.Context, uuid string) {
b.mutex.Lock()
defer b.mutex.Unlock()

client, has := b.clients[uuid]
if !has {
return
}

close(client.Queue)
delete(b.clients, uuid)

go observability.Record(ctx, b.router.Stats.SSEClients, -1)
clients map[string]*eventsBrokerSubscribe
messages chan sdk.Event
dbFunc func() *gorp.DbMap
cache cache.Store
router *Router
chanAddClient chan (*eventsBrokerSubscribe)
chanRemoveClient chan (string)
}

//Init the eventsBroker
Expand Down Expand Up @@ -120,17 +89,52 @@ func (b *eventsBroker) cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk
}

// Start the broker
func (b *eventsBroker) Start(c context.Context) {
func (b *eventsBroker) Start(ctx context.Context) {
b.chanAddClient = make(chan (*eventsBrokerSubscribe))
b.chanRemoveClient = make(chan (string))

for {
select {
case <-c.Done():
b.cleanAll()
if c.Err() != nil {
log.Error("eventsBroker.Start> Exiting: %v", c.Err())
case <-ctx.Done():
if b.clients != nil {
go observability.Record(b.router.Background, b.router.Stats.SSEClients, -1*int64(len(b.clients)))
for c, v := range b.clients {
close(v.Queue)
delete(b.clients, c)
}

}
if ctx.Err() != nil {
log.Error("eventsBroker.Start> Exiting: %v", ctx.Err())
return
}
case receivedEvent := <-b.messages:
b.manageEvent(receivedEvent)
for i := range b.clients {
go func(c *eventsBrokerSubscribe) {
c.Mutex.Lock()
defer c.Mutex.Unlock()
if c.IsAlive.IsSet() {
log.Debug("send data to %s", c.UUID)
c.Queue <- receivedEvent
}
}(b.clients[i])
}
case client := <-b.chanAddClient:
b.clients[client.UUID] = client
go observability.Record(ctx, b.router.Stats.SSEClients, 1)
case uuid := <-b.chanRemoveClient:
client, has := b.clients[uuid]
if !has {
return
}
go func(c *eventsBrokerSubscribe) {
c.Mutex.Lock()
close(c.Queue)
c.IsAlive.UnSet()
c.Mutex.Unlock()
observability.Record(ctx, b.router.Stats.SSEClients, -1)
}(client)
delete(b.clients, uuid)
}
}
}
Expand All @@ -145,14 +149,16 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
}

uuid := sdk.UUID()
client := eventsBrokerSubscribe{
UUID: uuid,
User: getUser(ctx),
Queue: make(chan sdk.Event, 10), // chan buffered, to avoid goroutine Start() wait on push in queue
client := &eventsBrokerSubscribe{
UUID: uuid,
User: getUser(ctx),
Queue: make(chan sdk.Event, 10), // chan buffered, to avoid goroutine Start() wait on push in queue
Mutex: new(sync.Mutex),
IsAlive: abool.NewBool(true),
}

// Add this client to the map of those that should receive updates
b.addClient(ctx, client)
b.chanAddClient <- client

// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
Expand All @@ -173,11 +179,11 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
select {
case <-ctx.Done():
log.Info("events.Http: context done")
b.disconnectClient(ctx, client.UUID)
b.chanRemoveClient <- client.UUID
break leave
case <-r.Context().Done():
log.Info("events.Http: client disconnected")
b.disconnectClient(ctx, client.UUID)
b.chanRemoveClient <- client.UUID
break leave
case event := <-client.Queue:
if ok := client.manageEvent(event); !ok {
Expand All @@ -195,6 +201,9 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
buffer.Write(msg)
buffer.WriteString("\n\n")

if !client.IsAlive.IsSet() {
break leave
}
if _, err := w.Write(buffer.Bytes()); err != nil {
return sdk.WrapError(err, "events.write> Unable to write to client")
}
Expand All @@ -211,23 +220,6 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
}
}

func (b *eventsBroker) manageEvent(receivedEvent sdk.Event) {
// Create a slice of clients with a mutex
b.mutex.Lock()
clients := make([]eventsBrokerSubscribe, len(b.clients))
var i int
for _, client := range b.clients {
clients[i] = client
i++
}
b.mutex.Unlock()
// Then iterate over it outside the mutex
for _, c := range clients {
log.Debug("send data to %s", c.UUID)
c.Queue <- receivedEvent
}
}

func (s *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool {
var isSharedInfra bool
for _, g := range s.User.Groups {
Expand Down
Loading

0 comments on commit 8005c4a

Please sign in to comment.