Skip to content

Commit

Permalink
slurm runner refactored (#912)
Browse files Browse the repository at this point in the history
* update MPI implementation for slurm runner
* updates slurm runner tests
* remove docker and podman runners from slurm runner
  • Loading branch information
JayjeetAtGithub committed Sep 3, 2020
1 parent 422ee6d commit cedb257
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 489 deletions.
13 changes: 10 additions & 3 deletions docs/sections/cn_workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ Popper workflows can run on [HPC](https://en.wikipedia.org/wiki/HPC) (Multi-Node
using [Slurm](https://slurm.schedmd.com/overview.html) as the underlying resource manager to distribute the execution of a step to
several nodes. You can get started with running Popper workflows through Slurm by following the example below.

**NOTE:** Set the `POPPER_CACHE_DIR` environment variable to `/path/to/shared/.cache` while running a workflow on multiple nodes.

Let's consider a workflow `sample.yml` like the one shown below.
```yaml
steps:
Expand All @@ -490,14 +492,16 @@ steps:
args: ["ls", "-l"]
```

To run all the steps of the workflow through slurm resource manager,
To run all the steps of the workflow through SLURM resource manager,
use the `--resource-manager` or `-r` option of the `popper run` subcommand to specify the resource manager.

```bash
popper run -f sample.yml -r slurm
```

To have more finer control on which steps to run through slurm resource manager,
This runs the workflow on a single compute node in the cluster which is also the default scenario when no specific configuration is provided.

To have more finer control on which steps to run through SLURM resource manager,
the specifications can be provided through the config file as shown below.

We create a config file called `config.yml` with the following contents.
Expand All @@ -522,7 +526,10 @@ Now, we execute `popper run` with this config file as follows:
popper run -f sample.yml -c config.yml
```

This runs the step `one` locally in the host and step `two` through slurm on 2 nodes.
This runs the step `one` locally in the host and step `two` through SLURM on any 2 compute nodes.
If `singularity` is used as the container engine, then by default the steps would run using MPI
as SLURM jobs. This behaviour can be overriden by passing `mpi: false` in the configuration of the
step for which MPI is not required.

#### Host

Expand Down
271 changes: 98 additions & 173 deletions src/popper/runner_slurm.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import os
import time
import signal
import socket
import threading

from popper import utils as pu
from popper.cli import log as log
from popper.runner_host import HostRunner
from popper.runner_host import DockerRunner as HostDockerRunner
from popper.runner_host import SingularityRunner as HostSingularityRunner
from popper.runner_host import PodmanRunner as HostPodmanRunner


class SlurmRunner(HostRunner):
Expand Down Expand Up @@ -44,46 +43,93 @@ def _stop_out_stream(self):
log.warning("Tail process was stopped by some other process.")
self._out_stream_thread.join()

def _submit_batch_job(self, cmd, step):
def _set_config_vars(self, step):
self._nodes = self._config.resman_opts.get(step.id, {}).get("nodes", 1)
self._nodelist = self._config.resman_opts.get(step.id, {}).get("nodelist", None)
self._ntasks = self._config.resman_opts.get(step.id, {}).get(
"ntasks", self._nodes
)
self._ntasks_per_node = self._config.resman_opts.get(step.id, {}).get(
"ntasks-per-node", 1
)

def _get_resman_kwargs(self, step):
default_options = ["nodes", "nodelist", "ntasks", "ntasks-per-node"]
resman_options = []
for k, v in self._config.resman_opts.get(step.id, {}).items():
if k not in default_options:
flag = pu.key_value_to_flag(k, v)
if flag:
resman_options.extend(flag.split())

return resman_options

def _exec_srun(self, cmd, step, **kwargs):
self._set_config_vars(step)
_cmd = [
"srun",
"--nodes",
f"{self._nodes}",
"--ntasks",
f"{self._ntasks}",
"--ntasks-per-node",
f"{self._ntasks_per_node}",
]

if self._nodelist:
_cmd.extend(["--nodelist", self._nodelist])

_cmd.extend(self._get_resman_kwargs(step))
_cmd.extend(cmd)

log.debug(f"Command: {_cmd}")

if self._config.dry_run:
return 0

_, ecode, _ = HostRunner._exec_cmd(_cmd, **kwargs)
return ecode

def _exec_mpi(self, cmd, step, **kwargs):
self._set_config_vars(step)
job_name = pu.sanitized_name(step.id, self._config.wid)
temp_dir = f"{self._config.cache_dir}/slurm/{self._config.wid}"
os.makedirs(temp_dir, exist_ok=True)
mpi_cmd = ["mpirun", f"{' '.join(cmd)}"]

job_script = os.path.join(temp_dir, f"{job_name}.sh")
out_file = os.path.join(temp_dir, f"{job_name}.out")
job_script = os.path.join(f"{job_name}.sh")
out_file = os.path.join(f"{job_name}.out")

# create/truncate log
with open(out_file, "w"):
pass

with open(job_script, "w") as f:
f.write("#!/bin/bash\n")
f.write("\n".join(cmd))

sbatch_cmd = f"sbatch --wait --job-name {job_name} --output {out_file}"
sbatch_cmd = sbatch_cmd.split()

for k, v in self._config.resman_opts.get(step.id, {}).items():
sbatch_cmd.append(pu.key_value_to_flag(k, v))

sbatch_cmd.append(job_script)

log.info(f'[{step.id}] {" ".join(sbatch_cmd)}')
f.write(f"#SBATCH --job-name={job_name}\n")
f.write(f"#SBATCH --output={out_file}\n")
f.write(f"#SBATCH --nodes={self._nodes}\n")
f.write(f"#SBATCH --ntasks={self._ntasks}\n")
f.write(f"#SBATCH --ntasks-per-node={self._ntasks_per_node}\n")
if self._nodelist:
f.write(f"#SBATCH --nodelist={self._nodelist}\n")
f.write(" ".join(mpi_cmd))

sbatch_cmd = [
"sbatch",
"--wait",
]
sbatch_cmd.extend(self._get_resman_kwargs(step))
sbatch_cmd.extend([job_script])

log.debug(f"Command: {sbatch_cmd}")

if self._config.dry_run:
return 0

self._spawned_jobs.add(job_name)

# start a tail (background) process on the output file
self._start_out_stream(out_file)

# submit the job and wait
_, ecode, output = HostRunner._exec_cmd(sbatch_cmd, logging=False)
_, ecode, _ = HostRunner._exec_cmd(sbatch_cmd, **kwargs)

# kill the tail process
self._stop_out_stream()

self._spawned_jobs.remove(job_name)

return ecode
Expand All @@ -96,135 +142,6 @@ def stop_running_tasks(self):
log.warning(f"Failed to cancel the job {job_name}.")


class DockerRunner(SlurmRunner, HostDockerRunner):
def __init__(self, **kw):
super(DockerRunner, self).__init__(init_docker_client=False, **kw)

def __exit__(self, exc_type, exc, traceback):
pass

def run(self, step):
"""Execute the given step via slurm in the docker engine."""
cid = pu.sanitized_name(step.id, self._config.wid)
cmd = []

build, _, img, tag, build_ctx_path = self._get_build_info(step)

cmd.append(f"docker rm -f {cid} || true")

if build:
cmd.append(f"docker build -t {img}:{tag} {build_ctx_path}")
elif not self._config.skip_pull and not step.skip_pull:
cmd.append(f"docker pull {img}:{tag}")

cmd.append(self._create_cmd(step, f"{img}:{tag}", cid))
cmd.append(f"docker start --attach {cid}")

self._spawned_containers.add(cid)
ecode = self._submit_batch_job(cmd, step)
self._spawned_containers.remove(cid)
return ecode

def _create_cmd(self, step, img, cid):
container_args = self._get_container_kwargs(step, img, cid)

if "volumes" not in container_args:
container_args["volumes"] = []
container_args["volumes"].insert(1, "/var/run/docker.sock:/var/run/docker.sock")

container_args.pop("detach")
cmd = ["docker create"]
cmd.append(f"--name {container_args.pop('name')}")
cmd.append(f"--workdir {container_args.pop('working_dir')}")

entrypoint = container_args.pop("entrypoint", None)
if entrypoint:
cmd.append(f"--entrypoint {' '.join(entrypoint)}")

# append volume and environment flags
for vol in container_args.pop("volumes"):
cmd.append(f"-v {vol}")
for env_key, env_val in container_args.pop("environment").items():
cmd.append(f"-e {env_key}={env_val}")

command = container_args.pop("command")
image = container_args.pop("image")

# anything else is treated as a flag
for k, v in container_args.items():
cmd.append(pu.key_value_to_flag(k, v))

# append the image and the commands
cmd.append(image)

if command:
cmd.append(" ".join(command))

return " ".join(cmd)


class PodmanRunner(SlurmRunner, HostPodmanRunner):
def __init__(self, **kw):
super(PodmanRunner, self).__init__(init_podman_client=False, **kw)

def __exit__(self, exc_type, exc, traceback):
pass

def run(self, step):
"""Execute the given step via slurm in the docker engine."""
cid = pu.sanitized_name(step.id, self._config.wid)
cmd = []

build, _, img, tag, build_ctx_path = self._get_build_info(step)

cmd.append(f"podman rm -f {cid} || true")

if build:
cmd.append(f"podman build -t {img}:{tag} {build_ctx_path}")
elif not self._config.skip_pull and not step.skip_pull:
cmd.append(f"podman pull {img}:{tag}")

cmd.append(self._create_cmd(step, f"{img}:{tag}", cid))
cmd.append(f"podman start --attach {cid}")

self._spawned_containers.add(cid)
ecode = self._submit_batch_job(cmd, step)
self._spawned_containers.remove(cid)
return ecode

def _create_cmd(self, step, img, cid):
container_args = self._get_container_kwargs(step, img, cid)
container_args.pop("detach")
cmd = ["podman create"]
cmd.append(f"--name {container_args.pop('name')}")
cmd.append(f"--workdir {container_args.pop('working_dir')}")

entrypoint = container_args.pop("entrypoint", None)
if entrypoint:
cmd.append(f"--entrypoint {' '.join(entrypoint)}")

# append volume and environment flags
for vol in container_args.pop("volumes"):
cmd.append(f"-v {vol}")
for env_key, env_val in container_args.pop("environment").items():
cmd.append(f"-e {env_key}={env_val}")

command = container_args.pop("command")
image = container_args.pop("image")

# anything else is treated as a flag
for k, v in container_args.items():
cmd.append(pu.key_value_to_flag(k, v))

# append the image and the commands
cmd.append(image)

if command:
cmd.append(" ".join(command))

return " ".join(cmd)


class SingularityRunner(SlurmRunner, HostSingularityRunner):
def __init__(self, **kw):
super(SingularityRunner, self).__init__(init_spython_client=False, **kw)
Expand All @@ -246,22 +163,30 @@ def run(self, step):
img = step.uses
build_ctx_path = None

HostRunner._exec_cmd(["rm", "-rf", self._container])

if not self._config.dry_run:
if build:
recipefile = self._get_recipe_file(build_ctx_path, cid)
HostRunner._exec_cmd(
["singularity", "build", "--fakeroot", self._container, recipefile],
cwd=build_ctx_path,
)
else:
HostRunner._exec_cmd(["singularity", "pull", self._container, img])
self._exec_srun(["rm", "-rf", self._container], step)

cmd = [self._create_cmd(step, cid)]
if build:
recipefile = self._get_recipe_file(build_ctx_path, cid)
log.info(f"[{step.id}] srun singularity build {self._container}")
self._exec_srun(
["singularity", "build", "--fakeroot", self._container, recipefile,],
step,
cwd=os.path.dirname(recipefile),
)
else:
log.info(f"[{step.id}] srun singularity pull {self._container}")
self._exec_srun(["singularity", "pull", self._container, img], step)

cmd = self._create_cmd(step, cid)
self._spawned_containers.add(cid)
ecode = self._submit_batch_job(cmd, step)

if self._config.resman_opts.get(step.id, {}).get("mpi", True):
log.info(f'[{step.id}] sbatch {" ".join(cmd)}')
ecode = self._exec_mpi(cmd, step)
else:
log.info(f'[{step.id}] srun {" ".join(cmd)}')
ecode = self._exec_srun(cmd, step, logging=True)

self._spawned_containers.remove(cid)
return ecode

Expand All @@ -272,15 +197,15 @@ def _create_cmd(self, step, cid):

if step.runs:
commands = step.runs
cmd = ["singularity exec"]
cmd = ["singularity", "exec"]
else:
commands = step.args
cmd = ["singularity run"]
cmd = ["singularity", "run"]

options = self._get_container_options()

cmd.append(" ".join(options))
cmd.append(self._container)
cmd.append(" ".join(commands))
cmd.extend(options)
cmd.extend([self._container])
cmd.extend(commands)

return " ".join(cmd)
return cmd

0 comments on commit cedb257

Please sign in to comment.