Skip to content

Commit

Permalink
Changes to latency-profile-observer,controller
Browse files Browse the repository at this point in the history
- use a slice for suppress fns in NewLatencyProfileObserver
- add new suppressor fn: NewSuppressConfigUpdateForExtremeProfilesFunc
- add checkRejectFn in controller to update status when rejection occurs

Signed-off-by: Swarup Ghosh <swghosh@redhat.com>
  • Loading branch information
swghosh committed May 26, 2022
1 parent 5e5ccf6 commit 594ea24
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 49 deletions.
49 changes: 30 additions & 19 deletions pkg/operator/configobserver/node/observe_latency_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@ type LatencyConfigProfileTuple struct {
ProfileConfigValues map[configv1.WorkerLatencyProfileType]string
}

// LatencyProfileRejectionScenario is used to describe a scenario from and to a profile type when
// updates should be rejected in favour of cluster stability
type LatencyProfileRejectionScenario struct {
FromProfile configv1.WorkerLatencyProfileType
ToProfile configv1.WorkerLatencyProfileType
}

type ShouldSuppressConfigUpdatesFunc func() (bool, error)

type latencyProfileObserver struct {
latencyConfigs []LatencyConfigProfileTuple
shouldSuppressConfigUpdatesFunc func() (bool, error)
latencyConfigs []LatencyConfigProfileTuple
shouldSuppressConfigUpdatesFuncs []ShouldSuppressConfigUpdatesFunc
}

// NewLatencyProfileObserver is used to create ObserveConfigFunc that can be used with an configobservation controller to trigger
// changes to different arg val pairs in observedConfig.* fields and update them on the basis of current worker latency profile.
// ShouldSuppressConfigUpdatesFunc is used to pass a function that returns a boolean and config updates by the observer function are
// only passed iff the bool value is false, it is helpful to gate the config updates in case a pre-req condition is not satisfied.
func NewLatencyProfileObserver(latencyConfigs []LatencyConfigProfileTuple, shouldSuppressConfigUpdatesFunc func() (bool, error)) configobserver.ObserveConfigFunc {
func NewLatencyProfileObserver(latencyConfigs []LatencyConfigProfileTuple, shouldSuppressConfigUpdatesFuncs []ShouldSuppressConfigUpdatesFunc) configobserver.ObserveConfigFunc {
ret := latencyProfileObserver{
latencyConfigs: latencyConfigs,
shouldSuppressConfigUpdatesFunc: shouldSuppressConfigUpdatesFunc,
latencyConfigs: latencyConfigs,
shouldSuppressConfigUpdatesFuncs: shouldSuppressConfigUpdatesFuncs,
}
return ret.observeLatencyProfile
}
Expand All @@ -53,6 +62,22 @@ func (l *latencyProfileObserver) observeLatencyProfile(
ret = configobserver.Pruned(ret, configPaths...)
}()

// if condition is not satisfied, return existing config back without any changes
// eg. shouldSuppressConfigUpdatesFunc is to check if rollouts are in progressing state
// then reject the update and do not set any newly observed values till the last rollout is complete
for _, shouldSupressConfigUpdatesFn := range l.shouldSuppressConfigUpdatesFuncs {
suppress, err := shouldSupressConfigUpdatesFn()
if err != nil {
errs = append(errs, err)
return existingConfig, errs
}
if suppress {
// log that latency profile couldn't be updated due to conditional
klog.Warningf("latency profile config observer did not update observed config due to unsatisified pre-requisite")
return existingConfig, errs
}
}

listers := genericListers.(NodeLister)
configNode, err := listers.NodeLister().Get("cluster")
// we got an error so without the node object we are not able to determine worker latency profile
Expand All @@ -64,20 +89,6 @@ func (l *latencyProfileObserver) observeLatencyProfile(
return existingConfig, errs
}

// if condition is not satisfied, return existing config back without any changes
// eg. shouldSuppressConfigUpdatesFunc is to check if rollouts are in progressing state
// then reject the update and do not set any newly observed values till the last rollout is complete
suppress, err := l.shouldSuppressConfigUpdatesFunc()
if err != nil {
errs = append(errs, err)
return existingConfig, errs
}
if suppress {
// log that latency profile couldn't be updated due to conditional
klog.Warningf("latency profile config observer did not update observed config due to unsatisified pre-requisite")
return existingConfig, errs
}

if configNode.Spec.WorkerLatencyProfile == "" {
// in case worker latency profile is not set on cluster
return existingConfig, errs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,8 @@ func multiScenarioLatencyProfilesTest(t *testing.T, observeFn configobserver.Obs
}
}

func alwaysFalse() (bool, error) {
return false, nil
}

func TestCreateLatencyProfileObserverKAS(t *testing.T) {
kasoObserveLatencyProfile := NewLatencyProfileObserver(kasLatencyConfigs, alwaysFalse)
kasoObserveLatencyProfile := NewLatencyProfileObserver(kasLatencyConfigs, nil)

scenarios := []workerLatencyProfileTestScenario{
// scenario 1: empty worker latency profile
Expand Down Expand Up @@ -138,7 +134,7 @@ func TestCreateLatencyProfileObserverKAS(t *testing.T) {
}

func TestCreateLatencyProfileObserverKCM(t *testing.T) {
kcmoObserveLatencyProfile := NewLatencyProfileObserver(kcmLatencyConfigs, alwaysFalse)
kcmoObserveLatencyProfile := NewLatencyProfileObserver(kcmLatencyConfigs, nil)

scenarios := []workerLatencyProfileTestScenario{
// scenario 1: empty worker latency profile
Expand Down
102 changes: 101 additions & 1 deletion pkg/operator/configobserver/node/suppress_config_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"fmt"
"reflect"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
listersv1 "k8s.io/client-go/listers/core/v1"

configv1 "github.com/openshift/api/config/v1"
configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"
"github.com/openshift/library-go/pkg/operator/configobserver"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
"github.com/openshift/library-go/pkg/operator/v1helpers"
Expand Down Expand Up @@ -34,7 +38,7 @@ func NewSuppressConfigUpdateUntilSameProfileFunc(
operatorClient v1helpers.StaticPodOperatorClient,
configMapLister listersv1.ConfigMapNamespaceLister,
latencyConfigs []LatencyConfigProfileTuple,
) (f func() (bool, error)) {
) ShouldSuppressConfigUpdatesFunc {

usedConfigPaths := make([][]string, len(latencyConfigs))
for i, latencyConfig := range latencyConfigs {
Expand Down Expand Up @@ -110,3 +114,99 @@ func (s *revisionDiffProfileSuppressor) shouldSuppressConfigUpdates() (suppress
// suppress=false, when all config values are identical
return false, nil
}

type rejectExtremeProfilesSupressor struct {
operatorClient v1helpers.StaticPodOperatorClient
configNodeLister configlistersv1.NodeLister
latencyConfigs []LatencyConfigProfileTuple
usedConfigPaths [][]string
rejectionScenarios []LatencyProfileRejectionScenario
}

func NewSuppressConfigUpdateForExtremeProfilesFunc(
operatorClient v1helpers.StaticPodOperatorClient,
configNodeLister configlistersv1.NodeLister,
latencyConfigs []LatencyConfigProfileTuple,
rejectionScenarios []LatencyProfileRejectionScenario,
) ShouldSuppressConfigUpdatesFunc {

usedConfigPaths := make([][]string, len(latencyConfigs))
for i, latencyConfig := range latencyConfigs {
usedConfigPaths[i] = latencyConfig.ConfigPath
}

ret := rejectExtremeProfilesSupressor{
operatorClient: operatorClient,
configNodeLister: configNodeLister,
latencyConfigs: latencyConfigs,
usedConfigPaths: usedConfigPaths,
rejectionScenarios: rejectionScenarios,
}
return ret.shouldSuppressConfigUpdates
}

func (s *rejectExtremeProfilesSupressor) shouldSuppressConfigUpdates() (bool, error) {
operatorSpec, _, _, err := s.operatorClient.GetStaticPodOperatorState()
if err != nil {
return false, err
}

observedConfigRaw, err := resourcemerge.MergeProcessConfig(
nil,
operatorSpec.OperatorSpec.ObservedConfig.Raw,
)
if err != nil {
return false, err
}

var observedConfig map[string]interface{}
err = json.Unmarshal(observedConfigRaw, &observedConfig)
if err != nil {
return false, err
}
observedConfigPruned := configobserver.Pruned(observedConfig, s.usedConfigPaths...)

knownProfileConfigs := make(map[configv1.WorkerLatencyProfileType]map[string]interface{})
for _, latencyConfig := range s.latencyConfigs {
for profile := range latencyConfig.ProfileConfigValues {
profileValue := latencyConfig.ProfileConfigValues[profile]

_, ok := knownProfileConfigs[profile]
if !ok {
knownProfileConfigs[profile] = make(map[string]interface{})
}
err = unstructured.SetNestedStringSlice(knownProfileConfigs[profile], []string{profileValue}, latencyConfig.ConfigPath...)
if err != nil {
return false, err
}
}
}

var fromProfile, toProfile configv1.WorkerLatencyProfileType
for profile := range knownProfileConfigs {
if reflect.DeepEqual(observedConfigPruned, knownProfileConfigs[profile]) {
fromProfile = profile
break
}
}

configNode, err := s.configNodeLister.Get("cluster")
if err != nil {
if !apierrors.IsNotFound(err) {
return false, err
}
// suppress=False
return false, nil
}

toProfile = configNode.Spec.WorkerLatencyProfile

// check whether to reject or not
for _, rejectionScenario := range s.rejectionScenarios {
if fromProfile == rejectionScenario.FromProfile && toProfile == rejectionScenario.ToProfile {
// suppress=True
return true, nil
}
}
return false, nil
}
44 changes: 33 additions & 11 deletions pkg/operator/latencyprofilecontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ import (

const (
// set of reasons used for updating status
reasonLatencyProfileUpdated = "ProfileUpdated"
reasonLatencyProfileUpdateTriggered = "ProfileUpdateTriggered"
reasonLatencyProfileEmpty = "ProfileEmpty"
reasonLatencyProfileUpdated = "ProfileUpdated"
reasonLatencyProfileUpdateTriggered = "ProfileUpdateTriggered"
reasonLatencyProfileEmpty = "ProfileEmpty"
reasonLatencyProfileUpdateProhibited = "ProfileUpdateProhibited"

// prefix used with status types
workerLatencyProfileProgressing = "WorkerLatencyProfileProgressing"
workerLatencyProfileComplete = "WorkerLatencyProfileComplete"
)

type MatchProfileRevisionConfigsFunc func(profile configv1.WorkerLatencyProfileType, revisions []int32) (match bool, err error)
type CheckProfileRejectionFunc func(
desiredProfile configv1.WorkerLatencyProfileType,
currentRevisions []int32,
) (isRejected bool, rejectMsg string, err error)

// LatencyProfileController either instantly via the informers
// or periodically via resync, lists the config/v1/node object
Expand All @@ -40,26 +45,29 @@ type MatchProfileRevisionConfigsFunc func(profile configv1.WorkerLatencyProfileT
// because of an "unknown latency profile" error.

type LatencyProfileController struct {
operatorClient v1helpers.StaticPodOperatorClient
targetNamespace string
configNodeLister listerv1.NodeLister
matchRevisionsFn MatchProfileRevisionConfigsFunc
operatorClient v1helpers.StaticPodOperatorClient
targetNamespace string
configNodeLister listerv1.NodeLister
checkProfileRejectionFn CheckProfileRejectionFunc
matchRevisionsFn MatchProfileRevisionConfigsFunc
}

func NewLatencyProfileController(
operatorClient v1helpers.StaticPodOperatorClient,
targetNamespace string,
checkProfileRejectionFn CheckProfileRejectionFunc,
matchRevisionsFn MatchProfileRevisionConfigsFunc,
nodeInformer configv1informers.NodeInformer,
kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces,
eventRecorder events.Recorder,
) factory.Controller {

ret := &LatencyProfileController{
operatorClient: operatorClient,
targetNamespace: targetNamespace,
matchRevisionsFn: matchRevisionsFn,
configNodeLister: nodeInformer.Lister(),
operatorClient: operatorClient,
targetNamespace: targetNamespace,
checkProfileRejectionFn: checkProfileRejectionFn,
matchRevisionsFn: matchRevisionsFn,
configNodeLister: nodeInformer.Lister(),
}

return factory.New().WithInformers(
Expand Down Expand Up @@ -114,6 +122,20 @@ func (c *LatencyProfileController) sync(ctx context.Context, syncCtx factory.Syn
}
}

isRejected, rejectMsg, err := c.checkProfileRejectionFn(configNodeObj.Spec.WorkerLatencyProfile, uniqueRevisions)
if err != nil {
return err
}
if isRejected {
_, err = c.updateStatus(
ctx,
false, true, // not progressing, complete=True
reasonLatencyProfileUpdateProhibited,
rejectMsg,
)
return err
}

// For each revision, check that the configmap for that revision have correct arg val pairs or not
revisionsHaveSynced, err := c.matchRevisionsFn(configNodeObj.Spec.WorkerLatencyProfile, uniqueRevisions)
if err != nil {
Expand Down
90 changes: 90 additions & 0 deletions pkg/operator/latencyprofilecontroller/profilerejectionchecker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package latencyprofilecontroller

import (
"fmt"
"reflect"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
listersv1 "k8s.io/client-go/listers/core/v1"

configv1 "github.com/openshift/api/config/v1"
nodeobserver "github.com/openshift/library-go/pkg/operator/configobserver/node"
)

type profileRejectRevisionChecker struct {
configMapLister listersv1.ConfigMapNamespaceLister
profileRejectionScenarios []nodeobserver.LatencyProfileRejectionScenario
latencyConfigs []nodeobserver.LatencyConfigProfileTuple
}

// NewInstallerProfileRejectionChecker
func NewInstallerProfileRejectionChecker(
configMapLister listersv1.ConfigMapNamespaceLister,
profileRejectionScenarios []nodeobserver.LatencyProfileRejectionScenario,
latencyConfigs []nodeobserver.LatencyConfigProfileTuple,
) CheckProfileRejectionFunc {

ret := profileRejectRevisionChecker{
configMapLister: configMapLister,
profileRejectionScenarios: profileRejectionScenarios,
latencyConfigs: latencyConfigs,
}
return ret.checkProfileRejection
}

func (r *profileRejectRevisionChecker) checkProfileRejection(targetProfile configv1.WorkerLatencyProfileType, revisions []int32) (isRejected bool, rejectMsg string, err error) {
if len(revisions) <= 0 {
return false, "", fmt.Errorf("no revisions found for comparison")
}
highestRevision := revisions[0]
for _, revision := range revisions {
if revision > highestRevision {
highestRevision = revision
}
}

usedConfigPaths := make([][]string, len(r.latencyConfigs))
knownProfileConfigs := make(map[configv1.WorkerLatencyProfileType]map[string]interface{})

for i, latencyConfig := range r.latencyConfigs {
for profile := range latencyConfig.ProfileConfigValues {
profileValue := latencyConfig.ProfileConfigValues[profile]

_, ok := knownProfileConfigs[profile]
if !ok {
knownProfileConfigs[profile] = make(map[string]interface{})
}
err = unstructured.SetNestedStringSlice(knownProfileConfigs[profile], []string{profileValue}, latencyConfig.ConfigPath...)
if err != nil {
return false, "", err
}
}
usedConfigPaths[i] = latencyConfig.ConfigPath
}

configMap, err := r.configMapLister.Get(fmt.Sprintf("%s-%d", revisionConfigMapName, highestRevision))
if err != nil {
return false, "", err
}

currentConfigPruned, err := getPrunedConfigFromConfigMap(configMap, usedConfigPaths)
if err != nil {
return false, "", err
}

var fromProfile configv1.WorkerLatencyProfileType
for profile := range knownProfileConfigs {
if reflect.DeepEqual(currentConfigPruned, knownProfileConfigs[profile]) {
fromProfile = profile
break
}
}

for _, rejectionScenario := range r.profileRejectionScenarios {
if fromProfile == rejectionScenario.FromProfile && targetProfile == rejectionScenario.ToProfile {
return true, fmt.Sprintf("rejected update from %q to %q latency profile as extreme profile transition is unsupported", fromProfile, targetProfile), nil
}
}

return false, "", nil
}

0 comments on commit 594ea24

Please sign in to comment.