Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #43871 #44173

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ func StrategicMergePatch(original, patch []byte, dataStruct interface{}) ([]byte
// StrategicMergePatch applies a strategic merge patch. The original and patch documents
// must be JSONMap. A patch can be created from an original and modified document by
// calling CreateTwoWayMergeMapPatch.
// Warning: the original and patch JSONMap objects are mutated by this function and should not be reused.
func StrategicMergeMapPatch(original, patch JSONMap, dataStruct interface{}) (JSONMap, error) {
t, err := getTagStructType(dataStruct)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,21 @@ func strategicPatchObject(
patchJS []byte,
objToUpdate runtime.Object,
versionedObj runtime.Object,
) (originalObjMap map[string]interface{}, patchMap map[string]interface{}, retErr error) {
originalObjMap = make(map[string]interface{})
) error {
originalObjMap := make(map[string]interface{})
if err := unstructured.DefaultConverter.ToUnstructured(originalObject, &originalObjMap); err != nil {
return nil, nil, err
return err
}

patchMap = make(map[string]interface{})
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
return nil, nil, err
return err
}

if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, versionedObj); err != nil {
return nil, nil, err
return err
}
return
return nil
}

// applyPatchToObject applies a strategic merge patch of <patchMap> to
Expand Down
66 changes: 47 additions & 19 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Reque
// may be used to deserialize an options object to pass to the getter.
type getterFunc func(ctx request.Context, name string, req *restful.Request) (runtime.Object, error)

// maxRetryWhenPatchConflicts is the maximum number of conflicts retry during a patch operation before returning failure
const maxRetryWhenPatchConflicts = 5
// MaxRetryWhenPatchConflicts is the maximum number of conflicts retry during a patch operation before returning failure
const MaxRetryWhenPatchConflicts = 5

// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
Expand Down Expand Up @@ -570,7 +570,7 @@ func patchResource(
originalObjJS []byte
originalPatchedObjJS []byte
originalObjMap map[string]interface{}
originalPatchMap map[string]interface{}
getOriginalPatchMap func() (map[string]interface{}, error)
lastConflictErr error
originalResourceVersion string
)
Expand Down Expand Up @@ -610,6 +610,26 @@ func patchResource(
return nil, err
}
originalObjJS, originalPatchedObjJS = originalJS, patchedJS

// Make a getter that can return a fresh strategic patch map if needed for conflict retries
// We have to rebuild it each time we need it, because the map gets mutated when being applied
var originalPatchBytes []byte
getOriginalPatchMap = func() (map[string]interface{}, error) {
if originalPatchBytes == nil {
// Compute once
originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
if err != nil {
return nil, err
}
}
// Return a fresh map every time
originalPatchMap := make(map[string]interface{})
if err := json.Unmarshal(originalPatchBytes, &originalPatchMap); err != nil {
return nil, err
}
return originalPatchMap, nil
}

