Skip to content

Commit

Permalink
fix (api): dequeue with context
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin committed Jun 5, 2017
1 parent ed2cc0d commit ec062e4
Show file tree
Hide file tree
Showing 31 changed files with 447 additions and 264 deletions.
46 changes: 28 additions & 18 deletions engine/api/action/requirement_cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package action

import (
"context"
"fmt"
"time"

Expand All @@ -12,25 +13,34 @@ import (
)

//RequirementsCacheLoader set all action requirement in the cache
func RequirementsCacheLoader(delay time.Duration, DBFunc func() *gorp.DbMap) {
func RequirementsCacheLoader(c context.Context, delay time.Duration, DBFunc func() *gorp.DbMap) {
tick := time.NewTicker(delay * time.Second).C

for {
time.Sleep(delay * time.Second)
db := DBFunc()
if db != nil {
var mayIWork string
loaderKey := cache.Key("action", "requirements", "loading")
if cache.Get(loaderKey, &mayIWork) {
cache.SetWithTTL(loaderKey, "true", 60)
actions, err := LoadActions(db)
if err != nil {
log.Warning("RequirementsCacheLoader> Unable to load worker models: %s", err)
continue
}
for _, a := range actions {
k := cache.Key("action", "requirements", fmt.Sprintf("%d", a.ID))
cache.Set(k, a.Requirements)
select {
case <-c.Done():
if c.Err() != nil {
log.Error("RequirementsCacheLoader> Exiting RequirementsCacheLoader: %v", c.Err())
return
}
case <-tick:
db := DBFunc()
if db != nil {
var mayIWork string
loaderKey := cache.Key("action", "requirements", "loading")
if cache.Get(loaderKey, &mayIWork) {
cache.SetWithTTL(loaderKey, "true", 60)
actions, err := LoadActions(db)
if err != nil {
log.Warning("RequirementsCacheLoader> Unable to load worker models: %s", err)
continue
}
for _, a := range actions {
k := cache.Key("action", "requirements", fmt.Sprintf("%d", a.ID))
cache.Set(k, a.Requirements)
}
cache.Delete(loaderKey)
}
cache.Delete(loaderKey)
}
}
}
Expand All @@ -45,7 +55,7 @@ func GetRequirements(db gorp.SqlExecutor, id int64) ([]sdk.Requirement, error) {
var err error
req, err = LoadActionRequirements(db, id)
if err != nil {
return nil, fmt.Errorf("GetRequirements> cannot LoadActionRequirements: %s\n", err)
return nil, fmt.Errorf("GetRequirements> cannot LoadActionRequirements: %s", err)
}
cache.Set(k, req)
}
Expand Down
26 changes: 17 additions & 9 deletions engine/api/audit.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
package main

import (
"context"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

const (
maxVersion = 10
delay = 1
)

func auditCleanerRoutine(DBFunc func() *gorp.DbMap) {
defer sdk.Exit("AuditCleanerRoutine exited")
func auditCleanerRoutine(c context.Context, DBFunc func() *gorp.DbMap) {
tick := time.NewTicker(delay * time.Minute).C

for {
db := DBFunc()
if db != nil {
err := actionAuditCleaner(db)
if err != nil {
log.Warning("AuditCleanerRoutine> Action clean failed: %s\n", err)
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting auditCleanerRoutine: %v", c.Err())
}
return
case <-tick:
db := DBFunc()
if db != nil {
err := actionAuditCleaner(db)
if err != nil {
log.Warning("AuditCleanerRoutine> Action clean failed: %s\n", err)
}
}
}
time.Sleep(1 * time.Minute)
}
}

Expand Down
17 changes: 9 additions & 8 deletions engine/api/auth/auth.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package auth

import (
"context"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -11,7 +12,7 @@ import (
"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/context"
ctx "github.com/ovh/cds/engine/api/context"
"github.com/ovh/cds/engine/api/sessionstore"
"github.com/ovh/cds/engine/api/user"
"github.com/ovh/cds/engine/api/worker"
Expand All @@ -25,13 +26,13 @@ type Driver interface {
Store() sessionstore.Store
Authentify(db gorp.SqlExecutor, username, password string) (bool, error)
AuthentifyUser(db gorp.SqlExecutor, user *sdk.User, password string) (bool, error)
GetCheckAuthHeaderFunc(options interface{}) func(db *gorp.DbMap, headers http.Header, c *context.Ctx) error
GetCheckAuthHeaderFunc(options interface{}) func(db *gorp.DbMap, headers http.Header, c *ctx.Ctx) error
}

//GetDriver is a factory
func GetDriver(mode string, options interface{}, storeOptions sessionstore.Options) (Driver, error) {
func GetDriver(c context.Context, mode string, options interface{}, storeOptions sessionstore.Options) (Driver, error) {
log.Info("Auth> Intializing driver (%s)", mode)
store, err := sessionstore.Get(storeOptions.Mode, storeOptions.RedisHost, storeOptions.RedisPassword, storeOptions.TTL)
store, err := sessionstore.Get(c, storeOptions.Mode, storeOptions.RedisHost, storeOptions.RedisPassword, storeOptions.TTL)
if err != nil {
return nil, fmt.Errorf("Unable to get AuthDriver : %s\n", err)
}
Expand Down Expand Up @@ -109,7 +110,7 @@ func GetUsername(store sessionstore.Store, token string) (string, error) {
}

//CheckPersistentSession check persistent session token from CLI
func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool {
func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool {
if headers.Get(sdk.RequestedWithHeader) == sdk.RequestedWithValue {
if getUserPersistentSession(db, store, headers, ctx) {
return true
Expand All @@ -121,7 +122,7 @@ func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, heade
return false
}

func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool {
func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool {
h := headers.Get(sdk.SessionTokenHeader)
if h != "" {
ok, _ := store.Exists(sessionstore.SessionKey(h))
Expand All @@ -141,7 +142,7 @@ func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, hea
return false
}

func reloadUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool {
func reloadUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool {
authHeaderValue := headers.Get("Authorization")
if authHeaderValue == "" {
log.Warning("ReloadUserPersistentSession> No Authorization Header")
Expand Down Expand Up @@ -218,7 +219,7 @@ func GetWorker(db gorp.SqlExecutor, workerID string) (*sdk.Worker, error) {
return w, nil
}

func checkWorkerAuth(db *gorp.DbMap, auth string, ctx *context.Ctx) error {
func checkWorkerAuth(db *gorp.DbMap, auth string, ctx *ctx.Ctx) error {
id, err := base64.StdEncoding.DecodeString(auth)
if err != nil {
return fmt.Errorf("bad worker key syntax: %s", err)
Expand Down
3 changes: 2 additions & 1 deletion engine/api/auth/test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package auth

import (
"context"
"testing"

"github.com/ovh/cds/engine/api/sessionstore"
)

func TestLocalAuth(t *testing.T) Driver {
authDriver, err := GetDriver("local", nil, sessionstore.Options{Mode: "local"})
authDriver, err := GetDriver(context.Background(), "local", nil, sessionstore.Options{Mode: "local"})
if err != nil {
panic(err)
}
Expand Down
8 changes: 8 additions & 0 deletions engine/api/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,11 @@ func Status() string {

return fmt.Sprintf("Database: %s OK (%d conns)", dbDriver, db.Stats().OpenConnections)
}

// Close closes the database, releasing any open resources.
func Close() error {
if db != nil {
return db.Close()
}
return nil
}
10 changes: 8 additions & 2 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package event

import (
"context"
"fmt"
"os"

Expand Down Expand Up @@ -54,10 +55,15 @@ func Initialize(k KafkaConfig) error {
}

// DequeueEvent runs in a goroutine and dequeue event from cache
func DequeueEvent() {
func DequeueEvent(c context.Context) {
for {
e := sdk.Event{}
cache.Dequeue("events", &e)
cache.DequeueWithContext(c, "events", &e)
if c.Err() != nil {
log.Error("event.DequeueEvent error : %v", e)
return
}

for _, b := range brokers {
if err := b.sendEvent(&e); err != nil {
log.Warning("Error while sending message: %s", err)
Expand Down
39 changes: 22 additions & 17 deletions engine/api/hatchery/heartbeat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hatchery

import (
"context"
"time"

"github.com/go-gorp/gorp"
Expand All @@ -13,30 +14,34 @@ var HatcheryHeartbeatTimeout = 30.0

// Heartbeat runs in a goroutine and check last beat from all hatcheries
// on a 10s basis
func Heartbeat(DBFunc func() *gorp.DbMap) {
// If this goroutine exit, then it's a crash
defer log.Fatalf("Goroutine of hatchery.Heartbeat exited - Exit CDS Engine")
func Heartbeat(c context.Context, DBFunc func() *gorp.DbMap) {
tick := time.NewTicker(5 * time.Second).C

for {
db := DBFunc()
if db != nil {
w, err := LoadDeadHatcheries(db, HatcheryHeartbeatTimeout)
if err != nil {
log.Warning("HatcheryHeartbeat> Cannot load hatcherys: %s\n", err)
// add extra sleep if db is unavailable
time.Sleep(5 * time.Second)
continue
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting HatcheryHeartbeat: %v", c.Err())
return
}

for i := range w {
err = DeleteHatchery(db, w[i].ID, w[i].Model.ID)
case <-tick:
db := DBFunc()
if db != nil {
w, err := LoadDeadHatcheries(db, HatcheryHeartbeatTimeout)
if err != nil {
log.Warning("HatcheryHeartbeat> Cannot delete hatchery %d: %s\n", w[i].ID, err)
log.Warning("HatcheryHeartbeat> Cannot load hatcherys: %s\n", err)
continue
}
log.Debug("HatcheryHeartbeat> Hatchery %s removed.\n", w[i].Name)

for i := range w {
err = DeleteHatchery(db, w[i].ID, w[i].Model.ID)
if err != nil {
log.Warning("HatcheryHeartbeat> Cannot delete hatchery %d: %s\n", w[i].ID, err)
continue
}
log.Debug("HatcheryHeartbeat> Hatchery %s removed.\n", w[i].Name)
}
}
}
time.Sleep(5 * time.Second)
}
}
23 changes: 16 additions & 7 deletions engine/api/hook.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
ctx "context"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -203,16 +204,24 @@ func deleteHook(w http.ResponseWriter, r *http.Request, db *gorp.DbMap, c *conte
}

//hookRecoverer is the go-routine which catches on-error hook
func hookRecoverer(DBFunc func() *gorp.DbMap) {
func hookRecoverer(c ctx.Context, DBFunc func() *gorp.DbMap) {
tick := time.NewTicker(10 * time.Second).C
for {
h := hook.ReceivedHook{}
cache.Dequeue("hook:recovery", &h)
if h.Repository != "" {
if err := processHook(DBFunc(), h); err != nil {
hook.Recovery(h, err)
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting hookRecoverer: %v", c.Err())
return
}
case <-tick:
h := hook.ReceivedHook{}
cache.Dequeue("hook:recovery", &h)
if h.Repository != "" {
if err := processHook(DBFunc(), h); err != nil {
hook.Recovery(h, err)
}
}
}
time.Sleep(10 * time.Second)
}
}

Expand Down
Loading

0 comments on commit ec062e4

Please sign in to comment.