Skip to content

Commit

Permalink
Python: Use async invokes to avoid hangs and stalls (#2863)
Browse files Browse the repository at this point in the history
pulumi 3.109.0 introduces a new `invoke_async` function
(pulumi/pulumi#15602) that allows calling
invokes asynchronously. This change updates the Kubernetes Python SDK's
`yaml`, `helm`, and `kustomize` components to use `invoke_async` to
avoid hangs and stalls in resource registrations which severely limits
parallelism.

It'd also be great to add some new tests (to complement existing tests),
but I don't want to block on getting a fix out to customers:
1. A test similar to the repro of the hang in
pulumi/pulumi#15462. It's difficult to repro
this without mocking, though.
2. Some kind of benchmark that demonstrates this improves performance.

Fixes pulumi/pulumi#15462
Fixes pulumi/pulumi#15591

(cherry picked from commit e174e29)
  • Loading branch information
justinvp authored and rquitales committed Mar 13, 2024
1 parent 0b393b1 commit 823f51e
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Unreleased

- Use async invokes to avoid hangs/stalls in Python `helm`, `kustomize`, and `yaml` components (https://github.com/pulumi/pulumi-kubernetes/pull/2863)

## 4.9.0 (March 4, 2024)

- Fix SSA ignoreChanges by enhancing field manager path comparisons (https://github.com/pulumi/pulumi-kubernetes/pull/2828)
Expand Down
2 changes: 1 addition & 1 deletion provider/cmd/pulumi-resource-kubernetes/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@
},
"readme": "The Kubernetes provider package offers support for all Kubernetes resources and their properties.\nResources are exposed as types from modules based on Kubernetes API groups such as 'apps', 'core',\n'rbac', and 'storage', among many others. Additionally, support for deploying Helm charts ('helm')\nand YAML files ('yaml') is available in this package. Using this package allows you to\nprogrammatically declare instances of any Kubernetes resources and any supported resource version\nusing infrastructure as code, which Pulumi then uses to drive the Kubernetes API.\n\nIf this is your first time using this package, these two resources may be helpful:\n\n* [Kubernetes Getting Started Guide](https://www.pulumi.com/docs/quickstart/kubernetes/): Get up and running quickly.\n* [Kubernetes Pulumi Setup Documentation](https://www.pulumi.com/docs/quickstart/kubernetes/configure/): How to configure Pulumi\n for use with your Kubernetes cluster.\n",
"requires": {
"pulumi": ">=3.25.0,<4.0.0",
"pulumi": ">=3.109.0,<4.0.0",
"requests": ">=2.21,<3.0"
},
"usesIOClasses": true
Expand Down
8 changes: 4 additions & 4 deletions provider/pkg/gen/python-templates/helm/v3/helm.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,8 @@ def _parse_chart(all_config: Tuple[Union[ChartOpts, LocalChartOpts], pulumi.Reso
if config.skip_await:
transformations.append(_skip_await)

def invoke_helm_template(opts):
inv = pulumi.runtime.invoke('kubernetes:helm:template', {'jsonOpts': opts}, invoke_opts)
return (inv.value or {}).get('result', [])
objects = json_opts.apply(invoke_helm_template)
async def invoke_helm_template_async(opts):
inv = await pulumi.runtime.invoke_async('kubernetes:helm:template', {'jsonOpts': opts}, invoke_opts)
return (inv or {}).get('result', [])
objects = json_opts.apply(invoke_helm_template_async)
return objects.apply(lambda x: _parse_yaml_document(x, opts, transformations))
16 changes: 9 additions & 7 deletions provider/pkg/gen/python-templates/kustomize/kustomize.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,20 @@ def omit_resource(obj, opts):

child_opts = _get_child_options(self, opts)
invoke_opts = _get_invoke_options(child_opts)

__ret__ = pulumi.runtime.invoke(
'kubernetes:kustomize:directory', {'directory': directory}, invoke_opts)

# Handle the cases when the provider is not fully configured:
# https://github.com/pulumi/pulumi/blob/v3.60.1/sdk/go/common/resource/plugin/provider_plugin.go#L1364-L1367
result = (__ret__.value or {}).get('result', [])
async def invoke_kustomize_directory_async():
inv = await pulumi.runtime.invoke_async(
'kubernetes:kustomize:directory', {'directory': directory}, invoke_opts)
# Handle the cases when the provider is not fully configured:
# https://github.com/pulumi/pulumi/blob/v3.60.1/sdk/go/common/resource/plugin/provider_plugin.go#L1364-L1367
return (inv or {}).get('result', [])

result = pulumi.Output.from_input(invoke_kustomize_directory_async())

# Note: Unlike NodeJS, Python requires that we "pull" on our futures in order to get them scheduled for
# execution. In order to do this, we leverage the engine's RegisterResourceOutputs to wait for the
# resolution of all resources that this YAML document created.
self.resources = _parse_yaml_document(result, child_opts, transformations, resource_prefix)
self.resources = result.apply(lambda x: _parse_yaml_document(x, child_opts, transformations, resource_prefix))
self.register_outputs({"resources": self.resources})

def translate_output_property(self, prop: str) -> str:
Expand Down
14 changes: 7 additions & 7 deletions provider/pkg/gen/python-templates/yaml/yaml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ class ConfigGroup(pulumi.ComponentResource):

for text in yaml:
invoke_opts = _get_invoke_options(child_opts)
__ret__ = invoke_yaml_decode(text, invoke_opts)
resources = _parse_yaml_document(__ret__, child_opts, transformations, resource_prefix)
decoded = pulumi.Output.from_input(_invoke_yaml_decode_async(text, invoke_opts))
resources = decoded.apply(lambda x: _parse_yaml_document(x, child_opts, transformations, resource_prefix))
# Add any new YAML resources to the ConfigGroup's resources
self.resources = pulumi.Output.all(resources, self.resources).apply(lambda x: {**x[0], **x[1]})

Expand Down Expand Up @@ -337,12 +337,12 @@ class ConfigFile(pulumi.ComponentResource):
transformations.append(_skip_await)

invoke_opts = _get_invoke_options(child_opts)
__ret__ = invoke_yaml_decode(text, invoke_opts)
decoded = pulumi.Output.from_input(_invoke_yaml_decode_async(text, invoke_opts))

# Note: Unlike NodeJS, Python requires that we "pull" on our futures in order to get them scheduled for
# execution. In order to do this, we leverage the engine's RegisterResourceOutputs to wait for the
# resolution of all resources that this YAML document created.
self.resources = _parse_yaml_document(__ret__, child_opts, transformations, resource_prefix)
self.resources = decoded.apply(lambda x: _parse_yaml_document(x, child_opts, transformations, resource_prefix))
self.register_outputs({"resources": self.resources})

def translate_output_property(self, prop: str) -> str:
Expand Down Expand Up @@ -526,6 +526,6 @@ def _parse_yaml_object(
lambda x: (f"{gvk}:{x}",
CustomResource(f"{x}", api_version, kind, spec, metadata, opts)))]

def invoke_yaml_decode(text, invoke_opts):
inv = pulumi.runtime.invoke('kubernetes:yaml:decode', {'text': text}, invoke_opts)
return (inv.value or {}).get('result', [])
async def _invoke_yaml_decode_async(text, invoke_opts):
inv = await pulumi.runtime.invoke_async('kubernetes:yaml:decode', {'text': text}, invoke_opts)
return (inv or {}).get('result', [])
2 changes: 1 addition & 1 deletion provider/pkg/gen/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ Use the navigation below to see detailed documentation for each of the supported
})
pkg.Language["python"] = rawMessage(map[string]any{
"requires": map[string]string{
"pulumi": ">=3.25.0,<4.0.0",
"pulumi": ">=3.109.0,<4.0.0",
"requests": ">=2.21,<3.0",
},
"pyproject": map[string]bool{
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/pulumi_kubernetes/helm/v3/helm.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,8 @@ def _parse_chart(all_config: Tuple[Union[ChartOpts, LocalChartOpts], pulumi.Reso
if config.skip_await:
transformations.append(_skip_await)

def invoke_helm_template(opts):
inv = pulumi.runtime.invoke('kubernetes:helm:template', {'jsonOpts': opts}, invoke_opts)
return (inv.value or {}).get('result', [])
objects = json_opts.apply(invoke_helm_template)
async def invoke_helm_template_async(opts):
inv = await pulumi.runtime.invoke_async('kubernetes:helm:template', {'jsonOpts': opts}, invoke_opts)
return (inv or {}).get('result', [])
objects = json_opts.apply(invoke_helm_template_async)
return objects.apply(lambda x: _parse_yaml_document(x, opts, transformations))
16 changes: 9 additions & 7 deletions sdk/python/pulumi_kubernetes/kustomize/kustomize.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,20 @@ def omit_resource(obj, opts):

child_opts = _get_child_options(self, opts)
invoke_opts = _get_invoke_options(child_opts)

__ret__ = pulumi.runtime.invoke(
'kubernetes:kustomize:directory', {'directory': directory}, invoke_opts)

# Handle the cases when the provider is not fully configured:
# https://github.com/pulumi/pulumi/blob/v3.60.1/sdk/go/common/resource/plugin/provider_plugin.go#L1364-L1367
result = (__ret__.value or {}).get('result', [])
async def invoke_kustomize_directory_async():
inv = await pulumi.runtime.invoke_async(
'kubernetes:kustomize:directory', {'directory': directory}, invoke_opts)
# Handle the cases when the provider is not fully configured:
# https://github.com/pulumi/pulumi/blob/v3.60.1/sdk/go/common/resource/plugin/provider_plugin.go#L1364-L1367
return (inv or {}).get('result', [])

result = pulumi.Output.from_input(invoke_kustomize_directory_async())

# Note: Unlike NodeJS, Python requires that we "pull" on our futures in order to get them scheduled for
# execution. In order to do this, we leverage the engine's RegisterResourceOutputs to wait for the
# resolution of all resources that this YAML document created.
self.resources = _parse_yaml_document(result, child_opts, transformations, resource_prefix)
self.resources = result.apply(lambda x: _parse_yaml_document(x, child_opts, transformations, resource_prefix))
self.register_outputs({"resources": self.resources})

def translate_output_property(self, prop: str) -> str:
Expand Down
14 changes: 7 additions & 7 deletions sdk/python/pulumi_kubernetes/yaml/yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ def omit_resource(obj, opts):

for text in yaml:
invoke_opts = _get_invoke_options(child_opts)
__ret__ = invoke_yaml_decode(text, invoke_opts)
resources = _parse_yaml_document(__ret__, child_opts, transformations, resource_prefix)
decoded = pulumi.Output.from_input(_invoke_yaml_decode_async(text, invoke_opts))
resources = decoded.apply(lambda x: _parse_yaml_document(x, child_opts, transformations, resource_prefix))
# Add any new YAML resources to the ConfigGroup's resources
self.resources = pulumi.Output.all(resources, self.resources).apply(lambda x: {**x[0], **x[1]})

Expand Down Expand Up @@ -337,12 +337,12 @@ def omit_resource(obj, opts):
transformations.append(_skip_await)

invoke_opts = _get_invoke_options(child_opts)
__ret__ = invoke_yaml_decode(text, invoke_opts)
decoded = pulumi.Output.from_input(_invoke_yaml_decode_async(text, invoke_opts))

# Note: Unlike NodeJS, Python requires that we "pull" on our futures in order to get them scheduled for
# execution. In order to do this, we leverage the engine's RegisterResourceOutputs to wait for the
# resolution of all resources that this YAML document created.
self.resources = _parse_yaml_document(__ret__, child_opts, transformations, resource_prefix)
self.resources = decoded.apply(lambda x: _parse_yaml_document(x, child_opts, transformations, resource_prefix))
self.register_outputs({"resources": self.resources})

def translate_output_property(self, prop: str) -> str:
Expand Down Expand Up @@ -2006,6 +2006,6 @@ def _parse_yaml_object(
lambda x: (f"{gvk}:{x}",
CustomResource(f"{x}", api_version, kind, spec, metadata, opts)))]

def invoke_yaml_decode(text, invoke_opts):
inv = pulumi.runtime.invoke('kubernetes:yaml:decode', {'text': text}, invoke_opts)
return (inv.value or {}).get('result', [])
async def _invoke_yaml_decode_async(text, invoke_opts):
inv = await pulumi.runtime.invoke_async('kubernetes:yaml:decode', {'text': text}, invoke_opts)
return (inv or {}).get('result', [])
2 changes: 1 addition & 1 deletion sdk/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "pulumi_kubernetes"
description = "A Pulumi package for creating and managing Kubernetes resources."
dependencies = ["parver>=0.2.1", "pulumi>=3.25.0,<4.0.0", "requests>=2.21,<3.0", "semver>=2.8.1"]
dependencies = ["parver>=0.2.1", "pulumi>=3.109.0,<4.0.0", "requests>=2.21,<3.0", "semver>=2.8.1"]
keywords = ["pulumi", "kubernetes", "category/cloud", "kind/native"]
readme = "README.md"
requires-python = ">=3.8"
Expand Down

0 comments on commit 823f51e

Please sign in to comment.