Skip to content

Commit

Permalink
feat(api): add SEE metrics (#3432)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and richardlt committed Oct 10, 2018
1 parent 4d80334 commit dee449b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 9 deletions.
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func (api *API) InitRouter() {
api.Router.PostMiddlewares = append(api.Router.PostMiddlewares, api.deletePermissionMiddleware, TracingPostMiddleware)

api.eventsBroker = &eventsBroker{
router: api.Router,
cache: api.Cache,
clients: make(map[string]eventsBrokerSubscribe),
dbFunc: api.DBConnectionFactory.GetDBMap,
Expand Down
20 changes: 13 additions & 7 deletions engine/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/group"
"github.com/ovh/cds/engine/api/observability"
"github.com/ovh/cds/engine/api/permission"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/sdk"
Expand All @@ -37,14 +38,16 @@ type eventsBroker struct {
dbFunc func() *gorp.DbMap
cache cache.Store
clientsLen int64
router *Router
}

// AddClient add a client to the client map
func (b *eventsBroker) addClient(client eventsBrokerSubscribe) {
func (b *eventsBroker) addClient(ctx context.Context, client eventsBrokerSubscribe) {
b.mutex.Lock()
defer b.mutex.Unlock()
b.clients[client.UUID] = client
b.clientsLen++
observability.Record(ctx, b.router.Stats.SSEClients, 1)
}

// CleanAll cleans all clients
Expand All @@ -55,22 +58,24 @@ func (b *eventsBroker) cleanAll() {
for c, v := range b.clients {
close(v.Queue)
delete(b.clients, c)
observability.Record(b.router.Background, b.router.Stats.SSEClients, -1)
}
}
b.clientsLen = 0
}

func (b *eventsBroker) disconnectClient(uuid string) {
func (b *eventsBroker) disconnectClient(ctx context.Context, uuid string) {
b.disconnectedMutex.Lock()
defer b.disconnectedMutex.Unlock()
b.disconnected[uuid] = true
observability.Record(ctx, b.router.Stats.SSEClients, -1)
}

//Init the eventsBroker
func (b *eventsBroker) Init(c context.Context) {
// Start cache Subscription
subscribeFunc := func() {
cacheSubscribe(c, b.messages, b.cache)
b.cacheSubscribe(c, b.messages, b.cache)
}
sdk.GoRoutine("eventsBroker.Init.CacheSubscribe", subscribeFunc)

Expand All @@ -80,7 +85,7 @@ func (b *eventsBroker) Init(c context.Context) {
sdk.GoRoutine("eventsBroker.Init.Start", startFunc)
}

func cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cache.Store) {
func (b *eventsBroker) cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cache.Store) {
pubSub := store.Subscribe("events_pubsub")
tick := time.NewTicker(50 * time.Millisecond)
defer tick.Stop()
Expand All @@ -107,6 +112,7 @@ func cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cach
case "sdk.EventPipelineBuild", "sdk.EventJob":
continue
}
observability.Record(c, b.router.Stats.SSEEvents, 1)
cacheMsgChan <- e
}
}
Expand Down Expand Up @@ -145,7 +151,7 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
}

// Add this client to the map of those that should receive updates
b.addClient(client)
b.addClient(ctx, client)

// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
Expand All @@ -166,11 +172,11 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
select {
case <-ctx.Done():
log.Info("events.Http: context done")
b.disconnectClient(client.UUID)
b.disconnectClient(ctx, client.UUID)
break leave
case <-r.Context().Done():
log.Info("events.Http: client disconnected")
b.disconnectClient(client.UUID)
b.disconnectClient(ctx, client.UUID)
break leave
case event := <-client.Queue:
if ok := client.manageEvent(event); !ok {
Expand Down
22 changes: 20 additions & 2 deletions engine/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ type Router struct {
nbPanic int
lastPanic *time.Time
Stats struct {
Errors *stats.Int64Measure
Hits *stats.Int64Measure
Errors *stats.Int64Measure
Hits *stats.Int64Measure
SSEClients *stats.Int64Measure
SSEEvents *stats.Int64Measure
}
}

Expand Down Expand Up @@ -516,6 +518,10 @@ func (r *Router) InitStats(service, name string) error {
r.Stats.Errors = stats.Int64(label, "number of errors", stats.UnitDimensionless)
label = fmt.Sprintf("cds/%s/%s/router_hits", service, name)
r.Stats.Hits = stats.Int64(label, "number of hits", stats.UnitDimensionless)
label = fmt.Sprintf("cds/%s/%s/sse_clients", service, name)
r.Stats.SSEClients = stats.Int64(label, "number of sse clients", stats.UnitDimensionless)
label = fmt.Sprintf("cds/%s/%s/sse_events", service, name)
r.Stats.SSEEvents = stats.Int64(label, "number of sse events", stats.UnitDimensionless)

log.Info("api> Stats initialized")

Expand All @@ -532,5 +538,17 @@ func (r *Router) InitStats(service, name string) error {
Measure: r.Stats.Hits,
Aggregation: view.Count(),
},
&view.View{
Name: "sse_clients",
Description: r.Stats.SSEClients.Description(),
Measure: r.Stats.SSEClients,
Aggregation: view.Count(),
},
&view.View{
Name: "sse_events",
Description: r.Stats.SSEEvents.Description(),
Measure: r.Stats.SSEEvents,
Aggregation: view.Count(),
},
)
}

0 comments on commit dee449b

Please sign in to comment.