Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setting up the environment prior to task execution #18

Closed
spitz-dan-l opened this issue Jan 26, 2022 · 3 comments
Closed

Setting up the environment prior to task execution #18

spitz-dan-l opened this issue Jan 26, 2022 · 3 comments

Comments

@spitz-dan-l
Copy link
Contributor

Hey there! I have been continuing to play around with redun and I continue to be impressed.

While I am a big fan of redun's simple model of a task as a pure input-output function, I find that in practice it is sometimes useful to set up the environment right before a task executes.

A simple example is- say I want the environment variable MY_VAR to be set to the value MY_VALUE in all tasks that run.

Currently, it seems like the only ways to do this are:

  1. (Local executor) - Set MY_VAR prior to starting redun.
  2. (Batch executor) - Build docker images with MY_VAR set and reference those images.
  3. (works everywhere) Modify all tasks to explicitly set MY_VAR themselves before doing anything else.

All of the above become more difficult when the desired value of MY_VAR becomes dependent on something that can change across runs- e.g. a --setup option.

I recognize that any generic facility for running code prior to task execution can be misused, and may weaken redun's ability to perfectly track provenance, as well as complicate its execution model. So I understand if it is not in the cards. Just wanted to mention it. I also wonder if there's a functional-programming-inspired approach that could work, a la monads.

@mattrasmus
Copy link
Collaborator

Hi @spitz-dan-l, great question again.

We do support adding a wrapper around a task to do setup and teardown like activities. The feature is called "task decorators" which are like Python's decorators but for tasks instead of functions.

redun/redun/task.py

Lines 751 to 764 in e007df4

def doubled_task() -> Callable[[Func], Task[Func]]:
# The name of this inner function is used to create the nested namespace,
# so idiomatically, use the same name as the decorator with a leading underscore.
@wraps_task()
def _doubled_task(inner_task: Task) -> Callable[[Task[Func]], Func]:
# The behavior when the task is run. Note we have both the task and the
# runtime args.
def do_doubling(*task_args, **task_kwargs) -> Any:
return 2 * inner_task.func(*task_args, **task_kwargs)
return do_doubling
return _doubled_task

The key thing to use is wraps_task which is the task equivalent of functools.wraps for normal decorators.

In the example provided we do post-processing, but you could do preprocessing if needed. Calling task.func(...) calls the original function sychronously, so you can use that to avoid starting a separate task (which may be in another thread/process/batch job/etc).

@spitz-dan-l
Copy link
Contributor Author

spitz-dan-l commented Feb 4, 2022

Thanks @mattrasmus !

This seems like the perfect solution! However, I'm noticing some inconsistency in the behavior. Basically, when using the local-process or awsbatch executor, task wrappers only work when used as decorators. Here is a case where using it more dynamically has different behavior for the local-thread executor vs local-process or batch:

# Taken from the docstring for wraps_task
def wrapper_with_args(wrapper_arg: int):
    @wraps_task(wrapper_hash_includes=[wrapper_arg])
    def _wrapper_with_args(inner_task: Task):
        def do_wrapper_with_args(*task_args, **task_kwargs):
            return wrapper_arg * inner_task.func(*task_args, **task_kwargs)
        return do_wrapper_with_args
    return _wrapper_with_args

# Using as a decorator works as expected for all executors
@wrapper_with_args(2)
def foo():
    return 5

# However, if we try to use it inside another task, the behavior will not be as excepted
@task()
def bar():
    return 5

@task()
def main():
    return wrapper_with_args(2)(bar)()

With the local executor in threaded mode, this outputs 10 (as expected). But with in process mode, or using the batch executor, it outputs 5, seemingly ignoring the wrapper.

@mattrasmus
Copy link
Collaborator

Yes, using the task decorator dynamically like that is not supported and we can better document that. The reason is that redun requires tasks to be registered at import-time (other design choices currently require this). Decorators technically create a new outer task, so it's important for it to run during import, which is what you get when used in the usual decorator @ style. Your dynamic case hides the outer task creation within a task, which leads to undefined behavior.

The details are low level, but basically when we first get inside a batch job, we need to repopulate the TaskRegistry. We do that by re-importing the workflow script, which typically causes all necessary tasks to be registered.

Let me know if you have any more questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants