diff --git a/pympipool/share/serial.py b/pympipool/share/serial.py index aeff635b..a17d91a5 100644 --- a/pympipool/share/serial.py +++ b/pympipool/share/serial.py @@ -78,6 +78,43 @@ def get_parallel_subprocess_command( return command_lst +def execute_parallel_tasks_loop(interface, future_queue): + while True: + task_dict = future_queue.get() + if "shutdown" in task_dict.keys() and task_dict["shutdown"]: + interface.shutdown(wait=task_dict["wait"]) + break + elif "fn" in task_dict.keys() and "future" in task_dict.keys(): + f = task_dict.pop("future") + if f.set_running_or_notify_cancel(): + f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) + elif "fn" in task_dict.keys() and "init" in task_dict.keys(): + interface.send_dict(input_dict=task_dict) + + +def execute_serial_tasks_loop(interface, future_queue, future_dict, sleep_interval=0.1): + while True: + try: + task_dict = future_queue.get_nowait() + except queue.Empty: + pass + else: + if "shutdown" in task_dict.keys() and task_dict["shutdown"]: + done_dict = interface.shutdown(wait=task_dict["wait"]) + if isinstance(done_dict, dict): + for k, v in done_dict.items(): + if k in future_dict.keys() and not future_dict[k].cancelled(): + future_dict.pop(k).set_result(v) + break + elif "fn" in task_dict.keys() and "future" in task_dict.keys(): + f = task_dict.pop("future") + future_hash = interface.send_and_receive_dict(input_dict=task_dict) + future_dict[future_hash] = f + update_future_dict( + interface=interface, future_dict=future_dict, sleep_interval=sleep_interval + ) + + def execute_parallel_tasks( future_queue, cores, oversubscribe=False, enable_flux_backend=False, cwd=None ): @@ -93,17 +130,7 @@ def execute_parallel_tasks( ), cwd=cwd, ) - while True: - task_dict = future_queue.get() - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - interface.shutdown(wait=task_dict["wait"]) - break - elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - f = task_dict.pop("future") - if f.set_running_or_notify_cancel(): - f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) - elif "fn" in task_dict.keys() and "init" in task_dict.keys(): - interface.send_dict(input_dict=task_dict) + execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) def execute_serial_tasks( @@ -127,26 +154,12 @@ def execute_serial_tasks( ), cwd=cwd, ) - while True: - try: - task_dict = future_queue.get_nowait() - except queue.Empty: - pass - else: - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - done_dict = interface.shutdown(wait=task_dict["wait"]) - if isinstance(done_dict, dict): - for k, v in done_dict.items(): - if k in future_dict.keys() and not future_dict[k].cancelled(): - future_dict.pop(k).set_result(v) - break - elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - f = task_dict.pop("future") - future_hash = interface.send_and_receive_dict(input_dict=task_dict) - future_dict[future_hash] = f - update_future_dict( - interface=interface, future_dict=future_dict, sleep_interval=sleep_interval - ) + execute_serial_tasks_loop( + interface=interface, + future_queue=future_queue, + future_dict=future_dict, + sleep_interval=sleep_interval, + ) def update_future_dict(interface, future_dict, sleep_interval=0.1):