Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimistic-locking on diff #71156

Merged
merged 2 commits into from
Nov 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 35 additions & 1 deletion pkg/kubectl/cmd/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apply

import (
"encoding/json"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -436,6 +437,7 @@ func (o *ApplyOptions) Run() error {
GracePeriod: o.DeleteOptions.GracePeriod,
ServerDryRun: o.ServerDryRun,
OpenapiSchema: openapiSchema,
Retries: maxPatchRetry,
}

patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
Expand Down Expand Up @@ -699,6 +701,12 @@ type Patcher struct {
GracePeriod int
ServerDryRun bool

// If set, forces the patch against a specific resourceVersion
ResourceVersion *string

// Number of retries to make if the patch fails with conflict
Retries int

OpenapiSchema openapi.Resources
}

Expand Down Expand Up @@ -741,6 +749,22 @@ func (v *DryRunVerifier) HasSupport(gvk schema.GroupVersionKind) error {
return nil
}

func addResourceVersion(patch []byte, rv string) ([]byte, error) {
var patchMap map[string]interface{}
err := json.Unmarshal(patch, &patchMap)
if err != nil {
return nil, err
}
u := unstructured.Unstructured{Object: patchMap}
a, err := meta.Accessor(&u)
if err != nil {
return nil, err
}
a.SetResourceVersion(rv)

return json.Marshal(patchMap)
}

func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
// Serialize the current configuration of the object from the server.
current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
Expand Down Expand Up @@ -812,6 +836,13 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names
return patch, obj, nil
}

if p.ResourceVersion != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't patchSimple will fail O(retries) times if p.ResourceVersion != nil and you get a conflict? Should you do this in Patch() before the first patchSimple and not enter the retry loop if p.ResourceVersion != nil?

patch, err = addResourceVersion(patch, *p.ResourceVersion)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr("Failed to insert resourceVersion in patch", source, err)
}
}

options := metav1.UpdateOptions{}
if p.ServerDryRun {
options.DryRun = []string{metav1.DryRunAll}
Expand All @@ -824,7 +855,10 @@ func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, source, names
func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
var getErr error
patchBytes, patchObject, err := p.patchSimple(current, modified, source, namespace, name, errOut)
for i := 1; i <= maxPatchRetry && errors.IsConflict(err); i++ {
if p.Retries == 0 {
p.Retries = maxPatchRetry
}
for i := 1; i <= p.Retries && errors.IsConflict(err); i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use RetryOnConflict

func RetryOnConflict(backoff wait.Backoff, fn func() error) error {

if i > triesBeforeBackOff {
p.BackOff.Sleep(backOffPeriod)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubectl/cmd/diff/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ go_library(
"//pkg/kubectl/util/i18n:go_default_library",
"//pkg/kubectl/util/templates:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/resource:go_default_library",
"//vendor/github.com/jonboulle/clockwork:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],
Expand Down
72 changes: 54 additions & 18 deletions pkg/kubectl/cmd/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"github.com/jonboulle/clockwork"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericclioptions/resource"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/apply"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
Expand Down Expand Up @@ -60,6 +62,9 @@ var (
cat service.yaml | kubectl diff -f -`))
)

// Number of times we try to diff before giving-up
const maxRetries = 4

type DiffOptions struct {
FilenameOptions resource.FilenameOptions
}
Expand Down Expand Up @@ -228,6 +233,7 @@ type InfoObject struct {
Info *resource.Info
Encoder runtime.Encoder
OpenAPI openapi.Resources
Force bool
}

var _ Object = &InfoObject{}
Expand All @@ -251,6 +257,16 @@ func (obj InfoObject) Merged() (runtime.Object, error) {
)
}

var resourceVersion *string
if !obj.Force {
accessor, err := meta.Accessor(obj.Info.Object)
if err != nil {
return nil, err
}
str := accessor.GetResourceVersion()
resourceVersion = &str
}

modified, err := kubectl.GetModifiedConfiguration(obj.LocalObj, false, unstructured.UnstructuredJSONScheme)
if err != nil {
return nil, err
Expand All @@ -259,12 +275,13 @@ func (obj InfoObject) Merged() (runtime.Object, error) {
// This is using the patcher from apply, to keep the same behavior.
// We plan on replacing this with server-side apply when it becomes available.
patcher := &apply.Patcher{
Mapping: obj.Info.Mapping,
Helper: resource.NewHelper(obj.Info.Client, obj.Info.Mapping),
Overwrite: true,
BackOff: clockwork.NewRealClock(),
ServerDryRun: true,
OpenapiSchema: obj.OpenAPI,
Mapping: obj.Info.Mapping,
Helper: resource.NewHelper(obj.Info.Client, obj.Info.Mapping),
Overwrite: true,
BackOff: clockwork.NewRealClock(),
ServerDryRun: true,
OpenapiSchema: obj.OpenAPI,
ResourceVersion: resourceVersion,
}

_, result, err := patcher.Patch(obj.Info.Object, modified, obj.Info.Source, obj.Info.Namespace, obj.Info.Name, nil)
Expand Down Expand Up @@ -319,6 +336,10 @@ func (d *Differ) TearDown() {
d.To.Dir.Delete() // Ignore error
}

func isConflict(err error) bool {
return err != nil && errors.IsConflict(err)
}

// RunDiff uses the factory to parse file arguments, find the version to
// diff, and find each Info object for each files, and runs against the
// differ.
Expand Down Expand Up @@ -376,21 +397,36 @@ func RunDiff(f cmdutil.Factory, diff *DiffProgram, options *DiffOptions) error {
}

local := info.Object.DeepCopyObject()
if err := info.Get(); err != nil {
if !errors.IsNotFound(err) {
return err
for i := 1; i <= maxRetries; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use RetryOnConflict? force should work fine.

if err = info.Get(); err != nil {
if !errors.IsNotFound(err) {
return err
}
info.Object = nil
}
info.Object = nil
}

obj := InfoObject{
LocalObj: local,
Info: info,
Encoder: scheme.DefaultJSONEncoder(),
OpenAPI: schema,
}
force := i == maxRetries
if force {
klog.Warningf(
"Object (%v: %v) keeps changing, diffing without lock",
info.Object.GetObjectKind().GroupVersionKind(),
info.Name,
)
}
obj := InfoObject{
LocalObj: local,
Info: info,
Encoder: scheme.DefaultJSONEncoder(),
OpenAPI: schema,
Force: force,
}

return differ.Diff(obj, printer)
err = differ.Diff(obj, printer)
if !isConflict(err) {
break
}
}
return err
})
if err != nil {
return err
Expand Down