Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Commit

Permalink
[#812] Add DockerCLIOperator that runs containers using Docker CLI
Browse files Browse the repository at this point in the history
Airflow's DockerOperator has an issue with long-running tasks, where it doesn't
detect their exit status, keeping the tasks stuck in a "running" state even
after they're finished (see
https://issues.apache.org/jira/browse/AIRFLOW-1131). This commit implements the
DockerCLIOperator that instead of using the Docker API, as the DockerOperator
does, uses the Docker CLI executable.

I have tested locally and it seems to work around the issues we've been having.
However, we'll only be sure after testing in production. This commit also
changes the EUCTR processor task to use the new DockerCLIOperator, so we can
try it. If it does work, we'll change the helpers to use DockerCLIOperator
instead of Airflow's DockerOperator.

opentrials/opentrials#812
  • Loading branch information
vitorbaptista committed May 17, 2017
1 parent ec44992 commit 2ecfee3
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 2 deletions.
16 changes: 15 additions & 1 deletion Dockerfile
Expand Up @@ -12,7 +12,21 @@ RUN apt-get update -yqq && \
sudo \
python-pip \
postgresql-client \
git
git \
# Dependencies needed to install docker-ce
apt-transport-https \
ca-certificates \
curl \
gnupg2 \
software-properties-common
RUN curl -fsSL https://download.docker.com/linux/debian/gpg | \
apt-key add -
RUN add-apt-repository -y \
"deb [arch=amd64] https://download.docker.com/linux/debian \
$(lsb_release -cs) \
stable"
RUN apt-get update -yqq && \
apt-get install -yqq docker-ce

ADD requirements.txt /
RUN pip uninstall airflow -y && \
Expand Down
2 changes: 1 addition & 1 deletion dags/euctr.py
Expand Up @@ -28,7 +28,7 @@
command='make start euctr 2001-01-01'
)

processor_task = helpers.create_processor_task(
processor_task = helpers.create_processor_task_using_bash(
name='euctr',
dag=dag
)
Expand Down
107 changes: 107 additions & 0 deletions dags/operators/docker_cli_operator.py
@@ -0,0 +1,107 @@
import airflow.models
import airflow.exceptions
import airflow.hooks.base_hook
from airflow.utils.decorators import apply_defaults

import logging
import os
import shlex
import signal
import subprocess


class DockerCLIOperator(airflow.models.BaseOperator):
'''Executes a command on a Docker comtainer.
This uses bash to execute Docker commands instead of using the Docker API
to try to work around issue
https://issues.apache.org/jira/browse/AIRFLOW-1131
:param image: Docker image from which to create the container.
:type image: str
:param command: Command to run (templated)
:type command: str
:param environment: Environment variables to set in the container.
:type environment: dict
:param force_pull: Pull the docker image on every run (default: False).
:type force_pull: bool
'''
template_fields = ('command',)

@apply_defaults
def __init__(
self,
image,
command,
environment=None,
force_pull=False,
*args,
**kwargs
):
super(DockerCLIOperator, self).__init__(*args, **kwargs)
self.image = image
self.command = command
self.environment = environment or {}
self.force_pull = force_pull
self._process = None

def execute(self, context):
if self.force_pull:
self._pull_image()

docker_run_command = self._get_docker_run_command()
return self._run_command(docker_run_command, self.environment)

def on_kill(self):
if self._process:
logging.info('Sending SIGTERM signal to process group')
os.killpg(os.getpgid(self._process.pid), signal.SIGTERM)

def _pull_image(self):
pull_command = 'docker pull {image}'.format(image=self.image)
return self._run_command(pull_command)

def _get_docker_run_command(self):
env_params = [
'--env "{key}=${key}"'.format(key=key)
for key in self.environment.keys()
]

docker_command = [
'docker',
'run',
'--rm',
] + env_params + [
self.image,
self.command,
]

return ' '.join(docker_command)

def _run_command(self, command, env=None):
logging.info('Running command "{}"'.format(shlex.split(command)))
self._process = subprocess.Popen(
shlex.split(command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=os.setsid
)
process = self._process

line = ''
for line in iter(process.stdout.readline, b''):
line = line.decode('utf-8').strip()
logging.info(line)
process.wait()
logging.info('Command exited with '
'return code {0}'.format(process.returncode))

if process.returncode != 0:
msg = 'Bash command "{command}" failed with exit code "{exitcode}"'.format(
command=command,
exitcode=process.returncode
)
raise airflow.exceptions.AirflowException(msg)

return process.returncode
8 changes: 8 additions & 0 deletions dags/smoke_test.py
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.docker_operator import DockerOperator
from operators.docker_cli_operator import DockerCLIOperator


default_args = {
Expand All @@ -27,3 +28,10 @@
api_version=os.environ.get('DOCKER_API_VERSION', '1.23'),
command='sleep 5'
)

docker_cli_sleep_task = DockerCLIOperator(
task_id='docker_cli_sleep',
dag=dag,
image='alpine:latest',
command='sleep 5'
)
27 changes: 27 additions & 0 deletions dags/utils/helpers.py
Expand Up @@ -4,6 +4,8 @@
import airflow.models
import os

from operators.docker_cli_operator import DockerCLIOperator


def get_postgres_uri(name):
conn = airflow.hooks.base_hook.BaseHook.get_connection(name)
Expand Down Expand Up @@ -67,6 +69,31 @@ def create_processor_task(name, dag, command=None, environment=None):
)


def create_processor_task_using_bash(name, dag, command=None, environment=None):
# FIXME: This is a temporary method to try DockerCLIOperator. If everything
# works fine, we should remove it and use DockerCLIOperator on "_create_task()"
default_command = 'make start {}'.format(name)
env = {
'SENTRY_DSN': airflow.models.Variable.get('PROCESSOR_SENTRY_DSN'),
'WAREHOUSE_URL': get_postgres_uri('warehouse_db'),
'DATABASE_URL': get_postgres_uri('api_db'),
'EXPLORER_URL': get_postgres_uri('explorer_db'),
'PYTHON_ENV': airflow.models.Variable.get('ENV'),
'LOGGING_URL': airflow.models.Variable.get('LOGGING_URL'),
'DOWNLOAD_DELAY': airflow.models.Variable.get('DOWNLOAD_DELAY'),
}
env.update(environment or {})

return DockerCLIOperator(
task_id='processor_{}'.format(name),
dag=dag,
image='opentrials/processors:latest',
command=command or default_command,
environment=env,
force_pull=True,
)


def _create_task(task_id, dag, image, command, environment):
env = {
'WAREHOUSE_URL': get_postgres_uri('warehouse_db'),
Expand Down
122 changes: 122 additions & 0 deletions tests/dags/operators/test_docker_cli_operator.py
@@ -0,0 +1,122 @@
try:
import unittest.mock as mock
except ImportError:
import mock
import collections
import io
import shlex
import pytest
import airflow.exceptions
from dags.operators.docker_cli_operator import DockerCLIOperator


class TestDockerCLIOperator(object):
def test_its_created_successfully(self):
operator = DockerCLIOperator(
task_id='task_id',
image='docker/image:latest',
command='true',
environment={},
force_pull=True
)
assert operator
assert operator.task_id == 'task_id'

@mock.patch('subprocess.Popen', autospec=True)
@mock.patch('logging.info', autospec=True)
def test_run_command(self, logging_info_mock, popen_mock):
command = 'command'
command_output = [
u'first line',
u'second line',
]
env = {
'foo': 'bar',
}
process_mock = mock.Mock()
process_mock.stdout = io.StringIO(u'\n'.join(command_output))
process_mock.returncode = 0
popen_mock.return_value = process_mock

operator = DockerCLIOperator(
task_id='task_id',
image='docker/image:latest',
command=command,
)

exit_code = operator._run_command(command, env)

popen_mock.assert_called_with(
shlex.split(command),
env=env,
stdout=mock.ANY,
stderr=mock.ANY,
preexec_fn=mock.ANY
)
logging_info_mock.assert_has_calls([
mock.call(line) for line in command_output
])
assert exit_code == process_mock.returncode

@mock.patch('subprocess.Popen', autospec=True)
def test_run_command_raises_airflowexception_if_command_failed(self, popen_mock):
process_mock = mock.Mock()
process_mock.stdout = io.StringIO(u'')
process_mock.returncode = 1
popen_mock.return_value = process_mock
command = 'inexistent_command'

operator = DockerCLIOperator(
task_id='task_id',
image='docker/image:latest',
command=command,
)

with pytest.raises(airflow.exceptions.AirflowException):
operator._run_command(command)

def test_get_docker_run_command_works_without_environment(self):
operator = DockerCLIOperator(
task_id='task_id',
image='docker/image:latest',
command='command',
)

docker_command = operator._get_docker_run_command()

assert docker_command == 'docker run --rm {image} {command}'.format(
image=operator.image,
command=operator.command
)

def test_get_docker_run_command_works_with_environment(self):
environment = collections.OrderedDict([
('foo', 'bar baz'),
('bar', 'foo bar baz'),
])
operator = DockerCLIOperator(
task_id='task_id',
image='docker/image:latest',
command='command',
environment=environment
)

docker_command = operator._get_docker_run_command()

assert docker_command == 'docker run --rm --env "foo=$foo" --env "bar=$bar" {image} {command}'.format(
image=operator.image,
command=operator.command
)

@mock.patch('dags.operators.docker_cli_operator.DockerCLIOperator._run_command')
def test_pull_image_runs_the_correct_command(self, run_command_mock):
operator = DockerCLIOperator(
task_id='task_id',
image='alpine:latest',
command='true'
)

exit_code = operator._pull_image()

run_command_mock.assert_called_with('docker pull {}'.format(operator.image))
assert exit_code == run_command_mock()

0 comments on commit 2ecfee3

Please sign in to comment.