case types.StrategicMergePatchType:
// Since the patch is applied on versioned objects, we need to convert the
// current object to versioned representation first.
Expand All @@ -621,8 +641,12 @@ func patchResource(
if err != nil {
return nil, err
}
originalMap, patchMap, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
if err != nil {
// Capture the original object map and patch for possible retries.
originalMap := make(map[string]interface{})
if err := unstructured.DefaultConverter.ToUnstructured(currentVersionedObject, &originalMap); err != nil {
return nil, err
}
if err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj); err != nil {
return nil, err
}
// Convert the object back to unversioned.
Expand All @@ -632,8 +656,17 @@ func patchResource(
return nil, err
}
objToUpdate = unversionedObjToUpdate
// Store unstructured representations for possible retries.
originalObjMap, originalPatchMap = originalMap, patchMap
// Store unstructured representation for possible retries.
originalObjMap = originalMap
// Make a getter that can return a fresh strategic patch map if needed for conflict retries
// We have to rebuild it each time we need it, because the map gets mutated when being applied
getOriginalPatchMap = func() (map[string]interface{}, error) {
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
return nil, err
}
return patchMap, nil
}
}
if err := checkName(objToUpdate, name, namespace, namer); err != nil {
return nil, err
Expand Down Expand Up @@ -669,17 +702,6 @@ func patchResource(
return nil, err
}
} else {
if originalPatchMap == nil {
// Compute original patch, if we already didn't do this in previous retries.
originalPatch, err := strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
if err != nil {
return nil, err
}
originalPatchMap = make(map[string]interface{})
if err := json.Unmarshal(originalPatch, &originalPatchMap); err != nil {
return nil, err
}
}
// Compute current patch.
currentObjJS, err := runtime.Encode(codec, currentObject)
if err != nil {
Expand All @@ -695,6 +717,12 @@ func patchResource(
}
}

// Get a fresh copy of the original strategic patch each time through, since applying it mutates the map
originalPatchMap, err := getOriginalPatchMap()
if err != nil {
return nil, err
}

hasConflicts, err := mergepatch.HasConflicts(originalPatchMap, currentPatchMap)
if err != nil {
return nil, err
Expand Down Expand Up @@ -742,7 +770,7 @@ func patchResource(

return finishRequest(timeout, func() (runtime.Object, error) {
updateObject, _, updateErr := patcher.Update(ctx, name, updatedObjectInfo)
for i := 0; i < maxRetryWhenPatchConflicts && (errors.IsConflict(updateErr)); i++ {
for i := 0; i < MaxRetryWhenPatchConflicts && (errors.IsConflict(updateErr)); i++ {
lastConflictErr = updateErr
updateObject, _, updateErr = patcher.Update(ctx, name, updatedObjectInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestPatchAnonymousField(t *testing.T) {
}

actual := &testPatchType{}
_, _, err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{})
err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestNumberConversion(t *testing.T) {

patchJS := []byte(`{"spec":{"ports":[{"port":80,"nodePort":31789}]}}`)

_, _, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
if err != nil {
t.Fatal(err)
}
Expand Down
116 changes: 116 additions & 0 deletions test/integration/apiserver/patch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// +build integration,!no-etcd

/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apiserver

import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/pborman/uuid"

"reflect"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/endpoints/handlers"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/test/integration/framework"
)

// Tests that the apiserver retries non-overlapping conflicts on patches
func TestPatchConflicts(t *testing.T) {
s, clientSet := setup(t)
defer s.Close()

ns := framework.CreateTestingNamespace("status-code", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)

// Create the object we're going to conflict on
clientSet.Core().Secrets(ns.Name).Create(&v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
// Populate annotations so the strategic patch descends, compares, and notices the $patch directive
Annotations: map[string]string{"initial": "value"},
},
})
client := clientSet.Core().RESTClient()

successes := int32(0)

// Run a lot of simultaneous patch operations to exercise internal API server retry of patch application.
// Internally, a patch API call retries up to MaxRetryWhenPatchConflicts times if the resource version of the object has changed.
// If the resource version of the object changed between attempts, that means another one of our patch requests succeeded.
// That means if we run 2*MaxRetryWhenPatchConflicts patch attempts, we should see at least MaxRetryWhenPatchConflicts succeed.
wg := sync.WaitGroup{}
for i := 0; i < (2 * handlers.MaxRetryWhenPatchConflicts); i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
annotationName := fmt.Sprintf("annotation-%d", i)
labelName := fmt.Sprintf("label-%d", i)
value := uuid.NewRandom().String()

obj, err := client.Patch(types.StrategicMergePatchType).
Namespace(ns.Name).
Resource("secrets").
Name("test").
Body([]byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}, "annotations":{"$patch":"replace","%s":"%s"}}}`, labelName, value, annotationName, value))).
Do().
Get()

if errors.IsConflict(err) {
t.Logf("tolerated conflict error patching %s: %v", "secrets", err)
return
}
if err != nil {
t.Errorf("error patching %s: %v", "secrets", err)
return
}

accessor, err := meta.Accessor(obj)
if err != nil {
t.Errorf("error getting object from %s: %v", "secrets", err)
return
}
// make sure the label we wanted was effective
if accessor.GetLabels()[labelName] != value {
t.Errorf("patch of %s was ineffective, expected %s=%s, got labels %#v", "secrets", labelName, value, accessor.GetLabels())
return
}
// make sure the patch directive didn't get lost, and that the entire annotation map was replaced
if !reflect.DeepEqual(accessor.GetAnnotations(), map[string]string{annotationName: value}) {
t.Errorf("patch of %s with $patch directive was ineffective, didn't replace entire annotations map: %#v", "secrets", accessor.GetAnnotations())
}

atomic.AddInt32(&successes, 1)
}(i)
}
wg.Wait()

if successes < handlers.MaxRetryWhenPatchConflicts {
t.Errorf("Expected at least %d successful patches for %s, got %d", handlers.MaxRetryWhenPatchConflicts, "secrets", successes)
} else {
t.Logf("Got %d successful patches for %s", successes, "secrets")
}

}