Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (api): all goroutines are managed with context #678

Merged
merged 11 commits into from
Jun 6, 2017
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

[![Join the chat at https://gitter.im/ovh-cds/Lobby](https://badges.gitter.im/ovh-cds/Lobby.svg)](https://gitter.im/ovh-cds/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Go Report Card](https://goreportcard.com/badge/github.com/ovh/cds)](https://goreportcard.com/report/github.com/ovh/cds)
[![Coverage Status](https://coveralls.io/repos/github/ovh/cds/badge.svg?branch=master)](https://coveralls.io/github/ovh/cds?branch=master)

<img align="right" src="https://raw.githubusercontent.com/ovh/cds/master/logo-background.png" width="25%">

Expand Down
46 changes: 28 additions & 18 deletions engine/api/action/requirement_cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package action

import (
"context"
"fmt"
"time"

Expand All @@ -12,25 +13,34 @@ import (
)

//RequirementsCacheLoader set all action requirement in the cache
func RequirementsCacheLoader(delay time.Duration, DBFunc func() *gorp.DbMap) {
func RequirementsCacheLoader(c context.Context, delay time.Duration, DBFunc func() *gorp.DbMap) {
tick := time.NewTicker(delay).C

for {
time.Sleep(delay * time.Second)
db := DBFunc()
if db != nil {
var mayIWork string
loaderKey := cache.Key("action", "requirements", "loading")
if cache.Get(loaderKey, &mayIWork) {
cache.SetWithTTL(loaderKey, "true", 60)
actions, err := LoadActions(db)
if err != nil {
log.Warning("RequirementsCacheLoader> Unable to load worker models: %s", err)
continue
}
for _, a := range actions {
k := cache.Key("action", "requirements", fmt.Sprintf("%d", a.ID))
cache.Set(k, a.Requirements)
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting RequirementsCacheLoader: %v", c.Err())
return
}
case <-tick:
db := DBFunc()
if db != nil {
var mayIWork string
loaderKey := cache.Key("action", "requirements", "loading")
if cache.Get(loaderKey, &mayIWork) {
cache.SetWithTTL(loaderKey, "true", 60)
actions, err := LoadActions(db)
if err != nil {
log.Warning("RequirementsCacheLoader> Unable to load worker models: %s", err)
continue
}
for _, a := range actions {
k := cache.Key("action", "requirements", fmt.Sprintf("%d", a.ID))
cache.Set(k, a.Requirements)
}
cache.Delete(loaderKey)
}
cache.Delete(loaderKey)
}
}
}
Expand All @@ -45,7 +55,7 @@ func GetRequirements(db gorp.SqlExecutor, id int64) ([]sdk.Requirement, error) {
var err error
req, err = LoadActionRequirements(db, id)
if err != nil {
return nil, fmt.Errorf("GetRequirements> cannot LoadActionRequirements: %s\n", err)
return nil, fmt.Errorf("GetRequirements> cannot LoadActionRequirements: %s", err)
}
cache.Set(k, req)
}
Expand Down
26 changes: 17 additions & 9 deletions engine/api/audit.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
package main

import (
"context"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

const (
maxVersion = 10
delay = 1
)

func auditCleanerRoutine(DBFunc func() *gorp.DbMap) {
defer sdk.Exit("AuditCleanerRoutine exited")
func auditCleanerRoutine(c context.Context, DBFunc func() *gorp.DbMap) {
tick := time.NewTicker(delay * time.Minute).C

for {
db := DBFunc()
if db != nil {
err := actionAuditCleaner(db)
if err != nil {
log.Warning("AuditCleanerRoutine> Action clean failed: %s\n", err)
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting auditCleanerRoutine: %v", c.Err())
}
return
case <-tick:
db := DBFunc()
if db != nil {
err := actionAuditCleaner(db)
if err != nil {
log.Warning("AuditCleanerRoutine> Action clean failed: %s\n", err)
}
}
}
time.Sleep(1 * time.Minute)
}
}

Expand Down
17 changes: 9 additions & 8 deletions engine/api/auth/auth.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package auth

import (
"context"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -11,7 +12,7 @@ import (
"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/context"
ctx "github.com/ovh/cds/engine/api/context"
"github.com/ovh/cds/engine/api/sessionstore"
"github.com/ovh/cds/engine/api/user"
"github.com/ovh/cds/engine/api/worker"
Expand All @@ -25,13 +26,13 @@ type Driver interface {
Store() sessionstore.Store
Authentify(db gorp.SqlExecutor, username, password string) (bool, error)
AuthentifyUser(db gorp.SqlExecutor, user *sdk.User, password string) (bool, error)
GetCheckAuthHeaderFunc(options interface{}) func(db *gorp.DbMap, headers http.Header, c *context.Ctx) error
GetCheckAuthHeaderFunc(options interface{}) func(db *gorp.DbMap, headers http.Header, c *ctx.Ctx) error
}

//GetDriver is a factory
func GetDriver(mode string, options interface{}, storeOptions sessionstore.Options) (Driver, error) {
func GetDriver(c context.Context, mode string, options interface{}, storeOptions sessionstore.Options) (Driver, error) {
log.Info("Auth> Intializing driver (%s)", mode)
store, err := sessionstore.Get(storeOptions.Mode, storeOptions.RedisHost, storeOptions.RedisPassword, storeOptions.TTL)
store, err := sessionstore.Get(c, storeOptions.Mode, storeOptions.RedisHost, storeOptions.RedisPassword, storeOptions.TTL)
if err != nil {
return nil, fmt.Errorf("Unable to get AuthDriver : %s\n", err)
}
Expand Down Expand Up @@ -109,7 +110,7 @@ func GetUsername(store sessionstore.Store, token string) (string, error) {
}

//CheckPersistentSession check persistent session token from CLI
func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool {
func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool {
if headers.Get(sdk.RequestedWithHeader) == sdk.RequestedWithValue {
if getUserPersistentSession(db, store, headers, ctx) {
return true
Expand All @@ -121,7 +122,7 @@ func CheckPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, heade
return false
}

func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool {
func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool {
h := headers.Get(sdk.SessionTokenHeader)
if h != "" {
ok, _ := store.Exists(sessionstore.SessionKey(h))
Expand All @@ -141,7 +142,7 @@ func getUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, hea
return false
}

func reloadUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *context.Ctx) bool {
func reloadUserPersistentSession(db gorp.SqlExecutor, store sessionstore.Store, headers http.Header, ctx *ctx.Ctx) bool {
authHeaderValue := headers.Get("Authorization")
if authHeaderValue == "" {
log.Warning("ReloadUserPersistentSession> No Authorization Header")
Expand Down Expand Up @@ -218,7 +219,7 @@ func GetWorker(db gorp.SqlExecutor, workerID string) (*sdk.Worker, error) {
return w, nil
}

func checkWorkerAuth(db *gorp.DbMap, auth string, ctx *context.Ctx) error {
func checkWorkerAuth(db *gorp.DbMap, auth string, ctx *ctx.Ctx) error {
id, err := base64.StdEncoding.DecodeString(auth)
if err != nil {
return fmt.Errorf("bad worker key syntax: %s", err)
Expand Down
3 changes: 2 additions & 1 deletion engine/api/auth/test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package auth

import (
"context"
"testing"

"github.com/ovh/cds/engine/api/sessionstore"
)

func TestLocalAuth(t *testing.T) Driver {
authDriver, err := GetDriver("local", nil, sessionstore.Options{Mode: "local"})
authDriver, err := GetDriver(context.Background(), "local", nil, sessionstore.Options{Mode: "local"})
if err != nil {
panic(err)
}
Expand Down
9 changes: 9 additions & 0 deletions engine/api/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Store interface {
Enqueue(queueName string, value interface{})
Dequeue(queueName string, value interface{})
DequeueWithContext(c context.Context, queueName string, value interface{})
QueueLen(queueName string) int
}

//Initialize the global cache in memory, or redis
Expand Down Expand Up @@ -119,3 +120,11 @@ func DequeueWithContext(c context.Context, queueName string, value interface{})
}
s.DequeueWithContext(c, queueName, value)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, but comment is on the wrong line

//QueueLen returns the length of a queue
func QueueLen(queueName string) int {
if s == nil {
return 0
}
return s.QueueLen(queueName)
}
9 changes: 9 additions & 0 deletions engine/api/cache/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ func (s *LocalStore) Dequeue(queueName string, value interface{}) {
return
}

//QueueLen returns the length of a queue
func (s *LocalStore) QueueLen(queueName string) int {
l := s.Queues[queueName]
if l == nil {
return 0
}
return l.Len()
}

//DequeueWithContext gets from queue This is blocking while there is nothing in the queue, it can be cancelled with a context.Context
func (s *LocalStore) DequeueWithContext(c context.Context, queueName string, value interface{}) {
l := s.Queues[queueName]
Expand Down
26 changes: 21 additions & 5 deletions engine/api/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ read:
}
}

//QueueLen returns the length of a queue
func (s *RedisStore) QueueLen(queueName string) int {
if s.Client == nil {
log.Error("redis> cannot get redis client")
return 0
}

res, err := s.Client.LLen(queueName).Result()
if err != nil {
log.Warning("redis> Cannot read %s :%s", queueName, err)
}
return int(res)
}

//DequeueWithContext gets from queue This is blocking while there is nothing in the queue, it can be cancelled with a context.Context
func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, value interface{}) {
if s.Client == nil {
Expand All @@ -172,26 +186,27 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val
}

elemChan := make(chan string)
var elemChanClosed bool
var once sync.Once
go func() {
ticker := time.NewTicker(50 * time.Millisecond).C
ticker := time.NewTicker(250 * time.Millisecond).C
for {
select {
case <-ticker:
res, err := s.Client.RPop(queueName).Result()
res, err := s.Client.BRPop(200*time.Millisecond, queueName).Result()
if err == redis.Nil {
continue
}
if err == io.EOF {
time.Sleep(1 * time.Second)
continue
}
if len(res) != 2 {
continue
if err == nil && len(res) == 2 && !elemChanClosed {
elemChan <- res[1]
}
elemChan <- res
case <-c.Done():
once.Do(func() {
elemChanClosed = true
close(elemChan)
})
return
Expand All @@ -205,6 +220,7 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val
json.Unmarshal(b, value)
}
once.Do(func() {
elemChanClosed = true
close(elemChan)
})
}
8 changes: 8 additions & 0 deletions engine/api/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,11 @@ func Status() string {

return fmt.Sprintf("Database: %s OK (%d conns)", dbDriver, db.Stats().OpenConnections)
}

// Close closes the database, releasing any open resources.
func Close() error {
if db != nil {
return db.Close()
}
return nil
}
10 changes: 8 additions & 2 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package event

import (
"context"
"fmt"
"os"

Expand Down Expand Up @@ -54,10 +55,15 @@ func Initialize(k KafkaConfig) error {
}

// DequeueEvent runs in a goroutine and dequeue event from cache
func DequeueEvent() {
func DequeueEvent(c context.Context) {
for {
e := sdk.Event{}
cache.Dequeue("events", &e)
cache.DequeueWithContext(c, "events", &e)
if err := c.Err(); err != nil {
log.Error("Exiting event.DequeueEvent : %v", err)
return
}

for _, b := range brokers {
if err := b.sendEvent(&e); err != nil {
log.Warning("Error while sending message: %s", err)
Expand Down
Loading