Skip to content

Commit

Permalink
feat(sdk): run goroutine with pprof labels (#3440)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and yesnault committed Oct 11, 2018
1 parent b8020d7 commit 64909bd
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 187 deletions.
65 changes: 46 additions & 19 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,21 +635,48 @@ func (a *API) Serve(ctx context.Context) error {
event.Subscribe(a.warnChan)

log.Info("Initializing internal routines...")
sdk.GoRoutine("workflow.ComputeAudit", func() { workflow.ComputeAudit(ctx, a.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine("warning.Start", func() { warning.Start(ctx, a.DBConnectionFactory.GetDBMap, a.warnChan) })
sdk.GoRoutine("queue.Pipelines", func() { queue.Pipelines(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine("pipeline.AWOLPipelineKiller", func() { pipeline.AWOLPipelineKiller(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
sdk.GoRoutine("auditCleanerRoutine(ctx", func() { auditCleanerRoutine(ctx, a.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine("metrics.Initialize", func() { metrics.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Config.Name) })
sdk.GoRoutine("repositoriesmanager.ReceiveEvents", func() { repositoriesmanager.ReceiveEvents(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
sdk.GoRoutine("action.RequirementsCacheLoader", func() { action.RequirementsCacheLoader(ctx, 5*time.Second, a.DBConnectionFactory.GetDBMap, a.Cache) })
sdk.GoRoutine("hookRecoverer(ctx", func() { hookRecoverer(ctx, a.DBConnectionFactory.GetDBMap, a.Cache) })
sdk.GoRoutine("services.KillDeadServices", func() { services.KillDeadServices(ctx, a.mustDB) })
sdk.GoRoutine("migrate.CleanOldWorkflow", func() { migrate.CleanOldWorkflow(ctx, a.Cache, a.DBConnectionFactory.GetDBMap, a.Config.URL.API) })
sdk.GoRoutine("migrate.KeyMigration", func() { migrate.KeyMigration(a.Cache, a.DBConnectionFactory.GetDBMap, &sdk.User{Admin: true}) })
sdk.GoRoutine("broadcast.Initialize", func() { broadcast.Initialize(ctx, a.DBConnectionFactory.GetDBMap) })
//sdk.GoRoutine("workflow.RestartAwolJobs", func() { workflow.RestartAwolJobs(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine("a.serviceAPIHeartbeat(ctx", func() { a.serviceAPIHeartbeat(ctx) })
sdk.GoRoutine(ctx, "workflow.ComputeAudit", func(ctx context.Context) {
workflow.ComputeAudit(ctx, a.DBConnectionFactory.GetDBMap)
})
sdk.GoRoutine(ctx, "warning.Start", func(ctx context.Context) {
warning.Start(ctx, a.DBConnectionFactory.GetDBMap, a.warnChan)
})
sdk.GoRoutine(ctx, "queue.Pipelines", func(ctx context.Context) {
queue.Pipelines(ctx, a.Cache, a.DBConnectionFactory.GetDBMap)
})
sdk.GoRoutine(ctx, "pipeline.AWOLPipelineKiller", func(ctx context.Context) {
pipeline.AWOLPipelineKiller(ctx, a.DBConnectionFactory.GetDBMap, a.Cache)
})
sdk.GoRoutine(ctx, "auditCleanerRoutine(ctx", func(ctx context.Context) {
auditCleanerRoutine(ctx, a.DBConnectionFactory.GetDBMap)
})
sdk.GoRoutine(ctx, "metrics.Initialize", func(ctx context.Context) {
metrics.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Config.Name)
})
sdk.GoRoutine(ctx, "repositoriesmanager.ReceiveEvents", func(ctx context.Context) {
repositoriesmanager.ReceiveEvents(ctx, a.DBConnectionFactory.GetDBMap, a.Cache)
})
sdk.GoRoutine(ctx, "action.RequirementsCacheLoader", func(ctx context.Context) {
action.RequirementsCacheLoader(ctx, 5*time.Second, a.DBConnectionFactory.GetDBMap, a.Cache)
})
sdk.GoRoutine(ctx, "hookRecoverer(ctx", func(ctx context.Context) {
hookRecoverer(ctx, a.DBConnectionFactory.GetDBMap, a.Cache)
})
sdk.GoRoutine(ctx, "services.KillDeadServices", func(ctx context.Context) {
services.KillDeadServices(ctx, a.mustDB)
})
sdk.GoRoutine(ctx, "migrate.CleanOldWorkflow", func(ctx context.Context) {
migrate.CleanOldWorkflow(ctx, a.Cache, a.DBConnectionFactory.GetDBMap, a.Config.URL.API)
})
sdk.GoRoutine(ctx, "migrate.KeyMigration", func(ctx context.Context) {
migrate.KeyMigration(a.Cache, a.DBConnectionFactory.GetDBMap, &sdk.User{Admin: true})
})
sdk.GoRoutine(ctx, "broadcast.Initialize", func(ctx context.Context) {
broadcast.Initialize(ctx, a.DBConnectionFactory.GetDBMap)
})
sdk.GoRoutine(ctx, "api.serviceAPIHeartbeat", func(ctx context.Context) {
a.serviceAPIHeartbeat(ctx)
})

//Temporary migration code
go migrate.WorkflowNodeRunArtifacts(a.Cache, a.DBConnectionFactory.GetDBMap)
Expand Down Expand Up @@ -683,7 +710,7 @@ func (a *API) Serve(ctx context.Context) error {
if err := services.InitExternal(a.mustDB, a.Cache, externalServices); err != nil {
return fmt.Errorf("unable to init external service: %v", err)
}
sdk.GoRoutine("pings-external-services", func() { services.Pings(ctx, a.mustDB, externalServices) })
sdk.GoRoutine(ctx, "pings-external-services", func(ctx context.Context) { services.Pings(ctx, a.mustDB, externalServices) })

// TODO: to delete after migration
if os.Getenv("CDS_MIGRATE_GIT_CLONE") == "true" {
Expand All @@ -700,12 +727,12 @@ func (a *API) Serve(ctx context.Context) error {
log.Warning("⚠ Cron Scheduler is disabled")
}

sdk.GoRoutine("workflow.Initialize", func() {
sdk.GoRoutine(ctx, "workflow.Initialize", func(ctx context.Context) {
workflow.Initialize(ctx, a.DBConnectionFactory.GetDBMap, a.Config.URL.UI, a.Config.DefaultOS, a.Config.DefaultArch)
})
sdk.GoRoutine("PushInElasticSearch", func() { event.PushInElasticSearch(ctx, a.mustDB(), a.Cache) })
sdk.GoRoutine(ctx, "PushInElasticSearch", func(ctx context.Context) { event.PushInElasticSearch(ctx, a.mustDB(), a.Cache) })
metrics.Init(ctx, a.DBConnectionFactory.GetDBMap)
sdk.GoRoutine("Purge", func() { purge.Initialize(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine(ctx, "Purge", func(ctx context.Context) { purge.Initialize(ctx, a.Cache, a.DBConnectionFactory.GetDBMap) })

s := &http.Server{
Addr: fmt.Sprintf("%s:%d", a.Config.HTTP.Addr, a.Config.HTTP.Port),
Expand Down
4 changes: 2 additions & 2 deletions engine/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *gorp.DbM
api.warnChan = make(chan sdk.Event)
event.Subscribe(api.warnChan)

sdk.GoRoutine("workflow.ComputeAudit", func() { workflow.ComputeAudit(context.Background(), api.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine("warning.Start", func() { warning.Start(context.Background(), api.DBConnectionFactory.GetDBMap, api.warnChan) })
sdk.GoRoutine(context.TODO(), "workflow.ComputeAudit", func(ctx context.Context) { workflow.ComputeAudit(ctx, api.DBConnectionFactory.GetDBMap) })
sdk.GoRoutine(context.TODO(), "warning.Start", func(ctx context.Context) { warning.Start(ctx, api.DBConnectionFactory.GetDBMap, api.warnChan) })

return api, db, router
}
Expand Down
16 changes: 7 additions & 9 deletions engine/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,15 @@ func (b *eventsBroker) disconnectClient(ctx context.Context, uuid string) {
}

//Init the eventsBroker
func (b *eventsBroker) Init(c context.Context) {
func (b *eventsBroker) Init(ctx context.Context) {
// Start cache Subscription
subscribeFunc := func() {
b.cacheSubscribe(c, b.messages, b.cache)
}
sdk.GoRoutine("eventsBroker.Init.CacheSubscribe", subscribeFunc)
sdk.GoRoutine(ctx, "eventsBroker.Init.CacheSubscribe", func(ctx context.Context) {
b.cacheSubscribe(ctx, b.messages, b.cache)
})

startFunc := func() {
b.Start(c)
}
sdk.GoRoutine("eventsBroker.Init.Start", startFunc)
sdk.GoRoutine(ctx, "eventsBroker.Init.Start", func(ctx context.Context) {
b.Start(ctx)
})
}

func (b *eventsBroker) cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cache.Store) {
Expand Down
5 changes: 3 additions & 2 deletions engine/api/metrics/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (

var metricsChan chan sdk.Metric

func Init(c context.Context, DBFunc func() *gorp.DbMap) {
// Init the metrics package which push to elasticSearch service
func Init(ctx context.Context, DBFunc func() *gorp.DbMap) {
metricsChan = make(chan sdk.Metric, 50)
sdk.GoRoutine("metrics.PushInElasticSearch", func() { pushInElasticSearch(c, DBFunc) })
sdk.GoRoutine(ctx, "metrics.PushInElasticSearch", func(c context.Context) { pushInElasticSearch(c, DBFunc) })
}

func pushInElasticSearch(c context.Context, DBFunc func() *gorp.DbMap) {
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ func (api *API) deleteWorkflowHandler() service.Handler {

event.PublishWorkflowDelete(key, *oldW, getUser(ctx))

sdk.GoRoutine("deleteWorkflowHandler",
func() {
sdk.GoRoutine(ctx, "deleteWorkflowHandler",
func(ctx context.Context) {
txg, errT := api.mustDB().Begin()
if errT != nil {
log.Error("deleteWorkflowHandler> Cannot start transaction: %v", errT)
Expand Down
89 changes: 0 additions & 89 deletions engine/api/workflow/awol.go

This file was deleted.

12 changes: 5 additions & 7 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,13 +721,11 @@ func (api *API) postWorkflowRunHandler() service.Handler {
go workflow.SendEvent(api.mustDB(), p.Key, report)

// Purge workflow run
sdk.GoRoutine(
"workflow.PurgeWorkflowRun",
func() {
if err := workflow.PurgeWorkflowRun(api.mustDB(), *wf); err != nil {
log.Error("workflow.PurgeWorkflowRun> error %v", err)
}
})
sdk.GoRoutine(ctx, "workflow.PurgeWorkflowRun", func(ctx context.Context) {
if err := workflow.PurgeWorkflowRun(api.mustDB(), *wf); err != nil {
log.Error("workflow.PurgeWorkflowRun> error %v", err)
}
})

var wr *sdk.WorkflowRun
if len(report.WorkflowRuns()) > 0 {
Expand Down
8 changes: 4 additions & 4 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func New() *HatcheryKubernetes {

// Init register local hatchery with its worker model
func (h *HatcheryKubernetes) Init() error {
sdk.GoRoutine("hatchery kubernetes routines", func() {
h.routines(context.Background())
sdk.GoRoutine(context.Background(), "hatchery kubernetes routines", func(ctx context.Context) {
h.routines(ctx)
})
return nil
}
Expand Down Expand Up @@ -471,13 +471,13 @@ func (h *HatcheryKubernetes) routines(ctx context.Context) {
for {
select {
case <-ticker.C:
sdk.GoRoutine("getServicesLogs", func() {
sdk.GoRoutine(ctx, "getServicesLogs", func(ctx context.Context) {
if err := h.getServicesLogs(); err != nil {
log.Error("Hatchery> Kubernetes> Cannot get service logs : %v", err)
}
})

sdk.GoRoutine("killAwolWorker", func() {
sdk.GoRoutine(ctx, "killAwolWorker", func(ctx context.Context) {
_ = h.killAwolWorkers()
})
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions engine/hatchery/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (h *HatcheryLocal) WorkersStartedByModel(model *sdk.Model) int {
// Init register local hatchery with its worker model
func (h *HatcheryLocal) Init() error {
h.workers = make(map[string]workerCmd)
sdk.GoRoutine("startKillAwolWorkerRoutine", h.startKillAwolWorkerRoutine)
sdk.GoRoutine(context.Background(), "startKillAwolWorkerRoutine", h.startKillAwolWorkerRoutine)
return nil
}

Expand All @@ -375,7 +375,7 @@ func (h *HatcheryLocal) localWorkerIndexCleanup() {
}
}

func (h *HatcheryLocal) startKillAwolWorkerRoutine() {
func (h *HatcheryLocal) startKillAwolWorkerRoutine(ctx context.Context) {
t := time.NewTicker(5 * time.Second)
for range t.C {
if err := h.killAwolWorkers(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (h *HatcherySwarm) Init() error {
}
}

sdk.GoRoutine("swarm", func() { h.routines(context.Background()) })
sdk.GoRoutine(context.Background(), "swarm", func(ctx context.Context) { h.routines(ctx) })

return nil
}
Expand Down Expand Up @@ -613,13 +613,13 @@ func (h *HatcherySwarm) routines(ctx context.Context) {
for {
select {
case <-ticker.C:
sdk.GoRoutine("getServicesLogs", func() {
sdk.GoRoutine(ctx, "getServicesLogs", func(ctx context.Context) {
if err := h.getServicesLogs(); err != nil {
log.Error("Hatchery> swarm> Cannot get service logs : %v", err)
}
})

sdk.GoRoutine("killAwolWorker", func() {
sdk.GoRoutine(ctx, "killAwolWorker", func(ctx context.Context) {
_ = h.killAwolWorker()
})
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion engine/worker/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (w *currentWorker) runGRPCPlugin(ctx context.Context, a *sdk.Action, buildI
}()

chanRes := make(chan sdk.Result, 1)
sdk.GoRoutine("runGRPCPlugin", func() {
sdk.GoRoutine(ctx, "runGRPCPlugin", func(ctx context.Context) {
params := *params
//For the moment we consider that plugin name = action name
pluginName := a.Name
Expand Down
2 changes: 1 addition & 1 deletion sdk/cdsclient/client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJ

// This goroutine call the SSE route
chanSSEvt := make(chan SSEvent)
sdk.GoRoutine("RequestSSEGet", func() {
sdk.GoRoutine(ctx, "RequestSSEGet", func(ctx context.Context) {
for ctx.Err() == nil {
if err := c.RequestSSEGet(ctx, "/events", chanSSEvt); err != nil {
log.Println(err)
Expand Down
15 changes: 11 additions & 4 deletions sdk/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sdk
import (
"bufio"
"bytes"
"context"
"crypto/md5"
"crypto/sha512"
"encoding/hex"
Expand All @@ -13,6 +14,7 @@ import (
"reflect"
"regexp"
"runtime"
"runtime/pprof"

"github.com/go-gorp/gorp"

Expand Down Expand Up @@ -108,9 +110,13 @@ func FileSHA512sum(filePath string) (string, error) {
}

// GoRoutine runs the function within a goroutine with a panic recovery
func GoRoutine(name string, fn func(), writerFactories ...func(s string) (io.WriteCloser, error)) {
func GoRoutine(c context.Context, name string, fn func(ctx context.Context), writerFactories ...func(s string) (io.WriteCloser, error)) {
hostname, _ := os.Hostname()
go func() {
go func(ctx context.Context) {
labels := pprof.Labels("goroutine-name", name, "goroutine-hostname", hostname)
goroutineCtx := pprof.WithLabels(ctx, labels)
pprof.SetGoroutineLabels(goroutineCtx)

defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1<<16)
Expand All @@ -134,6 +140,7 @@ func GoRoutine(name string, fn func(), writerFactories ...func(s string) (io.Wri
}
}
}()
fn()
}()

fn(goroutineCtx)
}(c)
}
Loading

0 comments on commit 64909bd

Please sign in to comment.