Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions executorlib/interactive/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,23 @@ def __init__(
else:
self._generate_dependency_graph = True

@property
def info(self) -> Optional[dict]:
"""
Get the information about the executor.

Returns:
Optional[dict]: Information about the executor.
"""
if isinstance(self._future_queue, queue.Queue):
f: Future = Future()
self._future_queue.queue.insert(
0, {"internal": True, "task": "info", "future": f}
)
return f.result()
else:
return None

def submit( # type: ignore
self,
fn: Callable[..., Any],
Expand Down Expand Up @@ -168,6 +185,11 @@ def _execute_tasks_with_dependencies(
future_queue.task_done()
future_queue.join()
break
if ( # shutdown the executor
task_dict is not None and "internal" in task_dict and task_dict["internal"]
):
if task_dict["task"] == "info":
task_dict["future"].set_result(executor.info)
Comment on lines +188 to +192
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix incorrect comment and add error handling.

The comment is incorrect (says "shutdown the executor" but handles internal tasks). Also, add error handling for accessing executor.info.

-        if (  # shutdown the executor
+        if (  # handle internal tasks
             task_dict is not None and "internal" in task_dict and task_dict["internal"]
         ):
             if task_dict["task"] == "info":
-                task_dict["future"].set_result(executor.info)
+                try:
+                    task_dict["future"].set_result(executor.info)
+                except Exception as e:
+                    task_dict["future"].set_exception(e)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if ( # shutdown the executor
task_dict is not None and "internal" in task_dict and task_dict["internal"]
):
if task_dict["task"] == "info":
task_dict["future"].set_result(executor.info)
if ( # handle internal tasks
task_dict is not None and "internal" in task_dict and task_dict["internal"]
):
if task_dict["task"] == "info":
try:
task_dict["future"].set_result(executor.info)
except Exception as e:
task_dict["future"].set_exception(e)

elif ( # handle function submitted to the executor
task_dict is not None and "fn" in task_dict and "future" in task_dict
):
Expand Down
34 changes: 34 additions & 0 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from executorlib.interfaces.single import create_single_node_executor
from executorlib.interactive.dependency import _execute_tasks_with_dependencies
from executorlib.standalone.serialize import cloudpickle_register
from executorlib.standalone.interactive.spawner import MpiExecSpawner


try:
Expand Down Expand Up @@ -327,3 +328,36 @@ def test_block_allocation_true_two_workers_loop(self):
parameter=lst,
)
lst.result()


class TestInfo(unittest.TestCase):
"""Test cases for the info property of SingleNodeExecutor."""

def setUp(self):
"""Set up the expected info dictionary."""
self.expected_info = {
'cores': 1,
'cwd': None,
'openmpi_oversubscribe': False,
'cache_directory': None,
'hostname_localhost': None,
'spawner': MpiExecSpawner,
'max_cores': None,
'max_workers': None,
}

def test_info_disable_dependencies_true(self):
"""Test info property with dependencies disabled."""
with SingleNodeExecutor(disable_dependencies=True) as exe:
self.assertEqual(exe.info, self.expected_info)

def test_info_disable_dependencies_false(self):
"""Test info property with dependencies enabled."""
with SingleNodeExecutor(disable_dependencies=False) as exe:
self.assertEqual(exe.info, self.expected_info)

def test_info_error_handling(self):
"""Test info property error handling when executor is not running."""
exe = SingleNodeExecutor()
exe.shutdown(wait=True)
self.assertIsNone(exe.info)
Loading