Skip to content

Commit

Permalink
featuregates: process gates after version is read from payload
Browse files Browse the repository at this point in the history
Because of [OCPBUGS-30080](https://issues.redhat.com/browse/OCPBUGS-30080), we cannot easily determine running CVO version by a single `os.Getenv()`, like other operators can. CVO can determine its version from the initial payload it loads from disk though, but this happens a bit later in the code flow, after leadership lease is acquired and all informers are started. At that point we can provide the feature gate / featureset knowledge to the structures that need it: actual CVO controller and the feature changestopper, but these structures also need to be initialized earlier (they require informers which are already started). This leads to a slightly awkard delayed initialization scheme, where the controller structures are initialized early and populated with early content like informers etc. Later, when informers are started and CVO loads its initial payload, we can extract the version from it and use it to populate the feature gate in the controller structures. Because enabled feature gates are avaiable later in the flow, it also means part of the CVO code cannot be gated by a feature gate (like controller initialization, or initial payload loading). We do not need that now but it may cause issues later.

The high-level sequence after this commit looks like this:

1. Initialize CVO and ChangeStopper controller structures with informers they need, and populate CVO's `enabledFeatureGate` checker with one panics when used (no code can check for gates before we know them)
2. Acquire lease and start the informers
3. Fetch a FeatureGate resource from the cluster (using an informer) and determine the FeatureSet from it (needed to load the payload)
4. Load the initial payload from disk and extract the version from it
5. Use the version to determine the enabled feature gates from the FeatureGate resource
6. Populate the CVO and ChangeStopper controller structures with the newly discovered feature gates
  • Loading branch information
petr-muller committed Mar 6, 2024
1 parent e7ea6db commit bfaeda6
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ spec:
name: kube-api-access
readOnly: true
env:
# Unfortunately the placeholder is not replaced, reported as OCPBUGS-30080
- name: OPERATOR_IMAGE_VERSION
value: "0.0.1-snapshot"
- name: KUBERNETES_SERVICE_PORT # allows CVO to communicate with apiserver directly on same host. Is substituted with port from infrastructures.status.apiServerInternalURL if available.
Expand Down
45 changes: 25 additions & 20 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ type Operator struct {
// via annotation
exclude string

clusterFeatures featuregates.ClusterFeatures
enabledFeatureGates featuregates.CvoGateChecker

clusterProfile string
uid types.UID
Expand All @@ -186,7 +186,6 @@ func New(
client clientset.Interface,
kubeClient kubernetes.Interface,
exclude string,
clusterFeatures featuregates.ClusterFeatures,
clusterProfile string,
promqlTarget clusterconditions.PromQLTarget,
injectClusterIdIntoPromQL bool,
Expand Down Expand Up @@ -218,10 +217,14 @@ func New(
upgradeableQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "upgradeable"),

exclude: exclude,
clusterFeatures: clusterFeatures,
clusterProfile: clusterProfile,
conditionRegistry: standard.NewConditionRegistry(promqlTarget),
injectClusterIdIntoPromQL: injectClusterIdIntoPromQL,

// Because of OCPBUGS-30080, we can only detect the enabled feature gates after Operator loads the initial payload
// from disk via LoadInitialPayload. We must not have any gate-checking code until that happens, so we initialize
// this field with a checker that panics when used.
enabledFeatureGates: featuregates.PanicOnUsageBeforeInitialization,
}

if _, err := cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler()); err != nil {
Expand Down Expand Up @@ -253,10 +256,9 @@ func New(
return optr, nil
}

// InitializeFromPayload waits until a ClusterVersion object exists. It then retrieves the payload contents and verifies the
// initial state, then configures the controller that loads and applies content to the cluster. It returns an error if the
// payload appears to be in error rather than continuing.
func (optr *Operator) InitializeFromPayload(ctx context.Context, restConfig *rest.Config, burstRestConfig *rest.Config) error {
// LoadInitialPayload waits until a ClusterVersion object exists. It then retrieves the payload contents, verifies the
// initial state and returns it. If the payload is invalid, an error is returned.
func (optr *Operator) LoadInitialPayload(ctx context.Context, startingRequiredFeatureSet configv1.FeatureSet, restConfig *rest.Config) (*payload.Update, error) {

// wait until cluster version object exists
if err := wait.PollUntilContextCancel(ctx, 3*time.Second, true, func(ctx context.Context) (bool, error) {
Expand All @@ -273,24 +275,19 @@ func (optr *Operator) InitializeFromPayload(ctx context.Context, restConfig *res
}
return true, nil
}); err != nil {
return fmt.Errorf("Error when attempting to get cluster version object: %w", err)
return nil, fmt.Errorf("Error when attempting to get cluster version object: %w", err)
}

update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.release.Image, optr.exclude, optr.clusterFeatures.StartingRequiredFeatureSet,
update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.release.Image, optr.exclude, string(startingRequiredFeatureSet),
optr.clusterProfile, capability.GetKnownCapabilities())

if err != nil {
return fmt.Errorf("the local release contents are invalid - no current version can be determined from disk: %v", err)
return nil, fmt.Errorf("the local release contents are invalid - no current version can be determined from disk: %v", err)
}

optr.release = update.Release
optr.releaseCreated = update.ImageRef.CreationTimestamp.Time
optr.SetArchitecture(update.Architecture)

httpClientConstructor := sigstore.NewCachedHTTPClientConstructor(optr.HTTPClient, nil)
configClient, err := coreclientsetv1.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("unable to create a configuration client: %v", err)
return nil, fmt.Errorf("unable to create a configuration client: %v", err)
}

customSignatureStore := &customsignaturestore.Store{
Expand All @@ -302,7 +299,7 @@ func (optr *Operator) InitializeFromPayload(ctx context.Context, restConfig *res
// attempt to load a verifier as defined in the payload
verifier, signatureStore, err := loadConfigMapVerifierDataFromUpdate(update, httpClientConstructor.HTTPClient, configClient, customSignatureStore)
if err != nil {
return err
return nil, err
}
if verifier != nil {
klog.Infof("Verifying release authenticity: %v", verifier)
Expand All @@ -312,6 +309,16 @@ func (optr *Operator) InitializeFromPayload(ctx context.Context, restConfig *res
}
optr.verifier = verifier
optr.signatureStore = signatureStore
return update, nil
}

// InitializeFromPayload configures the controller that loads and applies content to the cluster given an initial payload
// and feature gate data.
func (optr *Operator) InitializeFromPayload(update *payload.Update, requiredFeatureSet configv1.FeatureSet, cvoFlags featuregates.CvoGateChecker, restConfig *rest.Config, burstRestConfig *rest.Config) {
optr.enabledFeatureGates = cvoFlags
optr.release = update.Release
optr.releaseCreated = update.ImageRef.CreationTimestamp.Time
optr.SetArchitecture(update.Architecture)

// after the verifier has been loaded, initialize the sync worker with a payload retriever
// which will consume the verifier
Expand All @@ -327,12 +334,10 @@ func (optr *Operator) InitializeFromPayload(ctx context.Context, restConfig *res
Cap: time.Second * 15,
},
optr.exclude,
optr.clusterFeatures.StartingRequiredFeatureSet,
requiredFeatureSet,
optr.eventRecorder,
optr.clusterProfile,
)

return nil
}

// ownerReferenceModifier sets the owner reference to the current CV resource if no other reference exists. It also resets
Expand Down
20 changes: 11 additions & 9 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/client-go/config/clientset/versioned/fake"
"github.com/openshift/library-go/pkg/manifest"

"github.com/openshift/cluster-version-operator/pkg/featuregates"
"github.com/openshift/cluster-version-operator/pkg/payload"
"github.com/openshift/cluster-version-operator/pkg/payload/precondition"
"github.com/openshift/library-go/pkg/manifest"
)

var architecture string
Expand Down Expand Up @@ -108,14 +109,15 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]apiruntime.Object, *
}

