Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't save managedFields if object is too large #87508

Merged
merged 1 commit into from Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/BUILD
Expand Up @@ -103,6 +103,8 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/wsstream:go_default_library",
"//vendor/github.com/evanphx/json-patch:go_default_library",
"//vendor/golang.org/x/net/websocket:go_default_library",
"//vendor/google.golang.org/grpc/codes:go_default_library",
"//vendor/google.golang.org/grpc/status:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/trace:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
Expand Down
30 changes: 21 additions & 9 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go
Expand Up @@ -27,6 +27,7 @@ import (
"unicode/utf8"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
Expand Down Expand Up @@ -139,6 +140,16 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
}

trace.Step("About to store object in database")
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
result, err := finishRequest(timeout, func() (runtime.Object, error) {
if scope.FieldManager != nil {
liveObj, err := scope.Creater.New(scope.Kind)
Expand All @@ -150,20 +161,21 @@ func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Int
return nil, fmt.Errorf("failed to update object (Create for %v) managed fields: %v", scope.Kind, err)
}
}

admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
jennybuckley marked this conversation as resolved.
Show resolved Hide resolved
// it is safe to remove managedFields (which can be large) and try again.
if isTooLargeError(err) {
if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
accessor.SetManagedFields(nil)
result, err = requestFunc()
}
}
return result, err
})
if err != nil {
scope.err(err, w, req)
Expand Down
18 changes: 17 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/endpoints/handlers/patch.go
Expand Up @@ -581,12 +581,28 @@ func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runti

wasCreated := false
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)
result, err := finishRequest(p.timeout, func() (runtime.Object, error) {
requestFunc := func() (runtime.Object, error) {
// Pass in UpdateOptions to override UpdateStrategy.AllowUpdateOnCreate
options := patchToUpdateOptions(p.options)
updateObject, created, updateErr := p.restPatcher.Update(ctx, p.name, p.updatedObjectInfo, p.createValidation, p.updateValidation, p.forceAllowCreate, options)
wasCreated = created
return updateObject, updateErr
}
result, err := finishRequest(p.timeout, func() (runtime.Object, error) {
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.
if isTooLargeError(err) && p.patchType != types.ApplyPatchType {
if _, accessorErr := meta.Accessor(p.restPatcher.New()); accessorErr == nil {
p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission, func(_ context.Context, obj, _ runtime.Object) (runtime.Object, error) {
accessor, _ := meta.Accessor(obj)
accessor.SetManagedFields(nil)
return obj, nil
})
result, err = requestFunc()
}
}
return result, err
})
return result, wasCreated, err
}
Expand Down
29 changes: 29 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/rest.go
Expand Up @@ -25,8 +25,12 @@ import (
"net/http"
"net/url"
goruntime "runtime"
"strings"
"time"

grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -416,3 +420,28 @@ func parseTimeout(str string) time.Duration {
func isDryRun(url *url.URL) bool {
return len(url.Query()["dryRun"]) != 0
}

type etcdError interface {
Code() grpccodes.Code
Error() string
}

type grpcError interface {
GRPCStatus() *grpcstatus.Status
}

func isTooLargeError(err error) bool {
jennybuckley marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
if etcdErr, ok := err.(etcdError); ok {
if etcdErr.Code() == grpccodes.InvalidArgument && etcdErr.Error() == "etcdserver: request is too large" {
return true
}
}
if grpcErr, ok := err.(grpcError); ok {
if grpcErr.GRPCStatus().Code() == grpccodes.ResourceExhausted && strings.Contains(grpcErr.GRPCStatus().Message(), "trying to send message larger than max") {
return true
}
}
jennybuckley marked this conversation as resolved.
Show resolved Hide resolved
}
return false
}
32 changes: 26 additions & 6 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/update.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
Expand Down Expand Up @@ -124,15 +125,22 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa

userInfo, _ := request.UserFrom(ctx)
transformers := []rest.TransformFunc{}

// allows skipping managedFields update if the resulting object is too big
shouldUpdateManagedFields := true
if scope.FieldManager != nil {
transformers = append(transformers, func(_ context.Context, newObj, liveObj runtime.Object) (runtime.Object, error) {
obj, err := scope.FieldManager.Update(liveObj, newObj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
if err != nil {
return nil, fmt.Errorf("failed to update object (Update for %v) managed fields: %v", scope.Kind, err)
if shouldUpdateManagedFields {
obj, err := scope.FieldManager.Update(liveObj, newObj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
if err != nil {
return nil, fmt.Errorf("failed to update object (Update for %v) managed fields: %v", scope.Kind, err)
}
return obj, nil
}
return obj, nil
return newObj, nil
})
}

if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
transformers = append(transformers, func(ctx context.Context, newObj, oldObj runtime.Object) (runtime.Object, error) {
isNotZeroObject, err := hasUID(oldObj)
Expand All @@ -149,7 +157,6 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
}
return newObj, nil
})

}

createAuthorizerAttributes := authorizer.AttributesRecord{
Expand All @@ -167,7 +174,7 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa

trace.Step("About to store object in database")
wasCreated := false
result, err := finishRequest(timeout, func() (runtime.Object, error) {
requestFunc := func() (runtime.Object, error) {
obj, created, err := r.Update(
ctx,
name,
Expand All @@ -184,6 +191,19 @@ func UpdateResource(r rest.Updater, scope *RequestScope, admit admission.Interfa
)
wasCreated = created
return obj, err
}
result, err := finishRequest(timeout, func() (runtime.Object, error) {
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.
if isTooLargeError(err) && scope.FieldManager != nil {
jennybuckley marked this conversation as resolved.
Show resolved Hide resolved
if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
accessor.SetManagedFields(nil)
shouldUpdateManagedFields = false
result, err = requestFunc()
}
}
return result, err
})
if err != nil {
scope.err(err, w, req)
Expand Down
1 change: 1 addition & 0 deletions test/integration/apiserver/apply/BUILD
Expand Up @@ -25,6 +25,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
Expand Down