From dee449b88d539963af3ca0ab6824242a0f18594f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Samin?= Date: Wed, 10 Oct 2018 13:41:13 +0200 Subject: [PATCH] feat(api): add SEE metrics (#3432) --- engine/api/api_routes.go | 1 + engine/api/events.go | 20 +++++++++++++------- engine/api/router.go | 22 ++++++++++++++++++++-- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/engine/api/api_routes.go b/engine/api/api_routes.go index 9a78a2c2b3..973384ba5e 100644 --- a/engine/api/api_routes.go +++ b/engine/api/api_routes.go @@ -17,6 +17,7 @@ func (api *API) InitRouter() { api.Router.PostMiddlewares = append(api.Router.PostMiddlewares, api.deletePermissionMiddleware, TracingPostMiddleware) api.eventsBroker = &eventsBroker{ + router: api.Router, cache: api.Cache, clients: make(map[string]eventsBrokerSubscribe), dbFunc: api.DBConnectionFactory.GetDBMap, diff --git a/engine/api/events.go b/engine/api/events.go index 75d888480e..747351f11e 100644 --- a/engine/api/events.go +++ b/engine/api/events.go @@ -14,6 +14,7 @@ import ( "github.com/ovh/cds/engine/api/cache" "github.com/ovh/cds/engine/api/group" + "github.com/ovh/cds/engine/api/observability" "github.com/ovh/cds/engine/api/permission" "github.com/ovh/cds/engine/service" "github.com/ovh/cds/sdk" @@ -37,14 +38,16 @@ type eventsBroker struct { dbFunc func() *gorp.DbMap cache cache.Store clientsLen int64 + router *Router } // AddClient add a client to the client map -func (b *eventsBroker) addClient(client eventsBrokerSubscribe) { +func (b *eventsBroker) addClient(ctx context.Context, client eventsBrokerSubscribe) { b.mutex.Lock() defer b.mutex.Unlock() b.clients[client.UUID] = client b.clientsLen++ + observability.Record(ctx, b.router.Stats.SSEClients, 1) } // CleanAll cleans all clients @@ -55,22 +58,24 @@ func (b *eventsBroker) cleanAll() { for c, v := range b.clients { close(v.Queue) delete(b.clients, c) + observability.Record(b.router.Background, b.router.Stats.SSEClients, -1) } } b.clientsLen = 0 } -func (b *eventsBroker) disconnectClient(uuid string) { +func (b *eventsBroker) disconnectClient(ctx context.Context, uuid string) { b.disconnectedMutex.Lock() defer b.disconnectedMutex.Unlock() b.disconnected[uuid] = true + observability.Record(ctx, b.router.Stats.SSEClients, -1) } //Init the eventsBroker func (b *eventsBroker) Init(c context.Context) { // Start cache Subscription subscribeFunc := func() { - cacheSubscribe(c, b.messages, b.cache) + b.cacheSubscribe(c, b.messages, b.cache) } sdk.GoRoutine("eventsBroker.Init.CacheSubscribe", subscribeFunc) @@ -80,7 +85,7 @@ func (b *eventsBroker) Init(c context.Context) { sdk.GoRoutine("eventsBroker.Init.Start", startFunc) } -func cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cache.Store) { +func (b *eventsBroker) cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cache.Store) { pubSub := store.Subscribe("events_pubsub") tick := time.NewTicker(50 * time.Millisecond) defer tick.Stop() @@ -107,6 +112,7 @@ func cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cach case "sdk.EventPipelineBuild", "sdk.EventJob": continue } + observability.Record(c, b.router.Stats.SSEEvents, 1) cacheMsgChan <- e } } @@ -145,7 +151,7 @@ func (b *eventsBroker) ServeHTTP() service.Handler { } // Add this client to the map of those that should receive updates - b.addClient(client) + b.addClient(ctx, client) // Set the headers related to event streaming. w.Header().Set("Content-Type", "text/event-stream") @@ -166,11 +172,11 @@ func (b *eventsBroker) ServeHTTP() service.Handler { select { case <-ctx.Done(): log.Info("events.Http: context done") - b.disconnectClient(client.UUID) + b.disconnectClient(ctx, client.UUID) break leave case <-r.Context().Done(): log.Info("events.Http: client disconnected") - b.disconnectClient(client.UUID) + b.disconnectClient(ctx, client.UUID) break leave case event := <-client.Queue: if ok := client.manageEvent(event); !ok { diff --git a/engine/api/router.go b/engine/api/router.go index e2c90d7450..7cb72bbb0c 100644 --- a/engine/api/router.go +++ b/engine/api/router.go @@ -47,8 +47,10 @@ type Router struct { nbPanic int lastPanic *time.Time Stats struct { - Errors *stats.Int64Measure - Hits *stats.Int64Measure + Errors *stats.Int64Measure + Hits *stats.Int64Measure + SSEClients *stats.Int64Measure + SSEEvents *stats.Int64Measure } } @@ -516,6 +518,10 @@ func (r *Router) InitStats(service, name string) error { r.Stats.Errors = stats.Int64(label, "number of errors", stats.UnitDimensionless) label = fmt.Sprintf("cds/%s/%s/router_hits", service, name) r.Stats.Hits = stats.Int64(label, "number of hits", stats.UnitDimensionless) + label = fmt.Sprintf("cds/%s/%s/sse_clients", service, name) + r.Stats.SSEClients = stats.Int64(label, "number of sse clients", stats.UnitDimensionless) + label = fmt.Sprintf("cds/%s/%s/sse_events", service, name) + r.Stats.SSEEvents = stats.Int64(label, "number of sse events", stats.UnitDimensionless) log.Info("api> Stats initialized") @@ -532,5 +538,17 @@ func (r *Router) InitStats(service, name string) error { Measure: r.Stats.Hits, Aggregation: view.Count(), }, + &view.View{ + Name: "sse_clients", + Description: r.Stats.SSEClients.Description(), + Measure: r.Stats.SSEClients, + Aggregation: view.Count(), + }, + &view.View{ + Name: "sse_events", + Description: r.Stats.SSEEvents.Description(), + Measure: r.Stats.SSEEvents, + Aggregation: view.Count(), + }, ) }