forked from openshift/library-go
/
management_state_controller.go
144 lines (114 loc) · 4.33 KB
/
management_state_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package management
import (
"fmt"
"strings"
"time"
"k8s.io/klog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
operatorv1 "github.com/openshift/api/operator/v1"
"github.com/openshift/library-go/pkg/operator/condition"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/v1helpers"
operatorv1helpers "github.com/openshift/library-go/pkg/operator/v1helpers"
)
var workQueueKey = "instance"
// ManagementStateController watches changes of `managementState` field and react in case that field is set to an unsupported value.
// As each operator can opt-out from supporting `unmanaged` or `removed` states, this controller will add failing condition when the
// value for this field is set to this values for those operators.
type ManagementStateController struct {
operatorName string
operatorClient operatorv1helpers.OperatorClient
cachesToSync []cache.InformerSynced
queue workqueue.RateLimitingInterface
eventRecorder events.Recorder
}
func NewOperatorManagementStateController(
name string,
operatorClient operatorv1helpers.OperatorClient,
recorder events.Recorder,
) *ManagementStateController {
c := &ManagementStateController{
operatorName: name,
operatorClient: operatorClient,
eventRecorder: recorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ManagementStateController_"+strings.Replace(name, "-", "_", -1)),
}
operatorClient.Informer().AddEventHandler(c.eventHandler())
c.cachesToSync = append(c.cachesToSync, operatorClient.Informer().HasSynced)
return c
}
func (c ManagementStateController) sync() error {
detailedSpec, _, _, err := c.operatorClient.GetOperatorState()
if apierrors.IsNotFound(err) {
c.eventRecorder.Warningf("StatusNotFound", "Unable to determine current operator status for %s", c.operatorName)
return nil
}
cond := operatorv1.OperatorCondition{
Type: condition.ManagementStateDegradedConditionType,
Status: operatorv1.ConditionFalse,
}
if IsOperatorAlwaysManaged() && detailedSpec.ManagementState == operatorv1.Unmanaged {
cond.Status = operatorv1.ConditionTrue
cond.Reason = "Unmanaged"
cond.Message = fmt.Sprintf("Unmanaged is not supported for %s operator", c.operatorName)
}
if IsOperatorNotRemovable() && detailedSpec.ManagementState == operatorv1.Removed {
cond.Status = operatorv1.ConditionTrue
cond.Reason = "Removed"
cond.Message = fmt.Sprintf("Removed is not supported for %s operator", c.operatorName)
}
if IsOperatorUnknownState(detailedSpec.ManagementState) {
cond.Status = operatorv1.ConditionTrue
cond.Reason = "Unknown"
cond.Message = fmt.Sprintf("Unsupported management state %q for %s operator", detailedSpec.ManagementState, c.operatorName)
}
if _, _, updateError := v1helpers.UpdateStatus(c.operatorClient, v1helpers.UpdateConditionFn(cond)); updateError != nil {
if err == nil {
return updateError
}
}
return nil
}
func (c *ManagementStateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("Starting management-state-controller-" + c.operatorName)
defer klog.Infof("Shutting down management-state-controller-" + c.operatorName)
if !cache.WaitForCacheSync(stopCh, c.cachesToSync...) {
return
}
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
}
func (c *ManagementStateController) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *ManagementStateController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)
err := c.sync()
if err == nil {
c.queue.Forget(dsKey)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)
return true
}
// eventHandler queues the operator to check spec and status
func (c *ManagementStateController) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
}
}