Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support concurrent-podgroup-syncs #2997

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/controller-manager/app/options/options.go
Expand Up @@ -60,6 +60,9 @@ type ServerOption struct {
EnableHealthz bool
// To determine whether inherit owner's annotations for pods when create podgroup
InheritOwnerAnnotations bool
// WorkerThreadsForPG is the number of threads syncing podgroup operations
// The larger the number, the faster the podgroup processing, but requires more CPU load.
WorkerThreadsForPG uint32
}

type DecryptFunc func(c *ServerOption) error
Expand Down Expand Up @@ -90,6 +93,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default")
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", 1, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.")
}

// CheckOptionOrDie checks the LockObjectNamespace.
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/options/options_test.go
Expand Up @@ -54,6 +54,7 @@ func TestAddFlags(t *testing.T) {
InheritOwnerAnnotations: true,
EnableLeaderElection: true,
LockObjectNamespace: defaultLockObjectNamespace,
WorkerThreadsForPG: 1,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/server.go
Expand Up @@ -124,6 +124,7 @@ func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx c
controllerOpt.VolcanoClient = vcclientset.NewForConfigOrDie(config)
controllerOpt.SharedInformerFactory = informers.NewSharedInformerFactory(controllerOpt.KubeClient, 0)
controllerOpt.InheritOwnerAnnotations = opt.InheritOwnerAnnotations
controllerOpt.WorkerThreadsForPG = opt.WorkerThreadsForPG

return func(ctx context.Context) {
framework.ForeachController(func(c framework.Controller) {
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/framework/interface.go
Expand Up @@ -33,6 +33,7 @@ type ControllerOption struct {
MaxRequeueNum int

InheritOwnerAnnotations bool
WorkerThreadsForPG uint32
}

// Controller is the interface of all controllers.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/podgroup/pg_controller.go
Expand Up @@ -81,7 +81,7 @@ func (pg *pgcontroller) Name() string {
func (pg *pgcontroller) Initialize(opt *framework.ControllerOption) error {
pg.kubeClient = opt.KubeClient
pg.vcClient = opt.VolcanoClient
pg.workers = opt.WorkerNum
pg.workers = opt.WorkerThreadsForPG

pg.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

Expand Down