-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[sdk/python] - Await all async tasks #6606
Conversation
81222bd
to
8cb5c67
Compare
# We will occasionally start tasks deliberately that we know will never complete. We must | ||
# cancel them before shutting down the event loop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually true? I haven't found any instances where we start tasks "forever" in practice. And I don't really know why we would have a run-forever task within the pulumi_func especially.
The solution here depends on this statement being false and an assumption that all futures will resolve unless an exception is thrown.
f9373b5
to
8e2c2b4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with the caveat that I'm not very familiar with this part of the codebase.
7dd01d9
to
bc8d82d
Compare
Had a chat with @pgavlin about this PR and he brought up the fact that the "other async tasks" could themselves schedule new RPCs so I need to do a little more work here, at the very least to add tests for that particular scenario. |
One concrete example is the creation of a resource inside of an |
2618ddb
to
d8463c0
Compare
b4f5168
to
db71a74
Compare
@pgavlin I've added the "create a resource inside apply" test and fixed up the code accordingly. Should be ready for your review 🙏🏽 |
e9a7ee0
to
6147eae
Compare
Currently, only RPC futures are awaited in the python SDK. When RPCs are complete, we collect all remaining tasks and cancel them. This means that code within
.apply()
and/or un-exported outputs are not always awaited and may be canceled in-flight.This PR awaits the remaining outstanding tasks until either an exception is raised or all futures are resolved. Once
asyncio.wait
returns we cancel any pending tasks if they exist and check thedone
pile for any futures that raised an exception, re-raising if so.We then repeat the cycle if the "extra async work" has itself scheduled any RPCs, only breaking when there are no remaining RPCs at the end of the cycle.
Python part of #3991