Skip to content

Commit

Permalink
Revert "Revert "Merge pull request 107797 from tkashem/revert-107456""
Browse files Browse the repository at this point in the history
This reverts commit b0b4609.
  • Loading branch information
liggitt committed Mar 25, 2022
1 parent 5e63432 commit 363a8be
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 19 deletions.
39 changes: 21 additions & 18 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
Expand All @@ -33,6 +34,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
apitypes "k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -51,7 +53,6 @@ import (
"k8s.io/utils/clock"

flowcontrol "k8s.io/api/flowcontrol/v1beta2"
flowcontrolapplyconfiguration "k8s.io/client-go/applyconfigurations/flowcontrol/v1beta2"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2"
flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta2"
)
Expand Down Expand Up @@ -434,16 +435,18 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior

// if we are going to issue an update, be sure we track every name we update so we know if we update it too often.
currResult.updatedItems.Insert(fsu.flowSchema.Name)
patchBytes, err := makeFlowSchemaConditionPatch(fsu.condition)
if err != nil {
// should never happen because these conditions are created here and well formed
panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error()))
}
if klogV := klog.V(4); klogV.Enabled() {
klogV.Infof("%s writing Condition %s to FlowSchema %s, which had ResourceVersion=%s, because its previous value was %s, diff: %s",
cfgCtlr.name, fsu.condition, fsu.flowSchema.Name, fsu.flowSchema.ResourceVersion, fcfmt.Fmt(fsu.oldValue), cmp.Diff(fsu.oldValue, fsu.condition))
}
fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas()
applyOptions := metav1.ApplyOptions{FieldManager: cfgCtlr.asFieldManager, Force: true}

// the condition field in fsStatusUpdate holds the new condition we want to update.
// TODO: this will break when we have multiple conditions for a flowschema
_, err := fsIfc.ApplyStatus(context.TODO(), toFlowSchemaApplyConfiguration(fsu), applyOptions)
patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager}
_, err = fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status")
if err != nil {
if apierrors.IsNotFound(err) {
// This object has been deleted. A notification is coming
Expand All @@ -459,18 +462,18 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior
return suggestedDelay, utilerrors.NewAggregate(errs)
}

func toFlowSchemaApplyConfiguration(fsUpdate fsStatusUpdate) *flowcontrolapplyconfiguration.FlowSchemaApplyConfiguration {
condition := flowcontrolapplyconfiguration.FlowSchemaCondition().
WithType(fsUpdate.condition.Type).
WithStatus(fsUpdate.condition.Status).
WithReason(fsUpdate.condition.Reason).
WithLastTransitionTime(fsUpdate.condition.LastTransitionTime).
WithMessage(fsUpdate.condition.Message)

return flowcontrolapplyconfiguration.FlowSchema(fsUpdate.flowSchema.Name).
WithStatus(flowcontrolapplyconfiguration.FlowSchemaStatus().
WithConditions(condition),
)
// makeFlowSchemaConditionPatch takes in a condition and returns the patch status as a json.
func makeFlowSchemaConditionPatch(condition flowcontrol.FlowSchemaCondition) ([]byte, error) {
o := struct {
Status flowcontrol.FlowSchemaStatus `json:"status"`
}{
Status: flowcontrol.FlowSchemaStatus{
Conditions: []flowcontrol.FlowSchemaCondition{
condition,
},
},
}
return json.Marshal(o)
}

// shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed.
Expand Down
88 changes: 88 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/patch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flowcontrol

import (
"fmt"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"
flowcontrol "k8s.io/api/flowcontrol/v1beta2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func Test_configController_generatePatchBytes(t *testing.T) {
now := time.Now().UTC()
tests := []struct {
name string
condition flowcontrol.FlowSchemaCondition
want []byte
}{
{
name: "check if only condition is parsed",
condition: flowcontrol.FlowSchemaCondition{
Type: flowcontrol.FlowSchemaConditionDangling,
Status: flowcontrol.ConditionTrue,
Reason: "test reason",
Message: "test none",
LastTransitionTime: metav1.NewTime(now),
},
want: []byte(fmt.Sprintf(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":"%s","reason":"test reason","message":"test none"}]}}`, now.Format(time.RFC3339))),
},
{
name: "check when message has double quotes",
condition: flowcontrol.FlowSchemaCondition{
Type: flowcontrol.FlowSchemaConditionDangling,
Status: flowcontrol.ConditionTrue,
Reason: "test reason",
Message: `test ""none`,
LastTransitionTime: metav1.NewTime(now),
},
want: []byte(fmt.Sprintf(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":"%s","reason":"test reason","message":"test \"\"none"}]}}`, now.Format(time.RFC3339))),
},
{
name: "check when message has a whitespace character that can be escaped",
condition: flowcontrol.FlowSchemaCondition{
Type: flowcontrol.FlowSchemaConditionDangling,
Status: flowcontrol.ConditionTrue,
Reason: "test reason",
Message: `test none`,
LastTransitionTime: metav1.NewTime(now),
},
want: []byte(fmt.Sprintf(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":"%s","reason":"test reason","message":"test \t\tnone"}]}}`, now.Format(time.RFC3339))),
},
{
name: "check when a few fields (message & lastTransitionTime) are missing",
condition: flowcontrol.FlowSchemaCondition{
Type: flowcontrol.FlowSchemaConditionDangling,
Status: flowcontrol.ConditionTrue,
Reason: "test reason",
},
want: []byte(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":null,"reason":"test reason"}]}}`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := makeFlowSchemaConditionPatch(tt.condition)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("makeFlowSchemaConditionPatch() got = %s, want %s; diff is %s", got, tt.want, cmp.Diff(tt.want, got))
}
})
}
}
6 changes: 5 additions & 1 deletion test/integration/apiserver/apply/reset_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ var resetFieldsStatusData = map[schema.GroupVersionResource]string{
// resetFieldsStatusDefault conflicts with statusDefault
const resetFieldsStatusDefault = `{"status": {"conditions": [{"type": "MyStatus", "status":"False"}]}}`

var resetFieldsSkippedResources = map[string]struct{}{}
var resetFieldsSkippedResources = map[string]struct{}{
// TODO: flowschemas is flaking,
// possible bug in the flowschemas controller.
"flowschemas": {},
}

// noConflicts is the set of reources for which
// a conflict cannot occur.
Expand Down

0 comments on commit 363a8be

Please sign in to comment.