o := &Operator{
namespace: "test",
name: "version",
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cvo-loop-test"),
client: client,
cvLister: &clientCVLister{client: client},
exclude: "exclude-test",
eventRecorder: record.NewFakeRecorder(100),
clusterProfile: payload.DefaultClusterProfile,
namespace: "test",
name: "version",
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cvo-loop-test"),
client: client,
enabledFeatureGates: featuregates.DefaultCvoGates("version"),
cvLister: &clientCVLister{client: client},
exclude: "exclude-test",
eventRecorder: record.NewFakeRecorder(100),
clusterProfile: payload.DefaultClusterProfile,
}

dynamicScheme := apiruntime.NewScheme()
Expand Down
6 changes: 4 additions & 2 deletions pkg/cvo/cvo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ import (
configv1 "github.com/openshift/api/config/v1"
clientset "github.com/openshift/client-go/config/clientset/versioned"
"github.com/openshift/client-go/config/clientset/versioned/fake"

"github.com/openshift/cluster-version-operator/pkg/payload"
"github.com/openshift/library-go/pkg/manifest"
"github.com/openshift/library-go/pkg/verify/store/serial"
"github.com/openshift/library-go/pkg/verify/store/sigstore"

"github.com/openshift/cluster-version-operator/pkg/featuregates"
"github.com/openshift/cluster-version-operator/pkg/payload"
)

