This repository has been archived by the owner on Jan 29, 2022. It is now read-only.
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[#812] Add
DockerCLIOperator
that runs containers using Docker CLI
Airflow's DockerOperator has an issue with long-running tasks, where it doesn't detect their exit status, keeping the tasks stuck in a "running" state even after they're finished (see https://issues.apache.org/jira/browse/AIRFLOW-1131). This commit implements the DockerCLIOperator that instead of using the Docker API, as the DockerOperator does, uses the Docker CLI executable. I have tested locally and it seems to work around the issues we've been having. However, we'll only be sure after testing in production. This commit also changes the EUCTR processor task to use the new DockerCLIOperator, so we can try it. If it does work, we'll change the helpers to use DockerCLIOperator instead of Airflow's DockerOperator. opentrials/opentrials#812
- Loading branch information
1 parent
ec44992
commit 8023130
Showing
5 changed files
with
272 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import airflow.models | ||
import airflow.exceptions | ||
import airflow.hooks.base_hook | ||
from airflow.utils.decorators import apply_defaults | ||
|
||
import logging | ||
import os | ||
import shlex | ||
import signal | ||
import subprocess | ||
|
||
|
||
class MyDockerOperator(airflow.models.BaseOperator): | ||
'''Executes a command on a Docker comtainer. | ||
This uses bash to execute Docker commands instead of using the Docker API | ||
to try to work around issue | ||
https://issues.apache.org/jira/browse/AIRFLOW-1131 | ||
:param image: Docker image from which to create the container. | ||
:type image: str | ||
:param command: Command to run (templated) | ||
:type command: str | ||
:param environment: Environment variables to set in the container. | ||
:type environment: dict | ||
:param force_pull: Pull the docker image on every run (default: False). | ||
:type force_pull: bool | ||
''' | ||
template_fields = ('command',) | ||
|
||
@apply_defaults | ||
def __init__( | ||
self, | ||
image, | ||
command, | ||
environment=None, | ||
force_pull=False, | ||
*args, | ||
**kwargs | ||
): | ||
super(MyDockerOperator, self).__init__(*args, **kwargs) | ||
self.image = image | ||
self.command = command | ||
self.environment = environment or {} | ||
self.force_pull = force_pull | ||
self._process = None | ||
|
||
def execute(self, context): | ||
if self.force_pull: | ||
self._pull_image() | ||
|
||
docker_run_command = self._get_docker_run_command() | ||
return self._run_command(docker_run_command, self.environment) | ||
|
||
def on_kill(self): | ||
if self._process: | ||
logging.info('Sending SIGTERM signal to process group') | ||
os.killpg(os.getpgid(self._process.pid), signal.SIGTERM) | ||
|
||
def _pull_image(self): | ||
pull_command = 'docker pull {image}'.format(image=self.image) | ||
return self._run_command(pull_command) | ||
|
||
def _get_docker_run_command(self): | ||
env_params = [ | ||
'--env "{key}=${key}"'.format(key=key) | ||
for key in self.environment.keys() | ||
] | ||
|
||
docker_command = [ | ||
'docker', | ||
'run', | ||
'--rm', | ||
] + env_params + [ | ||
self.image, | ||
self.command, | ||
] | ||
|
||
return ' '.join(docker_command) | ||
|
||
def _run_command(self, command, env=None): | ||
logging.info('Running command "{}"'.format(shlex.split(command))) | ||
self._process = subprocess.Popen( | ||
shlex.split(command), | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
env=env, | ||
preexec_fn=os.setsid | ||
) | ||
process = self._process | ||
|
||
line = '' | ||
for line in iter(process.stdout.readline, b''): | ||
line = line.decode('utf-8').strip() | ||
logging.info(line) | ||
process.wait() | ||
logging.info('Command exited with ' | ||
'return code {0}'.format(process.returncode)) | ||
|
||
if process.returncode != 0: | ||
msg = 'Bash command "{command}" failed with exit code "{exitcode}"'.format( | ||
command=command, | ||
exitcode=process.returncode | ||
) | ||
raise airflow.exceptions.AirflowException(msg) | ||
|
||
return process.returncode |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
try: | ||
import unittest.mock as mock | ||
except ImportError: | ||
import mock | ||
import collections | ||
import io | ||
import shlex | ||
import pytest | ||
import airflow.exceptions | ||
from dags.operators.my_docker_operator import MyDockerOperator | ||
|
||
|
||
class TestMyDockerOperator(object): | ||
def test_its_created_successfully(self): | ||
operator = MyDockerOperator( | ||
task_id='task_id', | ||
image='docker/image:latest', | ||
command='true', | ||
environment={}, | ||
force_pull=True | ||
) | ||
assert operator | ||
assert operator.task_id == 'task_id' | ||
|
||
@mock.patch('subprocess.Popen', autospec=True) | ||
@mock.patch('logging.info', autospec=True) | ||
def test_run_command(self, logging_info_mock, popen_mock): | ||
command = 'command' | ||
command_output = [ | ||
u'first line', | ||
u'second line', | ||
] | ||
env = { | ||
'foo': 'bar', | ||
} | ||
process_mock = mock.Mock() | ||
process_mock.stdout = io.StringIO(u'\n'.join(command_output)) | ||
process_mock.returncode = 0 | ||
popen_mock.return_value = process_mock | ||
|
||
operator = MyDockerOperator( | ||
task_id='task_id', | ||
image='docker/image:latest', | ||
command=command, | ||
) | ||
|
||
exit_code = operator._run_command(command, env) | ||
|
||
popen_mock.assert_called_with( | ||
shlex.split(command), | ||
env=env, | ||
stdout=mock.ANY, | ||
stderr=mock.ANY, | ||
preexec_fn=mock.ANY | ||
) | ||
logging_info_mock.assert_has_calls([ | ||
mock.call(line) for line in command_output | ||
]) | ||
assert exit_code == process_mock.returncode | ||
|
||
@mock.patch('subprocess.Popen', autospec=True) | ||
def test_run_command_raises_airflowexception_if_command_failed(self, popen_mock): | ||
process_mock = mock.Mock() | ||
process_mock.stdout = io.StringIO(u'') | ||
process_mock.returncode = 1 | ||
popen_mock.return_value = process_mock | ||
command = 'inexistent_command' | ||
|
||
operator = MyDockerOperator( | ||
task_id='task_id', | ||
image='docker/image:latest', | ||
command=command, | ||
) | ||
|
||
with pytest.raises(airflow.exceptions.AirflowException): | ||
operator._run_command(command) | ||
|
||
def test_get_docker_run_command_works_without_environment(self): | ||
operator = MyDockerOperator( | ||
task_id='task_id', | ||
image='docker/image:latest', | ||
command='command', | ||
) | ||
|
||
docker_command = operator._get_docker_run_command() | ||
|
||
assert docker_command == 'docker run --rm {image} {command}'.format( | ||
image=operator.image, | ||
command=operator.command | ||
) | ||
|
||
def test_get_docker_run_command_works_with_environment(self): | ||
environment = collections.OrderedDict([ | ||
('foo', 'bar baz'), | ||
('bar', 'foo bar baz'), | ||
]) | ||
operator = MyDockerOperator( | ||
task_id='task_id', | ||
image='docker/image:latest', | ||
command='command', | ||
environment=environment | ||
) | ||
|
||
docker_command = operator._get_docker_run_command() | ||
|
||
assert docker_command == 'docker run --rm --env "foo=$foo" --env "bar=$bar" {image} {command}'.format( | ||
image=operator.image, | ||
command=operator.command | ||
) | ||
|
||
@mock.patch('dags.operators.my_docker_operator.MyDockerOperator._run_command') | ||
def test_pull_image_runs_the_correct_command(self, run_command_mock): | ||
operator = MyDockerOperator( | ||
task_id='task_id', | ||
image='alpine:latest', | ||
command='true' | ||
) | ||
|
||
exit_code = operator._pull_image() | ||
|
||
run_command_mock.assert_called_with('docker pull {}'.format(operator.image)) | ||
assert exit_code == run_command_mock() |