Skip to content

Commit

Permalink
fix (api): dequeue with context
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin committed Jun 5, 2017
1 parent 1994b4a commit dea3293
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 38 deletions.
2 changes: 1 addition & 1 deletion engine/api/action/requirement_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func RequirementsCacheLoader(c context.Context, delay time.Duration, DBFunc func
select {
case <-c.Done():
if c.Err() != nil {
log.Error("RequirementsCacheLoader> Exiting RequirementsCacheLoader: %v", c.Err())
log.Error("Exiting RequirementsCacheLoader: %v", c.Err())
return
}
case <-tick:
Expand Down
2 changes: 1 addition & 1 deletion engine/api/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, val
elemChan := make(chan string)
var once sync.Once
go func() {
ticker := time.NewTicker(50 * time.Millisecond).C
ticker := time.NewTicker(100 * time.Millisecond).C
for {
select {
case <-ticker:
Expand Down
5 changes: 3 additions & 2 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ func DequeueEvent(c context.Context) {
for {
e := sdk.Event{}
cache.DequeueWithContext(c, "events", &e)
if c.Err() != nil {
log.Error("event.DequeueEvent error : %v", e)
err := c.Err()
if err != nil {
log.Error("Exiting event.DequeueEvent : %v", err)
return
}

Expand Down
7 changes: 6 additions & 1 deletion engine/api/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,12 @@ func hookRecoverer(c ctx.Context, DBFunc func() *gorp.DbMap) {
}
case <-tick:
h := hook.ReceivedHook{}
cache.Dequeue("hook:recovery", &h)
cache.DequeueWithContext(c, "hook:recovery", &h)
err := c.Err()
if err != nil {
log.Error("Exiting hookRecoverer: %v", err)
return
}
if h.Repository != "" {
if err := processHook(DBFunc(), h); err != nil {
hook.Recovery(h, err)
Expand Down
12 changes: 6 additions & 6 deletions engine/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ var mainCmd = &cobra.Command{

// Gracefully shutdown sql connections
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
signal.Notify(c, syscall.SIGKILL)
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
defer func() {
signal.Stop(c)
cancel()
Expand All @@ -82,8 +80,10 @@ var mainCmd = &cobra.Command{
case <-c:
log.Warning("Cleanup SQL connections")
database.Close()
event.Publish(sdk.EventEngine{Message: "shutdown"})
cancel()
//event.Publish(sdk.EventEngine{Message: "shutdown"})
event.Close()

case <-ctx.Done():
}
}()
Expand Down Expand Up @@ -262,9 +262,9 @@ var mainCmd = &cobra.Command{
go hatchery.Heartbeat(ctx, database.GetDBMap)
go auditCleanerRoutine(ctx, database.GetDBMap)

go repositoriesmanager.ReceiveEvents()
go repositoriesmanager.ReceiveEvents(ctx, database.GetDBMap)

go stats.StartRoutine()
go stats.StartRoutine(ctx, database.GetDBMap)
go action.RequirementsCacheLoader(ctx, 5*time.Second, database.GetDBMap)
go hookRecoverer(ctx, database.GetDBMap)

Expand Down
14 changes: 10 additions & 4 deletions engine/api/repositoriesmanager/events.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package repositoriesmanager

import (
"context"
"fmt"

"github.com/go-gorp/gorp"
"github.com/mitchellh/mapstructure"

"github.com/ovh/cds/engine/api/cache"
"github.com/ovh/cds/engine/api/database"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

//ReceiveEvents has to be launched as a goroutine.
func ReceiveEvents() {
func ReceiveEvents(c context.Context, DBFunc func() *gorp.DbMap) {
for {
e := sdk.Event{}
cache.Dequeue("events_repositoriesmanager", &e)
db := database.DBMap(database.DB())
cache.DequeueWithContext(c, "events_repositoriesmanager", &e)
err := c.Err()
if err != nil {
log.Error("Exiting repositoriesmanager.ReceiveEvents: %v", err)
return
}

db := DBFunc()
if db != nil {
if err := processEvent(db, e); err != nil {
log.Error("ReceiveEvents> err while processing error=%s : %v", err, e)
Expand Down
46 changes: 24 additions & 22 deletions engine/api/stats/worker.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,43 @@
package stats

import (
"context"
"database/sql"
"time"

"github.com/go-gorp/gorp"

"github.com/ovh/cds/engine/api/database"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/log"
)

// StartRoutine starts a routine collecting regular build statistics
func StartRoutine() {

func StartRoutine(c context.Context, DBFunc func() *gorp.DbMap) {
go func() {
defer sdk.Exit("StatsRoutine exited")

tick := time.NewTicker(10 * time.Second).C
for {

time.Sleep(2 * time.Second)

db := database.DBMap(database.DB())
if db != nil {
err := createTodaysRow(db)
if err != nil {
log.Error("StatsRoutine: Cannot create today's row: %s\n", err)
continue
}

err = updateWorkerStats(db)
if err != nil {
log.Warning("StatsRoutine> Cannot update build stats: %s\n", err)
select {
case <-c.Done():
if c.Err() != nil {
log.Error("Exiting Stat routine: %v", c.Err())
}
err = updatePipelineStats(db)
if err != nil {
log.Warning("StatsRoutine> Cannot update build stats: %s\n", err)
return
case <-tick:
db := DBFunc()
if db != nil {
err := createTodaysRow(db)
if err != nil {
log.Error("StatsRoutine: Cannot create today's row: %s\n", err)
continue
}
err = updateWorkerStats(db)
if err != nil {
log.Warning("StatsRoutine> Cannot update build stats: %s\n", err)
}
err = updatePipelineStats(db)
if err != nil {
log.Warning("StatsRoutine> Cannot update build stats: %s\n", err)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/worker/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func CheckHeartbeat(c context.Context, DBFunc func() *gorp.DbMap) {
select {
case <-c.Done():
if c.Err() != nil {
log.Error("WorkerHeartbeat> Exiting CheckHeartbeat: %v", c.Err())
log.Error("Exiting WorkerHeartbeat: %v", c.Err())
}
return
case <-tick:
Expand Down

0 comments on commit dea3293

Please sign in to comment.