Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dynamic Task #106

Merged
merged 3 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions mara_pipelines/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import copy
import pathlib
import re
from typing import Optional, Dict, Set, List, Tuple, Union
from typing import Optional, Dict, Set, List, Tuple, Union, Callable

from . import config

Expand Down Expand Up @@ -81,24 +81,53 @@ def html_doc_items(self) -> List[Tuple[str, str]]:


class Task(Node):
def __init__(self, id: str, description: str, commands: Optional[List[Command]] = None, max_retries: Optional[int] = None) -> None:
def __init__(self, id: str, description: str, commands: Optional[Union[Callable, List[Command]]] = None, max_retries: Optional[int] = None) -> None:
super().__init__(id, description)
self.commands = []
self.max_retries = max_retries

for command in commands or []:
self.add_command(command)

def add_command(self, command: Command, prepend=False):
if callable(commands):
self._commands = None
self.__dynamic_commands_generator_func = commands
else:
self._commands = []
self._add_commands(commands or [])

@property
def is_dynamic_commands(self) -> bool:
"""if the command list is generated dynamically via a function"""
return self.__dynamic_commands_generator_func is not None

def _assert_is_not_dynamic(self):
if self.is_dynamic_commands:
raise Exception('You cannot use add_command when the task is constructed with a callable commands function.')

@property
def commands(self) -> List:
if self._commands is None:
self._commands = []
# execute the callable command function and cache the result
for command in self.__dynamic_commands_generator_func() or []:
self._add_command(command)
return self._commands

def _add_command(self, command: Command, prepend=False):
if prepend:
self.commands.insert(0, command)
self._commands.insert(0, command)
else:
self.commands.append(command)
self._commands.append(command)
command.parent = self

def add_commands(self, commands: List[Command]):
def _add_commands(self, commands: List[Command]):
for command in commands:
self.add_command(command)
self._add_command(command)

def add_command(self, command: Command, prepend=False):
self._assert_is_not_dynamic()
self._add_command(command, prepend=prepend)

def add_commands(self, commands: List[Command]):
self._assert_is_not_dynamic()
self._add_commands(commands)

def run(self):
for command in self.commands:
Expand Down
4 changes: 4 additions & 0 deletions mara_pipelines/ui/node_page.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ def __(pipeline: pipelines.Pipeline):
def __(task: pipelines.Task):
if not acl.current_user_has_permission(views.acl_resource):
return bootstrap.card(header_left='Commands', body=acl.inline_permission_denied_message())
elif task.is_dynamic_commands:
return bootstrap.card(
header_left='Commands',
body=f"""<span style="font-style:italic;color:#aaa"><span class="fa fa-lightbulb"> </span> {"... are defined dynamically during execution"}</span>""")
else:
commands_card = bootstrap.card(
header_left='Commands',
Expand Down
64 changes: 64 additions & 0 deletions tests/test_pipeline_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import pytest
from typing import List

from mara_pipelines.pipelines import Task
from mara_pipelines.commands.python import RunFunction

class _PythonFuncTestResult:
has_run = False


def test_run_task():
"""
A simple test executing a task.
"""
test_result = _PythonFuncTestResult()

def python_test_function(result: _PythonFuncTestResult):
result.has_run = True # noqa: F841

assert not test_result.has_run

task = Task(
id='run_task',
description="Unit test test_run_task",
commands=[RunFunction(function=python_test_function, args=[test_result])])

assert not test_result.has_run

task.run()

assert test_result.has_run


def test_run_task_dynamic_commands():
"""
A simple test executing a task with callable commands
"""
import mara_pipelines.ui.node_page

test_result = _PythonFuncTestResult()

def python_test_function(result: _PythonFuncTestResult):
result.has_run = True # noqa: F841

def generate_command_list() -> List:
yield RunFunction(function=lambda t: python_test_function(t), args=[test_result])

assert not test_result.has_run

task = Task(
id='run_task_dynamic_commands',
description="Unit test test_run_task_dynamic_commands",
commands=generate_command_list)

assert not test_result.has_run

content = mara_pipelines.ui.node_page.node_content(task)
assert content

assert not test_result.has_run

task.run()

assert test_result.has_run