Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci_support/environment-mpich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ dependencies:
- mpich
- numpy =1.23.5
- mpi4py =3.1.4
- pympipool =0.5.6
- pympipool =0.6.0
- ase =3.22.1
- scipy =1.10.1
2 changes: 1 addition & 1 deletion .ci_support/environment-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ dependencies:
- openmpi
- numpy =1.23.5
- mpi4py =3.1.4
- pympipool =0.5.6
- pympipool =0.6.0
- ase =3.22.1
- scipy =1.10.1
35 changes: 19 additions & 16 deletions pylammpsmpi/mpi/lmpmpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import sys
from lammps import lammps
from pympipool import (
connect_to_socket_interface,
send_result,
close_connection,
receive_instruction,
interface_connect,
interface_send,
interface_shutdown,
interface_receive,
)

__author__ = "Sarath Menon, Jan Janssen"
Expand Down Expand Up @@ -463,37 +463,40 @@ def _gather_data_from_all_processors(data):


def _run_lammps_mpi(argument_lst):
index_selected = argument_lst.index("--zmqport")
port_selected = argument_lst[index_selected + 1]
if "--host" in argument_lst:
index_selected = argument_lst.index("--host")
host = argument_lst[index_selected + 1]
else:
host = "localhost"
argument_red_lst = argument_lst[:index_selected]
if MPI.COMM_WORLD.rank == 0:
port_selected = argument_lst[argument_lst.index("--zmqport") + 1]
if "--host" in argument_lst:
host = argument_lst[argument_lst.index("--host") + 1]
else:
host = "localhost"
context, socket = connect_to_socket_interface(host=host, port=port_selected)
context, socket = interface_connect(host=host, port=port_selected)
else:
context, socket = None, None
# Lammps executable
args = ["-screen", "none"]
if len(argument_lst) > 3:
args.extend(argument_lst[3:])
if len(argument_red_lst) > 1:
args.extend(argument_red_lst[1:])
job = lammps(cmdargs=args)
while True:
if MPI.COMM_WORLD.rank == 0:
input_dict = receive_instruction(socket=socket)
input_dict = interface_receive(socket=socket)
else:
input_dict = None
input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0)
if "shutdown" in input_dict.keys() and input_dict["shutdown"]:
job.close()
if MPI.COMM_WORLD.rank == 0:
send_result(socket=socket, result_dict={"result": True})
close_connection(socket=socket, context=context)
interface_send(socket=socket, result_dict={"result": True})
interface_shutdown(socket=socket, context=context)
break
output = select_cmd(input_dict["command"])(
job=job, funct_args=input_dict["args"]
)
if MPI.COMM_WORLD.rank == 0 and output is not None:
send_result(socket=socket, result_dict={"result": output})
interface_send(socket=socket, result_dict={"result": output})


if __name__ == "__main__":
Expand Down
66 changes: 17 additions & 49 deletions pylammpsmpi/wrapper/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import socket
from concurrent.futures import Future
from queue import Queue
from pympipool import RaisingThread, SocketInterface, cancel_items_in_queue
from pympipool import RaisingThread, cancel_items_in_queue, interface_bootup


__author__ = "Sarath Menon, Jan Janssen"
Expand All @@ -21,67 +21,35 @@
__date__ = "Feb 28, 2020"


def _initialize_socket(
interface,
cmdargs,
cwd,
cores,
oversubscribe=False,
enable_flux_backend=False,
enable_slurm_backend=False,
):
port_selected = interface.bind_to_random_port()
executable = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "../mpi", "lmpmpi.py"
)
if enable_flux_backend:
cmds = ["flux", "run"]
elif enable_slurm_backend:
cmds = ["srun"]
else:
cmds = ["mpiexec"]
if oversubscribe:
cmds += ["--oversubscribe"]
cmds += [
"-n",
str(cores),
"python",
executable,
"--zmqport",
str(port_selected),
]
if enable_flux_backend or enable_slurm_backend:
cmds += [
"--host",
socket.gethostname(),
]
if cmdargs is not None:
cmds.extend(cmdargs)
interface.bootup(command_lst=cmds, cwd=cwd)
return interface


def execute_async(
future_queue,
cmdargs,
cores,
cmdargs=None,
cores=1,
oversubscribe=False,
enable_flux_backend=False,
enable_slurm_backend=False,
cwd=None,
queue_adapter=None,
queue_adapter_kwargs=None,
):
interface = _initialize_socket(
interface=SocketInterface(
queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs
),
cmdargs=cmdargs,
executable = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "../mpi", "lmpmpi.py"
)
if cmdargs is not None:
command_lst = ["python", executable] + cmdargs
else:
command_lst = ["python", executable]
interface = interface_bootup(
command_lst=command_lst,
cwd=cwd,
cores=cores,
gpus_per_core=0,
oversubscribe=oversubscribe,
enable_flux_backend=enable_flux_backend,
enable_slurm_backend=enable_slurm_backend,
oversubscribe=oversubscribe,
queue_adapter=queue_adapter,
queue_type=None,
queue_adapter_kwargs=queue_adapter_kwargs,
)
while True:
task_dict = future_queue.get()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
keywords='lammps, mpi4py',
packages=find_packages(exclude=["*tests*"]),
install_requires=[
"mpi4py==3.1.4", "pympipool==0.5.6", "numpy==1.23.5"
"mpi4py==3.1.4", "pympipool==0.6.0", "numpy==1.23.5"
],
extras_require={
"ase": ["ase==3.22.1", "scipy==1.10.1"],
Expand Down