Skip to content

Commit

Permalink
Changes to latency-profile-observer,controller
Browse files Browse the repository at this point in the history
- use a slice for suppress fns in NewLatencyProfileObserver
- add new suppressor fn: NewSuppressConfigUpdateForExtremeProfilesFunc
- add checkRejectFn in controller to update status when rejection occurs

Signed-off-by: Swarup Ghosh <swghosh@redhat.com>
  • Loading branch information
swghosh committed May 27, 2022
1 parent c6dce2a commit 675a3a8
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 50 deletions.
49 changes: 30 additions & 19 deletions pkg/operator/configobserver/node/observe_latency_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@ type LatencyConfigProfileTuple struct {
ProfileConfigValues map[configv1.WorkerLatencyProfileType]string
}

// LatencyProfileRejectionScenario is used to describe a scenario from and to a profile type when
// updates should be rejected in favour of cluster stability
type LatencyProfileRejectionScenario struct {
FromProfile configv1.WorkerLatencyProfileType
ToProfile configv1.WorkerLatencyProfileType
}

type ShouldSuppressConfigUpdatesFunc func() (bool, error)

type latencyProfileObserver struct {
latencyConfigs []LatencyConfigProfileTuple
shouldSuppressConfigUpdatesFunc func() (bool, error)
latencyConfigs []LatencyConfigProfileTuple
shouldSuppressConfigUpdatesFuncs []ShouldSuppressConfigUpdatesFunc
}

// NewLatencyProfileObserver is used to create ObserveConfigFunc that can be used with an configobservation controller to trigger
// changes to different arg val pairs in observedConfig.* fields and update them on the basis of current worker latency profile.
// ShouldSuppressConfigUpdatesFunc is used to pass a function that returns a boolean and config updates by the observer function are
// only passed iff the bool value is false, it is helpful to gate the config updates in case a pre-req condition is not satisfied.
func NewLatencyProfileObserver(latencyConfigs []LatencyConfigProfileTuple, shouldSuppressConfigUpdatesFunc func() (bool, error)) configobserver.ObserveConfigFunc {
func NewLatencyProfileObserver(latencyConfigs []LatencyConfigProfileTuple, shouldSuppressConfigUpdatesFuncs []ShouldSuppressConfigUpdatesFunc) configobserver.ObserveConfigFunc {
ret := latencyProfileObserver{
latencyConfigs: latencyConfigs,
shouldSuppressConfigUpdatesFunc: shouldSuppressConfigUpdatesFunc,
latencyConfigs: latencyConfigs,
shouldSuppressConfigUpdatesFuncs: shouldSuppressConfigUpdatesFuncs,
}
return ret.observeLatencyProfile
}
Expand All @@ -53,6 +62,22 @@ func (l *latencyProfileObserver) observeLatencyProfile(
ret = configobserver.Pruned(ret, configPaths...)
}()

// if condition is not satisfied, return existing config back without any changes
// eg. shouldSuppressConfigUpdatesFunc is to check if rollouts are in progressing state
// then reject the update and do not set any newly observed values till the last rollout is complete
for _, shouldSupressConfigUpdatesFn := range l.shouldSuppressConfigUpdatesFuncs {
suppress, err := shouldSupressConfigUpdatesFn()
if err != nil {
errs = append(errs, err)
return existingConfig, errs
}
if suppress {
// log that latency profile couldn't be updated due to conditional
klog.Warningf("latency profile config observer did not update observed config due to unsatisified pre-requisite")
return existingConfig, errs
}
}

listers := genericListers.(NodeLister)
configNode, err := listers.NodeLister().Get("cluster")
// we got an error so without the node object we are not able to determine worker latency profile
Expand All @@ -64,20 +89,6 @@ func (l *latencyProfileObserver) observeLatencyProfile(
return existingConfig, errs
}

// if condition is not satisfied, return existing config back without any changes
// eg. shouldSuppressConfigUpdatesFunc is to check if rollouts are in progressing state
// then reject the update and do not set any newly observed values till the last rollout is complete
suppress, err := l.shouldSuppressConfigUpdatesFunc()
if err != nil {
errs = append(errs, err)
return existingConfig, errs
}
if suppress {
// log that latency profile couldn't be updated due to conditional
klog.Warningf("latency profile config observer did not update observed config due to unsatisified pre-requisite")
return existingConfig, errs
}

if configNode.Spec.WorkerLatencyProfile == "" {
// in case worker latency profile is not set on cluster
return existingConfig, errs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,8 @@ func multiScenarioLatencyProfilesTest(t *testing.T, observeFn configobserver.Obs
}
}

func alwaysFalse() (bool, error) {
return false, nil
}

func TestCreateLatencyProfileObserverKAS(t *testing.T) {
kasoObserveLatencyProfile := NewLatencyProfileObserver(kasLatencyConfigs, alwaysFalse)
kasoObserveLatencyProfile := NewLatencyProfileObserver(kasLatencyConfigs, nil)

scenarios := []workerLatencyProfileTestScenario{
// scenario 1: empty worker latency profile
Expand Down Expand Up @@ -138,7 +134,7 @@ func TestCreateLatencyProfileObserverKAS(t *testing.T) {
}

func TestCreateLatencyProfileObserverKCM(t *testing.T) {
kcmoObserveLatencyProfile := NewLatencyProfileObserver(kcmLatencyConfigs, alwaysFalse)
kcmoObserveLatencyProfile := NewLatencyProfileObserver(kcmLatencyConfigs, nil)

scenarios := []workerLatencyProfileTestScenario{
// scenario 1: empty worker latency profile
Expand Down
118 changes: 116 additions & 2 deletions pkg/operator/configobserver/node/suppress_config_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ import (
"fmt"
"reflect"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kyaml "k8s.io/apimachinery/pkg/util/yaml"
listersv1 "k8s.io/client-go/listers/core/v1"

configv1 "github.com/openshift/api/config/v1"
configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"
"github.com/openshift/library-go/pkg/operator/configobserver"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
"github.com/openshift/library-go/pkg/operator/v1helpers"
Expand All @@ -25,7 +30,7 @@ type revisionDiffProfileSuppressor struct {
usedConfigPaths [][]string
}

// NewSuppressConfigUpdateUntilSameProfileFunc is used to create a conditional func (shouldSuppressConfigUpdatesFunc)
// NewSuppressConfigUpdateUntilSameProfileFunc is used to create a conditional func (ShouldSuppressConfigUpdatesFunc)
// that can be used by the latency profile config observer to determine if a new update to observedConfig should be rolled out or not.
// It uses a config map lister and status obtained from a static pod operator client to check if all active revisions on the cluster have
// common values for the required latency config paths or not. Config maps generated by installer controller
Expand All @@ -34,7 +39,7 @@ func NewSuppressConfigUpdateUntilSameProfileFunc(
operatorClient v1helpers.StaticPodOperatorClient,
configMapLister listersv1.ConfigMapNamespaceLister,
latencyConfigs []LatencyConfigProfileTuple,
) (f func() (bool, error)) {
) ShouldSuppressConfigUpdatesFunc {

usedConfigPaths := make([][]string, len(latencyConfigs))
for i, latencyConfig := range latencyConfigs {
Expand Down Expand Up @@ -110,3 +115,112 @@ func (s *revisionDiffProfileSuppressor) shouldSuppressConfigUpdates() (suppress
// suppress=false, when all config values are identical
return false, nil
}

type rejectExtremeProfilesSupressor struct {
operatorClient v1helpers.StaticPodOperatorClient
configNodeLister configlistersv1.NodeLister
latencyConfigs []LatencyConfigProfileTuple
usedConfigPaths [][]string
rejectionScenarios []LatencyProfileRejectionScenario
}

// NewSuppressConfigUpdateForExtremeProfilesFunc is used to create a conditional func (ShouldSuppressConfigUpdatesFunc)
// that can be used by the latency profile config observer to determine if a new update to observedConfig should be rolled out or not.
// It checks the current value of observedConfig from operator's state (static pod operator resource), tries to determine the latency profile
// that was applied by the config observer and compares the same against the desired profile set on the cluster. In case, the update from
// the active profile to the desired profile is an extreme scenario i.e. one of the scenarios specified in rejectionScenarios,
// the suppressor suppresses the update.
func NewSuppressConfigUpdateForExtremeProfilesFunc(
operatorClient v1helpers.StaticPodOperatorClient,
configNodeLister configlistersv1.NodeLister,
latencyConfigs []LatencyConfigProfileTuple,
rejectionScenarios []LatencyProfileRejectionScenario,
) ShouldSuppressConfigUpdatesFunc {

usedConfigPaths := make([][]string, len(latencyConfigs))
for i, latencyConfig := range latencyConfigs {
usedConfigPaths[i] = latencyConfig.ConfigPath
}

ret := rejectExtremeProfilesSupressor{
operatorClient: operatorClient,
configNodeLister: configNodeLister,
latencyConfigs: latencyConfigs,
usedConfigPaths: usedConfigPaths,
rejectionScenarios: rejectionScenarios,
}
return ret.shouldSuppressConfigUpdates
}

func (s *rejectExtremeProfilesSupressor) shouldSuppressConfigUpdates() (bool, error) {
// get current state of the config
operatorSpec, _, _, err := s.operatorClient.GetStaticPodOperatorState()
if err != nil {
return false, err
}

// raw bytes of observed config in JSON format
observedConfigRaw, err := kyaml.ToJSON(operatorSpec.OperatorSpec.ObservedConfig.Raw)
if err != nil {
return false, err
}

// decode the JSON
var observedConfig map[string]interface{}
err = json.Unmarshal(observedConfigRaw, &observedConfig)
if err != nil {
return false, err
}
// prune the obtained config with only config paths that we monitor for latency profiles
observedConfigPruned := configobserver.Pruned(observedConfig, s.usedConfigPaths...)

knownProfileConfigs := make(map[configv1.WorkerLatencyProfileType]map[string]interface{})
// traverse along each latency config to construct a config arg val map for each known profile
for _, latencyConfig := range s.latencyConfigs {
for profile := range latencyConfig.ProfileConfigValues {
profileValue := latencyConfig.ProfileConfigValues[profile]

_, ok := knownProfileConfigs[profile]
if !ok {
knownProfileConfigs[profile] = make(map[string]interface{})
}
err = unstructured.SetNestedStringSlice(knownProfileConfigs[profile], []string{profileValue}, latencyConfig.ConfigPath...)
if err != nil {
return false, err
}
}
}

// determine which profile was in the obtained config
var fromProfile configv1.WorkerLatencyProfileType
for profile := range knownProfileConfigs {
if reflect.DeepEqual(observedConfigPruned, knownProfileConfigs[profile]) {
fromProfile = profile
break
}
}

// get desired profile from config node object
configNode, err := s.configNodeLister.Get("cluster")
if err != nil {
if !apierrors.IsNotFound(err) {
return false, err
}
// suppress=False
return false, nil
}

// desired profile
toProfile := configNode.Spec.WorkerLatencyProfile

// check from/to profile if it is one of the rejection scenarios,
// suppress=true, else suppress=false
for _, rejectionScenario := range s.rejectionScenarios {
if fromProfile == rejectionScenario.FromProfile && toProfile == rejectionScenario.ToProfile {
// suppress=True
return true, nil
}
}
// suppress=False
return false, nil
}
48 changes: 37 additions & 11 deletions pkg/operator/latencyprofilecontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ import (

const (
// set of reasons used for updating status
reasonLatencyProfileUpdated = "ProfileUpdated"
reasonLatencyProfileUpdateTriggered = "ProfileUpdateTriggered"
reasonLatencyProfileEmpty = "ProfileEmpty"
reasonLatencyProfileUpdated = "ProfileUpdated"
reasonLatencyProfileUpdateTriggered = "ProfileUpdateTriggered"
reasonLatencyProfileEmpty = "ProfileEmpty"
reasonLatencyProfileUpdateProhibited = "ProfileUpdateProhibited"

// prefix used with status types
workerLatencyProfileProgressing = "WorkerLatencyProfileProgressing"
workerLatencyProfileComplete = "WorkerLatencyProfileComplete"
)

type MatchProfileRevisionConfigsFunc func(profile configv1.WorkerLatencyProfileType, revisions []int32) (match bool, err error)
type CheckProfileRejectionFunc func(
desiredProfile configv1.WorkerLatencyProfileType,
currentRevisions []int32,
) (isRejected bool, rejectMsg string, err error)

// LatencyProfileController either instantly via the informers
// or periodically via resync, lists the config/v1/node object
Expand All @@ -40,26 +45,29 @@ type MatchProfileRevisionConfigsFunc func(profile configv1.WorkerLatencyProfileT
// because of an "unknown latency profile" error.

type LatencyProfileController struct {
operatorClient v1helpers.StaticPodOperatorClient
targetNamespace string
configNodeLister listerv1.NodeLister
matchRevisionsFn MatchProfileRevisionConfigsFunc
operatorClient v1helpers.StaticPodOperatorClient
targetNamespace string
configNodeLister listerv1.NodeLister
checkProfileRejectionFn CheckProfileRejectionFunc
matchRevisionsFn MatchProfileRevisionConfigsFunc
}

func NewLatencyProfileController(
operatorClient v1helpers.StaticPodOperatorClient,
targetNamespace string,
checkProfileRejectionFn CheckProfileRejectionFunc,
matchRevisionsFn MatchProfileRevisionConfigsFunc,
nodeInformer configv1informers.NodeInformer,
kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces,
eventRecorder events.Recorder,
) factory.Controller {

ret := &LatencyProfileController{
operatorClient: operatorClient,
targetNamespace: targetNamespace,
matchRevisionsFn: matchRevisionsFn,
configNodeLister: nodeInformer.Lister(),
operatorClient: operatorClient,
targetNamespace: targetNamespace,
checkProfileRejectionFn: checkProfileRejectionFn,
matchRevisionsFn: matchRevisionsFn,
configNodeLister: nodeInformer.Lister(),
}

return factory.New().WithInformers(
Expand Down Expand Up @@ -114,6 +122,24 @@ func (c *LatencyProfileController) sync(ctx context.Context, syncCtx factory.Syn
}
}

// In case a checkProfileRejection func was used, status will be updated accordingly
if c.checkProfileRejectionFn != nil {
isRejected, rejectMsg, err := c.checkProfileRejectionFn(configNodeObj.Spec.WorkerLatencyProfile, uniqueRevisions)
if err != nil {
return err
}
// if profile transition is rejected, set status with rejection message
if isRejected {
_, err = c.updateStatus(
ctx,
false, true, // not progressing, complete=True
reasonLatencyProfileUpdateProhibited,
rejectMsg,
)
return err
}
}

// For each revision, check that the configmap for that revision have correct arg val pairs or not
revisionsHaveSynced, err := c.matchRevisionsFn(configNodeObj.Spec.WorkerLatencyProfile, uniqueRevisions)
if err != nil {
Expand Down

0 comments on commit 675a3a8

Please sign in to comment.