Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix original object mutation on patch retry #43871

Merged
merged 1 commit into from
Apr 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func handleUnmarshal(j []byte) (map[string]interface{}, error) {
// StrategicMergePatch applies a strategic merge patch. The original and patch documents
// must be JSONMap. A patch can be created from an original and modified document by
// calling CreateTwoWayMergeMapPatch.
// Warning: the original and patch JSONMap objects are mutated by this function and should not be reused.
func StrategicMergeMapPatch(original, patch JSONMap, dataStruct interface{}) (JSONMap, error) {
t, err := getTagStructType(dataStruct)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,21 @@ func strategicPatchObject(
patchJS []byte,
objToUpdate runtime.Object,
versionedObj runtime.Object,
) (originalObjMap map[string]interface{}, patchMap map[string]interface{}, retErr error) {
originalObjMap = make(map[string]interface{})
) error {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just removed the return values here since they are unsuitable for use after having used them to apply the patch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - that sounds good.

originalObjMap := make(map[string]interface{})
if err := unstructured.DefaultConverter.ToUnstructured(originalObject, &originalObjMap); err != nil {
return nil, nil, err
return err
}

patchMap = make(map[string]interface{})
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
return nil, nil, err
return err
}

if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, versionedObj); err != nil {
return nil, nil, err
return err
}
return
return nil
}

// applyPatchToObject applies a strategic merge patch of <patchMap> to
Expand Down
60 changes: 44 additions & 16 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func patchResource(
originalObjJS []byte
originalPatchedObjJS []byte
originalObjMap map[string]interface{}
originalPatchMap map[string]interface{}
getOriginalPatchMap func() (map[string]interface{}, error)
lastConflictErr error
originalResourceVersion string
)
Expand Down Expand Up @@ -610,6 +610,26 @@ func patchResource(
return nil, err
}
originalObjJS, originalPatchedObjJS = originalJS, patchedJS

// Make a getter that can return a fresh strategic patch map if needed for conflict retries
// We have to rebuild it each time we need it, because the map gets mutated when being applied
var originalPatchBytes []byte
getOriginalPatchMap = func() (map[string]interface{}, error) {
if originalPatchBytes == nil {
// Compute once
originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
if err != nil {
return nil, err
}
}
// Return a fresh map every time
originalPatchMap := make(map[string]interface{})
if err := json.Unmarshal(originalPatchBytes, &originalPatchMap); err != nil {
return nil, err
}
return originalPatchMap, nil
}

case types.StrategicMergePatchType:
// Since the patch is applied on versioned objects, we need to convert the
// current object to versioned representation first.
Expand All @@ -621,8 +641,12 @@ func patchResource(
if err != nil {
return nil, err
}
originalMap, patchMap, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
if err != nil {
// Capture the original object map and patch for possible retries.
originalMap := make(map[string]interface{})
if err := unstructured.DefaultConverter.ToUnstructured(currentVersionedObject, &originalMap); err != nil {
return nil, err
}
if err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj); err != nil {
return nil, err
}
// Convert the object back to unversioned.
Expand All @@ -632,8 +656,17 @@ func patchResource(
return nil, err
}
objToUpdate = unversionedObjToUpdate
// Store unstructured representations for possible retries.
originalObjMap, originalPatchMap = originalMap, patchMap
// Store unstructured representation for possible retries.
originalObjMap = originalMap
// Make a getter that can return a fresh strategic patch map if needed for conflict retries
// We have to rebuild it each time we need it, because the map gets mutated when being applied
getOriginalPatchMap = func() (map[string]interface{}, error) {
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
return nil, err
}
return patchMap, nil
}
}
if err := checkName(objToUpdate, name, namespace, namer); err != nil {
return nil, err
Expand Down Expand Up @@ -669,17 +702,6 @@ func patchResource(
return nil, err
}
} else {
if originalPatchMap == nil {
// Compute original patch, if we already didn't do this in previous retries.
originalPatch, err := strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
if err != nil {
return nil, err
}
originalPatchMap = make(map[string]interface{})
if err := json.Unmarshal(originalPatch, &originalPatchMap); err != nil {
return nil, err
}
}
// Compute current patch.
currentObjJS, err := runtime.Encode(codec, currentObject)
if err != nil {
Expand All @@ -695,6 +717,12 @@ func patchResource(
}
}

// Get a fresh copy of the original strategic patch each time through, since applying it mutates the map
originalPatchMap, err := getOriginalPatchMap()
if err != nil {
return nil, err
}

hasConflicts, err := mergepatch.HasConflicts(originalPatchMap, currentPatchMap)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestPatchAnonymousField(t *testing.T) {
}

actual := &testPatchType{}
_, _, err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{})
err := strategicPatchObject(codec, defaulter, original, []byte(patch), actual, &testPatchType{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestNumberConversion(t *testing.T) {

patchJS := []byte(`{"spec":{"ports":[{"port":80,"nodePort":31789}]}}`)

_, _, err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj)
if err != nil {
t.Fatal(err)
}
Expand Down
116 changes: 116 additions & 0 deletions test/integration/apiserver/patch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// +build integration,!no-etcd

/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apiserver

import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/pborman/uuid"

"reflect"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/endpoints/handlers"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/test/integration/framework"
)

// Tests that the apiserver retries non-overlapping conflicts on patches
func TestPatchConflicts(t *testing.T) {
s, clientSet := setup(t)
defer s.Close()

ns := framework.CreateTestingNamespace("status-code", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)

// Create the object we're going to conflict on
clientSet.Core().Secrets(ns.Name).Create(&v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
// Populate annotations so the strategic patch descends, compares, and notices the $patch directive
Annotations: map[string]string{"initial": "value"},
},
})
client := clientSet.Core().RESTClient()

