Skip to content

Commit

Permalink
fix(sdk,hatcherty): limit queue read to maxcontainers (#3322)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault committed Sep 14, 2018
1 parent 15998bf commit 5f461f6
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 24 deletions.
27 changes: 17 additions & 10 deletions sdk/cdsclient/client_queue.go
Expand Up @@ -22,32 +22,39 @@ import (

// shrinkQueue is used to shrink the polled queue 200% of the channel capacity (l)
// it returns as reference date the date of the last element in the shrinkked queue
func shrinkQueue(queue *sdk.WorkflowQueue, l int) time.Time {
func shrinkQueue(queue *sdk.WorkflowQueue, nbJobsToKeep int) time.Time {
if len(*queue) == 0 {
return time.Time{}
}

if l < 1 {
l = 1
if nbJobsToKeep < 1 {
nbJobsToKeep = 1
}
l = l * 3
t0 := (*queue)[len(*queue)-1].Queued

// nbJobsToKeep is by default the concurrent max worker provisionning.
// we keep 2x this number
nbJobsToKeep = nbJobsToKeep * 2

queue.Sort()

if len(*queue) > l {
t0 = (*queue)[l].Queued
newQueue := (*queue)[:l]
if len(*queue) > nbJobsToKeep {
newQueue := (*queue)[:nbJobsToKeep]
*queue = newQueue
}
t0 := time.Now()
for _, q := range *queue {
if q.Queued.Before(t0) {
t0 = q.Queued
}
}
return t0
}

func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJobRun, pbjobs chan<- sdk.PipelineBuildJob, errs chan<- error, delay time.Duration, graceTime int, exceptWfJobID *int64) error {
t0 := time.Unix(0, 0)
jobsTicker := time.NewTicker(delay)
pbjobsTicker := time.NewTicker(delay * 5)
oldJobsTicker := time.NewTicker(delay * 10)
pbjobsTicker := time.NewTicker(delay * 10)
oldJobsTicker := time.NewTicker(delay * 5)

for {
select {
Expand Down
84 changes: 84 additions & 0 deletions sdk/cdsclient/client_queue_test.go
@@ -0,0 +1,84 @@
package cdsclient

import (
"reflect"
"testing"
"time"

"github.com/ovh/cds/sdk"
)

func Test_shrinkQueue(t *testing.T) {
now := time.Now()
t10, _ := time.Parse(time.RFC3339, "2018-09-01T10:00:00+00:00")
t11, _ := time.Parse(time.RFC3339, "2018-09-01T11:00:00+00:00")
t12, _ := time.Parse(time.RFC3339, "2018-09-01T12:00:00+00:00")
t13, _ := time.Parse(time.RFC3339, "2018-09-01T13:00:00+00:00")
t14, _ := time.Parse(time.RFC3339, "2018-09-01T14:00:00+00:00")
t15, _ := time.Parse(time.RFC3339, "2018-09-01T15:00:00+00:00")

type args struct {
queue *sdk.WorkflowQueue
l int
}
tests := []struct {
name string
args args
want time.Time
}{
{
name: "simple",
args: args{queue: &sdk.WorkflowQueue{
{
ProjectID: 1,
ID: 1,
Queued: t10,
QueuedSeconds: now.Unix() - t10.Unix(),
},
{
ProjectID: 1,
ID: 2,
Queued: t11,
QueuedSeconds: now.Unix() - t11.Unix(),
},
{
ProjectID: 1,
ID: 3,
Queued: t12,
QueuedSeconds: now.Unix() - t12.Unix(),
},
{
ProjectID: 2,
ID: 4,
Queued: t13,
QueuedSeconds: now.Unix() - t13.Unix(),
},
{
ProjectID: 1,
ID: 5,
Queued: t14,
QueuedSeconds: now.Unix() - t14.Unix(),
},
{
ProjectID: 3,
ID: 6,
Queued: t15,
QueuedSeconds: now.Unix() - t15.Unix(),
},
},
l: 2,
},
want: t10,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := shrinkQueue(tt.args.queue, tt.args.l); !reflect.DeepEqual(got, tt.want) {
t.Errorf("shrinkQueue() = %v, want %v", got, tt.want)
}
for _, q := range *tt.args.queue {
t.Logf("Project:%d ID:%d Queued: %s\n", q.ProjectID, q.ID, q.Queued)
}
})
}
}
4 changes: 2 additions & 2 deletions sdk/hatchery/hatchery.go
Expand Up @@ -96,8 +96,8 @@ func Create(h Interface) error {
tickerGetModels.Stop()
}()

pbjobs := make(chan sdk.PipelineBuildJob, 10)
wjobs := make(chan sdk.WorkflowNodeJobRun, 10)
pbjobs := make(chan sdk.PipelineBuildJob, 2)
wjobs := make(chan sdk.WorkflowNodeJobRun, h.Configuration().Provision.MaxConcurrentProvisioning)
errs := make(chan error, 1)

// Create a cache with a default expiration time of 3 second, and which
Expand Down
94 changes: 82 additions & 12 deletions sdk/workflow_run_test.go
Expand Up @@ -2,6 +2,7 @@ package sdk

import (
"testing"
"time"

"github.com/ovh/venom"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -124,6 +125,15 @@ func TestWorkflowRunReport(t *testing.T) {
}

func TestWorkflowQueue_Sort(t *testing.T) {
now := time.Now()
t10, _ := time.Parse(time.RFC3339, "2018-09-01T10:00:00+00:00")
t11, _ := time.Parse(time.RFC3339, "2018-09-01T11:00:00+00:00")
t12, _ := time.Parse(time.RFC3339, "2018-09-01T12:00:00+00:00")
t13, _ := time.Parse(time.RFC3339, "2018-09-01T13:00:00+00:00")
t14, _ := time.Parse(time.RFC3339, "2018-09-01T14:00:00+00:00")
t15, _ := time.Parse(time.RFC3339, "2018-09-01T15:00:00+00:00")
t16, _ := time.Parse(time.RFC3339, "2018-09-01T16:00:00+00:00")

tests := []struct {
name string
q WorkflowQueue
Expand All @@ -133,30 +143,90 @@ func TestWorkflowQueue_Sort(t *testing.T) {
name: "test sort 1",
q: WorkflowQueue{
{
ProjectID: 1,
ID: 1,
ProjectID: 1,
ID: 1,
Queued: t10,
QueuedSeconds: now.Unix() - t10.Unix(),
},
{
ProjectID: 1,
ID: 2,
Queued: t11,
QueuedSeconds: now.Unix() - t11.Unix(),
},
{
ProjectID: 2,
ID: 3,
Queued: t12,
QueuedSeconds: now.Unix() - t12.Unix(),
},
{
ProjectID: 1,
ID: 4,
Queued: t13,
QueuedSeconds: now.Unix() - t13.Unix(),
},
{
ProjectID: 1,
ID: 5,
Queued: t14,
QueuedSeconds: now.Unix() - t14.Unix(),
},
{
ProjectID: 1,
ID: 2,
ProjectID: 2,
ID: 6,
Queued: t15,
QueuedSeconds: now.Unix() - t15.Unix(),
},
{
ProjectID: 2,
ID: 3,
ProjectID: 1,
ID: 7,
Queued: t16,
QueuedSeconds: now.Unix() - t16.Unix(),
},
},
expected: WorkflowQueue{
{
ProjectID: 2,
ID: 3,
ProjectID: 2,
ID: 3,
Queued: t12,
QueuedSeconds: now.Unix() - t12.Unix(),
},
{
ProjectID: 2,
ID: 6,
Queued: t15,
QueuedSeconds: now.Unix() - t15.Unix(),
},
{
ProjectID: 1,
ID: 1,
Queued: t10,
QueuedSeconds: now.Unix() - t10.Unix(),
},
{
ProjectID: 1,
ID: 2,
Queued: t11,
QueuedSeconds: now.Unix() - t11.Unix(),
},
{
ProjectID: 1,
ID: 4,
Queued: t13,
QueuedSeconds: now.Unix() - t13.Unix(),
},
{
ProjectID: 1,
ID: 1,
ProjectID: 1,
ID: 5,
Queued: t14,
QueuedSeconds: now.Unix() - t14.Unix(),
},
{
ProjectID: 1,
ID: 2,
ProjectID: 1,
ID: 7,
Queued: t16,
QueuedSeconds: now.Unix() - t16.Unix(),
},
},
},
Expand Down

0 comments on commit 5f461f6

Please sign in to comment.