Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track ownership of scale subresource #98377

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 44 additions & 0 deletions pkg/registry/apps/deployment/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
storeerr "k8s.io/apiserver/pkg/storage/errors"
"k8s.io/apiserver/pkg/util/dryrun"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/apps"
appsv1beta1 "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
appsv1beta2 "k8s.io/kubernetes/pkg/apis/apps/v1beta2"
Expand All @@ -54,6 +57,18 @@ type DeploymentStorage struct {
Rollback *RollbackREST
}

// ReplicasPathMappings returns the mappings between each group version and a replicas path
func ReplicasPathMappings() fieldmanager.ResourcePathMappings {
return replicasPathInDeployment
}

// maps a group version to the replicas path in a deployment object
var replicasPathInDeployment = fieldmanager.ResourcePathMappings{
schema.GroupVersion{Group: "apps", Version: "v1beta1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
schema.GroupVersion{Group: "apps", Version: "v1beta2"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
schema.GroupVersion{Group: "apps", Version: "v1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
}

// NewStorage returns new instance of DeploymentStorage.
func NewStorage(optsGetter generic.RESTOptionsGetter) (DeploymentStorage, error) {
deploymentRest, deploymentStatusRest, deploymentRollbackRest, err := NewREST(optsGetter)
Expand Down Expand Up @@ -337,6 +352,7 @@ func scaleFromDeployment(deployment *apps.Deployment) (*autoscaling.Scale, error
if err != nil {
return nil, err
}

return &autoscaling.Scale{
// TODO: Create a variant of ObjectMeta type that only contains the fields below.
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -376,11 +392,32 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
return nil, errors.NewNotFound(apps.Resource("deployments/scale"), i.name)
}

groupVersion := schema.GroupVersion{Group: "apps", Version: "v1"}
if requestInfo, found := genericapirequest.RequestInfoFrom(ctx); found {
nodo marked this conversation as resolved.
Show resolved Hide resolved
requestGroupVersion := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
if _, ok := replicasPathInDeployment[requestGroupVersion.String()]; ok {
groupVersion = requestGroupVersion
} else {
klog.Fatal("Unrecognized group/version in request info %q", requestGroupVersion.String())
}
}

managedFieldsHandler := fieldmanager.NewScaleHandler(
deployment.ManagedFields,
groupVersion,
replicasPathInDeployment,
)

// deployment -> old scale
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of constructing a scale object from the existing deployment, I expected us to construct a deployment object from the new scale object and the existing deployment object, and apply or update that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't the patch path have to work in terms of the request object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more or less what's happening. The problem is that in order to get the new scale object we need the "old scale" object from the deployment and run it through UpdatedObject. UpdatedObject will update the managed fields of the scale object and conflict if needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am objecting to a large part of the existing codebase here. Antoine attempted to convince me that I have to hold my nose for this. I'll look again.

oldScale, err := scaleFromDeployment(deployment)
if err != nil {
return nil, err
}
scaleManagedFields, err := managedFieldsHandler.ToSubresource()
if err != nil {
return nil, err
}
oldScale.ManagedFields = scaleManagedFields

// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
Expand Down Expand Up @@ -412,5 +449,12 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
// move replicas/resourceVersion fields to object and return
deployment.Spec.Replicas = scale.Spec.Replicas
deployment.ResourceVersion = scale.ResourceVersion

updatedEntries, err := managedFieldsHandler.ToParent(scale.ManagedFields)
if err != nil {
return nil, err
}
deployment.ManagedFields = updatedEntries

return deployment, nil
}
43 changes: 43 additions & 0 deletions pkg/registry/apps/replicaset/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/apps"
appsv1beta1 "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
appsv1beta2 "k8s.io/kubernetes/pkg/apis/apps/v1beta2"
Expand All @@ -50,6 +53,17 @@ type ReplicaSetStorage struct {
Scale *ScaleREST
}

// ReplicasPathMappings returns the mappings between each group version and a replicas path
func ReplicasPathMappings() fieldmanager.ResourcePathMappings {
return replicasPathInReplicaSet
}

// maps a group version to the replicas path in a replicaset object
var replicasPathInReplicaSet = fieldmanager.ResourcePathMappings{
schema.GroupVersion{Group: "apps", Version: "v1beta2"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
schema.GroupVersion{Group: "apps", Version: "v1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
}

// NewStorage returns new instance of ReplicaSetStorage.
func NewStorage(optsGetter generic.RESTOptionsGetter) (ReplicaSetStorage, error) {
replicaSetRest, replicaSetStatusRest, err := NewREST(optsGetter)
Expand Down Expand Up @@ -279,12 +293,34 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
return nil, errors.NewNotFound(apps.Resource("replicasets/scale"), i.name)
}

groupVersion := schema.GroupVersion{Group: "apps", Version: "v1"}
if requestInfo, found := genericapirequest.RequestInfoFrom(ctx); found {
requestGroupVersion := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
if _, ok := replicasPathInReplicaSet[requestGroupVersion.String()]; ok {
groupVersion = requestGroupVersion
} else {
klog.Fatal("Unrecognized group/version in request info %q", requestGroupVersion.String())
}
}

managedFieldsHandler := fieldmanager.NewScaleHandler(
replicaset.ManagedFields,
groupVersion,
replicasPathInReplicaSet,
)

// replicaset -> old scale
oldScale, err := scaleFromReplicaSet(replicaset)
if err != nil {
return nil, err
}

scaleManagedFields, err := managedFieldsHandler.ToSubresource()
if err != nil {
return nil, err
}
oldScale.ManagedFields = scaleManagedFields

// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
if err != nil {
Expand Down Expand Up @@ -315,5 +351,12 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
// move replicas/resourceVersion fields to object and return
replicaset.Spec.Replicas = scale.Spec.Replicas
replicaset.ResourceVersion = scale.ResourceVersion

updatedEntries, err := managedFieldsHandler.ToParent(scale.ManagedFields)
if err != nil {
return nil, err
}
replicaset.ManagedFields = updatedEntries

return replicaset, nil
}
43 changes: 43 additions & 0 deletions pkg/registry/apps/statefulset/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/apps"
appsv1beta1 "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
appsv1beta2 "k8s.io/kubernetes/pkg/apis/apps/v1beta2"
Expand All @@ -47,6 +50,18 @@ type StatefulSetStorage struct {
Scale *ScaleREST
}

// ReplicasPathMappings returns the mappings between each group version and a replicas path
func ReplicasPathMappings() fieldmanager.ResourcePathMappings {
return replicasPathInStatefulSet
}

// maps a group version to the replicas path in a statefulset object
var replicasPathInStatefulSet = fieldmanager.ResourcePathMappings{
schema.GroupVersion{Group: "apps", Version: "v1beta1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
schema.GroupVersion{Group: "apps", Version: "v1beta2"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
schema.GroupVersion{Group: "apps", Version: "v1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
}

// NewStorage returns new instance of StatefulSetStorage.
func NewStorage(optsGetter generic.RESTOptionsGetter) (StatefulSetStorage, error) {
statefulSetRest, statefulSetStatusRest, err := NewREST(optsGetter)
Expand Down Expand Up @@ -265,11 +280,32 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
return nil, errors.NewNotFound(apps.Resource("statefulsets/scale"), i.name)
}

groupVersion := schema.GroupVersion{Group: "apps", Version: "v1"}
if requestInfo, found := genericapirequest.RequestInfoFrom(ctx); found {
requestGroupVersion := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
if _, ok := replicasPathInStatefulSet[requestGroupVersion.String()]; ok {
groupVersion = requestGroupVersion
} else {
klog.Fatal("Unrecognized group/version in request info %q", requestGroupVersion.String())
}
}

managedFieldsHandler := fieldmanager.NewScaleHandler(
statefulset.ManagedFields,
groupVersion,
replicasPathInStatefulSet,
)

// statefulset -> old scale
oldScale, err := scaleFromStatefulSet(statefulset)
if err != nil {
return nil, err
}
scaleManagedFields, err := managedFieldsHandler.ToSubresource()
if err != nil {
return nil, err
}
oldScale.ManagedFields = scaleManagedFields

// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
Expand Down Expand Up @@ -301,5 +337,12 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
// move replicas/resourceVersion fields to object and return
statefulset.Spec.Replicas = scale.Spec.Replicas
statefulset.ResourceVersion = scale.ResourceVersion

updatedEntries, err := managedFieldsHandler.ToParent(scale.ManagedFields)
if err != nil {
return nil, err
}
statefulset.ManagedFields = updatedEntries

return statefulset, nil
}
41 changes: 41 additions & 0 deletions pkg/registry/core/replicationcontroller/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/autoscaling"
autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
"k8s.io/kubernetes/pkg/apis/autoscaling/validation"
Expand All @@ -49,6 +52,16 @@ type ControllerStorage struct {
Scale *ScaleREST
}

// ReplicasPathMappings returns the mappings between each group version and a replicas path
func ReplicasPathMappings() fieldmanager.ResourcePathMappings {
return replicasPathInReplicationController
}

// maps a group version to the replicas path in a deployment object
var replicasPathInReplicationController = fieldmanager.ResourcePathMappings{
schema.GroupVersion{Group: "", Version: "v1"}.String(): fieldpath.MakePathOrDie("spec", "replicas"),
}

func NewStorage(optsGetter generic.RESTOptionsGetter) (ControllerStorage, error) {
controllerREST, statusREST, err := NewREST(optsGetter)
if err != nil {
Expand Down Expand Up @@ -239,8 +252,29 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
return nil, errors.NewNotFound(api.Resource("replicationcontrollers/scale"), i.name)
}

groupVersion := schema.GroupVersion{Group: "", Version: "v1"}
if requestInfo, found := genericapirequest.RequestInfoFrom(ctx); found {
requestGroupVersion := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
if _, ok := replicasPathInReplicationController[requestGroupVersion.String()]; ok {
groupVersion = requestGroupVersion
} else {
klog.Fatal("Unrecognized group/version in request info %q", requestGroupVersion.String())
}
}

managedFieldsHandler := fieldmanager.NewScaleHandler(
replicationcontroller.ManagedFields,
groupVersion,
replicasPathInReplicationController,
)

// replicationcontroller -> old scale
oldScale := scaleFromRC(replicationcontroller)
scaleManagedFields, err := managedFieldsHandler.ToSubresource()
if err != nil {
return nil, err
}
oldScale.ManagedFields = scaleManagedFields

// old scale -> new scale
newScaleObj, err := i.reqObjInfo.UpdatedObject(ctx, oldScale)
Expand Down Expand Up @@ -272,5 +306,12 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
// move replicas/resourceVersion fields to object and return
replicationcontroller.Spec.Replicas = scale.Spec.Replicas
replicationcontroller.ResourceVersion = scale.ResourceVersion

updatedEntries, err := managedFieldsHandler.ToParent(scale.ManagedFields)
if err != nil {
return nil, err
}
replicationcontroller.ManagedFields = updatedEntries

return replicationcontroller, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ func (c *crConverter) ConvertToVersion(in runtime.Object, target runtime.GroupVe
// TODO: should this be a typed error?
return nil, fmt.Errorf("%v is unstructured and is not suitable for converting to %q", fromGVK.String(), target)
}
// Special-case typed scale conversion if this custom resource supports a scale endpoint
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

if c.convertScale {
if _, isInScale := in.(*autoscalingv1.Scale); isInScale {
return typedscheme.Scheme.ConvertToVersion(in, target)
}
}

if !c.validVersions[toGVK.GroupVersion()] {
return nil, fmt.Errorf("request to convert CR to an invalid group/version: %s", toGVK.GroupVersion().String())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,28 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
if err != nil {
return nil, err
}

// Create replicasPathInCustomResource
replicasPathInCustomResource := fieldmanager.ResourcePathMappings{}
for _, v := range crd.Spec.Versions {
subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, v.Name)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("the server could not properly serve the CR subresources")
}
if subresources == nil || subresources.Scale == nil {
nodo marked this conversation as resolved.
Show resolved Hide resolved
replicasPathInCustomResource[schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}.String()] = nil
continue
}
path := fieldpath.Path{}
splitReplicasPath := strings.Split(strings.TrimPrefix(subresources.Scale.SpecReplicasPath, "."), ".")
for _, element := range splitReplicasPath {
s := element
path = append(path, fieldpath.PathElement{FieldName: &s})
}
replicasPathInCustomResource[schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}.String()] = path
}

for _, v := range crd.Spec.Versions {
// In addition to Unstructured objects (Custom Resources), we also may sometimes need to
// decode unversioned Options objects, so we delegate to parameterScheme for such types.
Expand Down Expand Up @@ -803,6 +825,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
},
crd.Status.AcceptedNames.Categories,
table,
replicasPathInCustomResource,
)

selfLinkPrefix := ""
Expand Down Expand Up @@ -892,8 +915,19 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
SelfLinkPathPrefix: selfLinkPrefix,
SelfLinkPathSuffix: "/scale",
}
// TODO(issues.k8s.io/82046): We can't effectively track ownership on scale requests yet.
scaleScope.FieldManager = nil

if utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) && subresources != nil && subresources.Scale != nil {
scaleScope, err = scopeWithFieldManager(
typeConverter,
scaleScope,
nil,
"scale",
)
if err != nil {
return nil, err
}
}

scaleScopes[v.Name] = &scaleScope

// override status subresource values
Expand Down