/
dynamic.go
143 lines (111 loc) · 3.12 KB
/
dynamic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package source
import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/solo-io/go-utils/contextutils"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)
// Stoppable is a stoppable source
type Stoppable interface {
source.Source
InjectStopChannel(<-chan struct{}) error
}
// DynamicSource is a funnel for sources that can be
// dynamically (de)registered before & after the controller has started
type Dynamic interface {
source.Source
// sources must be registered with a unique id
Add(id string, src source.Source) error
// remove a source. errors if not found
Remove(id string) error
}
// cache of sources
type cachedSource struct {
// the original source
source source.Source
// cancel function to stop it
cancel context.CancelFunc
}
// the args with which the dynamic source was started
type startArgs struct {
h handler.EventHandler
i workqueue.RateLimitingInterface
ps []predicate.Predicate
}
// DynamicSource implements Dynamic
type DynamicSource struct {
// cancel this context to stop all registered sources
ctx context.Context
// the cached sources that can be dynamically added/removed
cache map[string]cachedSource
// cache access
lock sync.RWMutex
// has source started?
started *startArgs
// the channel to which to push events
output source.Channel
}
func NewDynamicSource(ctx context.Context) *DynamicSource {
return &DynamicSource{
ctx: ctx,
cache: make(map[string]cachedSource),
}
}
// start all the sources
func (s *DynamicSource) Start(ctx context.Context, h handler.EventHandler, i workqueue.RateLimitingInterface, ps ...predicate.Predicate) error {
s.lock.Lock()
defer s.lock.Unlock()
if s.started != nil {
return errors.Errorf("source was already started")
}
for _, src := range s.cache {
if err := src.source.Start(ctx, h, i, ps...); err != nil {
return err
}
}
s.started = &startArgs{
h: h,
i: i,
ps: ps,
}
return nil
}
// only Stoppable sources are currently supported
func (s *DynamicSource) Add(id string, src Stoppable) error {
contextutils.LoggerFrom(s.ctx).DPanic("DynamicSource.Add() may not work as expected due to the removal of dependency injection functions from controller-runtime in 15.0. See https://github.com/kubernetes-sigs/controller-runtime/releases")
s.lock.Lock()
defer s.lock.Unlock()
if _, exists := s.cache[id]; exists {
return errors.Errorf("source %v already exists", id)
}
ctx, cancel := context.WithCancel(s.ctx)
if err := src.InjectStopChannel(ctx.Done()); err != nil {
return err
}
if s.started != nil {
if err := src.Start(ctx, s.started.h, s.started.i, s.started.ps...); err != nil {
return errors.Wrapf(err, "failed to start source %v", id)
}
}
s.cache[id] = cachedSource{
source: src,
cancel: cancel,
}
return nil
}
// remove (and stop) a source
func (s *DynamicSource) Remove(id string) error {
s.lock.Lock()
defer s.lock.Unlock()
src, ok := s.cache[id]
if !ok {
return errors.Errorf("no source in cache with id %v", id)
}
src.cancel()
delete(s.cache, id)
return nil
}