New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exhaust generator operators when executing directly #3803
Conversation
@@ -362,30 +367,8 @@ async def _execute_operator(self, doc, log=False, run_link=None): | |||
set_progress=self.set_progress, | |||
) | |||
|
|||
# if a validation error happened during preparation, | |||
# only an ExecutionResult with an error is returned. | |||
# Raise it so the delegated operation is marked as a failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous implementation, validation errors would result in set_failed()
being called without set_running()
ever being called, which seemed weird to me? Now all operations will have a running time logged before possibly failing.
|
||
if log: | ||
logger.info("Running operator %s", operator_uri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a duplicate log
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## develop #3803 +/- ##
========================================
Coverage 15.57% 15.57%
========================================
Files 645 645
Lines 74120 74120
Branches 990 990
========================================
Hits 11542 11542
Misses 62578 62578
Flags with carried forward coverage won't be shown. Click here to find out more.
☔ View full report in Codecov by Sentry. |
if not exhaust: | ||
return result | ||
|
||
if inspect.isgenerator(result): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan of swallowing the messages the operator is sending via the generator. We should probably allow for the callee to pass in a function that processes these. Otherwise there is no path to handling messages when calling execute_operator()
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although in this could be the behavior of exhaust
which would be the default. I'm thinking mostlyu about adding execute_operator(message_reader=MyMessageReader())
or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds potentially useful to me 👍
This PR just unifies how execute_operator()
work with generators with how delegated operator execution works (which already "swallows" generators)
Fixes a bug where
foo.execute_operator()
would not exhaust generator operators. While fixing this, took the opportunity to remove code duplication, since the same logic was already in-place when executing delegated operations.