successes := int32(0)

// Run a lot of simultaneous patch operations to exercise internal API server retry of patch application.
// Internally, a patch API call retries up to MaxRetryWhenPatchConflicts times if the resource version of the object has changed.
// If the resource version of the object changed between attempts, that means another one of our patch requests succeeded.
// That means if we run 2*MaxRetryWhenPatchConflicts patch attempts, we should see at least MaxRetryWhenPatchConflicts succeed.
wg := sync.WaitGroup{}
for i := 0; i < (2 * handlers.MaxRetryWhenPatchConflicts); i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
annotationName := fmt.Sprintf("annotation-%d", i)
labelName := fmt.Sprintf("label-%d", i)
value := uuid.NewRandom().String()

obj, err := client.Patch(types.StrategicMergePatchType).
Namespace(ns.Name).
Resource("secrets").
Name("test").
Body([]byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}, "annotations":{"$patch":"replace","%s":"%s"}}}`, labelName, value, annotationName, value))).
Do().
Get()

if errors.IsConflict(err) {
t.Logf("tolerated conflict error patching %s: %v", "secrets", err)
return
}
if err != nil {
t.Errorf("error patching %s: %v", "secrets", err)
return
}

accessor, err := meta.Accessor(obj)
if err != nil {
t.Errorf("error getting object from %s: %v", "secrets", err)
return
}
// make sure the label we wanted was effective
if accessor.GetLabels()[labelName] != value {
t.Errorf("patch of %s was ineffective, expected %s=%s, got labels %#v", "secrets", labelName, value, accessor.GetLabels())
return
}
// make sure the patch directive didn't get lost, and that the entire annotation map was replaced
if !reflect.DeepEqual(accessor.GetAnnotations(), map[string]string{annotationName: value}) {
t.Errorf("patch of %s with $patch directive was ineffective, didn't replace entire annotations map: %#v", "secrets", accessor.GetAnnotations())
}

atomic.AddInt32(&successes, 1)
}(i)
}
wg.Wait()

if successes < handlers.MaxRetryWhenPatchConflicts {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I don't understand this. We have 2N concurrent goroutines trying to do a patch. Why at least N of them has to succeed? Can you explain it (and add comment about it)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/kubernetes/kubernetes/pull/43902/files#diff-a98175d31a994ac0386ef984acfe13f5R111

@liggitt wants to copy that into his patch. Took some time to get the reasoning there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW - I just looked into the other PR, and I think I like the approach from #43902 better. So maybe we should proceed with that one and close this one?
@liggitt @sttts - thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rename the function here as well. That one doesn't address the patch map mutation issue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. But the test is still passing with the other PR, which means it's not really testing everything, right?

Anyway - I'm fine with proceeding with this one too - just want to understand it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test is still passing with the other PR

the test in the other PR doesn't include the patch directives... the test in this PR would fail against the fix in the other PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - sorry for confusion then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you then please add a comment (similar than the one from the other PR) and I will lgtm this PR.

t.Errorf("Expected at least %d successful patches for %s, got %d", handlers.MaxRetryWhenPatchConflicts, "secrets", successes)
} else {
t.Logf("Got %d successful patches for %s", successes, "secrets")
}

}