/
controller_manager.go
57 lines (47 loc) · 1.48 KB
/
controller_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package manager
import (
"context"
"sync"
"k8s.io/klog/v2"
"github.com/openshift/library-go/pkg/controller/factory"
)
type ControllerManager interface {
Start(ctx context.Context)
WithController(controller factory.Controller, workers int) ControllerManager
}
// NewControllerManager returns new controller manager.
func NewControllerManager() ControllerManager {
return &controllerManager{}
}
// runnableController represents single controller runnable configuration.
type runnableController struct {
run func(ctx context.Context, workers int)
workersCount int
name string
}
type controllerManager struct {
controllers []runnableController
}
var _ ControllerManager = &controllerManager{}
func (c *controllerManager) WithController(controller factory.Controller, workers int) ControllerManager {
c.controllers = append(c.controllers, runnableController{
run: controller.Run,
workersCount: workers,
name: controller.Name(),
})
return c
}
// Start will run all managed controllers and block until all controllers shutdown.
// When the context passed is cancelled, all controllers are signalled to shutdown.
func (c controllerManager) Start(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(len(c.controllers))
for i := range c.controllers {
go func(index int) {
defer klog.Infof("%s controller terminated", c.controllers[index].name)
defer wg.Done()
c.controllers[index].run(ctx, c.controllers[index].workersCount)
}(i)
}
wg.Wait()
}