Skip to content

Commit

Permalink
Improve check for existing resource GVK (#2418)
Browse files Browse the repository at this point in the history
Part of the provider's preview and update logic involves checking whether a given GroupVersionKind (GVK) is registered with the Kubernetes cluster. A previous version of this check was implemented as an anonymous function in the provider's `Check` method. This implementation used a set to cache results of the check, but the cache was not saved between invocations, so it was adding unneccesary overhead to the check.

This change reimplements the GVK check as a reusable method, and simplifies the checking logic. We can implement a cache later if needed, but this should already be an improvement over the previous implementation.
  • Loading branch information
lblackstone committed May 18, 2023
1 parent 668552f commit 26878ce
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- Fix Helm release creation when the name of the chart conflicts with the name of a folder in the current working directory (https://github.com/pulumi/pulumi-kubernetes/pull/2410)
- Remove imperative authentication and authorization resources: TokenRequest, TokenReview, LocalSubjectAccessReview,
SelfSubjectReview, SelfSubjectAccessReview, SelfSubjectRulesReview, and SubjectAccessReview (https://github.com/pulumi/pulumi-kubernetes/pull/2413)
- Improve check for existing resource GVK (https://github.com/pulumi/pulumi-kubernetes/pull/2418)

## 3.27.1 (May 11, 2023)

Expand Down
55 changes: 24 additions & 31 deletions provider/pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"helm.sh/helm/v3/pkg/helmpath"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
k8sresource "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -1242,26 +1241,6 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) (
"`enableServerSideApply` Provider config")
}

// Utilities for determining whether a resource's GVK exists.
gvkExists := func(gvk schema.GroupVersionKind) bool {
knownGVKs := sets.NewString()
if knownGVKs.Has(gvk.String()) {
return true
}
gv := gvk.GroupVersion()
rls, err := k.clientSet.DiscoveryClientCached.ServerResourcesForGroupVersion(gv.String())
if err != nil {
if !errors.IsNotFound(err) {
logger.V(3).Infof("ServerResourcesForGroupVersion(%q) returned unexpected error %v", gv, err)
}
return false
}
for _, rl := range rls.APIResources {
knownGVKs.Insert(gv.WithKind(rl.Kind).String())
}
return knownGVKs.Has(gvk.String())
}

label := fmt.Sprintf("%s.Check(%s)", k.label(), urn)
logger.V(9).Infof("%s executing", label)

Expand Down Expand Up @@ -1391,10 +1370,10 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) (
// Validate the object according to the OpenAPI schema for its GVK.
err = openapi.ValidateAgainstSchema(resources, newInputs)
if err != nil {
resourceNotFound := errors.IsNotFound(err) ||
resourceNotFound := apierrors.IsNotFound(err) ||
strings.Contains(err.Error(), "is not supported by the server")
k8sAPIUnreachable := strings.Contains(err.Error(), "connection refused")
if resourceNotFound && gvkExists(gvk) {
if resourceNotFound && k.gvkExists(newInputs) {
failures = append(failures, &pulumirpc.CheckFailure{
Reason: fmt.Sprintf(" Found API Group, but it did not contain a schema for %q", gvk),
})
Expand Down Expand Up @@ -1837,7 +1816,7 @@ func (k *kubeProvider) Create(
failedPreview = true
}

if k.enableDryRun && errors.IsAlreadyExists(awaitErr) {
if k.enableDryRun && apierrors.IsAlreadyExists(awaitErr) {
failedPreview = true
}

Expand Down Expand Up @@ -1894,7 +1873,7 @@ func (k *kubeProvider) Create(

// Invalidate the client cache if this was a CRD. This will require subsequent CR creations to
// refresh the cache, at which point the CRD definition will be present, so that it doesn't fail
// with an `errors.IsNotFound`.
// with an `apierrors.IsNotFound`.
if clients.IsCRD(newInputs) {
k.clientSet.RESTMapper.Reset()
k.invalidateResources()
Expand Down Expand Up @@ -2054,7 +2033,7 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
return deleteResponse, nil
}

statusErr, ok := readErr.(*errors.StatusError)
statusErr, ok := readErr.(*apierrors.StatusError)
if ok && statusErr.ErrStatus.Code == 404 {
// If it's a 404 error, this resource was probably deleted.
return deleteResponse, nil
Expand Down Expand Up @@ -2478,7 +2457,7 @@ func (k *kubeProvider) Delete(ctx context.Context, req *pulumirpc.DeleteRequest)
return &pbempty.Empty{}, nil
}
if isPatchURN(urn) && await.IsDeleteRequiredFieldErr(awaitErr) {
if cause, ok := errors.StatusCause(awaitErr, metav1.CauseTypeFieldValueRequired); ok {
if cause, ok := apierrors.StatusCause(awaitErr, metav1.CauseTypeFieldValueRequired); ok {
awaitErr = fmt.Errorf(
"this Patch resource is currently managing a required field, so it can't be deleted "+
"directly. Either set the `retainOnDelete` resource option, or transfer ownership of the "+
Expand Down Expand Up @@ -2618,7 +2597,7 @@ func (k *kubeProvider) serverSidePatch(oldInputs, newInputs *unstructured.Unstru
var newObject *unstructured.Unstructured
_, err = client.Get(k.canceler.context, newInputs.GetName(), metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
case apierrors.IsNotFound(err):
newObject, err = client.Create(k.canceler.context, newInputs, metav1.CreateOptions{
DryRun: []string{metav1.DryRunAll},
})
Expand Down Expand Up @@ -2747,7 +2726,7 @@ func (k *kubeProvider) supportsDryRun(gvk schema.GroupVersionKind) bool {
}

func (k *kubeProvider) isDryRunDisabledError(err error) bool {
se, isStatusError := err.(*errors.StatusError)
se, isStatusError := err.(*apierrors.StatusError)
if !isStatusError {
return false
}
Expand Down Expand Up @@ -2793,7 +2772,7 @@ func (k *kubeProvider) tryServerSidePatch(
if k.isDryRunDisabledError(err) {
return nil, nil, false, err
}
if se, isStatusError := err.(*errors.StatusError); isStatusError {
if se, isStatusError := err.(*apierrors.StatusError); isStatusError {
if se.Status().Code == http.StatusUnprocessableEntity &&
strings.Contains(se.ErrStatus.Message, "field is immutable") {
// This error occurs if the resource field is immutable.
Expand Down Expand Up @@ -2874,6 +2853,20 @@ func (k *kubeProvider) fieldManagerName(
return fieldManager
}

// gvkExists attempts to load a REST mapping for the given resource and returns true on success. Since this operation
// will fail if the GVK has not been registered with the apiserver, it can be used to indirectly check if the resource
// may be an unregistered CustomResource.
func (k *kubeProvider) gvkExists(obj *unstructured.Unstructured) bool {
gvk := obj.GroupVersionKind()
if _, err := k.clientSet.RESTMapper.RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if !meta.IsNoMatchError(err) {
logger.V(3).Infof("RESTMapping(%q) returned unexpected error %v", gvk, err)
}
return false
}
return true
}

func mapReplStripSecrets(v resource.PropertyValue) (interface{}, bool) {
if v.IsSecret() {
return v.SecretValue().Element.MapRepl(nil, mapReplStripSecrets), true
Expand Down

0 comments on commit 26878ce

Please sign in to comment.