Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pympipool/legacy/shared/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import queue
import sys
import time

from pympipool.shared.communication import interface_bootup
Expand Down Expand Up @@ -57,12 +58,12 @@ def get_pool_command(cores_total, ranks_per_task=1):
os.path.join(__file__, "..", "..", "backend", "mpipool.py")
)
if ranks_per_task == 1:
command_lst = ["python", "-m", "mpi4py.futures", executable]
command_lst = [sys.executable, "-m", "mpi4py.futures", executable]
cores = cores_total
else:
# Running MPI parallel tasks within the map() requires mpi4py to use mpi spawn:
# https://github.com/mpi4py/mpi4py/issues/324
command_lst = ["python", executable]
command_lst = [sys.executable, executable]
cores = 1
command_lst += [
"--cores-per-task",
Expand Down
3 changes: 2 additions & 1 deletion pympipool/shared/taskexecutor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import inspect
import os
import queue
import sys

import cloudpickle

Expand Down Expand Up @@ -71,7 +72,7 @@ def execute_parallel_tasks(
queue_adapter_kwargs (dict/None): keyword arguments for the submit_job() function of the queue adapter
"""
command_lst = [
"python",
sys.executable,
os.path.abspath(os.path.join(__file__, "..", "..", "backend", "mpiexec.py")),
]
interface = interface_bootup(
Expand Down
3 changes: 2 additions & 1 deletion tests/test_interface.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys

import numpy as np
import unittest
Expand All @@ -24,7 +25,7 @@ def test_interface(self):
)
)
interface.bootup(command_lst=[
"python",
sys.executable,
os.path.abspath(os.path.join(__file__, "..", "..", "pympipool", "backend", "mpiexec.py")),
"--zmqport",
str(interface.bind_to_random_port()),
Expand Down
13 changes: 7 additions & 6 deletions tests/test_parse.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import unittest
from pympipool.shared.backend import parse_arguments
from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface, SlurmSubprocessInterface
Expand All @@ -14,13 +15,13 @@ def test_command_local(self):
'mpiexec',
'-n', '2',
'--oversubscribe',
'python', '/',
sys.executable, '/',
'--zmqport', result_dict['zmqport']
]
interface = MpiExecInterface(cwd=None, cores=2, gpus_per_core=0, oversubscribe=True)
self.assertEqual(
command_lst,
interface.generate_command(command_lst=['python', '/', '--zmqport', result_dict['zmqport']])
interface.generate_command(command_lst=[sys.executable, '/', '--zmqport', result_dict['zmqport']])
)
self.assertEqual(result_dict, parse_arguments(command_lst))

Expand All @@ -33,14 +34,14 @@ def test_command_flux(self):
'flux', 'run', '-n', '2',
"--cwd=" + os.path.abspath("."),
'--gpus-per-task=1',
'python', '/',
sys.executable, '/',
'--host', result_dict['host'],
'--zmqport', result_dict['zmqport']
]
interface = FluxCmdInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=False)
self.assertEqual(
command_lst,
interface.generate_command(command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']])
interface.generate_command(command_lst=[sys.executable, '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']])
)
self.assertEqual(result_dict, parse_arguments(command_lst))

Expand All @@ -59,13 +60,13 @@ def test_command_slurm(self):
"-D", os.path.abspath("."),
'--gpus-per-task=1',
'--oversubscribe',
'python', '/',
sys.executable, '/',
'--host', result_dict['host'],
'--zmqport', result_dict['zmqport']
]
interface = SlurmSubprocessInterface(cwd=os.path.abspath("."), cores=2, gpus_per_core=1, oversubscribe=True)
self.assertEqual(
command_lst,
interface.generate_command(command_lst=['python', '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']])
interface.generate_command(command_lst=[sys.executable, '/', '--host', result_dict['host'], '--zmqport', result_dict['zmqport']])
)
self.assertEqual(result_dict, parse_arguments(command_lst))
10 changes: 6 additions & 4 deletions tests/test_parse_legacy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import unittest
import os
import sys

from pympipool.legacy.shared.backend import parse_arguments
from pympipool.shared.connections import MpiExecInterface, FluxCmdInterface

Expand All @@ -16,7 +18,7 @@ def test_command_local(self):
'mpiexec',
'-n', result_dict['total_cores'],
'--oversubscribe',
'python', '-m', 'mpi4py.futures', '/',
sys.executable, '-m', 'mpi4py.futures', '/',
'--zmqport', result_dict['zmqport'],
'--cores-per-task', result_dict['cores_per_task'],
'--cores-total', result_dict['total_cores']
Expand All @@ -31,7 +33,7 @@ def test_command_local(self):
command_lst,
interface.generate_command(
command_lst=[
'python', '-m', 'mpi4py.futures', '/',
sys.executable, '-m', 'mpi4py.futures', '/',
'--zmqport', result_dict['zmqport'],
'--cores-per-task', '1', '--cores-total', '2'
]
Expand All @@ -49,7 +51,7 @@ def test_command_flux(self):
command_lst = [
'flux', 'run', '-n', '1',
"--cwd=" + os.path.abspath("."),
'python', '/',
sys.executable, '/',
'--host', result_dict['host'],
'--zmqport', result_dict['zmqport'],
'--cores-per-task', result_dict['cores_per_task'],
Expand All @@ -65,7 +67,7 @@ def test_command_flux(self):
command_lst,
interface.generate_command(
command_lst=[
'python', '/', '--host', result_dict['host'],
sys.executable, '/', '--host', result_dict['host'],
'--zmqport', result_dict['zmqport'],
'--cores-per-task', '2', '--cores-total', '2'
]
Expand Down