You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I used the GitHub search to find a similar request and didn't find it.
I searched the Prefect documentation for this feature.
Describe the current behavior
The .map interface of a prefect task is useful to submit a series of operations across an iterable of arguments for execution execution. The return of this map method is a list of PrefectFutures of the individual .submit results.
If there was a secondary prefect task that does not have direct data deficiencies, we can use its wait_for argument to tell the orchestration not to start until all mapped inputs in the first task have completed. However, there is no way to both use the map interface while specifying that it only has to wait for the corresponding future from task a to finish.
If task b has no relation to task a, but if task a has some side-effects that task b has to be aware of, it is necessary to wait for task a to finish appropriately. As an example - I have a pipeline that uses (almost exclusively) the .map interface. When I am processing the data set (a nest series of tables on disk called a Measurement Set, a format HPCs hate) I can not operate with said data. But, I want to zip/tar it asap to avoid file-quota related issues. I can either modify the expected return and input types between functions to pass around this extra information, or I can keep the functions as I would otherwise use and leverage the dependency management of prefect.
To put with some code:
futures = task_a.map("Break this string".split())
for some_number, future in zip([1,2,3], futures):
task_b.submit(some_number, wait_for=future)
Although the above works, it has a rather unattractive mixture of prefect methods. And I am sure that when the inputs to the mapped function are sufficiently large there are performance penalties when interacting with the API.
Describe the proposed behavior
Have an operation similar to unmapped that works for wait_for.
futures = task_a.map("Break this string".split())
task_b.submit([1,2,3], wait_for=unmapped(futures))
This would also make it easier to build up more complex set of wait_for statements without having to do any extra indexing or logic within a for loop.
futures = task_a.map("Break this string".split())
more_futures = new_task.map("Some other string".split())
task_b.submit([1,2,3], wait_for=(unmapped(futures), unmapped(more_futures)))
Example Use
No response
Additional context
I attempt to use the .map interface as much as possible in my pipelines. It makes the code much more readable, and avoid unnecessary layers of indentation.
I find myself in these situation a fair amount
telescope_mss = glob("*.ms")
image_data = image_ms_data.map(ms=telescope_mss)
zip = zip_data.map(ms=telescope_mss, wait_for=image_data)
In the above situation I can not zip the measurement sets (these nasty radio-telescope formats that are folders in folders, many small files and many big files that HPCs absolutely hate) until the imaging is finished.
Although I could pass the image_data future into the input of zip_data, it would mean I either have to modify the return of image_ms_data (which is the image, not the data) and updated what zip_ms expects, or add a dummy arg input to make dependency that way. This is a little smelly to me, as realistically the zip_data function should be completely agnostic to what it accepts for zipping.
This is an example. but is the type of use case pops up a little for me. As I try to convince the powers the be that we have a workable maintainable solution I'd love to keep my usage of prefect as consistent and as readable as possible.
The text was updated successfully, but these errors were encountered:
Thanks for the enhancement request @tjgalvin! If you're chaining .map calls, would you expect this new functionality to wait for upstream completion to be index-based (i.e. mapped execution 0 waits for upstream future 0, mapped execution 1 waits for upstream future 1, etc.)? Or is there other criteria that you'd expect to be able to use when waiting for upstream completion in a .map call?
First check
Describe the current behavior
The
.map
interface of a prefect task is useful to submit a series of operations across an iterable of arguments for execution execution. The return of thismap
method is a list ofPrefectFutures
of the individual.submit
results.If there was a secondary prefect task that does not have direct data deficiencies, we can use its
wait_for
argument to tell the orchestration not to start until all mapped inputs in the first task have completed. However, there is no way to both use the map interface while specifying that it only has to wait for the corresponding future from task a to finish.If task b has no relation to task a, but if task a has some side-effects that task b has to be aware of, it is necessary to wait for task a to finish appropriately. As an example - I have a pipeline that uses (almost exclusively) the
.map
interface. When I am processing the data set (a nest series of tables on disk called a Measurement Set, a format HPCs hate) I can not operate with said data. But, I want to zip/tar it asap to avoid file-quota related issues. I can either modify the expected return and input types between functions to pass around this extra information, or I can keep the functions as I would otherwise use and leverage the dependency management of prefect.To put with some code:
Although the above works, it has a rather unattractive mixture of prefect methods. And I am sure that when the inputs to the mapped function are sufficiently large there are performance penalties when interacting with the API.
Describe the proposed behavior
Have an operation similar to
unmapped
that works forwait_for
.This would also make it easier to build up more complex set of
wait_for
statements without having to do any extra indexing or logic within a for loop.Example Use
No response
Additional context
I attempt to use the
.map
interface as much as possible in my pipelines. It makes the code much more readable, and avoid unnecessary layers of indentation.I find myself in these situation a fair amount
In the above situation I can not zip the measurement sets (these nasty radio-telescope formats that are folders in folders, many small files and many big files that HPCs absolutely hate) until the imaging is finished.
Although I could pass the image_data future into the input of zip_data, it would mean I either have to modify the return of image_ms_data (which is the image, not the data) and updated what zip_ms expects, or add a dummy arg input to make dependency that way. This is a little smelly to me, as realistically the zip_data function should be completely agnostic to what it accepts for zipping.
This is an example. but is the type of use case pops up a little for me. As I try to convince the powers the be that we have a workable maintainable solution I'd love to keep my usage of prefect as consistent and as readable as possible.
The text was updated successfully, but these errors were encountered: