Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ResourceProvider#StreamInvoke #3424

Merged
merged 4 commits into from
Nov 5, 2019
Merged

Conversation

hausdorff
Copy link
Contributor

No description provided.

@hausdorff hausdorff force-pushed the hausdorff/streaminvoke branch 4 times, most recently from d1ef371 to 2528fda Compare October 31, 2019 02:15
@hausdorff hausdorff marked this pull request as ready for review November 1, 2019 01:31
@hausdorff
Copy link
Contributor Author

@pgavlin I'm going to mark this as ready for review because I think it's directionally correct. LMK what you think.

pkg/resource/deploy/builtins.go Outdated Show resolved Hide resolved
pkg/resource/deploy/providers/registry.go Outdated Show resolved Hide resolved
pkg/resource/deploy/source_eval.go Outdated Show resolved Hide resolved
pkg/resource/deploy/source_query.go Show resolved Hide resolved
pkg/resource/plugin/provider.go Outdated Show resolved Hide resolved
opts: InvokeOptions = {},
): AsyncIterable<any> {
if (opts.async) {
throw Error("streamInvoke does not support async mode");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like streamInvoke only supports async mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! On purpose. By definition it's an AsyncIterable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. I don't think we need to throw in this case, then--opts.async is essentially meaningless, so we might as well ignore it entirely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we shouldn't throw here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops missed this. will fix tonight.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I just made streamInvoke async natively.

@hausdorff
Copy link
Contributor Author

Just an update on this change—we've decided that we need to be able to cancel a StreamInvoke from the language provider. For example:

const deployments = pulumi.runtime.streamInvoke("kubernetes:kubernetes:watch", {
    group: "apps",
    version: "v1",
    kind: "Deployment",
});

for await (const { object: deployment, type } of deployments) {
    console.log(`${deployment.metadata!.name} was ${type}`);
    break;
}

deployments.close(); // <-- Signals to the resource provider to stop the invoke.

The code will be done today, but it's probably best, given that people are trying to leave so they can go to the end-of-sprint unwinder, to check this in on Monday.

@hausdorff hausdorff force-pushed the hausdorff/streaminvoke branch 3 times, most recently from 5752852 to 6e4d838 Compare November 4, 2019 21:56
@hausdorff
Copy link
Contributor Author

hausdorff commented Nov 4, 2019

I've just added the ability to "cancel" a streamInvoke from the language provider, so this is close to being ready to go. The commit messages contain more context, but in short, it's now possible to write something like:

// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 1
// update from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
    group: "apps", version: "v1", kind: "Deployment",
    break;
});
for await (const d of deployments) {
    break;
}
deployments.cancel();

The corresponding "graceful shutdown" code in the Kubernetes provider (which you can see in pulumi/pulumi-kubernetes#858) looks like the following snippet, and specifically in the last case. Something ~like this would have to be implemented in any provider that chose to support ResourceProvider#StreamInvoke.

for {
	select {
	case <-k.canceler.context.Done():
		//
		// `kubeProvider#Cancel` was called. Terminate the `StreamInvoke` RPC, free all
		// resources, and exit without error.
		//

		watch.Stop()
		return nil
	case event := <-watch.ResultChan():
		//
		// Kubernetes resource was updated. Publish resource update back to user.
		//

		resp, err := plugin.MarshalProperties(
			resource.NewPropertyMapFromMap(
				map[string]interface{}{
					"type":   event.Type,
					"object": event.Object.(*unstructured.Unstructured).Object,
				}),
			plugin.MarshalOptions{})
		if err != nil {
			return err
		}

		err = server.Send(&pulumirpc.InvokeResponse{Return: resp})
		if err != nil {
			return err
		}
	case <-server.Context().Done():
		//
		// gRPC stream was cancelled. Usually this happens in the language provider, e.g.,
		// in the call to `cancel` below. Terminate the `StreamInvoke` RPC, free all
		// resources, and exit without error.
		//
		//     const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
		//         group: "apps", version: "v1", kind: "Deployment",
		//     });
		//     deployments.cancel();
		//

		watch.Stop()
		return nil
	}
}

}

// Check properties that failed verification.
var failures []CheckFailure
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit weird--do we expect to only get these on the first message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's weird. We're streaming InvokeResponse back, but because we don't have a separate, first-class structured error channel it seems like you have to cram the error conditions into the message type—which we do both here and in Invoke. So you end up with this weird thing.

We could just take this out. It's not clear to me which is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, I think we can leave it for now. Maybe add a note with what you described above.

The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.

This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.

Use it like this:

    // `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
    // updates from the stream, then `cancel` giving the Kubernetes provider to
    // clean up and close gracefully.
    const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
        group: "apps", version: "v1", kind: "Deployment",
        break;
    });
    deployments.cancel();
@hausdorff hausdorff merged commit c0490ec into master Nov 5, 2019
@pulumi-bot pulumi-bot deleted the hausdorff/streaminvoke branch November 5, 2019 18:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants