Skip to content

Commit

Permalink
cli: move repeated functions into StepRunner (#902)
Browse files Browse the repository at this point in the history
Moves repeated logic found in host runners (`runner_host.py`) into parent StepRunner class (`runner.py`).
  • Loading branch information
edeediong committed Jul 30, 2020
1 parent 906bc30 commit cea0b62
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 296 deletions.
2 changes: 1 addition & 1 deletion src/popper/commands/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"-e",
"--engine",
help="Specify container engine used to execute workflow steps.",
type=click.Choice(["docker", "singularity", "vagrant"]),
type=click.Choice(["docker", "singularity"]),
)
@click.option(
"-r",
Expand Down
91 changes: 89 additions & 2 deletions src/popper/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, config):
self._is_resman_module_loaded = False

def _load_resman_module(self):
"""dynamically load resource manager module"""
"""dynamically load resource manager module."""
resman_mod_name = f"popper.runner_{self._config.resman_name}"
resman_spec = importlib.util.find_spec(resman_mod_name)
if not resman_spec:
Expand Down Expand Up @@ -166,7 +166,7 @@ def run(self, wf):
log.info("Workflow finished successfully.")

def _step_runner(self, engine_name, step):
"""Factory of singleton runners"""
"""Factory of singleton runners."""
if not self._is_resman_module_loaded:
self._load_resman_module()

Expand Down Expand Up @@ -230,6 +230,93 @@ def _prepare_environment(self, step, env={}):
)
return step_env

def _get_build_info(self, step):

"""Parses the `uses` attribute and returns build information needed.
Args:
step(dict): dict with step data
Returns:
(str, str, str, str): bool (build), image, tag, Dockerfile
"""
build = True
img = None
build_ctx_path = None
img_full = None
tag = None

if "docker://" in step.uses:
img_full = step.uses
img = step.uses.replace("docker://", "")
if ":" in img:
(img, tag) = img.split(":")
else:
tag = "latest"
build = False

elif "./" in step.uses:
img_full = f'{pu.sanitized_name(step.id, "step")}'
img = img_full.lower()
tag = self._config.git_sha_short if self._config.git_sha_short else "na"
build_ctx_path = os.path.join(self._config.workspace_dir, step.uses)

else:
_, service, user, repo, step_dir, version = scm.parse(step.uses)
wf_cache_dir = os.path.join(self._config.cache_dir, self._config.wid)
repo_dir = os.path.join(wf_cache_dir, service, user, repo)
img_full = f"{user}/{repo}".lower()
img = img_full
tag = version
build_ctx_path = os.path.join(repo_dir, step_dir)

return (build, img_full, img, tag, build_ctx_path)

def _update_with_engine_config(self, container_args):

"""Given container arguments, it extends it so it includes options
obtained from the popper.config.Config.engine_opts property.
"""
update_with = self._config.engine_opts
if not update_with:
return

if container_args.get("volumes"):
container_args["volumes"] = [
*container_args["volumes"],
*update_with.get("volumes", list()),
]
if container_args.get("bind"):
container_args["bind"] = [
*container_args["bind"],
*update_with.get("bind", list()),
]

for k, v in update_with.get("environment", dict()).items():
container_args["environment"].update({k: v})

for k, v in update_with.items():
if k not in container_args.keys():
container_args[k] = update_with[k]

def _get_container_kwargs(self, step, img, name):
args = {
"image": img,
"command": list(step.args),
"name": name,
"volumes": [f"{self._config.workspace_dir}:/workspace:Z",],
"working_dir": step.dir if step.dir else "/workspace",
"environment": self._prepare_environment(step),
"entrypoint": step.runs if step.runs else None,
"detach": not self._config.pty,
"tty": self._config.pty,
"stdin_open": self._config.pty,
}

self._update_with_engine_config(args)
args.update(step.options)
log.debug(f"container args: {pu.prettystr(args)}\n")

return args

def stop_running_tasks(self):
raise NotImplementedError("Needs implementation in derived classes.")

Expand Down
128 changes: 11 additions & 117 deletions src/popper/runner_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,42 +172,8 @@ def stop_running_tasks(self):
log.info(f"Stopping container {c.name}")
c.stop()

def _get_build_info(self, step):
"""Parses the `uses` attribute and returns build information needed.
Args:
step(dict): dict with step data
Returns:
(str, str, str, str): bool (build), image, tag, Dockerfile
"""
build = True
img = None
build_ctx_path = None

if "docker://" in step.uses:
img = step.uses.replace("docker://", "")
if ":" in img:
(img, tag) = img.split(":")
else:
tag = "latest"
build = False
elif "./" in step.uses:
img = pu.sanitized_name(step.id, "step").lower()
tag = self._config.git_sha_short if self._config.git_sha_short else "na"
build_ctx_path = os.path.join(self._config.workspace_dir, step.uses)
else:
_, service, user, repo, step_dir, version = scm.parse(step.uses)
wf_cache_dir = os.path.join(self._config.cache_dir, self._config.wid)
repo_dir = os.path.join(wf_cache_dir, service, user, repo)
img = f"{user}/{repo}".lower()
tag = version
build_ctx_path = os.path.join(repo_dir, step_dir)

return (build, img, tag, build_ctx_path)

def _create_container(self, cid, step):
build, img, tag, build_ctx_path = self._get_build_info(step)
build, _, img, tag, build_ctx_path = self._get_build_info(step)

if build:
log.info(f"[{step.id}] docker build {img}:{tag} {build_ctx_path}")
Expand All @@ -233,6 +199,10 @@ def _create_container(self, cid, step):

container_args = self._get_container_kwargs(step, f"{img}:{tag}", cid)

if "volumes" not in container_args:
container_args["volumes"] = []
container_args["volumes"].insert(1, "/var/run/docker.sock:/var/run/docker.sock")

log.debug(f"Container args: {container_args}")

msg = f"[{step.id}] docker create name={cid}"
Expand All @@ -247,29 +217,6 @@ def _create_container(self, cid, step):

return container

def _get_container_kwargs(self, step, img, name):
args = {
"image": img,
"command": list(step.args),
"name": name,
"volumes": [
f"{self._config.workspace_dir}:/workspace",
"/var/run/docker.sock:/var/run/docker.sock",
],
"working_dir": step.dir if step.dir else "/workspace",
"environment": self._prepare_environment(step),
"entrypoint": step.runs if step.runs else None,
"detach": not self._config.pty,
"tty": self._config.pty,
"stdin_open": self._config.pty,
}

self._update_with_engine_config(args)
args.update(step.options)
log.debug(f"container args: {pu.prettystr(args)}\n")

return args

def _find_container(self, cid):
"""Check whether the container exists."""
containers = self._d.containers.list(all=True, filters={"name": cid})
Expand All @@ -281,25 +228,6 @@ def _find_container(self, cid):

return None

def _update_with_engine_config(self, container_args):
"""Given container arguments, it extends it so it includes options
obtained from the popper.config.Config.engine_opts property.
"""
update_with = self._config.engine_opts
if not update_with:
return

container_args["volumes"] = [
*container_args["volumes"],
*update_with.get("volumes", list()),
]
for k, v in update_with.get("environment", dict()).items():
container_args["environment"].update({k: v})

for k, v in update_with.items():
if k not in container_args.keys():
container_args[k] = update_with[k]


class SingularityRunner(StepRunner):
"""Runs steps in singularity on the local machine."""
Expand Down Expand Up @@ -390,51 +318,12 @@ def _build_from_recipe(self, build_ctx_path, build_dest, cid):
os.chdir(pwd)
SingularityRunner.lock.release()

def _get_build_info(self, step):
build = True
img = None
build_ctx_path = None

if (
"docker://" in step.uses
or "shub://" in step.uses
or "library://" in step.uses
):
img = step.uses
build = False

elif "./" in step.uses:
img = f'{pu.sanitized_name(step.id, "step")}'
build_ctx_path = os.path.join(self._config.workspace_dir, step.uses)
else:
_, service, user, repo, step_dir, version = scm.parse(step.uses)
wf_cache_dir = os.path.join(self._config.cache_dir, self._config.wid)
repo_dir = os.path.join(wf_cache_dir, service, user, repo)
img = f"{user}/{repo}".lower()
build_ctx_path = os.path.join(repo_dir, step_dir)

return (build, img, build_ctx_path)

def _setup_singularity_cache(self):
self._singularity_cache = os.path.join(
self._config.cache_dir, "singularity", self._config.wid
)
os.makedirs(self._singularity_cache, exist_ok=True)

def _update_with_engine_config(self, container_args):
update_with = self._config.engine_opts
if not update_with:
return

container_args["bind"] = [
*container_args["bind"],
*update_with.get("bind", list()),
]

for k, v in update_with.items():
if k not in container_args.keys():
container_args[k] = update_with[k]

def _get_container_options(self):
container_args = {
"userns": True,
Expand All @@ -458,7 +347,12 @@ def _get_container_options(self):
return options

def _create_container(self, step, cid):
build, image, build_ctx_path = self._get_build_info(step)
build, image, _, _, build_ctx_path = self._get_build_info(step)

if "shub://" in step.uses or "library://" in step.uses:
build = False
image = step.uses
build_ctx_path = None

if build:
log.info(f"[{step.id}] singularity build {cid} {build_ctx_path}")
Expand Down
14 changes: 12 additions & 2 deletions src/popper/runner_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def run(self, step):
cid = pu.sanitized_name(step.id, self._config.wid)
cmd = []

build, img, tag, build_ctx_path = self._get_build_info(step)
build, _, img, tag, build_ctx_path = self._get_build_info(step)

cmd.append(f"docker rm -f {cid} || true")

Expand All @@ -126,6 +126,11 @@ def run(self, step):

def _create_cmd(self, step, img, cid):
container_args = self._get_container_kwargs(step, img, cid)

if "volumes" not in container_args:
container_args["volumes"] = []
container_args["volumes"].insert(1, "/var/run/docker.sock:/var/run/docker.sock")

container_args.pop("detach")
cmd = ["docker create"]
cmd.append(f"--name {container_args.pop('name')}")
Expand Down Expand Up @@ -171,7 +176,12 @@ def run(self, step):
cid = pu.sanitized_name(step.id, self._config.wid) + ".sif"
self._container = os.path.join(self._singularity_cache, cid)

build, img, build_ctx_path = self._get_build_info(step)
build, img, _, _, build_ctx_path = self._get_build_info(step)

if "shub://" in step.uses or "library://" in step.uses:
build = False
img = step.uses
build_ctx_path = None

HostRunner._exec_cmd(["rm", "-rf", self._container])

Expand Down

0 comments on commit cea0b62

Please sign in to comment.