Skip to content

Commit

Permalink
fix(api): sse mutexes (#3451)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and yesnault committed Oct 11, 2018
1 parent 4b4d1af commit 387f0ce
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 45 deletions.
14 changes: 6 additions & 8 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ func (api *API) InitRouter() {
api.Router.PostMiddlewares = append(api.Router.PostMiddlewares, api.deletePermissionMiddleware, TracingPostMiddleware)

api.eventsBroker = &eventsBroker{
router: api.Router,
cache: api.Cache,
clients: make(map[string]eventsBrokerSubscribe),
dbFunc: api.DBConnectionFactory.GetDBMap,
messages: make(chan sdk.Event),
mutex: &sync.Mutex{},
disconnectedMutex: &sync.Mutex{},
disconnected: make(map[string]bool),
router: api.Router,
cache: api.Cache,
clients: make(map[string]eventsBrokerSubscribe),
dbFunc: api.DBConnectionFactory.GetDBMap,
messages: make(chan sdk.Event),
mutex: &sync.Mutex{},
}
api.eventsBroker.Init(context.Background())

Expand Down
69 changes: 33 additions & 36 deletions engine/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,48 @@ type eventsBrokerSubscribe struct {

// lastUpdateBroker keeps connected client of the current route,
type eventsBroker struct {
clients map[string]eventsBrokerSubscribe
messages chan sdk.Event
mutex *sync.Mutex
disconnected map[string]bool
disconnectedMutex *sync.Mutex
dbFunc func() *gorp.DbMap
cache cache.Store
clientsLen int64
router *Router
clients map[string]eventsBrokerSubscribe
messages chan sdk.Event
mutex *sync.Mutex
dbFunc func() *gorp.DbMap
cache cache.Store
router *Router
}

// AddClient add a client to the client map
func (b *eventsBroker) addClient(ctx context.Context, client eventsBrokerSubscribe) {
b.mutex.Lock()
defer b.mutex.Unlock()
b.clients[client.UUID] = client
b.clientsLen++
observability.Record(ctx, b.router.Stats.SSEClients, 1)
b.mutex.Unlock()
go observability.Record(ctx, b.router.Stats.SSEClients, 1)
}

// CleanAll cleans all clients
func (b *eventsBroker) cleanAll() {
b.mutex.Lock()
defer b.mutex.Unlock()
if b.clients != nil {
defer observability.Record(b.router.Background, b.router.Stats.SSEClients, -1*int64(len(b.clients)))
for c, v := range b.clients {
close(v.Queue)
delete(b.clients, c)
observability.Record(b.router.Background, b.router.Stats.SSEClients, -1)
}
}
b.clientsLen = 0
b.mutex.Unlock()
}

func (b *eventsBroker) disconnectClient(ctx context.Context, uuid string) {
b.disconnectedMutex.Lock()
defer b.disconnectedMutex.Unlock()
b.disconnected[uuid] = true
observability.Record(ctx, b.router.Stats.SSEClients, -1)
b.mutex.Lock()
defer b.mutex.Unlock()

client, has := b.clients[uuid]
if !has {
return
}

close(client.Queue)
delete(b.clients, uuid)

go observability.Record(ctx, b.router.Stats.SSEClients, -1)
}

//Init the eventsBroker
Expand Down Expand Up @@ -209,26 +212,20 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
}

func (b *eventsBroker) manageEvent(receivedEvent sdk.Event) {
// Create a slice of clients with a mutex
b.mutex.Lock()
defer b.mutex.Unlock()
for _, i := range b.clients {
if b.canSend(i) {
i.Queue <- receivedEvent
}
clients := make([]eventsBrokerSubscribe, len(b.clients))
var i int
for _, client := range b.clients {
clients[i] = client
i++
}
}

// canSend Test if client is connected. If not, close channel and remove client from map
func (b *eventsBroker) canSend(client eventsBrokerSubscribe) bool {
b.disconnectedMutex.Lock()
defer b.disconnectedMutex.Unlock()
if _, ok := b.disconnected[client.UUID]; !ok {
return true
b.mutex.Unlock()
// Then iterate over it outside the mutex
for _, c := range clients {
log.Debug("send data to %s", c.UUID)
c.Queue <- receivedEvent
}
close(client.Queue)
delete(b.clients, client.UUID)
b.clientsLen--
return false
}

func (s *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool {
Expand Down
1 change: 0 additions & 1 deletion engine/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (api *API) Status() sdk.MonitoringStatus {
m.Lines = append(m.Lines, getStatusLine(objectstore.Status()))
m.Lines = append(m.Lines, getStatusLine(mail.Status()))
m.Lines = append(m.Lines, getStatusLine(api.DBConnectionFactory.Status()))
m.Lines = append(m.Lines, getStatusLine(sdk.MonitoringStatusLine{Component: "SSE Connected", Value: fmt.Sprintf("%d", api.eventsBroker.clientsLen), Status: sdk.MonitoringStatusOK}))
m.Lines = append(m.Lines, getStatusLine(worker.Status(api.mustDB())))

return m
Expand Down

0 comments on commit 387f0ce

Please sign in to comment.