This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
workqueue.go
47 lines (41 loc) · 1.93 KB
/
workqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package controller
import (
"context"
"github.com/lyft/flytepropeller/pkg/controller/config"
"golang.org/x/time/rate"
"github.com/lyft/flytestdlib/logger"
"k8s.io/client-go/util/workqueue"
)
func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) (workqueue.RateLimitingInterface, error) {
// TODO introduce bounds checks
logger.Infof(ctx, "WorkQueue type [%v] configured", cfg.Type)
switch cfg.Type {
case config.WorkqueueTypeBucketRateLimiter:
logger.Infof(ctx, "Using Bucket Ratelimited Workqueue, Rate [%v] Capacity [%v]", cfg.Rate, cfg.Capacity)
return workqueue.NewNamedRateLimitingQueue(
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{
Limiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity),
}, name), nil
case config.WorkqueueTypeExponentialFailureRateLimiter:
logger.Infof(ctx, "Using Exponential failure backoff Ratelimited Workqueue, Base Delay [%v], max Delay [%v]", cfg.BaseDelay, cfg.MaxDelay)
return workqueue.NewNamedRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(cfg.BaseDelay.Duration, cfg.MaxDelay.Duration),
name), nil
case config.WorkqueueTypeMaxOfRateLimiter:
logger.Infof(ctx, "Using Max-of Ratelimited Workqueue, Bucket {Rate [%v] Capacity [%v]} | FailureBackoff {Base Delay [%v], max Delay [%v]}", cfg.Rate, cfg.Capacity, cfg.BaseDelay, cfg.MaxDelay)
return workqueue.NewNamedRateLimitingQueue(
workqueue.NewMaxOfRateLimiter(
&workqueue.BucketRateLimiter{
Limiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity),
},
workqueue.NewItemExponentialFailureRateLimiter(cfg.BaseDelay.Duration,
cfg.MaxDelay.Duration),
), name), nil
case config.WorkqueueTypeDefault:
fallthrough
default:
logger.Infof(ctx, "Using Default Workqueue")
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), nil
}
}