From 60a51483fcecf462b069b8dee3d86e01882c967d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 08:30:23 -0600 Subject: [PATCH 1/6] Use new pympipool interface --- .ci_support/environment-mpich.yml | 2 +- .ci_support/environment-openmpi.yml | 2 +- pylammpsmpi/mpi/lmpmpi.py | 35 ++++++++++++++++------------- pylammpsmpi/wrapper/concurrent.py | 27 ++++++++++++++-------- setup.py | 2 +- 5 files changed, 40 insertions(+), 28 deletions(-) diff --git a/.ci_support/environment-mpich.yml b/.ci_support/environment-mpich.yml index 20aa1dd5..2a792171 100644 --- a/.ci_support/environment-mpich.yml +++ b/.ci_support/environment-mpich.yml @@ -8,4 +8,4 @@ dependencies: - mpich - numpy =1.23.5 - mpi4py =3.1.4 - - pympipool =0.5.5 \ No newline at end of file + - pympipool =0.6.0 \ No newline at end of file diff --git a/.ci_support/environment-openmpi.yml b/.ci_support/environment-openmpi.yml index a55ca12d..e0b70b37 100644 --- a/.ci_support/environment-openmpi.yml +++ b/.ci_support/environment-openmpi.yml @@ -8,4 +8,4 @@ dependencies: - openmpi - numpy =1.23.5 - mpi4py =3.1.4 - - pympipool =0.5.5 \ No newline at end of file + - pympipool =0.6.0 \ No newline at end of file diff --git a/pylammpsmpi/mpi/lmpmpi.py b/pylammpsmpi/mpi/lmpmpi.py index 80c6ebd2..febe431a 100644 --- a/pylammpsmpi/mpi/lmpmpi.py +++ b/pylammpsmpi/mpi/lmpmpi.py @@ -8,10 +8,10 @@ import sys from lammps import lammps from pympipool import ( - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, + interface_connect, + interface_send, + interface_shutdown, + interface_receive, ) __author__ = "Sarath Menon, Jan Janssen" @@ -463,37 +463,40 @@ def _gather_data_from_all_processors(data): def _run_lammps_mpi(argument_lst): + index_selected = argument_lst.index("--zmqport") + port_selected = argument_lst[index_selected + 1] + if "--host" in argument_lst: + index_selected = argument_lst.index("--host") + host = argument_lst[index_selected + 1] + else: + host = "localhost" + argument_red_lst = argument_lst[:index_selected - 1] if MPI.COMM_WORLD.rank == 0: - port_selected = argument_lst[argument_lst.index("--zmqport") + 1] - if "--host" in argument_lst: - host = argument_lst[argument_lst.index("--host") + 1] - else: - host = "localhost" - context, socket = connect_to_socket_interface(host=host, port=port_selected) + context, socket = interface_connect(host=host, port=port_selected) else: context, socket = None, None # Lammps executable args = ["-screen", "none"] - if len(argument_lst) > 3: - args.extend(argument_lst[3:]) + if len(argument_red_lst) > 1: + args.extend(argument_red_lst[1:]) job = lammps(cmdargs=args) while True: if MPI.COMM_WORLD.rank == 0: - input_dict = receive_instruction(socket=socket) + input_dict = interface_receive(socket=socket) else: input_dict = None input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0) if "shutdown" in input_dict.keys() and input_dict["shutdown"]: job.close() if MPI.COMM_WORLD.rank == 0: - send_result(socket=socket, result_dict={"result": True}) - close_connection(socket=socket, context=context) + interface_send(socket=socket, result_dict={"result": True}) + interface_shutdown(socket=socket, context=context) break output = select_cmd(input_dict["command"])( job=job, funct_args=input_dict["args"] ) if MPI.COMM_WORLD.rank == 0 and output is not None: - send_result(socket=socket, result_dict={"result": output}) + interface_send(socket=socket, result_dict={"result": output}) if __name__ == "__main__": diff --git a/pylammpsmpi/wrapper/concurrent.py b/pylammpsmpi/wrapper/concurrent.py index fbaaec61..a68ce460 100644 --- a/pylammpsmpi/wrapper/concurrent.py +++ b/pylammpsmpi/wrapper/concurrent.py @@ -6,7 +6,7 @@ import socket from concurrent.futures import Future from queue import Queue -from pympipool import RaisingThread, SocketInterface, cancel_items_in_queue +from pympipool import RaisingThread, cancel_items_in_queue, interface_bootup __author__ = "Sarath Menon, Jan Janssen" @@ -55,23 +55,32 @@ def _initialize_socket( def execute_async( future_queue, - cmdargs, - cores, + cmdargs=None, + cores=1, oversubscribe=False, enable_flux_backend=False, cwd=None, queue_adapter=None, queue_adapter_kwargs=None, ): - interface = _initialize_socket( - interface=SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ), - cmdargs=cmdargs, + executable = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "../mpi", "lmpmpi.py" + ) + if cmdargs is not None: + command_lst = ["python", executable, cmdargs] + else: + command_lst = ["python", executable] + interface = interface_bootup( + command_lst=command_lst, cwd=cwd, cores=cores, - enable_flux_backend=enable_flux_backend, + gpus_per_core=0, oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_slurm_backend=False, + queue_adapter=queue_adapter, + queue_type=None, + queue_adapter_kwargs=queue_adapter_kwargs, ) while True: task_dict = future_queue.get() diff --git a/setup.py b/setup.py index 88b92a50..800cdcdb 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ keywords='lammps, mpi4py', packages=find_packages(exclude=["*tests*"]), install_requires=[ - "mpi4py==3.1.4", "pympipool==0.5.5", "numpy==1.23.5" + "mpi4py==3.1.4", "pympipool==0.6.0", "numpy==1.23.5" ], cmdclass=versioneer.get_cmdclass(), ) From 9a62a27452a10687b5537634e08c1eebeb0259f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 12:24:04 -0600 Subject: [PATCH 2/6] black formatting --- pylammpsmpi/mpi/lmpmpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylammpsmpi/mpi/lmpmpi.py b/pylammpsmpi/mpi/lmpmpi.py index febe431a..0b27d0cf 100644 --- a/pylammpsmpi/mpi/lmpmpi.py +++ b/pylammpsmpi/mpi/lmpmpi.py @@ -470,7 +470,7 @@ def _run_lammps_mpi(argument_lst): host = argument_lst[index_selected + 1] else: host = "localhost" - argument_red_lst = argument_lst[:index_selected - 1] + argument_red_lst = argument_lst[: index_selected - 1] if MPI.COMM_WORLD.rank == 0: context, socket = interface_connect(host=host, port=port_selected) else: From 28ef850dedb70e989d4b8e8cd2e5f95be0507ffd Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 30 Jul 2023 12:28:28 -0600 Subject: [PATCH 3/6] Update concurrent.py --- pylammpsmpi/wrapper/concurrent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylammpsmpi/wrapper/concurrent.py b/pylammpsmpi/wrapper/concurrent.py index 03aa795c..ae141996 100644 --- a/pylammpsmpi/wrapper/concurrent.py +++ b/pylammpsmpi/wrapper/concurrent.py @@ -76,7 +76,7 @@ def execute_async( os.path.dirname(os.path.abspath(__file__)), "../mpi", "lmpmpi.py" ) if cmdargs is not None: - command_lst = ["python", executable, cmdargs] + command_lst = ["python", executable] + cmdargs else: command_lst = ["python", executable] interface = interface_bootup( From db4805e2e2cc846639bec7c1f89f4dd0e9b70d51 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 30 Jul 2023 12:29:48 -0600 Subject: [PATCH 4/6] Update concurrent.py --- pylammpsmpi/wrapper/concurrent.py | 40 ------------------------------- 1 file changed, 40 deletions(-) diff --git a/pylammpsmpi/wrapper/concurrent.py b/pylammpsmpi/wrapper/concurrent.py index ae141996..c2e702e1 100644 --- a/pylammpsmpi/wrapper/concurrent.py +++ b/pylammpsmpi/wrapper/concurrent.py @@ -21,46 +21,6 @@ __date__ = "Feb 28, 2020" -def _initialize_socket( - interface, - cmdargs, - cwd, - cores, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, -): - port_selected = interface.bind_to_random_port() - executable = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "../mpi", "lmpmpi.py" - ) - if enable_flux_backend: - cmds = ["flux", "run"] - elif enable_slurm_backend: - cmds = ["srun"] - else: - cmds = ["mpiexec"] - if oversubscribe: - cmds += ["--oversubscribe"] - cmds += [ - "-n", - str(cores), - "python", - executable, - "--zmqport", - str(port_selected), - ] - if enable_flux_backend or enable_slurm_backend: - cmds += [ - "--host", - socket.gethostname(), - ] - if cmdargs is not None: - cmds.extend(cmdargs) - interface.bootup(command_lst=cmds, cwd=cwd) - return interface - - def execute_async( future_queue, cmdargs=None, From fe8d650976527d35d8c50fe8046c623512f188e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 13:03:57 -0600 Subject: [PATCH 5/6] bug fix --- pylammpsmpi/mpi/lmpmpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylammpsmpi/mpi/lmpmpi.py b/pylammpsmpi/mpi/lmpmpi.py index 0b27d0cf..fd17acfa 100644 --- a/pylammpsmpi/mpi/lmpmpi.py +++ b/pylammpsmpi/mpi/lmpmpi.py @@ -470,7 +470,7 @@ def _run_lammps_mpi(argument_lst): host = argument_lst[index_selected + 1] else: host = "localhost" - argument_red_lst = argument_lst[: index_selected - 1] + argument_red_lst = argument_lst[: index_selected] if MPI.COMM_WORLD.rank == 0: context, socket = interface_connect(host=host, port=port_selected) else: From 2f1487df0ad08c2f6bfd48e22209d0075dd427c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sun, 30 Jul 2023 13:06:26 -0600 Subject: [PATCH 6/6] black formatting --- pylammpsmpi/mpi/lmpmpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pylammpsmpi/mpi/lmpmpi.py b/pylammpsmpi/mpi/lmpmpi.py index fd17acfa..138e968a 100644 --- a/pylammpsmpi/mpi/lmpmpi.py +++ b/pylammpsmpi/mpi/lmpmpi.py @@ -470,7 +470,7 @@ def _run_lammps_mpi(argument_lst): host = argument_lst[index_selected + 1] else: host = "localhost" - argument_red_lst = argument_lst[: index_selected] + argument_red_lst = argument_lst[:index_selected] if MPI.COMM_WORLD.rank == 0: context, socket = interface_connect(host=host, port=port_selected) else: