Skip to content

Commit

Permalink
Do not await during .get or import operations (#2373)
Browse files Browse the repository at this point in the history
Changes to the provider logic invalidated a check in the await Read method that was supposed to return the cluster state without await checks in the case of a .get or import. As a result, await logic was always executed against resources during a Read operation. This could lead to deadlocks and timeouts in cases where the await logic causes a cyclic dependency.

This change fixes the check so that the provider does not perform await logic on resources during a .get or import operation.
  • Loading branch information
lblackstone committed Apr 28, 2023
1 parent fa89097 commit 13189ac
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 48 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Unreleased

- Do not await during .get or import operations (https://github.com/pulumi/pulumi-kubernetes/pull/2373)

## 3.25.0 (April 11, 2023)
- Update Kubernetes to v1.27.0

Expand Down
10 changes: 5 additions & 5 deletions provider/pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ type CreateConfig struct {

type ReadConfig struct {
ProviderConfig
Inputs *unstructured.Unstructured
Name string
Inputs *unstructured.Unstructured
Name string
ReadFromCluster bool
}

type UpdateConfig struct {
Expand Down Expand Up @@ -299,9 +300,8 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
outputs, err := client.Get(c.Context, c.Name, metav1.GetOptions{})
if err != nil {
return nil, err
} else if c.Inputs == nil || len(c.Inputs.Object) == 0 {
// No inputs means that we do not manage the resource, i.e., it's a call to
// `CustomResource#get`. Simply return the object.
} else if c.ReadFromCluster {
// If the resource is read from a .get or an import, simply return the resource state from the cluster.
return outputs, nil
}

Expand Down
14 changes: 7 additions & 7 deletions provider/pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,7 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
req.GetId())
}

freshImport := false
readFromCluster := false
oldInputs, oldLive := parseCheckpointObject(oldState)
if oldInputs.GroupVersionKind().Empty() {
if oldLive.GroupVersionKind().Empty() {
Expand All @@ -1959,7 +1959,7 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
return nil, err
}
oldInputs.SetGroupVersionKind(gvk)
freshImport = true
readFromCluster = true
} else {
oldInputs.SetGroupVersionKind(oldLive.GroupVersionKind())
}
Expand Down Expand Up @@ -2017,8 +2017,9 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn),
Resources: resources,
},
Inputs: oldInputs,
Name: name,
Inputs: oldInputs,
ReadFromCluster: readFromCluster,
Name: name,
}
liveObj, readErr := await.Read(config)
if readErr != nil {
Expand Down Expand Up @@ -2055,9 +2056,8 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p
// Attempt to parse the inputs for this object. If parsing was unsuccessful, retain the old inputs.
liveInputs := parseLiveInputs(liveObj, oldInputs)

if freshImport {
// If no previous inputs were known, this is a fresh import. In which case we want to populate
// the inputs from the live state for the resource by referring to the input properties for the resource.
if readFromCluster {
// If no previous inputs were known, populate the inputs from the live cluster state for the resource.
pkgSpec := pulumischema.PackageSpec{}
if err := json.Unmarshal(k.pulumiSchema, &pkgSpec); err != nil {
return nil, err
Expand Down
32 changes: 28 additions & 4 deletions tests/sdk/nodejs/get/step1/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2019, Pulumi Corporation.
// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -20,16 +20,40 @@ const namespace = new k8s.core.v1.Namespace("test-namespace");
// `get`s the Kubernetes API service.
//

const svc = k8s.core.v1.Service.get("kube-api", "default/kubernetes");
export const svc = k8s.core.v1.Service.get("kube-api", "default/kubernetes");

// This will fail with a TypeError if the status was not populated (i.e. the .get isn't working)
export const loadBalancer = svc.status.loadBalancer;

//
// Create a Service resource with skipAwait that would fail to initialize due to await logic.
// get should return the resource state without gating on await logic.
//

export const awaitSvc = new k8s.core.v1.Service("svc", {
metadata: {
name: "test",
namespace: namespace.metadata.name,
annotations: {
"pulumi.com/skipAwait": "true",
},
},
spec: {
type: k8s.types.enums.core.v1.ServiceSpecType.ClusterIP,
ports: [{
name: "http",
port: 8080,
targetPort: 80,
}],
selector: { app: "nginx" }, // selector doesn't match Pods, so await logic would fail
}
});

//
// Create a CustomResourceDefinition, a CustomResource, and then `.get` it.
//

const ct = new k8s.apiextensions.v1.CustomResourceDefinition("crontab", {
export const ct = new k8s.apiextensions.v1.CustomResourceDefinition("crontab", {
metadata: { name: "crontabs.stable.example.com" },
spec: {
group: "stable.example.com",
Expand Down Expand Up @@ -71,7 +95,7 @@ const ct = new k8s.apiextensions.v1.CustomResourceDefinition("crontab", {
}
});

new k8s.apiextensions.CustomResource(
export const cr = new k8s.apiextensions.CustomResource(
"my-new-cron-object",
{
apiVersion: "stable.example.com/v1",
Expand Down
46 changes: 41 additions & 5 deletions tests/sdk/nodejs/get/step2/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2019, Pulumi Corporation.
// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -21,16 +21,52 @@ const namespace = new k8s.core.v1.Namespace("test-namespace");
// `get`s the Kubernetes API service.
//

const svc = k8s.core.v1.Service.get("kube-api", "default/kubernetes");
export const svc = k8s.core.v1.Service.get("kube-api", "default/kubernetes");

// This will fail with a TypeError if the status was not populated (i.e. the .get isn't working)
export const loadBalancer = svc.status.loadBalancer;

//
// Create a Service resource with skipAwait that would fail to initialize due to await logic.
// get should return the resource state without gating on await logic.
//

export const awaitSvc = new k8s.core.v1.Service("svc", {
metadata: {
name: "test",
namespace: namespace.metadata.name,
annotations: {
"pulumi.com/skipAwait": "true",
},
},
spec: {
type: k8s.types.enums.core.v1.ServiceSpecType.ClusterIP,
ports: [{
name: "http",
port: 8080,
targetPort: 80,
}],
selector: { app: "nginx" }, // selector doesn't match Pods, so await logic would fail
}
});

export const awaitSvcGet = k8s.core.v1.Service.get("await", pulumi.interpolate `${namespace.metadata.name}/test`);

// Create a ConfigMap using the clusterIP to validate that the .get worked.
export const cm = new k8s.core.v1.ConfigMap("svc-test", {
metadata: {
namespace: namespace.metadata.name,
},
data: {
key: awaitSvc.spec.clusterIP,
}
});

//
// Create a CustomResourceDefinition, a CustomResource, and then `.get` it.
//

const ct = new k8s.apiextensions.v1.CustomResourceDefinition("crontab", {
export const ct = new k8s.apiextensions.v1.CustomResourceDefinition("crontab", {
metadata: { name: "crontabs.stable.example.com" },
spec: {
group: "stable.example.com",
Expand Down Expand Up @@ -72,7 +108,7 @@ const ct = new k8s.apiextensions.v1.CustomResourceDefinition("crontab", {
}
});

new k8s.apiextensions.CustomResource(
export const cr = new k8s.apiextensions.CustomResource(
"my-new-cron-object",
{
apiVersion: "stable.example.com/v1",
Expand All @@ -86,7 +122,7 @@ new k8s.apiextensions.CustomResource(
{ dependsOn: ct }
);

k8s.apiextensions.CustomResource.get("my-new-cron-object-get", {
export const crGet = k8s.apiextensions.CustomResource.get("my-new-cron-object-get", {
apiVersion: "stable.example.com/v1",
kind: "CronTab",
id: pulumi.interpolate `${namespace.metadata.name}/my-new-cron-object`,
Expand Down
102 changes: 75 additions & 27 deletions tests/sdk/nodejs/nodejs_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2022, Pulumi Corporation.
// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -474,59 +474,107 @@ func TestGet(t *testing.T) {
ExpectRefreshChanges: true,
ExtraRuntimeValidation: func(t *testing.T, stackInfo integration.RuntimeValidationStackInfo) {
assert.NotNil(t, stackInfo.Deployment)
assert.Equal(t, 6, len(stackInfo.Deployment.Resources))

tests.SortResourcesByURN(stackInfo)
assert.Equal(t, 7, len(stackInfo.Deployment.Resources))

stackRes := stackInfo.Deployment.Resources[5]
assert.Equal(t, resource.RootStackType, stackRes.URN.Type())
//
// Assert we can use .get to retrieve the kube-api Service.
//

provRes := stackInfo.Deployment.Resources[4]
assert.True(t, providers.IsProviderType(provRes.URN.Type()))
service := stackInfo.Outputs["svc"].(map[string]interface{})
svcURN, _ := openapi.Pluck(service, "urn")
assert.Containsf(t, svcURN, "kube-api", "urn missing expected name")
svcName, _ := openapi.Pluck(service, "metadata", "name")
assert.Equalf(t, "kubernetes", svcName, "unexpected service name")

//
// Assert we can use .get to retrieve the kube-api Service.
// Assert that the uninitialized Service exists
//

service := stackInfo.Deployment.Resources[2]
assert.Equal(t, "kube-api", string(service.URN.Name()))
step1Name, _ := openapi.Pluck(service.Outputs, "metadata", "name")
assert.Equal(t, "kubernetes", step1Name.(string))
awaitSvc := stackInfo.Outputs["awaitSvc"].(map[string]interface{})
awaitSvcName, _ := openapi.Pluck(awaitSvc, "metadata", "name")
assert.Equalf(t, "test", awaitSvcName, "unexpected service name")
awaitSvcAnnotation, ok := openapi.Pluck(awaitSvc, "metadata", "annotations", "pulumi.com/skipAwait")
assert.Truef(t, ok, "failed to find skipAwait annotation")
assert.Equalf(t, "true", awaitSvcAnnotation, "expected annotation to be true")

//
// Assert that CRD and CR exist
//

crd := stackInfo.Deployment.Resources[0]
assert.Equal(t, "crontab", string(crd.URN.Name()))
crd := stackInfo.Outputs["ct"].(map[string]interface{})
crdURN, _ := openapi.Pluck(crd, "urn")
assert.Containsf(t, crdURN, "crontab", "urn missing expected name")

ct1 := stackInfo.Deployment.Resources[3]
assert.Equal(t, "my-new-cron-object", string(ct1.URN.Name()))
cr := stackInfo.Outputs["cr"].(map[string]interface{})
crURN, _ := openapi.Pluck(cr, "urn")
assert.Containsf(t, crURN, "my-new-cron-object", "urn missing expected name")
},
EditDirs: []integration.EditDir{
{
Dir: filepath.Join("get", "step2"),
Additive: true,
ExtraRuntimeValidation: func(t *testing.T, stackInfo integration.RuntimeValidationStackInfo) {
assert.NotNil(t, stackInfo.Deployment)
assert.Equal(t, 7, len(stackInfo.Deployment.Resources))
assert.Equal(t, 10, len(stackInfo.Deployment.Resources))

tests.SortResourcesByURN(stackInfo)
//
// Assert we can use .get to retrieve the kube-api Service.
//

stackRes := stackInfo.Deployment.Resources[6]
assert.Equal(t, resource.RootStackType, stackRes.URN.Type())
service := stackInfo.Outputs["svc"].(map[string]interface{})
svcURN, _ := openapi.Pluck(service, "urn")
assert.Containsf(t, svcURN, "kube-api", "urn missing expected name")
svcName, _ := openapi.Pluck(service, "metadata", "name")
assert.Equalf(t, "kubernetes", svcName, "unexpected service name")

provRes := stackInfo.Deployment.Resources[5]
assert.True(t, providers.IsProviderType(provRes.URN.Type()))
//
// Assert that the uninitialized Service exists
//

awaitSvc := stackInfo.Outputs["awaitSvc"].(map[string]interface{})
awaitSvcName, _ := openapi.Pluck(awaitSvc, "metadata", "name")
assert.Equalf(t, "test", awaitSvcName, "unexpected service name")
awaitSvcAnnotation, ok := openapi.Pluck(awaitSvc, "metadata", "annotations", "pulumi.com/skipAwait")
assert.Truef(t, ok, "failed to find skipAwait annotation")
assert.Equalf(t, "true", awaitSvcAnnotation, "expected annotation to be true")

//
// Assert we can use .get to retrieve a Service that would fail await logic.
//

awaitSvcGet := stackInfo.Outputs["awaitSvcGet"].(map[string]interface{})
awaitSvcGetURN, _ := openapi.Pluck(awaitSvcGet, "urn")
assert.Containsf(t, awaitSvcGetURN, "await", "urn missing expected name")

//
// Assert we can use an output from a Service that would fail await logic.
//

cm := stackInfo.Outputs["cm"].(map[string]interface{})
cmURN, _ := openapi.Pluck(cm, "urn")
assert.Containsf(t, cmURN, "svc-test", "urn missing expected name")
clusterIP, _ := openapi.Pluck(cm, "data", "key")
assert.NotEmptyf(t, clusterIP, "clusterIP should be set")

//
// Assert that CRD and CR exist
//

crd := stackInfo.Outputs["ct"].(map[string]interface{})
crdURN, _ := openapi.Pluck(crd, "urn")
assert.Containsf(t, crdURN, "crontab", "urn missing expected name")

cr := stackInfo.Outputs["cr"].(map[string]interface{})
crURN, _ := openapi.Pluck(cr, "urn")
assert.Containsf(t, crURN, "my-new-cron-object", "urn missing expected name")

//
// Assert we can use .get to retrieve CRDs.
//

ct2 := stackInfo.Deployment.Resources[4]
assert.Equal(t, "my-new-cron-object-get", string(ct2.URN.Name()))
image, _ := openapi.Pluck(ct2.Outputs, "spec", "image")
assert.Equal(t, "my-awesome-cron-image", image.(string))
crGet := stackInfo.Outputs["crGet"].(map[string]interface{})
crGetURN, _ := openapi.Pluck(crGet, "urn")
assert.Containsf(t, crGetURN, "my-new-cron-object-get", "urn missing expected name")
},
},
},
Expand Down

0 comments on commit 13189ac

Please sign in to comment.