Skip to content

Commit

Permalink
fix (api): all goroutines are managed with context (#678)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and yesnault committed Jun 6, 2017
1 parent ed2cc0d commit b292a4f
Show file tree
Hide file tree
Showing 39 changed files with 539 additions and 298 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

[![Join the chat at https://gitter.im/ovh-cds/Lobby](https://badges.gitter.im/ovh-cds/Lobby.svg)](https://gitter.im/ovh-cds/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Go Report Card](https://goreportcard.com/badge/github.com/ovh/cds)](https://goreportcard.com/report/github.com/ovh/cds)
[![Coverage Status](https://coveralls.io/repos/github/ovh/cds/badge.svg?branch=master)](https://coveralls.io/github/ovh/cds?branch=master)

<img align="right" src="https://raw.githubusercontent.com/ovh/cds/master/logo-background.png" width="25%">

Expand Down
46 changes: 28 additions & 18 deletions engine/api/action/requirement_cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package action

import (
"context"
"fmt"
"time"

Expand All @@ -12,25 +13,34 @@ import (
)

//RequirementsCacheLoader set all action requirement in the cache
func RequirementsCacheLoader(delay time.Duration, DBFunc func() *gorp.DbMap) {
func RequirementsCacheLoader(c context.Context, delay time.Duration, DBFunc func() *gorp.DbMap) {
tick := time.NewTicker(delay).C

for {
time.Sleep(delay * time.Second)
db := DBFunc()
if db != nil {
var mayIWork string
loaderKey := cache.Key("action", "requirements", "loading")
if cache.Get(loaderKey, &mayIWork) {
cache.SetWithTTL(loaderKey, "true", 60)
actions, err := LoadActions(db)
if err != nil {
log.Warning("RequirementsCacheLoader> Unable to load worker models: %s", err)
continue
}
for _, a := range actions {
k := cache.Key("action", "requirements", fmt.Sprintf("%d", a.ID))
cache.Set(k, a.Requirements)
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting RequirementsCacheLoader: %v", c.Err())
return
}
case <-tick:
db := DBFunc()
if db != nil {
var mayIWork string
loaderKey := cache.Key("action", "requirements", "loading")
if cache.Get(loaderKey, &mayIWork) {
cache.SetWithTTL(loaderKey, "true", 60)
actions, err := LoadActions(db)
if err != nil {
log.Warning("RequirementsCacheLoader> Unable to load worker models: %s", err)
continue
}
for _, a := range actions {
k := cache.Key("action", "requirements", fmt.Sprintf("%d", a.ID))
cache.Set(k, a.Requirements)
}
cache.Delete(loaderKey)
}
cache.Delete(loaderKey)
}
}
}
Expand All @@ -45,7 +55,7 @@ func GetRequirements(db gorp.SqlExecutor, id int64) ([]sdk.Requirement, error) {
var err error
req, err = LoadActionRequirements(db, id)
if err != nil {
return nil, fmt.Errorf("GetRequirements> cannot LoadActionRequirements: %s\n", err)
return nil, fmt.Errorf("GetRequirements> cannot LoadActionRequirements: %s", err)
}
cache.Set(k, req)
}
Expand Down
26 changes: 17 additions & 9 deletions engine/api/audit.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
package main

import (
"context"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

const (
maxVersion = 10
delay = 1
)

func auditCleanerRoutine(DBFunc func() *gorp.DbMap) {
defer sdk.Exit("AuditCleanerRoutine exited")
func auditCleanerRoutine(c context.Context, DBFunc func() *gorp.DbMap) {
tick := time.NewTicker(delay * time.Minute).C

for {
db := DBFunc()
if db != nil {
err := actionAuditCleaner(db)
if err != nil {
log.Warning("AuditCleanerRoutine> Action clean failed: %s\n", err)
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting auditCleanerRoutine: %v", c.Err())
}
return
case <-tick:
db := DBFunc()
if db != nil {
err := actionAuditCleaner(db)
if err != nil {
log.Warning("AuditCleanerRoutine> Action clean failed: %s\n", err)
}
}
}
time.Sleep(1 * time.Minute)
}
}

Expand Down
17 changes: 9 additions & 8 deletions engine/api/auth/auth.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package auth

import (
"context"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -11,7 +12,7 @@ import (
"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/context"
ctx "github.com/ovh/cds/engine/api/context"
"github.com/ovh/cds/engine/api/sessionstore"
"github.com/ovh/cds/engine/api/user"
"github.com/ovh/cds/engine/api/worker"
Expand All @@ -25,13 +26,13 @@ type Driver interface {
Store() sessionstore.Store
Authentify(db gorp.SqlExecutor, username, password string) (bool, error)
AuthentifyUser(db gorp.SqlExecutor, user *sdk.User, password string) (bool, error)
GetCheckAuthHeaderFunc(options interface{}) func(db *gorp.DbMap, headers http.Header, c *context.Ctx) error
GetCheckAuthHeaderFunc(options interface{}) func(db *gorp.DbMap, headers http.Header, c *ctx.Ctx) error
}

//GetDriver is a factory
func GetDriver(mode string, options interface{}, storeOptions sessionstore.Options) (Driver, error) {
func GetDriver(c context.Context, mode string, options interface{}, storeOptions sessionstore.Options) (Driver, error) {
log.Info("Auth> Intializing driver (%s)", mode)
store, err := sessionstore.Get(storeOptions.Mode, storeOptions.RedisHost, storeOptions.RedisPassword, storeOptions.TTL)
store, err := sessionstore.Get(c, storeOptions.Mode, storeOptions.RedisHost, storeOptions.RedisPassword, storeOptions.TTL)
if err != nil {
return nil, fmt.Errorf("Unable to get AuthDriver : %s\n", err)
}
Expand Down Expand Up @@ -109,7 +110,7 @@ func GetUsername(store sessionstore.Store, token string) (string, error) {
}

//CheckPersistentSession check persistent session token from CLI
func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool {
func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool {
if headers.Get(sdk.RequestedWithHeader) == sdk.RequestedWithValue {
if getUserPersistentSession(db, store, headers, ctx) {
return true
Expand All @@ -121,7 +122,7 @@ func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, heade
return false
}

func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool {
func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool {
h := headers.Get(sdk.SessionTokenHeader)
if h != "" {
ok, _ := store.Exists(sessionstore.SessionKey(h))
Expand All @@ -141,7 +142,7 @@ func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, hea
return false
}

func reloadUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool {
func reloadUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool {
authHeaderValue := headers.Get("Authorization")
if authHeaderValue == "" {
log.Warning("ReloadUserPersistentSession> No Authorization Header")
Expand Down Expand Up @@ -218,7 +219,7 @@ func GetWorker(db gorp.SqlExecutor, workerID string) (*sdk.Worker, error) {
return w, nil
}

func checkWorkerAuth(db *gorp.DbMap, auth string, ctx *context.Ctx) error {
func checkWorkerAuth(db *gorp.DbMap, auth string, ctx *ctx.Ctx) error {
id, err := base64.StdEncoding.DecodeString(auth)
if err != nil {
return fmt.Errorf("bad worker key syntax: %s", err)
Expand Down
3 changes: 2 additions & 1 deletion engine/api/auth/test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package auth

import (
"context"
"testing"

"github.com/ovh/cds/engine/api/sessionstore"
)

func TestLocalAuth(t *testing.T) Driver {
authDriver, err := GetDriver("local", nil, sessionstore.Options{Mode: "local"})
authDriver, err := GetDriver(context.Background(), "local", nil, sessionstore.Options{Mode: "local"})
if err != nil {
panic(err)
}
Expand Down
9 changes: 9 additions & 0 deletions engine/api/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Store interface {
Enqueue(queueName string, value interface{})
Dequeue(queueName string, value interface{})
DequeueWithContext(c context.Context, queueName string, value interface{})
QueueLen(queueName string) int
}

//Initialize the global cache in memory, or redis
Expand Down Expand Up @@ -119,3 +120,11 @@ func DequeueWithContext(c context.Context, queueName string, value interface{})
}
s.DequeueWithContext(c, queueName, value)
}

//QueueLen returns the length of a queue
func QueueLen(queueName string) int {
if s == nil {
return 0
}
return s.QueueLen(queueName)
}
9 changes: 9 additions & 0 deletions engine/api/cache/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ func (s *LocalStore) Dequeue(queueName string, value interface{}) {
return
}

//QueueLen returns the length of a queue
func (s *LocalStore) QueueLen(queueName string) int {
l := s.Queues[queueName]
if l == nil {
return 0
}
return l.Len()
}

//DequeueWithContext gets from queue This is blocking while there is nothing in the queue, it can be cancelled with a context.Context
func (s *LocalStore) DequeueWithContext(c context.Context, queueName string, value interface{}) {
l := s.Queues[queueName]
Expand Down
26 changes: 21 additions & 5 deletions engine/api/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ read:
}
}

//QueueLen returns the length of a queue
func (s *RedisStore) QueueLen(queueName string) int {
if s.Client == nil {
log.Error("redis> cannot get redis client")
return 0
}

res, err := s.Client.LLen(queueName).Result()
if err != nil {
log.Warning("redis> Cannot read %s :%s", queueName, err)
}
return int(res)
}

//DequeueWithContext gets from queue This is blocking while there is nothing in the queue, it can be cancelled with a context.Context
func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, value interface{}) {
if s.Client == nil {
Expand All @@ -172,26 +186,27 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val
}

elemChan := make(chan string)
var elemChanClosed bool
var once sync.Once
go func() {
ticker := time.NewTicker(50 * time.Millisecond).C
ticker := time.NewTicker(250 * time.Millisecond).C
for {
select {
case <-ticker:
res, err := s.Client.RPop(queueName).Result()
res, err := s.Client.BRPop(200*time.Millisecond, queueName).Result()
if err == redis.Nil {
continue
}
if err == io.EOF {
time.Sleep(1 * time.Second)
continue
}
if len(res) != 2 {
continue
if err == nil && len(res) == 2 && !elemChanClosed {
elemChan <- res[1]
}
elemChan <- res
case <-c.Done():
once.Do(func() {
elemChanClosed = true
close(elemChan)
})
return
Expand All @@ -205,6 +220,7 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val
json.Unmarshal(b, value)
}
once.Do(func() {
elemChanClosed = true
close(elemChan)
})
}
8 changes: 8 additions & 0 deletions engine/api/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,11 @@ func Status() string {

return fmt.Sprintf("Database: %s OK (%d conns)", dbDriver, db.Stats().OpenConnections)
}

// Close closes the database, releasing any open resources.
func Close() error {
if db != nil {
return db.Close()
}
return nil
}
10 changes: 8 additions & 2 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package event

import (
"context"
"fmt"
"os"

Expand Down Expand Up @@ -54,10 +55,15 @@ func Initialize(k KafkaConfig) error {
}

// DequeueEvent runs in a goroutine and dequeue event from cache
func DequeueEvent() {
func DequeueEvent(c context.Context) {
for {
e := sdk.Event{}
cache.Dequeue("events", &e)
cache.DequeueWithContext(c, "events", &e)
if err := c.Err(); err != nil {
log.Error("Exiting event.DequeueEvent : %v", err)
return
}

for _, b := range brokers {
if err := b.sendEvent(&e); err != nil {
log.Warning("Error while sending message: %s", err)
Expand Down
Loading

0 comments on commit b292a4f

Please sign in to comment.