diff --git a/.ci_support/environment-mpich.yml b/.ci_support/environment-mpich.yml index 28b8e12a..b5798ac2 100644 --- a/.ci_support/environment-mpich.yml +++ b/.ci_support/environment-mpich.yml @@ -8,6 +8,6 @@ dependencies: - mpich - numpy =1.23.5 - mpi4py =3.1.4 - - pympipool =0.5.6 + - pympipool =0.6.0 - ase =3.22.1 - scipy =1.10.1 \ No newline at end of file diff --git a/.ci_support/environment-openmpi.yml b/.ci_support/environment-openmpi.yml index 74ee56a8..37834233 100644 --- a/.ci_support/environment-openmpi.yml +++ b/.ci_support/environment-openmpi.yml @@ -8,6 +8,6 @@ dependencies: - openmpi - numpy =1.23.5 - mpi4py =3.1.4 - - pympipool =0.5.6 + - pympipool =0.6.0 - ase =3.22.1 - scipy =1.10.1 \ No newline at end of file diff --git a/pylammpsmpi/mpi/lmpmpi.py b/pylammpsmpi/mpi/lmpmpi.py index 80c6ebd2..138e968a 100644 --- a/pylammpsmpi/mpi/lmpmpi.py +++ b/pylammpsmpi/mpi/lmpmpi.py @@ -8,10 +8,10 @@ import sys from lammps import lammps from pympipool import ( - connect_to_socket_interface, - send_result, - close_connection, - receive_instruction, + interface_connect, + interface_send, + interface_shutdown, + interface_receive, ) __author__ = "Sarath Menon, Jan Janssen" @@ -463,37 +463,40 @@ def _gather_data_from_all_processors(data): def _run_lammps_mpi(argument_lst): + index_selected = argument_lst.index("--zmqport") + port_selected = argument_lst[index_selected + 1] + if "--host" in argument_lst: + index_selected = argument_lst.index("--host") + host = argument_lst[index_selected + 1] + else: + host = "localhost" + argument_red_lst = argument_lst[:index_selected] if MPI.COMM_WORLD.rank == 0: - port_selected = argument_lst[argument_lst.index("--zmqport") + 1] - if "--host" in argument_lst: - host = argument_lst[argument_lst.index("--host") + 1] - else: - host = "localhost" - context, socket = connect_to_socket_interface(host=host, port=port_selected) + context, socket = interface_connect(host=host, port=port_selected) else: context, socket = None, None # Lammps executable args = ["-screen", "none"] - if len(argument_lst) > 3: - args.extend(argument_lst[3:]) + if len(argument_red_lst) > 1: + args.extend(argument_red_lst[1:]) job = lammps(cmdargs=args) while True: if MPI.COMM_WORLD.rank == 0: - input_dict = receive_instruction(socket=socket) + input_dict = interface_receive(socket=socket) else: input_dict = None input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0) if "shutdown" in input_dict.keys() and input_dict["shutdown"]: job.close() if MPI.COMM_WORLD.rank == 0: - send_result(socket=socket, result_dict={"result": True}) - close_connection(socket=socket, context=context) + interface_send(socket=socket, result_dict={"result": True}) + interface_shutdown(socket=socket, context=context) break output = select_cmd(input_dict["command"])( job=job, funct_args=input_dict["args"] ) if MPI.COMM_WORLD.rank == 0 and output is not None: - send_result(socket=socket, result_dict={"result": output}) + interface_send(socket=socket, result_dict={"result": output}) if __name__ == "__main__": diff --git a/pylammpsmpi/wrapper/concurrent.py b/pylammpsmpi/wrapper/concurrent.py index 83b394ad..c2e702e1 100644 --- a/pylammpsmpi/wrapper/concurrent.py +++ b/pylammpsmpi/wrapper/concurrent.py @@ -6,7 +6,7 @@ import socket from concurrent.futures import Future from queue import Queue -from pympipool import RaisingThread, SocketInterface, cancel_items_in_queue +from pympipool import RaisingThread, cancel_items_in_queue, interface_bootup __author__ = "Sarath Menon, Jan Janssen" @@ -21,50 +21,10 @@ __date__ = "Feb 28, 2020" -def _initialize_socket( - interface, - cmdargs, - cwd, - cores, - oversubscribe=False, - enable_flux_backend=False, - enable_slurm_backend=False, -): - port_selected = interface.bind_to_random_port() - executable = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "../mpi", "lmpmpi.py" - ) - if enable_flux_backend: - cmds = ["flux", "run"] - elif enable_slurm_backend: - cmds = ["srun"] - else: - cmds = ["mpiexec"] - if oversubscribe: - cmds += ["--oversubscribe"] - cmds += [ - "-n", - str(cores), - "python", - executable, - "--zmqport", - str(port_selected), - ] - if enable_flux_backend or enable_slurm_backend: - cmds += [ - "--host", - socket.gethostname(), - ] - if cmdargs is not None: - cmds.extend(cmdargs) - interface.bootup(command_lst=cmds, cwd=cwd) - return interface - - def execute_async( future_queue, - cmdargs, - cores, + cmdargs=None, + cores=1, oversubscribe=False, enable_flux_backend=False, enable_slurm_backend=False, @@ -72,16 +32,24 @@ def execute_async( queue_adapter=None, queue_adapter_kwargs=None, ): - interface = _initialize_socket( - interface=SocketInterface( - queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs - ), - cmdargs=cmdargs, + executable = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "../mpi", "lmpmpi.py" + ) + if cmdargs is not None: + command_lst = ["python", executable] + cmdargs + else: + command_lst = ["python", executable] + interface = interface_bootup( + command_lst=command_lst, cwd=cwd, cores=cores, + gpus_per_core=0, + oversubscribe=oversubscribe, enable_flux_backend=enable_flux_backend, enable_slurm_backend=enable_slurm_backend, - oversubscribe=oversubscribe, + queue_adapter=queue_adapter, + queue_type=None, + queue_adapter_kwargs=queue_adapter_kwargs, ) while True: task_dict = future_queue.get() diff --git a/setup.py b/setup.py index b2578458..5069f283 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,7 @@ keywords='lammps, mpi4py', packages=find_packages(exclude=["*tests*"]), install_requires=[ - "mpi4py==3.1.4", "pympipool==0.5.6", "numpy==1.23.5" + "mpi4py==3.1.4", "pympipool==0.6.0", "numpy==1.23.5" ], extras_require={ "ase": ["ase==3.22.1", "scipy==1.10.1"],