var (
Expand Down Expand Up @@ -2273,6 +2274,7 @@ func TestOperator_sync(t *testing.T) {
optr.configSync = &fakeSyncRecorder{Returns: expectStatus}
}
optr.eventRecorder = record.NewFakeRecorder(100)
optr.enabledFeatureGates = featuregates.DefaultCvoGates("version")

ctx := context.Background()
err := optr.sync(ctx, optr.queueKey())
Expand Down
6 changes: 3 additions & 3 deletions pkg/cvo/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (optr *Operator) syncStatus(ctx context.Context, original, config *configv1
original = config.DeepCopy()
}

updateClusterVersionStatus(&config.Status, status, optr.release, optr.getAvailableUpdates, optr.clusterFeatures.StartingCvoFeatureGates, validationErrs)
updateClusterVersionStatus(&config.Status, status, optr.release, optr.getAvailableUpdates, optr.enabledFeatureGates, validationErrs)

if klog.V(6).Enabled() {
klog.Infof("Apply config: %s", diff.ObjectReflectDiff(original, config))
Expand All @@ -211,7 +211,7 @@ func (optr *Operator) syncStatus(ctx context.Context, original, config *configv1

// updateClusterVersionStatus updates the passed cvStatus with the latest status information
func updateClusterVersionStatus(cvStatus *configv1.ClusterVersionStatus, status *SyncWorkerStatus,
release configv1.Release, getAvailableUpdates func() *availableUpdates, enabledGates featuregates.CvoGates,
release configv1.Release, getAvailableUpdates func() *availableUpdates, enabledGates featuregates.CvoGateChecker,
validationErrs field.ErrorList) {

cvStatus.ObservedGeneration = status.Generation
Expand Down Expand Up @@ -382,7 +382,7 @@ func updateClusterVersionStatus(cvStatus *configv1.ClusterVersionStatus, status
}

oldRriCondition := resourcemerge.FindOperatorStatusCondition(cvStatus.Conditions, resourceReconciliationIssuesConditionType)
if enabledGates.ResourceReconciliationIssuesCondition || (oldRriCondition != nil && enabledGates.UnknownVersion) {
if enabledGates.ResourceReconciliationIssuesCondition() || (oldRriCondition != nil && enabledGates.UnknownVersion()) {
rriCondition := configv1.ClusterOperatorStatusCondition{
Type: resourceReconciliationIssuesConditionType,
Status: configv1.ConditionFalse,
Expand Down
25 changes: 20 additions & 5 deletions pkg/cvo/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/openshift/client-go/config/clientset/versioned/fake"

"github.com/openshift/cluster-version-operator/lib/resourcemerge"
"github.com/openshift/cluster-version-operator/pkg/featuregates"
)

func Test_mergeEqualVersions(t *testing.T) {
Expand Down Expand Up @@ -197,6 +196,19 @@ func TestOperator_syncFailingStatus(t *testing.T) {
}
}

type fakeRriFlags struct {
unknownVersion bool
resourceReconciliationIssuesCondition bool
}

func (f fakeRriFlags) UnknownVersion() bool {
return f.unknownVersion
}

func (f fakeRriFlags) ResourceReconciliationIssuesCondition() bool {
return f.resourceReconciliationIssuesCondition
}

func TestUpdateClusterVersionStatus_UnknownVersionAndRRI(t *testing.T) {
ignoreLastTransitionTime := cmpopts.IgnoreFields(configv1.ClusterOperatorStatusCondition{}, "LastTransitionTime")

Expand Down Expand Up @@ -247,9 +259,9 @@ func TestUpdateClusterVersionStatus_UnknownVersionAndRRI(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
gates := featuregates.CvoGates{
UnknownVersion: tc.unknownVersion,
ResourceReconciliationIssuesCondition: false,
gates := fakeRriFlags{
unknownVersion: tc.unknownVersion,
resourceReconciliationIssuesCondition: false,
}
release := configv1.Release{}
getAvailableUpdates := func() *availableUpdates { return nil }
Expand Down Expand Up @@ -317,7 +329,10 @@ func TestUpdateClusterVersionStatus_ResourceReconciliationIssues(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
gates := featuregates.CvoGates{ResourceReconciliationIssuesCondition: tc.enabled}
gates := fakeRriFlags{
unknownVersion: false,
resourceReconciliationIssuesCondition: tc.enabled,
}
release := configv1.Release{}
getAvailableUpdates := func() *availableUpdates { return nil }
var noErrors field.ErrorList
Expand Down
8 changes: 4 additions & 4 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ type SyncWorker struct {

// requiredFeatureSet is set to the value of Feature.config.openshift.io|spec.featureSet, which contributes to
// whether or not some manifests are included for reconciliation.
requiredFeatureSet string
requiredFeatureSet configv1.FeatureSet

clusterProfile string
}

// NewSyncWorker initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
// to a server, and obey limits about how often to reconcile or retry on errors.
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, requiredFeatureSet string, eventRecorder record.EventRecorder, clusterProfile string) *SyncWorker {
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, requiredFeatureSet configv1.FeatureSet, eventRecorder record.EventRecorder, clusterProfile string) *SyncWorker {
return &SyncWorker{
retriever: retriever,
builder: builder,
Expand All @@ -210,7 +210,7 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder,
// NewSyncWorkerWithPreconditions initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
// to a server, and obey limits about how often to reconcile or retry on errors.
// It allows providing preconditions for loading payload.
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, requiredFeatureSet string, eventRecorder record.EventRecorder, clusterProfile string) *SyncWorker {
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, requiredFeatureSet configv1.FeatureSet, eventRecorder record.EventRecorder, clusterProfile string) *SyncWorker {
worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude, requiredFeatureSet, eventRecorder, clusterProfile)
worker.preconditions = preconditions
return worker
Expand Down Expand Up @@ -315,7 +315,7 @@ func (w *SyncWorker) syncPayload(ctx context.Context, work *SyncWork) ([]configv

// Capability filtering is not done here since unknown capabilities are allowed
// during updated payload load and enablement checking only occurs during apply.
payloadUpdate, err := payload.LoadUpdate(info.Directory, desired.Image, w.exclude, w.requiredFeatureSet, w.clusterProfile, nil)
payloadUpdate, err := payload.LoadUpdate(info.Directory, desired.Image, w.exclude, string(w.requiredFeatureSet), w.clusterProfile, nil)

if err != nil {
msg := fmt.Sprintf("Loading payload failed version=%q image=%q failure=%v", desired.Version, desired.Image, err)
Expand Down

0 comments on commit bfaeda6

Please sign in to comment.