Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pympipool/interfaces/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ def __init__(self):
self._future_queue = queue.Queue()
self._process = None

@property
def future_queue(self):
return self._future_queue

def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.

Expand Down
21 changes: 8 additions & 13 deletions pympipool/interfaces/fluxbroker.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import os
import queue
import socket
from socket import gethostname
import sys
from time import sleep

from pympipool.shared.broker import (
_get_future_done,
_execute_task_dict,
get_future_done,
execute_task_dict,
)
from pympipool.interfaces.base import ExecutorBase
from pympipool.shared.thread import RaisingThread
from pympipool.shared.taskexecutor import (
cloudpickle_register,
_execute_parallel_tasks_loop,
execute_parallel_tasks_loop,
)
from pympipool.shared.connections import FluxPythonInterface
from pympipool.shared.communication import SocketInterface
Expand All @@ -29,13 +29,8 @@ class SingleTaskExecutor(ExecutorBase):
Args:
cores (int): defines the number of MPI ranks to use for each function call
gpus_per_task (int): number of GPUs per MPI rank - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec
enable_slurm_backend (bool): enable the SLURM queueing system as backend - defaults to False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
queue_adapter (pysqa.queueadapter.QueueAdapter): generalized interface to various queuing systems
queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter

Simple example:
```
Expand Down Expand Up @@ -149,7 +144,7 @@ def execute_parallel_tasks(
gpus_per_core=gpus_per_task,
executor=executor,
)
_execute_parallel_tasks_loop(interface=interface, future_queue=future_queue)
execute_parallel_tasks_loop(interface=interface, future_queue=future_queue)


def interface_bootup(
Expand All @@ -161,7 +156,7 @@ def interface_bootup(
):
command_lst += [
"--host",
socket.gethostname(),
gethostname(),
]
connections = FluxPythonInterface(
cwd=cwd,
Expand Down Expand Up @@ -203,7 +198,7 @@ def executor_broker(
except queue.Empty:
sleep(sleep_interval)
else:
if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
future_queue.task_done()
else:
future_queue.task_done()
Expand All @@ -219,7 +214,7 @@ def _get_executor_list(
executor=None,
):
return {
_get_future_done(): SingleTaskExecutor(
get_future_done(): SingleTaskExecutor(
cores=cores_per_worker,
gpus_per_task=int(gpus_per_worker / cores_per_worker),
init_function=init_function,
Expand Down
4 changes: 2 additions & 2 deletions pympipool/legacy/shared/backend.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from tqdm import tqdm

from pympipool.shared.backend import call_funct, _update_default_dict_from_arguments
from pympipool.shared.backend import call_funct, update_default_dict_from_arguments


def map_funct(executor, funct, lst, chunksize=1, cores_per_task=1, map_flag=True):
Expand Down Expand Up @@ -42,7 +42,7 @@ def parse_arguments(argument_lst):
Returns:
dict: dictionary with the parsed arguments and their corresponding values
"""
return _update_default_dict_from_arguments(
return update_default_dict_from_arguments(
argument_lst=argument_lst,
argument_dict={
"total_cores": "--cores-total",
Expand Down
4 changes: 2 additions & 2 deletions pympipool/shared/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def parse_arguments(argument_lst):
Returns:
dict: dictionary with the parsed arguments and their corresponding values
"""
return _update_default_dict_from_arguments(
return update_default_dict_from_arguments(
argument_lst=argument_lst,
argument_dict={
"zmqport": "--zmqport",
Expand All @@ -50,7 +50,7 @@ def parse_arguments(argument_lst):
)


def _update_default_dict_from_arguments(argument_lst, argument_dict, default_dict):
def update_default_dict_from_arguments(argument_lst, argument_dict, default_dict):
default_dict.update(
{
k: argument_lst[argument_lst.index(v) + 1]
Expand Down
10 changes: 5 additions & 5 deletions pympipool/shared/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ def executor_broker(
except queue.Empty:
sleep(sleep_interval)
else:
if _execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
if execute_task_dict(task_dict=task_dict, meta_future_lst=meta_future_lst):
future_queue.task_done()
else:
future_queue.task_done()
break


def _execute_task_dict(task_dict, meta_future_lst):
def execute_task_dict(task_dict, meta_future_lst):
if "fn" in task_dict.keys():
meta_future = next(as_completed(meta_future_lst.keys()))
executor = meta_future_lst.pop(meta_future)
executor._future_queue.put(task_dict)
executor.future_queue.put(task_dict)
meta_future_lst[task_dict["future"]] = executor
return True
elif "shutdown" in task_dict.keys() and task_dict["shutdown"]:
Expand All @@ -72,7 +72,7 @@ def _get_executor_list(
queue_adapter_kwargs=None,
):
return {
_get_future_done(): Executor(
get_future_done(): Executor(
cores=cores_per_worker,
gpus_per_task=int(gpus_per_worker / cores_per_worker),
oversubscribe=oversubscribe,
Expand All @@ -87,7 +87,7 @@ def _get_executor_list(
}


def _get_future_done():
def get_future_done():
f = Future()
f.set_result(True)
return f
4 changes: 2 additions & 2 deletions pympipool/shared/communication.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import cloudpickle
import socket
from socket import gethostname
import zmq

from pympipool.shared.connections import get_connection_interface
Expand Down Expand Up @@ -111,7 +111,7 @@ def interface_bootup(
if enable_flux_backend or enable_slurm_backend or queue_adapter is not None:
command_lst += [
"--host",
socket.gethostname(),
gethostname(),
]
connections = get_connection_interface(
cwd=cwd,
Expand Down
4 changes: 2 additions & 2 deletions pympipool/shared/taskexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ def execute_parallel_tasks(
queue_adapter=queue_adapter,
queue_adapter_kwargs=queue_adapter_kwargs,
)
_execute_parallel_tasks_loop(interface=interface, future_queue=future_queue)
execute_parallel_tasks_loop(interface=interface, future_queue=future_queue)


def _execute_parallel_tasks_loop(interface, future_queue):
def execute_parallel_tasks_loop(interface, future_queue):
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
Expand Down
12 changes: 6 additions & 6 deletions tests/test_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import unittest
from pympipool.shared.broker import (
executor_broker,
_execute_task_dict,
_get_future_done,
execute_task_dict,
get_future_done,
_get_executor_list,
)

Expand All @@ -24,7 +24,7 @@ def mpi_funct(i):

class TestFutureCreation(unittest.TestCase):
def test_get_future_done(self):
f = _get_future_done()
f = get_future_done()
self.assertTrue(isinstance(f, Future))
self.assertTrue(f.done())

Expand All @@ -43,21 +43,21 @@ def test_meta_executor_future(self):
def test_execute_task_dict(self):
meta_future_lst = _get_executor_list(max_workers=1)
f = Future()
self.assertTrue(_execute_task_dict(
self.assertTrue(execute_task_dict(
task_dict={"fn": calc, "args": (1,), "kwargs": {}, "future": f},
meta_future_lst=meta_future_lst
))
self.assertEqual(f.result(), 1)
self.assertTrue(f.done())
self.assertFalse(_execute_task_dict(
self.assertFalse(execute_task_dict(
task_dict={"shutdown": True, "wait": True},
meta_future_lst=meta_future_lst
))

def test_execute_task_dict_error(self):
meta_future_lst = _get_executor_list(max_workers=1)
with self.assertRaises(ValueError):
_execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst)
execute_task_dict(task_dict={}, meta_future_lst=meta_future_lst)
list(meta_future_lst.values())[0].shutdown(wait=True)

def test_executor_broker(self):
Expand Down