diff --git a/README.md b/README.md index 32aee1eaa0..996c3fde62 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ [![Join the chat at https://gitter.im/ovh-cds/Lobby](https://badges.gitter.im/ovh-cds/Lobby.svg)](https://gitter.im/ovh-cds/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Go Report Card](https://goreportcard.com/badge/github.com/ovh/cds)](https://goreportcard.com/report/github.com/ovh/cds) +[![Coverage Status](https://coveralls.io/repos/github/ovh/cds/badge.svg?branch=master)](https://coveralls.io/github/ovh/cds?branch=master) diff --git a/engine/api/action/requirement_cache.go b/engine/api/action/requirement_cache.go index b126640270..d3add17946 100644 --- a/engine/api/action/requirement_cache.go +++ b/engine/api/action/requirement_cache.go @@ -1,6 +1,7 @@ package action import ( + "context" "fmt" "time" @@ -12,25 +13,34 @@ import ( ) //RequirementsCacheLoader set all action requirement in the cache -func RequirementsCacheLoader(delay time.Duration, DBFunc func() *gorp.DbMap) { +func RequirementsCacheLoader(c context.Context, delay time.Duration, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(delay).C + for { - time.Sleep(delay * time.Second) - db := DBFunc() - if db != nil { - var mayIWork string - loaderKey := cache.Key("action", "requirements", "loading") - if cache.Get(loaderKey, &mayIWork) { - cache.SetWithTTL(loaderKey, "true", 60) - actions, err := LoadActions(db) - if err != nil { - log.Warning("RequirementsCacheLoader> Unable to load worker models: %s", err) - continue - } - for _, a := range actions { - k := cache.Key("action", "requirements", fmt.Sprintf("%d", a.ID)) - cache.Set(k, a.Requirements) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting RequirementsCacheLoader: %v", c.Err()) + return + } + case <-tick: + db := DBFunc() + if db != nil { + var mayIWork string + loaderKey := cache.Key("action", "requirements", "loading") + if cache.Get(loaderKey, &mayIWork) { + cache.SetWithTTL(loaderKey, "true", 60) + actions, err := LoadActions(db) + if err != nil { + log.Warning("RequirementsCacheLoader> Unable to load worker models: %s", err) + continue + } + for _, a := range actions { + k := cache.Key("action", "requirements", fmt.Sprintf("%d", a.ID)) + cache.Set(k, a.Requirements) + } + cache.Delete(loaderKey) } - cache.Delete(loaderKey) } } } @@ -45,7 +55,7 @@ func GetRequirements(db gorp.SqlExecutor, id int64) ([]sdk.Requirement, error) { var err error req, err = LoadActionRequirements(db, id) if err != nil { - return nil, fmt.Errorf("GetRequirements> cannot LoadActionRequirements: %s\n", err) + return nil, fmt.Errorf("GetRequirements> cannot LoadActionRequirements: %s", err) } cache.Set(k, req) } diff --git a/engine/api/audit.go b/engine/api/audit.go index 570e265ada..2e92d2e4a9 100644 --- a/engine/api/audit.go +++ b/engine/api/audit.go @@ -1,30 +1,38 @@ package main import ( + "context" "time" "github.com/go-gorp/gorp" - "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" ) const ( maxVersion = 10 + delay = 1 ) -func auditCleanerRoutine(DBFunc func() *gorp.DbMap) { - defer sdk.Exit("AuditCleanerRoutine exited") +func auditCleanerRoutine(c context.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(delay * time.Minute).C for { - db := DBFunc() - if db != nil { - err := actionAuditCleaner(db) - if err != nil { - log.Warning("AuditCleanerRoutine> Action clean failed: %s\n", err) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting auditCleanerRoutine: %v", c.Err()) + } + return + case <-tick: + db := DBFunc() + if db != nil { + err := actionAuditCleaner(db) + if err != nil { + log.Warning("AuditCleanerRoutine> Action clean failed: %s\n", err) + } } } - time.Sleep(1 * time.Minute) } } diff --git a/engine/api/auth/auth.go b/engine/api/auth/auth.go index 03d3883679..336bbfa409 100644 --- a/engine/api/auth/auth.go +++ b/engine/api/auth/auth.go @@ -1,6 +1,7 @@ package auth import ( + "context" "encoding/base64" "errors" "fmt" @@ -11,7 +12,7 @@ import ( "github.com/go-gorp/gorp" "github.com/ovh/cds/engine/api/cache" - "github.com/ovh/cds/engine/api/context" + ctx "github.com/ovh/cds/engine/api/context" "github.com/ovh/cds/engine/api/sessionstore" "github.com/ovh/cds/engine/api/user" "github.com/ovh/cds/engine/api/worker" @@ -25,13 +26,13 @@ type Driver interface { Store() sessionstore.Store Authentify(db gorp.SqlExecutor, username, password string) (bool, error) AuthentifyUser(db gorp.SqlExecutor, user *sdk.User, password string) (bool, error) - GetCheckAuthHeaderFunc(options interface{}) func(db *gorp.DbMap, headers http.Header, c *context.Ctx) error + GetCheckAuthHeaderFunc(options interface{}) func(db *gorp.DbMap, headers http.Header, c *ctx.Ctx) error } //GetDriver is a factory -func GetDriver(mode string, options interface{}, storeOptions sessionstore.Options) (Driver, error) { +func GetDriver(c context.Context, mode string, options interface{}, storeOptions sessionstore.Options) (Driver, error) { log.Info("Auth> Intializing driver (%s)", mode) - store, err := sessionstore.Get(storeOptions.Mode, storeOptions.RedisHost, storeOptions.RedisPassword, storeOptions.TTL) + store, err := sessionstore.Get(c, storeOptions.Mode, storeOptions.RedisHost, storeOptions.RedisPassword, storeOptions.TTL) if err != nil { return nil, fmt.Errorf("Unable to get AuthDriver : %s\n", err) } @@ -109,7 +110,7 @@ func GetUsername(store sessionstore.Store, token string) (string, error) { } //CheckPersistentSession check persistent session token from CLI -func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool { +func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool { if headers.Get(sdk.RequestedWithHeader) == sdk.RequestedWithValue { if getUserPersistentSession(db, store, headers, ctx) { return true @@ -121,7 +122,7 @@ func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, heade return false } -func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool { +func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool { h := headers.Get(sdk.SessionTokenHeader) if h != "" { ok, _ := store.Exists(sessionstore.SessionKey(h)) @@ -141,7 +142,7 @@ func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, hea return false } -func reloadUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool { +func reloadUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool { authHeaderValue := headers.Get("Authorization") if authHeaderValue == "" { log.Warning("ReloadUserPersistentSession> No Authorization Header") @@ -218,7 +219,7 @@ func GetWorker(db gorp.SqlExecutor, workerID string) (*sdk.Worker, error) { return w, nil } -func checkWorkerAuth(db *gorp.DbMap, auth string, ctx *context.Ctx) error { +func checkWorkerAuth(db *gorp.DbMap, auth string, ctx *ctx.Ctx) error { id, err := base64.StdEncoding.DecodeString(auth) if err != nil { return fmt.Errorf("bad worker key syntax: %s", err) diff --git a/engine/api/auth/test.go b/engine/api/auth/test.go index c6d3db0f9a..c744d587b2 100644 --- a/engine/api/auth/test.go +++ b/engine/api/auth/test.go @@ -1,13 +1,14 @@ package auth import ( + "context" "testing" "github.com/ovh/cds/engine/api/sessionstore" ) func TestLocalAuth(t *testing.T) Driver { - authDriver, err := GetDriver("local", nil, sessionstore.Options{Mode: "local"}) + authDriver, err := GetDriver(context.Background(), "local", nil, sessionstore.Options{Mode: "local"}) if err != nil { panic(err) } diff --git a/engine/api/cache/cache.go b/engine/api/cache/cache.go index 2e4339b7cc..d9b0bd32e6 100644 --- a/engine/api/cache/cache.go +++ b/engine/api/cache/cache.go @@ -27,6 +27,7 @@ type Store interface { Enqueue(queueName string, value interface{}) Dequeue(queueName string, value interface{}) DequeueWithContext(c context.Context, queueName string, value interface{}) + QueueLen(queueName string) int } //Initialize the global cache in memory, or redis @@ -119,3 +120,11 @@ func DequeueWithContext(c context.Context, queueName string, value interface{}) } s.DequeueWithContext(c, queueName, value) } + +//QueueLen returns the length of a queue +func QueueLen(queueName string) int { + if s == nil { + return 0 + } + return s.QueueLen(queueName) +} diff --git a/engine/api/cache/local.go b/engine/api/cache/local.go index dce3b72fbc..21aacdc8b5 100644 --- a/engine/api/cache/local.go +++ b/engine/api/cache/local.go @@ -130,6 +130,15 @@ func (s *LocalStore) Dequeue(queueName string, value interface{}) { return } +//QueueLen returns the length of a queue +func (s *LocalStore) QueueLen(queueName string) int { + l := s.Queues[queueName] + if l == nil { + return 0 + } + return l.Len() +} + //DequeueWithContext gets from queue This is blocking while there is nothing in the queue, it can be cancelled with a context.Context func (s *LocalStore) DequeueWithContext(c context.Context, queueName string, value interface{}) { l := s.Queues[queueName] diff --git a/engine/api/cache/redis.go b/engine/api/cache/redis.go index 2d99c3eb20..12c7ece09c 100644 --- a/engine/api/cache/redis.go +++ b/engine/api/cache/redis.go @@ -164,6 +164,20 @@ read: } } +//QueueLen returns the length of a queue +func (s *RedisStore) QueueLen(queueName string) int { + if s.Client == nil { + log.Error("redis> cannot get redis client") + return 0 + } + + res, err := s.Client.LLen(queueName).Result() + if err != nil { + log.Warning("redis> Cannot read %s :%s", queueName, err) + } + return int(res) +} + //DequeueWithContext gets from queue This is blocking while there is nothing in the queue, it can be cancelled with a context.Context func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, value interface{}) { if s.Client == nil { @@ -172,13 +186,14 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val } elemChan := make(chan string) + var elemChanClosed bool var once sync.Once go func() { - ticker := time.NewTicker(50 * time.Millisecond).C + ticker := time.NewTicker(250 * time.Millisecond).C for { select { case <-ticker: - res, err := s.Client.RPop(queueName).Result() + res, err := s.Client.BRPop(200*time.Millisecond, queueName).Result() if err == redis.Nil { continue } @@ -186,12 +201,12 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val time.Sleep(1 * time.Second) continue } - if len(res) != 2 { - continue + if err == nil && len(res) == 2 && !elemChanClosed { + elemChan <- res[1] } - elemChan <- res case <-c.Done(): once.Do(func() { + elemChanClosed = true close(elemChan) }) return @@ -205,6 +220,7 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val json.Unmarshal(b, value) } once.Do(func() { + elemChanClosed = true close(elemChan) }) } diff --git a/engine/api/database/database.go b/engine/api/database/database.go index e6dfe04ecf..3c255999d0 100644 --- a/engine/api/database/database.go +++ b/engine/api/database/database.go @@ -134,3 +134,11 @@ func Status() string { return fmt.Sprintf("Database: %s OK (%d conns)", dbDriver, db.Stats().OpenConnections) } + +// Close closes the database, releasing any open resources. +func Close() error { + if db != nil { + return db.Close() + } + return nil +} diff --git a/engine/api/event/event.go b/engine/api/event/event.go index 0ec796e663..a19d8bac1a 100644 --- a/engine/api/event/event.go +++ b/engine/api/event/event.go @@ -1,6 +1,7 @@ package event import ( + "context" "fmt" "os" @@ -54,10 +55,15 @@ func Initialize(k KafkaConfig) error { } // DequeueEvent runs in a goroutine and dequeue event from cache -func DequeueEvent() { +func DequeueEvent(c context.Context) { for { e := sdk.Event{} - cache.Dequeue("events", &e) + cache.DequeueWithContext(c, "events", &e) + if err := c.Err(); err != nil { + log.Error("Exiting event.DequeueEvent : %v", err) + return + } + for _, b := range brokers { if err := b.sendEvent(&e); err != nil { log.Warning("Error while sending message: %s", err) diff --git a/engine/api/hatchery/heartbeat.go b/engine/api/hatchery/heartbeat.go index 841fd8bcdd..77061c5c01 100644 --- a/engine/api/hatchery/heartbeat.go +++ b/engine/api/hatchery/heartbeat.go @@ -1,6 +1,7 @@ package hatchery import ( + "context" "time" "github.com/go-gorp/gorp" @@ -13,30 +14,33 @@ var HatcheryHeartbeatTimeout = 30.0 // Heartbeat runs in a goroutine and check last beat from all hatcheries // on a 10s basis -func Heartbeat(DBFunc func() *gorp.DbMap) { - // If this goroutine exit, then it's a crash - defer log.Fatalf("Goroutine of hatchery.Heartbeat exited - Exit CDS Engine") +func Heartbeat(c context.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(5 * time.Second).C for { - db := DBFunc() - if db != nil { - w, err := LoadDeadHatcheries(db, HatcheryHeartbeatTimeout) - if err != nil { - log.Warning("HatcheryHeartbeat> Cannot load hatcherys: %s\n", err) - // add extra sleep if db is unavailable - time.Sleep(5 * time.Second) - continue + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting HatcheryHeartbeat: %v", c.Err()) + return } - - for i := range w { - err = DeleteHatchery(db, w[i].ID, w[i].Model.ID) + case <-tick: + db := DBFunc() + if db != nil { + w, err := LoadDeadHatcheries(db, HatcheryHeartbeatTimeout) if err != nil { - log.Warning("HatcheryHeartbeat> Cannot delete hatchery %d: %s\n", w[i].ID, err) + log.Warning("HatcheryHeartbeat> Cannot load hatcherys: %s\n", err) continue } - log.Debug("HatcheryHeartbeat> Hatchery %s removed.\n", w[i].Name) + + for i := range w { + if err = DeleteHatchery(db, w[i].ID, w[i].Model.ID); err != nil { + log.Warning("HatcheryHeartbeat> Cannot delete hatchery %d: %s\n", w[i].ID, err) + continue + } + log.Debug("HatcheryHeartbeat> Hatchery %s removed.\n", w[i].Name) + } } } - time.Sleep(5 * time.Second) } } diff --git a/engine/api/hook.go b/engine/api/hook.go index 051a91fc59..7c88ed0700 100644 --- a/engine/api/hook.go +++ b/engine/api/hook.go @@ -1,6 +1,7 @@ package main import ( + ctx "context" "fmt" "io/ioutil" "net/http" @@ -203,16 +204,28 @@ func deleteHook(w http.ResponseWriter, r *http.Request, db *gorp.DbMap, c *conte } //hookRecoverer is the go-routine which catches on-error hook -func hookRecoverer(DBFunc func() *gorp.DbMap) { +func hookRecoverer(c ctx.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(10 * time.Second).C for { - h := hook.ReceivedHook{} - cache.Dequeue("hook:recovery", &h) - if h.Repository != "" { - if err := processHook(DBFunc(), h); err != nil { - hook.Recovery(h, err) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting hookRecoverer: %v", c.Err()) + return + } + case <-tick: + h := hook.ReceivedHook{} + cache.DequeueWithContext(c, "hook:recovery", &h) + if err := c.Err(); err != nil { + log.Error("Exiting hookRecoverer: %v", err) + return + } + if h.Repository != "" { + if err := processHook(DBFunc(), h); err != nil { + hook.Recovery(h, err) + } } } - time.Sleep(10 * time.Second) } } diff --git a/engine/api/main.go b/engine/api/main.go index 22aebe158a..94f65c720f 100644 --- a/engine/api/main.go +++ b/engine/api/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "net/http" "os" @@ -63,6 +64,31 @@ var mainCmd = &cobra.Command{ startupTime = time.Now() + //Initliaze context + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + // Gracefully shutdown sql connections + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL) + defer func() { + signal.Stop(c) + cancel() + }() + go func() { + select { + case <-c: + log.Warning("Cleanup SQL connections") + database.Close() + cancel() + event.Publish(sdk.EventEngine{Message: "shutdown"}) + event.Close() + os.Exit(0) + + case <-ctx.Done(): + } + }() + //Initialize secret driver secretBackend := viper.GetString(viperServerSecretBackend) secretBackendOptions := viper.GetStringSlice(viperServerSecretBackendOption) @@ -122,12 +148,12 @@ var mainCmd = &cobra.Command{ }, } - if err := objectstore.Initialize(cfg); err != nil { + if err := objectstore.Initialize(ctx, cfg); err != nil { log.Fatalf("Cannot initialize storage: %s", err) } //Intialize database - db, err := database.Init( + if _, err := database.Init( viper.GetString(viperDBUser), viper.GetString(viperDBPassword), viper.GetString(viperDBName), @@ -136,8 +162,7 @@ var mainCmd = &cobra.Command{ viper.GetString(viperDBSSLMode), viper.GetInt(viperDBTimeout), viper.GetInt(viperDBMaxConn), - ) - if err != nil { + ); err != nil { log.Error("Cannot connect to database: %s", err) os.Exit(3) } @@ -150,20 +175,6 @@ var mainCmd = &cobra.Command{ log.Error("Cannot setup databases: %s", err) } - // Gracefully shutdown sql connections - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - signal.Notify(c, syscall.SIGTERM) - signal.Notify(c, syscall.SIGKILL) - go func() { - <-c - log.Warning("Cleanup SQL connections") - db.Close() - event.Publish(sdk.EventEngine{Message: "shutdown"}) - event.Close() - os.Exit(0) - }() - router = &Router{ mux: mux.NewRouter(), } @@ -226,7 +237,7 @@ var mainCmd = &cobra.Command{ RedisPassword: viper.GetString(viperCacheRedisPassword), } - router.authDriver, _ = auth.GetDriver(authMode, authOptions, storeOptions) + router.authDriver, _ = auth.GetDriver(ctx, authMode, authOptions, storeOptions) cache.Initialize(viper.GetString(viperCacheMode), viper.GetString(viperCacheRedisHost), viper.GetString(viperCacheRedisPassword), viper.GetInt(viperCacheTTL)) @@ -240,32 +251,32 @@ var mainCmd = &cobra.Command{ if err := event.Initialize(kafkaOptions); err != nil { log.Warning("⚠ Error while initializing event system: %s", err) } else { - go event.DequeueEvent() + go event.DequeueEvent(ctx) } - if err := worker.Initialize(); err != nil { + if err := worker.Initialize(ctx, database.GetDBMap); err != nil { log.Warning("⚠ Error while initializing workers routine: %s", err) } - go queue.Pipelines() - go pipeline.AWOLPipelineKiller(database.GetDBMap) - go hatchery.Heartbeat(database.GetDBMap) - go auditCleanerRoutine(database.GetDBMap) + go queue.Pipelines(ctx, database.GetDBMap) + go pipeline.AWOLPipelineKiller(ctx, database.GetDBMap) + go hatchery.Heartbeat(ctx, database.GetDBMap) + go auditCleanerRoutine(ctx, database.GetDBMap) - go repositoriesmanager.ReceiveEvents() + go repositoriesmanager.ReceiveEvents(ctx, database.GetDBMap) - go stats.StartRoutine() - go action.RequirementsCacheLoader(5, database.GetDBMap) - go hookRecoverer(database.GetDBMap) + go stats.StartRoutine(ctx, database.GetDBMap) + go action.RequirementsCacheLoader(ctx, 5*time.Second, database.GetDBMap) + go hookRecoverer(ctx, database.GetDBMap) if !viper.GetBool(viperVCSPollingDisabled) { - go poller.Initialize(database.GetDBMap, 10) + go poller.Initialize(ctx, 10, database.GetDBMap) } else { log.Warning("⚠ Repositories polling is disabled") } if !viper.GetBool(viperSchedulersDisabled) { - go scheduler.Initialize(database.GetDBMap, 10) + go scheduler.Initialize(ctx, 10, database.GetDBMap) } else { log.Warning("⚠ Cron Scheduler is disabled") } diff --git a/engine/api/objectstore/objectstore.go b/engine/api/objectstore/objectstore.go index cd17e030be..25f0b48391 100644 --- a/engine/api/objectstore/objectstore.go +++ b/engine/api/objectstore/objectstore.go @@ -1,6 +1,7 @@ package objectstore import ( + "context" "fmt" "io" @@ -101,9 +102,9 @@ type Driver interface { } // Initialize setup wanted ObjectStore driver -func Initialize(cfg Config) error { +func Initialize(c context.Context, cfg Config) error { var err error - storage, err = New(cfg) + storage, err = New(c, cfg) if err != nil { return err } @@ -150,10 +151,10 @@ type ConfigOptionsFilesystem struct { } // New initialise a new ArtifactStorage -func New(cfg Config) (Driver, error) { +func New(c context.Context, cfg Config) (Driver, error) { switch cfg.Kind { case Openstack, Swift: - return NewOpenstackStore(cfg.Options.Openstack.Address, + return NewOpenstackStore(c, cfg.Options.Openstack.Address, cfg.Options.Openstack.Username, cfg.Options.Openstack.Password, cfg.Options.Openstack.Tenant, diff --git a/engine/api/objectstore/openstack.go b/engine/api/objectstore/openstack.go index 4b176a53b5..353c58582d 100644 --- a/engine/api/objectstore/openstack.go +++ b/engine/api/objectstore/openstack.go @@ -1,6 +1,7 @@ package objectstore import ( + "context" "fmt" "io" "net/url" @@ -22,7 +23,7 @@ type OpenstackStore struct { } // NewOpenstackStore create a new ObjectStore with openstack driver and check configuration -func NewOpenstackStore(address, user, password, tenant, region, containerprefix string) (*OpenstackStore, error) { +func NewOpenstackStore(c context.Context, address, user, password, tenant, region, containerprefix string) (*OpenstackStore, error) { log.Info("Objectstore> Initialize Swift(Openstack) driver on address: %s, tenant: %s, region: %s, prefix: %s", address, tenant, region, containerprefix) if address == "" { return nil, fmt.Errorf("artifact storage is openstack, but flag --artifact_address is not provided") @@ -54,7 +55,7 @@ func NewOpenstackStore(address, user, password, tenant, region, containerprefix if err != nil { return nil, err } - go ops.refreshTokenRoutine() + go ops.refreshTokenRoutine(c) log.Debug("NewOpenstackStore> Got token %dchar at %s\n", len(ops.token.ID), ops.endpoint) return ops, nil diff --git a/engine/api/objectstore/openstack_handlers.go b/engine/api/objectstore/openstack_handlers.go index bbd9e515a7..141a32988a 100644 --- a/engine/api/objectstore/openstack_handlers.go +++ b/engine/api/objectstore/openstack_handlers.go @@ -2,6 +2,7 @@ package objectstore import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -13,17 +14,24 @@ import ( "github.com/ovh/cds/sdk/log" ) -func (ops *OpenstackStore) refreshTokenRoutine() { +func (ops *OpenstackStore) refreshTokenRoutine(c context.Context) { + tick := time.NewTicker(20 * time.Hour).C for { - time.Sleep(20 * time.Hour) - - tk, endpoint, err := getToken(ops.user, ops.password, ops.address, ops.tenant, ops.region) - if err != nil { - log.Error("refreshTokenRoutine> Cannot refresh token: %s\n", err) - continue + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting refreshTokenRoutine: %v", c.Err()) + return + } + case <-tick: + tk, endpoint, err := getToken(ops.user, ops.password, ops.address, ops.tenant, ops.region) + if err != nil { + log.Error("refreshTokenRoutine> Cannot refresh token: %s\n", err) + continue + } + ops.token = tk + ops.endpoint = endpoint } - ops.token = tk - ops.endpoint = endpoint } } diff --git a/engine/api/pipeline/awol.go b/engine/api/pipeline/awol.go index c03692acac..6f2a8d4f2a 100644 --- a/engine/api/pipeline/awol.go +++ b/engine/api/pipeline/awol.go @@ -1,6 +1,7 @@ package pipeline import ( + "context" "time" "github.com/go-gorp/gorp" @@ -17,25 +18,28 @@ type awolPipelineBuildJob struct { // AWOLPipelineKiller will search in database for actions : // - Having building status // - Without any logs output in the last 15 minutes -func AWOLPipelineKiller(DBFunc func() *gorp.DbMap) { - // If this goroutine exits, then it's a crash - defer log.Fatalf("Goroutine of pipeline.AWOLPipelineKiller exited - Exit CDS Engine") - +func AWOLPipelineKiller(c context.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(1 * time.Minute).C for { - time.Sleep(1 * time.Minute) - db := DBFunc() - - if db != nil { - pbJobDatas, err := loadAWOLPipelineBuildJob(db) - if err != nil { - log.Warning("AWOLPipelineKiller> Cannot load awol building actions: %s\n", err) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting AWOLPipelineKiller: %v", c.Err()) } - - for _, data := range pbJobDatas { - err = killAWOLPipelineBuildJob(db, data) + return + case <-tick: + db := DBFunc() + if db != nil { + pbJobDatas, err := loadAWOLPipelineBuildJob(db) if err != nil { - log.Warning("AWOLPipelineKiller> Cannot kill action build %d: %s\n", data.pipelineBuildJobID, err) - time.Sleep(1 * time.Second) // Do not spam an unavailable database + log.Warning("AWOLPipelineKiller> Cannot load awol building actions: %s\n", err) + } + + for _, data := range pbJobDatas { + err = killAWOLPipelineBuildJob(db, data) + if err != nil { + log.Warning("AWOLPipelineKiller> Cannot kill action build %d: %s\n", data.pipelineBuildJobID, err) + } } } } diff --git a/engine/api/plugin_test.go b/engine/api/plugin_test.go index 81a652ea55..075f7eb2e1 100644 --- a/engine/api/plugin_test.go +++ b/engine/api/plugin_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + ctx "context" "encoding/json" "io" "io/ioutil" @@ -158,7 +159,8 @@ func TestAddPluginHandlerSuccess(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) u, _ := assets.InsertAdminUser(db) if err := actionplugin.Delete(db, "plugin-download", u.ID); err != nil { @@ -229,7 +231,8 @@ func TestAddPluginHandlerFailWithInvalidPlugin(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) u, _ := assets.InsertAdminUser(db) actionplugin.Delete(db, "plugin-download", u.ID) @@ -271,7 +274,8 @@ func TestAddPluginHandlerFailWithConflict(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) u, _ := assets.InsertAdminUser(db) actionplugin.Delete(db, "plugin-download", u.ID) @@ -321,7 +325,8 @@ func TestUpdatePluginHandlerSuccess(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) u, _ := assets.InsertAdminUser(db) actionplugin.Delete(db, "plugin-download", u.ID) @@ -384,7 +389,8 @@ func TestDeletePluginHandlerSuccess(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) u, _ := assets.InsertAdminUser(db) actionplugin.Delete(db, "plugin-download", u.ID) diff --git a/engine/api/poller/cleaner.go b/engine/api/poller/cleaner.go index 1b2b640d55..45dba0c981 100644 --- a/engine/api/poller/cleaner.go +++ b/engine/api/poller/cleaner.go @@ -1,6 +1,7 @@ package poller import ( + "context" "time" "github.com/go-gorp/gorp" @@ -10,13 +11,20 @@ import ( ) //Cleaner is the cleaner main goroutine -func Cleaner(DBFunc func() *gorp.DbMap, nbToKeep int) { - defer log.Error("poller.Cleaner> has been exited !") +func Cleaner(c context.Context, DBFunc func() *gorp.DbMap, nbToKeep int) { + tick := time.NewTicker(30 * time.Minute).C for { - time.Sleep(30 * time.Minute) - if _, err := CleanerRun(DBFunc(), nbToKeep); err != nil { - log.Warning("poller.Cleaner> Error : %s", err) - continue + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting poller.Cleaner: %v", c.Err()) + return + } + case <-tick: + if _, err := CleanerRun(DBFunc(), nbToKeep); err != nil { + log.Warning("poller.Cleaner> Error : %s", err) + continue + } } } } diff --git a/engine/api/poller/executer.go b/engine/api/poller/executer.go index 6ea3974d15..47ac0bde55 100644 --- a/engine/api/poller/executer.go +++ b/engine/api/poller/executer.go @@ -1,6 +1,7 @@ package poller import ( + "context" "database/sql" "regexp" "time" @@ -16,17 +17,24 @@ import ( ) //Executer is the goroutine which run the pipelines -func Executer(DBFunc func() *gorp.DbMap) { - defer log.Error("poller.Executer> has been exited !") +func Executer(c context.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(5 * time.Second).C for { - time.Sleep(5 * time.Second) - exs, err := ExecuterRun(DBFunc()) - if err != nil { - log.Warning("poller.Executer> Error : %s", err) - continue - } - if len(exs) > 0 { - log.Debug("poller.Executer> %d has been executed", len(exs)) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting poller.Executer: %v", c.Err()) + return + } + case <-tick: + exs, err := ExecuterRun(DBFunc()) + if err != nil { + log.Warning("poller.Executer> Error : %s", err) + continue + } + if len(exs) > 0 { + log.Debug("poller.Executer> %d has been executed", len(exs)) + } } } } @@ -65,6 +73,16 @@ func executerRun(db *gorp.DbMap, e *sdk.RepositoryPollerExecution) { p, errl := LoadByApplicationAndPipeline(tx, e.ApplicationID, e.PipelineID) if errl != nil { + //If the poller doesn't exist: clean this execution and exit + if errl == sql.ErrNoRows { + if err := DeleteExecution(tx, e); err != nil { + log.Error("poller.ExecuterRun> Unable to delete execution %d: %s", e.ID, err) + } + if err := tx.Commit(); err != nil { + log.Error("poller.ExecuterRun> %s", err) + } + return + } log.Error("poller.ExecuterRun> Unable to load poller appID=%d pipID=%d: %s", e.ApplicationID, e.PipelineID, errl) return } diff --git a/engine/api/poller/init.go b/engine/api/poller/init.go index ad13a9f2fd..9212239014 100644 --- a/engine/api/poller/init.go +++ b/engine/api/poller/init.go @@ -1,10 +1,14 @@ package poller -import "github.com/go-gorp/gorp" +import ( + "context" + + "github.com/go-gorp/gorp" +) //Initialize starts the 3 goroutines for pipeline schedulers -func Initialize(DBFunc func() *gorp.DbMap, nbExecToKeep int) { - go Cleaner(DBFunc, nbExecToKeep) - go Executer(DBFunc) - go Scheduler(DBFunc) +func Initialize(c context.Context, nbExecToKeep int, DBFunc func() *gorp.DbMap) { + go Cleaner(c, DBFunc, nbExecToKeep) + go Executer(c, DBFunc) + go Scheduler(c, DBFunc) } diff --git a/engine/api/poller/scheduler.go b/engine/api/poller/scheduler.go index fb3e6cbaaf..8ae0b3d952 100644 --- a/engine/api/poller/scheduler.go +++ b/engine/api/poller/scheduler.go @@ -1,6 +1,7 @@ package poller import ( + "context" "time" "github.com/go-gorp/gorp" @@ -15,15 +16,22 @@ var ( ) //Scheduler is the goroutine which compute date of next execution for pipeline scheduler -func Scheduler(DBFunc func() *gorp.DbMap) { - defer log.Error("poller.Scheduler> has been exited !") +func Scheduler(c context.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(10 * time.Second).C for { - _, status, err := SchedulerRun(DBFunc()) - if err != nil { - log.Error("poller.Scheduler> %s: %s", status, err) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting poller.Scheduler: %v", c.Err()) + return + } + case <-tick: + _, status, err := SchedulerRun(DBFunc()) + if err != nil { + log.Error("poller.Scheduler> %s: %s", status, err) + } + pollerStatus = status } - pollerStatus = status - time.Sleep(10 * time.Second) } } diff --git a/engine/api/queue/queue.go b/engine/api/queue/queue.go index 5ddc678a87..cfd72114af 100644 --- a/engine/api/queue/queue.go +++ b/engine/api/queue/queue.go @@ -1,6 +1,7 @@ package queue import ( + "context" "database/sql" "fmt" "time" @@ -11,7 +12,6 @@ import ( "github.com/ovh/cds/engine/api/action" "github.com/ovh/cds/engine/api/application" "github.com/ovh/cds/engine/api/cache" - "github.com/ovh/cds/engine/api/database" "github.com/ovh/cds/engine/api/environment" "github.com/ovh/cds/engine/api/event" "github.com/ovh/cds/engine/api/pipeline" @@ -22,33 +22,35 @@ import ( ) // Pipelines is a goroutine responsible for pushing actions of a building pipeline in queue, in the wanted order -func Pipelines() { - // If this goroutine exits, then it's a crash - defer log.Fatalf("Goroutine of scheduler.Schedule exited - Exit CDS Engine") - +func Pipelines(c context.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(2 * time.Second).C for { - time.Sleep(2 * time.Second) - - //Check if CDS is in maintenance mode - var m bool - cache.Get("maintenance", &m) - if m { - log.Warning("⚠ CDS maintenance in ON") - time.Sleep(30 * time.Second) - } - - db := database.DBMap(database.DB()) - if db != nil && !m { - ids, err := pipeline.LoadBuildingPipelinesIDs(db) - if err != nil { - log.Warning("queue.Pipelines> Cannot load building pipelines: %s\n", err) - // Add some extra sleep if db is down... - time.Sleep(3 * time.Second) - continue + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting queue.Pipelines: %v", c.Err()) + return + } + case <-tick: + //Check if CDS is in maintenance mode + var m bool + cache.Get("maintenance", &m) + if m { + log.Info("⚠ CDS maintenance in ON") + time.Sleep(30 * time.Second) } - for _, id := range ids { - runPipeline(db, id) + db := DBFunc() + if db != nil && !m { + ids, err := pipeline.LoadBuildingPipelinesIDs(db) + if err != nil { + log.Warning("queue.Pipelines> Cannot load building pipelines: %s\n", err) + continue + } + + for _, id := range ids { + runPipeline(db, id) + } } } } diff --git a/engine/api/repositoriesmanager/events.go b/engine/api/repositoriesmanager/events.go index 623fb4b67a..28b314f532 100644 --- a/engine/api/repositoriesmanager/events.go +++ b/engine/api/repositoriesmanager/events.go @@ -1,23 +1,34 @@ package repositoriesmanager import ( + "context" "fmt" "github.com/go-gorp/gorp" "github.com/mitchellh/mapstructure" "github.com/ovh/cds/engine/api/cache" - "github.com/ovh/cds/engine/api/database" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" ) +//EventsStatus returns info about length of events queue +func EventsStatus() string { + return fmt.Sprintf("%d", cache.QueueLen("events_repositoriesmanager")) +} + //ReceiveEvents has to be launched as a goroutine. -func ReceiveEvents() { +func ReceiveEvents(c context.Context, DBFunc func() *gorp.DbMap) { for { e := sdk.Event{} - cache.Dequeue("events_repositoriesmanager", &e) - db := database.DBMap(database.DB()) + cache.DequeueWithContext(c, "events_repositoriesmanager", &e) + err := c.Err() + if err != nil { + log.Error("Exiting repositoriesmanager.ReceiveEvents: %v", err) + return + } + + db := DBFunc() if db != nil { if err := processEvent(db, e); err != nil { log.Error("ReceiveEvents> err while processing error=%s : %v", err, e) diff --git a/engine/api/scheduler/cleaner.go b/engine/api/scheduler/cleaner.go index 732ce412e6..ddf1928800 100644 --- a/engine/api/scheduler/cleaner.go +++ b/engine/api/scheduler/cleaner.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "time" "github.com/go-gorp/gorp" @@ -10,10 +11,18 @@ import ( ) //Cleaner is the cleaner main goroutine -func Cleaner(DBFunc func() *gorp.DbMap, nbToKeep int) { +func Cleaner(c context.Context, DBFunc func() *gorp.DbMap, nbToKeep int) { + tick := time.NewTicker(10 * time.Minute).C for { - CleanerRun(DBFunc(), nbToKeep) - time.Sleep(10 * time.Minute) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting scheduler.Cleaner: %v", c.Err()) + return + } + case <-tick: + CleanerRun(DBFunc(), nbToKeep) + } } } diff --git a/engine/api/scheduler/executer.go b/engine/api/scheduler/executer.go index 4ca7a69d0f..4109452b7f 100644 --- a/engine/api/scheduler/executer.go +++ b/engine/api/scheduler/executer.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "time" "github.com/go-gorp/gorp" @@ -15,10 +16,18 @@ import ( ) //Executer is the goroutine which run the pipelines -func Executer(DBFunc func() *gorp.DbMap) { +func Executer(c context.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(5 * time.Second).C for { - time.Sleep(5 * time.Second) - ExecuterRun(DBFunc()) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting scheduler.Executer: %v", c.Err()) + return + } + case <-tick: + ExecuterRun(DBFunc()) + } } } diff --git a/engine/api/scheduler/init.go b/engine/api/scheduler/init.go index 49ae2ad280..cfe73fd8a1 100644 --- a/engine/api/scheduler/init.go +++ b/engine/api/scheduler/init.go @@ -1,10 +1,14 @@ package scheduler -import "github.com/go-gorp/gorp" +import ( + "context" + + "github.com/go-gorp/gorp" +) //Initialize starts the 3 goroutines for pipeline schedulers -func Initialize(DBFunc func() *gorp.DbMap, nbExecToKeep int) { - go Cleaner(DBFunc, nbExecToKeep) - go Executer(DBFunc) - go Scheduler(DBFunc) +func Initialize(c context.Context, nbExecToKeep int, DBFunc func() *gorp.DbMap) { + go Cleaner(c, DBFunc, nbExecToKeep) + go Executer(c, DBFunc) + go Scheduler(c, DBFunc) } diff --git a/engine/api/scheduler/scheduler.go b/engine/api/scheduler/scheduler.go index 2cbe16d05a..b5dec1ab2e 100644 --- a/engine/api/scheduler/scheduler.go +++ b/engine/api/scheduler/scheduler.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "fmt" "time" @@ -14,15 +15,23 @@ import ( var schedulerStatus = "Not Running" //Scheduler is the goroutine which compute date of next execution for pipeline scheduler -func Scheduler(DBFunc func() *gorp.DbMap) { +func Scheduler(c context.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(2 * time.Second).C for { - time.Sleep(2 * time.Second) - _, status, err := Run(DBFunc()) - - if err != nil { - log.Error("%s: %s", status, err) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting scheduler.Scheduler: %v", c.Err()) + return + } + case <-tick: + _, status, err := Run(DBFunc()) + + if err != nil { + log.Error("%s: %s", status, err) + } + schedulerStatus = status } - schedulerStatus = status } } diff --git a/engine/api/sessionstore/factory.go b/engine/api/sessionstore/factory.go index 2f052ccacd..a8e98283a4 100644 --- a/engine/api/sessionstore/factory.go +++ b/engine/api/sessionstore/factory.go @@ -1,6 +1,7 @@ package sessionstore import ( + "context" "sync" "github.com/ovh/cds/engine/api/cache" @@ -11,12 +12,12 @@ import ( var Status string //Get is a factory -func Get(mode, redisHost, redisPassword string, ttl int) (Store, error) { +func Get(c context.Context, mode, redisHost, redisPassword string, ttl int) (Store, error) { log.Info("SessionStore> Intializing store (%s)", mode) switch mode { case "redis": Status = "Redis " - r, err := NewRedis(redisHost, redisPassword, ttl) + r, err := NewRedis(c, redisHost, redisPassword, ttl) if err != nil { log.Error("sessionstore.factory> unable to connect to redis %s : %s", redisHost, err) Status += "KO" diff --git a/engine/api/sessionstore/inmemory.go b/engine/api/sessionstore/inmemory.go index b15ab3467d..34dc9ec9c6 100644 --- a/engine/api/sessionstore/inmemory.go +++ b/engine/api/sessionstore/inmemory.go @@ -34,13 +34,12 @@ func (s *InMemory) New(k SessionKey) (SessionKey, error) { s.data[k] = cache s.lock.Unlock() - go func(k SessionKey) { - time.Sleep(time.Duration(s.ttl) * time.Minute) + time.AfterFunc(time.Duration(s.ttl)*time.Minute, func() { log.Info("session> delete session %s after %d minutes", k, s.ttl) s.lock.Lock() delete(s.data, k) s.lock.Unlock() - }(k) + }) return k, nil } diff --git a/engine/api/sessionstore/redis.go b/engine/api/sessionstore/redis.go index 5bc0a61a3a..558755b318 100644 --- a/engine/api/sessionstore/redis.go +++ b/engine/api/sessionstore/redis.go @@ -1,6 +1,7 @@ package sessionstore import ( + "context" "encoding/json" "strings" "time" @@ -17,38 +18,45 @@ type Redis struct { } //Keep redis in good health and remove HSet for expired session -func (s *Redis) vacuumCleaner() { - log.Info("Redis> Starting Session Vacuum Cleaner") +func (s *Redis) vacuumCleaner(c context.Context) { + tick := time.NewTicker(5 * time.Minute).C for { - keys, err := s.store.Client.Keys("session:*:data").Result() - if err != nil { - log.Error("RedisSessionStore> Unable to get keys in store : %s", err) - } - for _, k := range keys { - sessionKey := strings.Replace(k, ":data", "", -1) - sessionExist, err := s.store.Client.Exists(sessionKey).Result() + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting sessionstore.vacuumCleaner: %v", c.Err()) + return + } + case <-tick: + keys, err := s.store.Client.Keys("session:*:data").Result() if err != nil { - log.Warning("RedisSessionStore> Unable to get key %s from store : %s", sessionKey, err) + log.Error("RedisSessionStore> Unable to get keys in store : %s", err) } - if !sessionExist { - if err := s.store.Client.Del(k).Err(); err != nil { - log.Error("RedisSessionStore> Unable to clear session %s from store : %s", sessionKey, err) + for _, k := range keys { + sessionKey := strings.Replace(k, ":data", "", -1) + sessionExist, err := s.store.Client.Exists(sessionKey).Result() + if err != nil { + log.Warning("RedisSessionStore> Unable to get key %s from store : %s", sessionKey, err) + } + if !sessionExist { + if err := s.store.Client.Del(k).Err(); err != nil { + log.Error("RedisSessionStore> Unable to clear session %s from store : %s", sessionKey, err) + } } } } - time.Sleep(5 * time.Minute) } } //NewRedis creates a ready to use redisstore -func NewRedis(redisHost, redisPassword string, ttl int) (*Redis, error) { +func NewRedis(c context.Context, redisHost, redisPassword string, ttl int) (*Redis, error) { r, err := cache.NewRedisStore(redisHost, redisPassword, ttl*60) if err != nil { return nil, err } log.Info("Redis> Store ready") redisStore := &Redis{ttl * 1440, r} - go redisStore.vacuumCleaner() + go redisStore.vacuumCleaner(c) return redisStore, nil } diff --git a/engine/api/stats/worker.go b/engine/api/stats/worker.go index e38c8e08ec..c9f2f21d40 100644 --- a/engine/api/stats/worker.go +++ b/engine/api/stats/worker.go @@ -1,41 +1,43 @@ package stats import ( + "context" "database/sql" "time" "github.com/go-gorp/gorp" - "github.com/ovh/cds/engine/api/database" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" ) // StartRoutine starts a routine collecting regular build statistics -func StartRoutine() { - +func StartRoutine(c context.Context, DBFunc func() *gorp.DbMap) { go func() { - defer sdk.Exit("StatsRoutine exited") - + tick := time.NewTicker(10 * time.Second).C for { - - time.Sleep(2 * time.Second) - - db := database.DBMap(database.DB()) - if db != nil { - err := createTodaysRow(db) - if err != nil { - log.Error("StatsRoutine: Cannot create today's row: %s\n", err) - continue - } - - err = updateWorkerStats(db) - if err != nil { - log.Warning("StatsRoutine> Cannot update build stats: %s\n", err) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting Stat routine: %v", c.Err()) } - err = updatePipelineStats(db) - if err != nil { - log.Warning("StatsRoutine> Cannot update build stats: %s\n", err) + return + case <-tick: + db := DBFunc() + if db != nil { + err := createTodaysRow(db) + if err != nil { + log.Error("StatsRoutine: Cannot create today's row: %s\n", err) + continue + } + err = updateWorkerStats(db) + if err != nil { + log.Warning("StatsRoutine> Cannot update build stats: %s\n", err) + } + err = updatePipelineStats(db) + if err != nil { + log.Warning("StatsRoutine> Cannot update build stats: %s\n", err) + } } } } diff --git a/engine/api/status.go b/engine/api/status.go index 50778d2c87..2ae2c6bf91 100644 --- a/engine/api/status.go +++ b/engine/api/status.go @@ -15,6 +15,7 @@ import ( "github.com/ovh/cds/engine/api/internal" "github.com/ovh/cds/engine/api/mail" "github.com/ovh/cds/engine/api/objectstore" + "github.com/ovh/cds/engine/api/repositoriesmanager" "github.com/ovh/cds/engine/api/scheduler" "github.com/ovh/cds/engine/api/secret" "github.com/ovh/cds/engine/api/sessionstore" @@ -63,6 +64,10 @@ func statusHandler(w http.ResponseWriter, r *http.Request, db *gorp.DbMap, c *co output = append(output, fmt.Sprintf("Event: %s", event.Status())) log.Debug("Status> Event: %s", event.Status()) + // Check Event + output = append(output, fmt.Sprintf("Internal Events Queue: %s", repositoriesmanager.EventsStatus())) + log.Debug("Status> Internal Events Queue: %s", repositoriesmanager.EventsStatus()) + // Check redis output = append(output, fmt.Sprintf("Cache: %s", cache.Status)) log.Debug("Status> Cache: %s", cache.Status) diff --git a/engine/api/templates_test.go b/engine/api/templates_test.go index 62cf4709c5..2171846386 100644 --- a/engine/api/templates_test.go +++ b/engine/api/templates_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + ctx "context" "encoding/json" "fmt" "io" @@ -88,7 +89,8 @@ func Test_addTemplateHandler(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) defer os.RemoveAll(tmpDir) @@ -219,7 +221,8 @@ func Test_deleteTemplateHandler(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) defer os.RemoveAll(tmpDir) @@ -331,7 +334,8 @@ func Test_updateTemplateHandler(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) defer os.RemoveAll(tmpDir) //Create admin user @@ -499,7 +503,8 @@ func Test_getBuildTemplatesHandler(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) defer os.RemoveAll(tmpDir) @@ -623,7 +628,8 @@ func Test_applyTemplatesHandler(t *testing.T) { }, }, } - objectstore.Initialize(cfg) + c := ctx.Background() + objectstore.Initialize(c, cfg) defer os.RemoveAll(tmpDir) diff --git a/engine/api/test/test.go b/engine/api/test/test.go index 08437c0720..d68b7b5c39 100644 --- a/engine/api/test/test.go +++ b/engine/api/test/test.go @@ -79,6 +79,7 @@ func SetupPG(t *testing.T, bootstrapFunc ...bootstrapf) *gorp.DbMap { } } } else { + t.Logf("Error reading %s: %v", f, err) DBDriver = flag.Lookup("dbDriver").Value.String() dbUser = flag.Lookup("dbUser").Value.String() dbPassword = flag.Lookup("dbPassword").Value.String() diff --git a/engine/api/worker/heartbeat.go b/engine/api/worker/heartbeat.go index 46e171adf5..2aca73341a 100644 --- a/engine/api/worker/heartbeat.go +++ b/engine/api/worker/heartbeat.go @@ -1,36 +1,42 @@ package worker import ( + "context" "time" - "github.com/ovh/cds/engine/api/database" + "github.com/go-gorp/gorp" "github.com/ovh/cds/sdk/log" ) // WorkerHeartbeatTimeout defines the number of seconds allowed for workers to refresh their beat -var WorkerHeartbeatTimeout = 1200.0 +var WorkerHeartbeatTimeout = 600.0 -// Heartbeat runs in a goroutine and check last beat from all workers -func Heartbeat() { - // If this goroutine exit, then it's a crash - defer log.Fatalf("Goroutine of worker.Heartbeat exited - Exit CDS Engine") +// CheckHeartbeat runs in a goroutine and check last beat from all workers +func CheckHeartbeat(c context.Context, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(10 * time.Second).C for { - time.Sleep(10 * time.Second) - if db := database.DB(); db != nil { - w, err := LoadDeadWorkers(database.DBMap(db), WorkerHeartbeatTimeout) - if err != nil { - log.Warning("WorkerHeartbeat> Cannot load dead workers: %s", err) - time.Sleep(10 * time.Second) - continue + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting WorkerHeartbeat: %v", c.Err()) } - - for i := range w { - log.Debug("WorkerHeartbeat> Delete worker %s[%s] LastBeat:%d hatchery:%d status:%s", w[i].Name, w[i].ID, w[i].LastBeat, w[i].HatcheryID, w[i].Status) - if err = DeleteWorker(database.DBMap(db), w[i].ID); err != nil { - log.Warning("WorkerHeartbeat> Cannot delete worker %s: %s", w[i].ID, err) + return + case <-tick: + if db := DBFunc(); db != nil { + w, err := LoadDeadWorkers(db, WorkerHeartbeatTimeout) + if err != nil { + log.Warning("WorkerHeartbeat> Cannot load dead workers: %s", err) continue } + + for i := range w { + log.Debug("WorkerHeartbeat> Delete worker %s[%s] LastBeat:%d hatchery:%d status:%s", w[i].Name, w[i].ID, w[i].LastBeat, w[i].HatcheryID, w[i].Status) + if err = DeleteWorker(db, w[i].ID); err != nil { + log.Warning("WorkerHeartbeat> Cannot delete worker %s: %s", w[i].ID, err) + continue + } + } } } } diff --git a/engine/api/worker/init.go b/engine/api/worker/init.go index 5076969e51..01e70a750f 100644 --- a/engine/api/worker/init.go +++ b/engine/api/worker/init.go @@ -1,8 +1,15 @@ package worker +import ( + "context" + "time" + + "github.com/go-gorp/gorp" +) + //Initialize init the package -func Initialize() error { - go Heartbeat() - go ModelCapabilititiesCacheLoader(5) +func Initialize(c context.Context, DBFunc func() *gorp.DbMap) error { + go CheckHeartbeat(c, DBFunc) + go ModelCapabilititiesCacheLoader(c, 10*time.Second, DBFunc) return nil } diff --git a/engine/api/worker/model_capabilities.go b/engine/api/worker/model_capabilities.go index d45c81129b..76fac5d047 100644 --- a/engine/api/worker/model_capabilities.go +++ b/engine/api/worker/model_capabilities.go @@ -1,38 +1,45 @@ package worker import ( + "context" "fmt" "time" "github.com/go-gorp/gorp" "github.com/ovh/cds/engine/api/cache" - "github.com/ovh/cds/engine/api/database" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/log" ) //ModelCapabilititiesCacheLoader set all model Capabilities in the cache -func ModelCapabilititiesCacheLoader(delay time.Duration) { +func ModelCapabilititiesCacheLoader(c context.Context, delay time.Duration, DBFunc func() *gorp.DbMap) { + tick := time.NewTicker(delay).C for { - time.Sleep(delay * time.Second) - db := database.DB() - dbmap := database.DBMap(db) - if db != nil { - var mayIWork string - loaderKey := cache.Key("worker", "modelcapabilitites", "loading") - if cache.Get(loaderKey, &mayIWork) { - cache.SetWithTTL(loaderKey, "true", 60) - wms, err := LoadWorkerModels(dbmap) - if err != nil { - log.Warning("ModelCapabilititiesCacheLoader> Unable to load worker models: %s", err) - continue - } - for _, wm := range wms { - modelKey := cache.Key("worker", "modelcapabilitites", fmt.Sprintf("%d", wm.ID)) - cache.Set(modelKey, wm.Capabilities) + select { + case <-c.Done(): + if c.Err() != nil { + log.Error("Exiting worker.ModelCapabilititiesCacheLoader: %v", c.Err()) + return + } + case <-tick: + dbmap := DBFunc() + if dbmap != nil { + var mayIWork string + loaderKey := cache.Key("worker", "modelcapabilitites", "loading") + if cache.Get(loaderKey, &mayIWork) { + cache.SetWithTTL(loaderKey, "true", 60) + wms, err := LoadWorkerModels(dbmap) + if err != nil { + log.Warning("ModelCapabilititiesCacheLoader> Unable to load worker models: %s", err) + continue + } + for _, wm := range wms { + modelKey := cache.Key("worker", "modelcapabilitites", fmt.Sprintf("%d", wm.ID)) + cache.Set(modelKey, wm.Capabilities) + } + cache.Delete(loaderKey) } - cache.Delete(loaderKey) } } } diff --git a/engine/worker/builtin_script.go b/engine/worker/builtin_script.go index f0fe6ba506..97a353b80a 100644 --- a/engine/worker/builtin_script.go +++ b/engine/worker/builtin_script.go @@ -146,7 +146,7 @@ func runScriptAction(a *sdk.Action, pbJob sdk.PipelineBuildJob, stepOrder int) s for _, p := range pbJob.Parameters { envName := strings.Replace(p.Name, ".", "_", -1) envName = strings.ToUpper(envName) - if sdk.NeedPlaceholder(p.Type) { + if !sdk.NeedPlaceholder(p.Type) { cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", envName, p.Value)) } else { cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", envName, sdk.PasswordPlaceholder))