From 0fdfe27fa8f7fc84cc263785bfe38fc702554c9a Mon Sep 17 00:00:00 2001 From: Julien BORDELLIER Date: Tue, 7 Apr 2020 20:51:38 +0200 Subject: [PATCH] AWS Batch runner support runtime-configurable Docker images. To support runtime-configurable Docker image in AWS Batch, the creation of a Batch Job also created a Job Definition with the configured image. Fixes #57 --- nextstrain/cli/runner/aws_batch/__init__.py | 9 ++++++ nextstrain/cli/runner/aws_batch/jobs.py | 36 ++++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/nextstrain/cli/runner/aws_batch/__init__.py b/nextstrain/cli/runner/aws_batch/__init__.py index 6f85cbe4..4096c22c 100644 --- a/nextstrain/cli/runner/aws_batch/__init__.py +++ b/nextstrain/cli/runner/aws_batch/__init__.py @@ -14,6 +14,7 @@ from ...types import RunnerTestResults, Tuple from ...util import colored, resolve_path, warn from ... import config +from .. import docker from . import jobs, logs, s3 @@ -80,6 +81,13 @@ def register_arguments(parser) -> None: type = int, default = DEFAULT_MEMORY) + development.add_argument( + "--aws-batch-image", + dest = "aws_batch_image", + help = "Docker image to use for the AWS Job Definition", + metavar = "", + default = docker.DEFAULT_IMAGE) + def run(opts, argv, working_volume = None, extra_env = {}) -> int: local_workdir = resolve_path(working_volume.src) @@ -123,6 +131,7 @@ def run(opts, argv, working_volume = None, extra_env = {}) -> int: try: job = jobs.submit( name = run_id, + image = opts.aws_batch_image, queue = opts.job_queue, definition = opts.job_definition, cpus = opts.cpus, diff --git a/nextstrain/cli/runner/aws_batch/jobs.py b/nextstrain/cli/runner/aws_batch/jobs.py index 9754b89b..b107258e 100644 --- a/nextstrain/cli/runner/aws_batch/jobs.py +++ b/nextstrain/cli/runner/aws_batch/jobs.py @@ -2,9 +2,11 @@ Job handling for AWS Batch. """ +import re from time import time from typing import Callable, Generator, Iterable, Mapping, List, Optional from ... import hostenv, aws +from ...util import warn from . import logs, s3 @@ -133,8 +135,20 @@ def stop(self) -> None: def stopped(self) -> bool: return self.status_reason == self.STOP_REASON +def job_definition_name(definition_name, docker_image): + """ + Format the AWS Batch Job Definition name according to API restriction. + + Returns a string. + """ + docker_image_tag = docker_image.split(':') + name = re.sub('[^0-9a-zA-Z-]+', '-', definition_name) + image = re.sub('[^0-9a-zA-Z-]+', '-', docker_image_tag[0]) + tag = re.sub('[^0-9a-zA-Z-]+', '-', docker_image_tag[1]) + return "%s_%s_%s" % (name, image, tag) def submit(name: str, + image: str, queue: str, definition: str, cpus: Optional[int], @@ -149,10 +163,30 @@ def submit(name: str, """ batch = aws.client_with_default_region("batch") + definition_name = job_definition_name(definition, image) + try: + batch.register_job_definition( + jobDefinitionName = definition_name, + type = "container", + containerProperties = { + "image": image, + "command": [], + }, + retryStrategy = { + "attempts": 1, + }, + timeout = { + "attemptDurationSeconds": 14400, + }, + ) + except Exception as error: + warn(error) + raise Exception("Creation of job definition (%s) failed" % definition_name) + submission = batch.submit_job( jobName = name, jobQueue = queue, - jobDefinition = definition, + jobDefinition = definition_name, containerOverrides = { "environment": [ {