From dea32935c56c2983825e424922f018274507b995 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Samin?= Date: Mon, 5 Jun 2017 18:31:37 +0200 Subject: [PATCH] fix (api): dequeue with context --- engine/api/action/requirement_cache.go | 2 +- engine/api/cache/redis.go | 2 +- engine/api/event/event.go | 5 +-- engine/api/hook.go | 7 +++- engine/api/main.go | 12 +++---- engine/api/repositoriesmanager/events.go | 14 +++++--- engine/api/stats/worker.go | 46 ++++++++++++------------ engine/api/worker/heartbeat.go | 2 +- 8 files changed, 52 insertions(+), 38 deletions(-) diff --git a/engine/api/action/requirement_cache.go b/engine/api/action/requirement_cache.go index a2a42d9c37..f0ba46adb7 100644 --- a/engine/api/action/requirement_cache.go +++ b/engine/api/action/requirement_cache.go @@ -20,7 +20,7 @@ func RequirementsCacheLoader(c context.Context, delay time.Duration, DBFunc func select { case <-c.Done(): if c.Err() != nil { - log.Error("RequirementsCacheLoader> Exiting RequirementsCacheLoader: %v", c.Err()) + log.Error("Exiting RequirementsCacheLoader: %v", c.Err()) return } case <-tick: diff --git a/engine/api/cache/redis.go b/engine/api/cache/redis.go index 2d99c3eb20..212d70ffa0 100644 --- a/engine/api/cache/redis.go +++ b/engine/api/cache/redis.go @@ -174,7 +174,7 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val elemChan := make(chan string) var once sync.Once go func() { - ticker := time.NewTicker(50 * time.Millisecond).C + ticker := time.NewTicker(100 * time.Millisecond).C for { select { case <-ticker: diff --git a/engine/api/event/event.go b/engine/api/event/event.go index 580e88addf..688bb625c1 100644 --- a/engine/api/event/event.go +++ b/engine/api/event/event.go @@ -59,8 +59,9 @@ func DequeueEvent(c context.Context) { for { e := sdk.Event{} cache.DequeueWithContext(c, "events", &e) - if c.Err() != nil { - log.Error("event.DequeueEvent error : %v", e) + err := c.Err() + if err != nil { + log.Error("Exiting event.DequeueEvent : %v", err) return } diff --git a/engine/api/hook.go b/engine/api/hook.go index a3c1db588a..6b15c1aef9 100644 --- a/engine/api/hook.go +++ b/engine/api/hook.go @@ -215,7 +215,12 @@ func hookRecoverer(c ctx.Context, DBFunc func() *gorp.DbMap) { } case <-tick: h := hook.ReceivedHook{} - cache.Dequeue("hook:recovery", &h) + cache.DequeueWithContext(c, "hook:recovery", &h) + err := c.Err() + if err != nil { + log.Error("Exiting hookRecoverer: %v", err) + return + } if h.Repository != "" { if err := processHook(DBFunc(), h); err != nil { hook.Recovery(h, err) diff --git a/engine/api/main.go b/engine/api/main.go index 2d6c4b103e..4f79ed8b2b 100644 --- a/engine/api/main.go +++ b/engine/api/main.go @@ -70,9 +70,7 @@ var mainCmd = &cobra.Command{ // Gracefully shutdown sql connections c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - signal.Notify(c, syscall.SIGTERM) - signal.Notify(c, syscall.SIGKILL) + signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL) defer func() { signal.Stop(c) cancel() @@ -82,8 +80,10 @@ var mainCmd = &cobra.Command{ case <-c: log.Warning("Cleanup SQL connections") database.Close() - event.Publish(sdk.EventEngine{Message: "shutdown"}) + cancel() + //event.Publish(sdk.EventEngine{Message: "shutdown"}) event.Close() + case <-ctx.Done(): } }() @@ -262,9 +262,9 @@ var mainCmd = &cobra.Command{ go hatchery.Heartbeat(ctx, database.GetDBMap) go auditCleanerRoutine(ctx, database.GetDBMap) - go repositoriesmanager.ReceiveEvents() + go repositoriesmanager.ReceiveEvents(ctx, database.GetDBMap) - go stats.StartRoutine() + go stats.StartRoutine(ctx, database.GetDBMap) go action.RequirementsCacheLoader(ctx, 5*time.Second, database.GetDBMap) go hookRecoverer(ctx, database.GetDBMap) diff --git a/engine/api/repositoriesmanager/events.go b/engine/api/repositoriesmanager/events.go index 623fb4b67a..b97b5af03c 100644 --- a/engine/api/repositoriesmanager/events.go +++ b/engine/api/repositoriesmanager/events.go @@ -1,23 +1,29 @@ package repositoriesmanager import ( + "context" "fmt" "github.com/go-gorp/gorp" "github.com/mitchellh/mapstructure" "github.com/ovh/cds/engine/api/cache" - "github.com/ovh/cds/engine/api/database" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" ) //ReceiveEvents has to be launched as a goroutine. -func ReceiveEvents() { +func ReceiveEvents(c context.Context, DBFunc func() *gorp.DbMap) { for { e := sdk.Event{} - cache.Dequeue("events_repositoriesmanager", &e) - db := database.DBMap(database.DB()) + cache.DequeueWithContext(c, "events_repositoriesmanager", &e) + err := c.Err() + if err != nil { + log.Error("Exiting repositoriesmanager.ReceiveEvents: %v", err) + return + } + + db := DBFunc() if db != nil { if err := processEvent(db, e); err != nil { log.Error("ReceiveEvents> err while processing error=%s : %v", err, e) diff --git a/engine/api/stats/worker.go b/engine/api/stats/worker.go index e38c8e08ec..c9f2f21d40 100644 --- a/engine/api/stats/worker.go +++ b/engine/api/stats/worker.go @@ -1,41 +1,43 @@ package stats import ( + "context" "database/sql" "time" "github.com/go-gorp/gorp" - "github.com/ovh/cds/engine/api/database" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" ) // StartRoutine starts a routine collecting regular build statistics -func StartRoutine() { - +func StartRoutine(c context.Context, DBFunc func() *gorp.DbMap) { go func() { - defer sdk.Exit("StatsRoutine exited") - + tick := time.NewTicker(10 * time.Second).C for { - - time.Sleep(2 * time.Second) - - db := database.DBMap(database.DB()) - if db != nil { - err := createTodaysRow(db) - if err != nil { - log.Error("StatsRoutine: Cannot create today's row: %s\n", err) - continue - } - - err = updateWorkerStats(db) - if err != nil { - log.Warning("StatsRoutine> Cannot update build stats: %s\n", err) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting Stat routine: %v", c.Err()) } - err = updatePipelineStats(db) - if err != nil { - log.Warning("StatsRoutine> Cannot update build stats: %s\n", err) + return + case <-tick: + db := DBFunc() + if db != nil { + err := createTodaysRow(db) + if err != nil { + log.Error("StatsRoutine: Cannot create today's row: %s\n", err) + continue + } + err = updateWorkerStats(db) + if err != nil { + log.Warning("StatsRoutine> Cannot update build stats: %s\n", err) + } + err = updatePipelineStats(db) + if err != nil { + log.Warning("StatsRoutine> Cannot update build stats: %s\n", err) + } } } } diff --git a/engine/api/worker/heartbeat.go b/engine/api/worker/heartbeat.go index 0a40ea514b..2aca73341a 100644 --- a/engine/api/worker/heartbeat.go +++ b/engine/api/worker/heartbeat.go @@ -19,7 +19,7 @@ func CheckHeartbeat(c context.Context, DBFunc func() *gorp.DbMap) { select { case <-c.Done(): if c.Err() != nil { - log.Error("WorkerHeartbeat> Exiting CheckHeartbeat: %v", c.Err()) + log.Error("Exiting WorkerHeartbeat: %v", c.Err()) } return case <-tick: