diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index fd6433dc90..f93d6f0e4d 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -22,32 +22,39 @@ import ( // shrinkQueue is used to shrink the polled queue 200% of the channel capacity (l) // it returns as reference date the date of the last element in the shrinkked queue -func shrinkQueue(queue *sdk.WorkflowQueue, l int) time.Time { +func shrinkQueue(queue *sdk.WorkflowQueue, nbJobsToKeep int) time.Time { if len(*queue) == 0 { return time.Time{} } - if l < 1 { - l = 1 + if nbJobsToKeep < 1 { + nbJobsToKeep = 1 } - l = l * 3 - t0 := (*queue)[len(*queue)-1].Queued + + // nbJobsToKeep is by default the concurrent max worker provisionning. + // we keep 2x this number + nbJobsToKeep = nbJobsToKeep * 2 queue.Sort() - if len(*queue) > l { - t0 = (*queue)[l].Queued - newQueue := (*queue)[:l] + if len(*queue) > nbJobsToKeep { + newQueue := (*queue)[:nbJobsToKeep] *queue = newQueue } + t0 := time.Now() + for _, q := range *queue { + if q.Queued.Before(t0) { + t0 = q.Queued + } + } return t0 } func (c *client) QueuePolling(ctx context.Context, jobs chan<- sdk.WorkflowNodeJobRun, pbjobs chan<- sdk.PipelineBuildJob, errs chan<- error, delay time.Duration, graceTime int, exceptWfJobID *int64) error { t0 := time.Unix(0, 0) jobsTicker := time.NewTicker(delay) - pbjobsTicker := time.NewTicker(delay * 5) - oldJobsTicker := time.NewTicker(delay * 10) + pbjobsTicker := time.NewTicker(delay * 10) + oldJobsTicker := time.NewTicker(delay * 5) for { select { diff --git a/sdk/cdsclient/client_queue_test.go b/sdk/cdsclient/client_queue_test.go new file mode 100644 index 0000000000..e1aa6acdf4 --- /dev/null +++ b/sdk/cdsclient/client_queue_test.go @@ -0,0 +1,84 @@ +package cdsclient + +import ( + "reflect" + "testing" + "time" + + "github.com/ovh/cds/sdk" +) + +func Test_shrinkQueue(t *testing.T) { + now := time.Now() + t10, _ := time.Parse(time.RFC3339, "2018-09-01T10:00:00+00:00") + t11, _ := time.Parse(time.RFC3339, "2018-09-01T11:00:00+00:00") + t12, _ := time.Parse(time.RFC3339, "2018-09-01T12:00:00+00:00") + t13, _ := time.Parse(time.RFC3339, "2018-09-01T13:00:00+00:00") + t14, _ := time.Parse(time.RFC3339, "2018-09-01T14:00:00+00:00") + t15, _ := time.Parse(time.RFC3339, "2018-09-01T15:00:00+00:00") + + type args struct { + queue *sdk.WorkflowQueue + l int + } + tests := []struct { + name string + args args + want time.Time + }{ + { + name: "simple", + args: args{queue: &sdk.WorkflowQueue{ + { + ProjectID: 1, + ID: 1, + Queued: t10, + QueuedSeconds: now.Unix() - t10.Unix(), + }, + { + ProjectID: 1, + ID: 2, + Queued: t11, + QueuedSeconds: now.Unix() - t11.Unix(), + }, + { + ProjectID: 1, + ID: 3, + Queued: t12, + QueuedSeconds: now.Unix() - t12.Unix(), + }, + { + ProjectID: 2, + ID: 4, + Queued: t13, + QueuedSeconds: now.Unix() - t13.Unix(), + }, + { + ProjectID: 1, + ID: 5, + Queued: t14, + QueuedSeconds: now.Unix() - t14.Unix(), + }, + { + ProjectID: 3, + ID: 6, + Queued: t15, + QueuedSeconds: now.Unix() - t15.Unix(), + }, + }, + l: 2, + }, + want: t10, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := shrinkQueue(tt.args.queue, tt.args.l); !reflect.DeepEqual(got, tt.want) { + t.Errorf("shrinkQueue() = %v, want %v", got, tt.want) + } + for _, q := range *tt.args.queue { + t.Logf("Project:%d ID:%d Queued: %s\n", q.ProjectID, q.ID, q.Queued) + } + }) + } +} diff --git a/sdk/hatchery/hatchery.go b/sdk/hatchery/hatchery.go index cbe91bcc3c..67c4cc81e8 100644 --- a/sdk/hatchery/hatchery.go +++ b/sdk/hatchery/hatchery.go @@ -96,8 +96,8 @@ func Create(h Interface) error { tickerGetModels.Stop() }() - pbjobs := make(chan sdk.PipelineBuildJob, 10) - wjobs := make(chan sdk.WorkflowNodeJobRun, 10) + pbjobs := make(chan sdk.PipelineBuildJob, 2) + wjobs := make(chan sdk.WorkflowNodeJobRun, h.Configuration().Provision.MaxConcurrentProvisioning) errs := make(chan error, 1) // Create a cache with a default expiration time of 3 second, and which diff --git a/sdk/workflow_run_test.go b/sdk/workflow_run_test.go index 808689055c..894c33a4bc 100644 --- a/sdk/workflow_run_test.go +++ b/sdk/workflow_run_test.go @@ -2,6 +2,7 @@ package sdk import ( "testing" + "time" "github.com/ovh/venom" "github.com/stretchr/testify/assert" @@ -124,6 +125,15 @@ func TestWorkflowRunReport(t *testing.T) { } func TestWorkflowQueue_Sort(t *testing.T) { + now := time.Now() + t10, _ := time.Parse(time.RFC3339, "2018-09-01T10:00:00+00:00") + t11, _ := time.Parse(time.RFC3339, "2018-09-01T11:00:00+00:00") + t12, _ := time.Parse(time.RFC3339, "2018-09-01T12:00:00+00:00") + t13, _ := time.Parse(time.RFC3339, "2018-09-01T13:00:00+00:00") + t14, _ := time.Parse(time.RFC3339, "2018-09-01T14:00:00+00:00") + t15, _ := time.Parse(time.RFC3339, "2018-09-01T15:00:00+00:00") + t16, _ := time.Parse(time.RFC3339, "2018-09-01T16:00:00+00:00") + tests := []struct { name string q WorkflowQueue @@ -133,30 +143,90 @@ func TestWorkflowQueue_Sort(t *testing.T) { name: "test sort 1", q: WorkflowQueue{ { - ProjectID: 1, - ID: 1, + ProjectID: 1, + ID: 1, + Queued: t10, + QueuedSeconds: now.Unix() - t10.Unix(), + }, + { + ProjectID: 1, + ID: 2, + Queued: t11, + QueuedSeconds: now.Unix() - t11.Unix(), + }, + { + ProjectID: 2, + ID: 3, + Queued: t12, + QueuedSeconds: now.Unix() - t12.Unix(), + }, + { + ProjectID: 1, + ID: 4, + Queued: t13, + QueuedSeconds: now.Unix() - t13.Unix(), + }, + { + ProjectID: 1, + ID: 5, + Queued: t14, + QueuedSeconds: now.Unix() - t14.Unix(), }, { - ProjectID: 1, - ID: 2, + ProjectID: 2, + ID: 6, + Queued: t15, + QueuedSeconds: now.Unix() - t15.Unix(), }, { - ProjectID: 2, - ID: 3, + ProjectID: 1, + ID: 7, + Queued: t16, + QueuedSeconds: now.Unix() - t16.Unix(), }, }, expected: WorkflowQueue{ { - ProjectID: 2, - ID: 3, + ProjectID: 2, + ID: 3, + Queued: t12, + QueuedSeconds: now.Unix() - t12.Unix(), + }, + { + ProjectID: 2, + ID: 6, + Queued: t15, + QueuedSeconds: now.Unix() - t15.Unix(), + }, + { + ProjectID: 1, + ID: 1, + Queued: t10, + QueuedSeconds: now.Unix() - t10.Unix(), + }, + { + ProjectID: 1, + ID: 2, + Queued: t11, + QueuedSeconds: now.Unix() - t11.Unix(), + }, + { + ProjectID: 1, + ID: 4, + Queued: t13, + QueuedSeconds: now.Unix() - t13.Unix(), }, { - ProjectID: 1, - ID: 1, + ProjectID: 1, + ID: 5, + Queued: t14, + QueuedSeconds: now.Unix() - t14.Unix(), }, { - ProjectID: 1, - ID: 2, + ProjectID: 1, + ID: 7, + Queued: t16, + QueuedSeconds: now.Unix() - t16.Unix(), }, }, },