Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
import time
from asyncio.exceptions import CancelledError
from concurrent.futures import Future
from concurrent.futures import Future, TimeoutError
from time import sleep
from typing import Any, Callable, Optional, Union

Expand Down Expand Up @@ -672,15 +672,13 @@ def _execute_task_with_cache(
future_queue.task_done()


def _get_exception_lst(future_lst: list) -> list:
def get_exception(future_obj: Future) -> bool:
try:
excp = future_obj.exception(timeout=10**-10)
return excp is not None and not isinstance(excp, CancelledError)
except TimeoutError:
return False
def _get_exception_lst(future_lst: list[Future]) -> list:
return [f.exception() for f in future_lst if _get_exception(future_obj=f)]

if sys.version_info[0] >= 3 and sys.version_info[1] >= 11:
return [f.exception() for f in future_lst if get_exception(future_obj=f)]
else:
return []

def _get_exception(future_obj: Future) -> bool:
try:
excp = future_obj.exception(timeout=10**-10)
return excp is not None and not isinstance(excp, CancelledError)
except TimeoutError:
return False
24 changes: 0 additions & 24 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ def test_dependency_steps(self):
self.assertTrue(fs2.done())
q.put({"shutdown": True, "wait": True})

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_dependency_steps_error(self):
cloudpickle_register(ind=1)
fs1 = Future()
Expand Down Expand Up @@ -164,10 +160,6 @@ def test_dependency_steps_error(self):
fs2.result()
q.put({"shutdown": True, "wait": True})

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_dependency_steps_error_before(self):
cloudpickle_register(ind=1)
fs1 = Future()
Expand Down Expand Up @@ -282,10 +274,6 @@ def test_block_allocation_true_two_workers(self):
cloudpickle_register(ind=1)
_ = exe.submit(raise_error, parameter=0)

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_false_one_worker_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe:
Expand All @@ -298,10 +286,6 @@ def test_block_allocation_false_one_worker_loop(self):
)
lst.result()

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_true_one_worker_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe:
Expand All @@ -314,10 +298,6 @@ def test_block_allocation_true_one_worker_loop(self):
)
lst.result()

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_false_two_workers_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe:
Expand All @@ -330,10 +310,6 @@ def test_block_allocation_false_two_workers_loop(self):
)
lst.result()

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_true_two_workers_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe:
Expand Down
Loading