Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ manifests: generate yq kustomize
$(MV_TMP_DIR)/v1_service_platform-operators-controller-manager-metrics-service.yaml manifests/0000_50_cluster-platform-operator-manager_02-metricsservice.yaml
$(MV_TMP_DIR)/apps_v1_deployment_platform-operators-controller-manager.yaml manifests/0000_50_cluster-platform-operator-manager_06-deployment.yaml
$(MV_TMP_DIR)/config.openshift.io_v1_clusteroperator_platform-operators-aggregated.yaml manifests/0000_50_cluster-platform-operator-manager_07-aggregated-clusteroperator.yaml
$(MV_TMP_DIR)/config.openshift.io_v1_clusteroperator_platform-operators-core.yaml manifests/0000_50_cluster-platform-operator-manager_07-core-clusteroperator.yaml

@# cluster-platform-operator-manager rbacs
rm -f manifests/0000_50_cluster-platform-operator-manager_03_rbac.yaml
Expand Down
17 changes: 15 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/utils/clock"

configv1 "github.com/openshift/api/config/v1"
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
Expand All @@ -35,6 +36,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"

platformv1alpha1 "github.com/openshift/api/platform/v1alpha1"
"github.com/openshift/platform-operators/internal/checker"
"github.com/openshift/platform-operators/internal/clusteroperator"
"github.com/openshift/platform-operators/internal/controllers"
"github.com/openshift/platform-operators/internal/sourcer"
Expand Down Expand Up @@ -99,13 +101,24 @@ func main() {
}
//+kubebuilder:scaffold:builder

// Add Aggregated CO controller to manager
// Add the core and aggregate ClusterOperator controllers to the manager.
if err = (&controllers.AggregatedClusterOperatorReconciler{
Client: mgr.GetClient(),
ReleaseVersion: clusteroperator.GetReleaseVariable(),
SystemNamespace: util.PodNamespace(systemNamespace),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AggregatedCO")
setupLog.Error(err, "unable to create controller", "controller", "AggregatedClusterOperator")
os.Exit(1)
}
if err = (&controllers.CoreClusterOperatorReconciler{
Client: mgr.GetClient(),
Clock: clock.RealClock{},
Checker: checker.ListChecker{Client: mgr.GetClient()},
AvailabilityThreshold: clusteroperator.DefaultUnavailabilityThreshold,
ReleaseVersion: clusteroperator.GetReleaseVariable(),
SystemNamespace: util.PodNamespace(systemNamespace),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CoreClusterOperator")
os.Exit(1)
}

Expand Down
22 changes: 22 additions & 0 deletions config/clusteroperator/core_clusteroperator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: config.openshift.io/v1
kind: ClusterOperator
metadata:
name: core
spec: {}
status:
versions:
- name: operator
version: "0.0.1-snapshot"
relatedObjects:
- group: ''
name: openshift-platform-operators
resource: namespaces
- group: platform.openshift.io
name: ""
resource: platformoperators
- group: core.rukpak.io
name: ""
resource: bundles
- group: core.rukpak.io
name: ""
resource: bundledeployments
1 change: 1 addition & 0 deletions config/clusteroperator/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
resources:
- aggregated_clusteroperator.yaml
- core_clusteroperator.yaml
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
k8s.io/api v0.25.0
k8s.io/apimachinery v0.25.0
k8s.io/client-go v0.25.0
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed
sigs.k8s.io/controller-runtime v0.12.3
)

Expand Down Expand Up @@ -85,7 +86,6 @@ require (
k8s.io/component-base v0.24.2 // indirect
k8s.io/klog/v2 v2.70.1 // indirect
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
31 changes: 31 additions & 0 deletions internal/checker/checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package checker

import (
"context"

configv1 "github.com/openshift/api/config/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

platformv1alpha1 "github.com/openshift/api/platform/v1alpha1"
)

type Checker interface {
CheckAvailability(context.Context, *configv1.ClusterOperator) bool
}

type ListChecker struct {
client.Client
}

func (c ListChecker) CheckAvailability(ctx context.Context, _ *configv1.ClusterOperator) bool {
poList := &platformv1alpha1.PlatformOperatorList{}
return c.List(ctx, poList) == nil
}

type NoopChecker struct {
Available bool
}

func (c NoopChecker) CheckAvailability(_ context.Context, _ *configv1.ClusterOperator) bool {
return c.Available
}
4 changes: 4 additions & 0 deletions internal/clusteroperator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
ReasonAsExpected = "AsExpected"
)

// From https://github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorstatus/builder.go
// Note: the clock api-machinery library in that package is now deprecated, so it has been removed here.

Expand Down
7 changes: 5 additions & 2 deletions internal/clusteroperator/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package clusteroperator

import "time"

const (
CoreResourceName = "platform-operators-core"
AggregateResourceName = "platform-operators-aggregated"
CoreResourceName = "platform-operators-core"
AggregateResourceName = "platform-operators-aggregated"
DefaultUnavailabilityThreshold = 5 * time.Minute
)
45 changes: 45 additions & 0 deletions internal/clusteroperator/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package clusteroperator

import (
configv1 "github.com/openshift/api/config/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// FindStatusCondition finds the conditionType in conditions.
// Note: manually vendored from o/library-go/pkg/config/clusteroperator/v1helpers/status.go.
func FindStatusCondition(conditions []configv1.ClusterOperatorStatusCondition, conditionType configv1.ClusterStatusConditionType) *configv1.ClusterOperatorStatusCondition {
for i := range conditions {
if conditions[i].Type == conditionType {
return &conditions[i]
}
}
return nil
}

func NewClusterOperator(name string) *configv1.ClusterOperator {
return &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Status: configv1.ClusterOperatorStatus{},
}
}

// SetDefaultStatusConditions adds the default ClusterOperator status conditions to
// the current Builder parameter. Those default status conditions are
// Progressing=True, Degraded=False, and Available=False.
func SetDefaultStatusConditions(builder *Builder, version string) {
builder.WithProgressing(configv1.ConditionTrue, "")
builder.WithDegraded(configv1.ConditionFalse)
builder.WithAvailable(configv1.ConditionFalse, "", "")
builder.WithVersion("operator", version)
}

// SetDefaultRelatedObjects adds the default ClusterOperator related object
// configurations to the Builder parameter.
func SetDefaultRelatedObjects(builder *Builder, namespace string) {
builder.WithRelatedObject("", "namespaces", "", namespace)
builder.WithRelatedObject("platform.openshift.io", "platformoperators", "", "")
builder.WithRelatedObject("core.rukpak.io", "bundles", "", "")
builder.WithRelatedObject("core.rukpak.io", "bundledeployments", "", "")
}
10 changes: 2 additions & 8 deletions internal/controllers/aggregated_clusteroperator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,8 @@ func (r *AggregatedClusterOperatorReconciler) Reconcile(ctx context.Context, req
}()

// Set the default CO status conditions: Progressing=True, Degraded=False, Available=False
coBuilder.WithProgressing(configv1.ConditionTrue, "")
coBuilder.WithDegraded(configv1.ConditionFalse)
coBuilder.WithAvailable(configv1.ConditionFalse, "", "")
coBuilder.WithVersion("operator", r.ReleaseVersion)
coBuilder.WithRelatedObject("", "namespaces", "", r.SystemNamespace)
coBuilder.WithRelatedObject("platform.openshift.io", "platformoperators", "", "")
coBuilder.WithRelatedObject("core.rukpak.io", "bundles", "", "")
coBuilder.WithRelatedObject("core.rukpak.io", "bundledeployments", "", "")
clusteroperator.SetDefaultStatusConditions(coBuilder, r.ReleaseVersion)
clusteroperator.SetDefaultRelatedObjects(coBuilder, r.SystemNamespace)

poList := &platformv1alpha1.PlatformOperatorList{}
if err := r.List(ctx, poList); err != nil {
Expand Down
166 changes: 166 additions & 0 deletions internal/controllers/core_clusteroperator_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright 2022.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
"errors"
"fmt"
"time"

configv1 "github.com/openshift/api/config/v1"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
logr "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/openshift/platform-operators/internal/checker"
"github.com/openshift/platform-operators/internal/clusteroperator"
)

// TODO(tflannag): Appropriately set the "Progressing" status condition
// type during cluster upgrade events.
// FIXME(tflannag): I'm seeing unit test flakes where we're bumping
// the lastTransistionTime value despite being in the same state as
// before which is a bug.

var (
errUnavailable = errors.New("platform operators manager has failed an availability check")
)

type CoreClusterOperatorReconciler struct {
client.Client
clock.Clock
checker.Checker

ReleaseVersion string
SystemNamespace string
AvailabilityThreshold time.Duration
}

//+kubebuilder:rbac:groups=platform.openshift.io,resources=platformoperators,verbs=list
//+kubebuilder:rbac:groups=config.openshift.io,resources=clusteroperators,verbs=get;list;watch
//+kubebuilder:rbac:groups=config.openshift.io,resources=clusteroperators/status,verbs=update;patch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *CoreClusterOperatorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logr.FromContext(ctx)
log.Info("reconciling request", "req", req.NamespacedName)
defer log.Info("finished reconciling request", "req", req.NamespacedName)

coBuilder := clusteroperator.NewBuilder()
coWriter := clusteroperator.NewWriter(r.Client)

core := &configv1.ClusterOperator{}
if err := r.Get(ctx, req.NamespacedName, core); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
defer func() {
if err := coWriter.UpdateStatus(ctx, core, coBuilder.GetStatus()); err != nil {
log.Error(err, "error updating CO status")
}
}()

// Add the default ClusterOperator status configurations to the builder instance.
clusteroperator.SetDefaultStatusConditions(coBuilder, r.ReleaseVersion)
clusteroperator.SetDefaultRelatedObjects(coBuilder, r.SystemNamespace)

// check whether the we're currently passing the availability checks. note: in
// the case where we were previously failing these checks, and we now have passed
// them, the expectation is that we're now setting an A=T state and purging any
// D=T states.
if available := r.CheckAvailability(ctx, core); available {
coBuilder.WithAvailable(configv1.ConditionTrue, fmt.Sprintf("The platform operator manager is available at %s", r.ReleaseVersion), clusteroperator.ReasonAsExpected)
coBuilder.WithProgressing(configv1.ConditionFalse, "")
coBuilder.WithDegraded(configv1.ConditionFalse)
return ctrl.Result{}, nil
}

log.Info("manager failed an availability check")
// we failed the availability checks, and now need to determine whether we to set
// D=T if this is the first time we've failed an availability check to avoid
// prematurely setting A=F during transient events.
if meetsDegradedStatusCriteria(core) {
log.Info("setting degraded=true since this is the first violation")
// avoid stomping on the current A=T status condition value if that
// status condition type was previously set.
available := clusteroperator.FindStatusCondition(core.Status.Conditions, configv1.OperatorAvailable)
if available != nil && available.Status == configv1.ConditionTrue {
coBuilder.WithAvailable(configv1.ConditionTrue, available.Message, available.Reason)
}
coBuilder.WithDegraded(configv1.ConditionTrue)
return ctrl.Result{}, errUnavailable
}
// check whether the time spent in the the D=T state has exceeded the configured
// threshold, and mark the ClusterOperator as unavailable.
if timeInDegradedStateExceedsThreshold(ctx, core, r.Now(), r.AvailabilityThreshold) {
log.Info("adjusted timestamp has exceeded unavailability theshold: setting A=F and P=F")

coBuilder.WithAvailable(configv1.ConditionFalse, "Exceeded platform operator availability timeout", "ExceededUnavailabilityThreshold")
coBuilder.WithProgressing(configv1.ConditionFalse, "Exceeded platform operator availability timeout")
coBuilder.WithDegraded(configv1.ConditionTrue)
}
return ctrl.Result{}, errUnavailable
}

func meetsDegradedStatusCriteria(co *configv1.ClusterOperator) bool {
degraded := clusteroperator.FindStatusCondition(co.Status.Conditions, configv1.OperatorDegraded)
return degraded == nil || degraded.Status != configv1.ConditionTrue
}

func timeInDegradedStateExceedsThreshold(
ctx context.Context,
co *configv1.ClusterOperator,
startTime time.Time,
threshold time.Duration,
) bool {
degraded := clusteroperator.FindStatusCondition(co.Status.Conditions, configv1.OperatorDegraded)
if degraded == nil {
return false
}
lastEncounteredTime := degraded.LastTransitionTime
adjustedTime := lastEncounteredTime.Add(threshold)

logr.FromContext(ctx).Info("checking whether time spent in degraded state has exceeded the configured threshold",
"threshold", threshold.String(),
"current", startTime.String(),
"last", lastEncounteredTime.String(),
"adjusted", adjustedTime.String(),
)
// check whether we've exceeded the availability threshold by comparing
// the currently recorded lastTransistionTime, adding the threshold buffer, and
// verifying whether that adjusted timestamp is less than the current clock timestamp.
return adjustedTime.Before(startTime)
}

// SetupWithManager sets up the controller with the Manager.
func (r *CoreClusterOperatorReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&configv1.ClusterOperator{}, builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool {
// TODO(tflannag): Investigate using using a label selector to avoid caching
// all clusteroperator resources, and then filtering for the "core" clusteroperator
// resource from that shared cache.
return object.GetName() == clusteroperator.CoreResourceName
}))).
Complete(r)
}
Loading