Skip to content
Permalink
Browse files

Allow running cook executor in docker containers (#1121)

  • Loading branch information...
pschorf authored and shamsimam committed May 8, 2019
1 parent 81add76 commit f73db629b9a414fa3640a8a38e0f3951d6bea1de
@@ -47,6 +47,12 @@ matrix:
before_script: cd integration && ./travis/prepare_integration.sh
script: ./travis/run_integration.sh --executor=cook

- name: 'Cook Scheduler integration tests with Cook Executor and Docker'
services: docker
install: sudo ./travis/install_mesos.sh
before_script: cd integration && ./travis/prepare_integration.sh
script: ./travis/run_integration.sh --executor=cook --image=python:3.5

- name: 'Cook Scheduler integration tests with no pools and with HTTP Basic Auth'
services: docker
install: sudo ./travis/install_mesos.sh
@@ -68,10 +74,6 @@ matrix:
before_script: cd simulator && ./travis/prepare_simulation.sh
script: ./travis/run_simulation.sh

- name: 'Cook Scheduler benchmark tests'
before_script: cd scheduler && ./travis/setup.sh
script: lein with-profile +test test :benchmark

- name: 'Cook Executor tests'
before_script: cd executor && ./travis/setup.sh
script: ./travis/run_tests.sh
@@ -425,6 +425,10 @@ def registered(self, driver, executor_info, framework_info, agent_info):

env = os.environ
if 'EXECUTOR_TEST_EXIT' in env:
# When running in docker, if the container exits too quickly it's logged in mesos as container launch failed
# instead of mesos executor terminated. This sleep ensures that we have the correct reason code for our
# integration tests.
time.sleep(5)
exit_code = int(env['EXECUTOR_TEST_EXIT'])
logging.warn('Exiting with code {} from EXECUTOR_TEST_EXIT environment variable'.
format(exit_code))
@@ -115,7 +115,7 @@ def test_docker_fields(self):
self.assertEqual(2, len(docker['parameters']))
self.assertEqual('bar', next(p['value'] for p in docker['parameters'] if p['key'] == 'foo'))
self.assertEqual('qux', next(p['value'] for p in docker['parameters'] if p['key'] == 'baz'))
self.assertEqual(4, len(volumes))
self.assertLessEqual(4, len(volumes))
self.assertIn({'host-path': '/var/lib/abc'}, volumes)
self.assertIn({'mode': 'RW',
'host-path': '/var/lib/def'}, volumes)
@@ -304,8 +304,8 @@ def test_progress_update_submit(self):
job_executor_type = util.get_job_executor_type(self.cook_url)
progress_file_env = util.retrieve_progress_file_env(self.cook_url)

line = util.progress_line(self.cook_url, 25, f'Twenty-five percent in ${{{progress_file_env}}}')
command = f'echo "{line}" >> ${{{progress_file_env}}}; sleep 1; exit 0'
line = util.progress_line(self.cook_url, 25, f'Twenty-five percent in ${{{progress_file_env}}}', True)
command = f'{line}; sleep 1; exit 0'
job_uuid, resp = util.submit_job(self.cook_url, command=command,
env={progress_file_env: 'progress.txt'},
executor=job_executor_type, max_runtime=60000)
@@ -357,6 +357,37 @@ def test_configurable_progress_update_submit(self):

@unittest.skipUnless(util.is_cook_executor_in_use(), 'Test assumes the Cook Executor is in use')
def test_multiple_progress_updates_submit(self):
job_executor_type = util.get_job_executor_type(self.cook_url)
line_1 = util.progress_line(self.cook_url, 25, 'Twenty-five percent', True)
line_2 = util.progress_line(self.cook_url, 50, 'Fifty percent', True)
line_3 = util.progress_line(self.cook_url, '', 'Sixty percent invalid format', True)
line_4 = util.progress_line(self.cook_url, 75, 'Seventy-five percent', True)
line_5 = util.progress_line(self.cook_url, '', 'Eighty percent invalid format', True)
command = f'{line_1} && sleep 2 && {line_2} && sleep 2 && ' \
f'{line_3} && sleep 2 && {line_4} && sleep 2 && ' \
f'{line_5} && sleep 2 && echo "Done" && sleep 10 && exit 0'
job_uuid, resp = util.submit_job(self.cook_url, command=command, executor=job_executor_type, max_runtime=60000)
self.assertEqual(201, resp.status_code, msg=resp.content)
job = util.wait_for_job(self.cook_url, job_uuid, 'completed')
self.assertEqual(1, len(job['instances']))
message = json.dumps(job['instances'][0], sort_keys=True)
self.assertEqual('success', job['instances'][0]['status'], message)

instance = util.wait_for_sandbox_directory(self.cook_url, job_uuid)
message = json.dumps(instance, sort_keys=True)
self.assertIsNotNone(instance['output_url'], message)
self.assertIsNotNone(instance['sandbox_directory'], message)
self.assertEqual('cook', instance['executor'])
util.sleep_for_publish_interval(self.cook_url)
instance = util.wait_for_exit_code(self.cook_url, job_uuid)
message = json.dumps(instance, sort_keys=True)
self.assertEqual(0, instance['exit_code'], message)
self.assertEqual(75, instance['progress'], message)
self.assertEqual('Seventy-five percent', instance['progress_message'], message)

@unittest.skipUnless(util.is_cook_executor_in_use() and not (util.docker_tests_enabled() and util.continuous_integration()),
'Test assumes the Cook Executor is in use. Fails on travis with docker')
def test_multiple_progress_updates_submit_stdout(self):
job_executor_type = util.get_job_executor_type(self.cook_url)
line_1 = util.progress_line(self.cook_url, 25, 'Twenty-five percent')
line_2 = util.progress_line(self.cook_url, 50, 'Fifty percent')
@@ -390,10 +421,10 @@ def test_multiple_rapid_progress_updates_submit(self):
job_executor_type = util.get_job_executor_type(self.cook_url)

def progress_string(a):
return util.progress_line(self.cook_url, a, f'{a}%')
return util.progress_line(self.cook_url, a, f'{a}%', True)

items = list(range(1, 100, 4)) + list(range(99, 40, -4)) + list(range(40, 81, 2))
command = ''.join([f'echo "{progress_string(a)}" && ' for a in items]) + 'echo "Done" && exit 0'
command = ''.join([f'{progress_string(a)} && ' for a in items]) + 'echo "Done" && sleep 10 && exit 0'
job_uuid, resp = util.submit_job(self.cook_url, command=command, executor=job_executor_type, max_runtime=60000)
self.assertEqual(201, resp.status_code, msg=resp.content)
job = util.wait_for_job(self.cook_url, job_uuid, 'completed')
@@ -1551,9 +1582,9 @@ def queue_predicate(resp):
finally:
util.kill_jobs(self.cook_url, uuids)

@pytest.mark.docker
@unittest.skipUnless(util.docker_tests_enabled(), "Requires a test docker image")
def test_basic_docker_job(self):
image = util.docker_image() or 'alpine:latest'
image = util.docker_image()
self.logger.debug(f'Using docker image {image}')
job_uuid, resp = util.submit_job(
self.cook_url,
@@ -1564,7 +1595,6 @@ def test_basic_docker_job(self):
job = util.wait_for_job(self.cook_url, job_uuid, 'completed')
self.assertEqual('success', job['instances'][0]['status'])

@pytest.mark.docker
@unittest.skipUnless(util.has_docker_service(), "Requires `docker inspect`")
def test_docker_port_mapping(self):
job_uuid, resp = util.submit_job(self.cook_url,
@@ -2770,7 +2800,7 @@ def get_debug_status_code():

@unittest.skipUnless(util.supports_mesos_containerizer_images(), "Requires support for docker images in mesos containerizer")
def test_mesos_containerizer_image_support(self):
job_uuid, resp = util.submit_job(self.cook_url, container={'type': 'mesos', 'mesos': {'image': 'alpine'}})
job_uuid, resp = util.submit_job(self.cook_url, executor='mesos', container={'type': 'mesos', 'mesos': {'image': 'alpine'}})
try:
self.assertEqual(201, resp.status_code, resp.text)
instance = util.wait_for_instance(self.cook_url, job_uuid, status='success')
@@ -2812,4 +2842,3 @@ def test_cook_executor_reset_vars(self):
util.wait_for_instance(self.cook_url, uuid, status='success')
finally:
util.kill_jobs(self.cook_url, job_uuids, assert_response=False)

@@ -2184,7 +2184,7 @@ def preprocess_notebook():
def test_submit_docker(self):
docker_image = util.docker_image()
self.assertIsNotNone(docker_image)
cp, uuids = cli.submit('sleep 600', self.cook_url, submit_flags=f'--docker-image {docker_image}')
cp, uuids = cli.submit('sleep 600', self.cook_url, submit_flags=f'--docker-image {docker_image} --executor mesos')
self.assertEqual(0, cp.returncode, cp.stderr)
try:
job = util.load_job(self.cook_url, uuids[0])
@@ -409,6 +409,14 @@ def minimal_job(**kwargs):
}
}
job.update(kwargs)
if is_cook_executor_in_use() and 'container' in job:
if 'volumes' not in job['container']:
job['container']['volumes'] = []
config = settings(retrieve_cook_url())
executor_path = config['executor']['command']
executor_dir = os.path.dirname(os.path.dirname(executor_path))
job['container']['volumes'].append({'host-path': executor_dir,
'container-path': executor_dir})
return job


@@ -910,21 +918,27 @@ def sleep_for_publish_interval(cook_url):
time.sleep(wait_publish_interval_ms / 1000.0)


def progress_line(cook_url, percent, message):
def progress_line(cook_url, percent, message, write_to_file=False):
"""Simple text replacement of regex string using expected patterns of (\d+), (?: )? and (.*)."""
cook_settings = settings(cook_url)
regex_string = get_in(cook_settings, 'executor', 'default-progress-regex-string')

if not regex_string:
regex_string = 'progress:\s+([0-9]*\.?[0-9]+)($|\s+.*)'
if '([0-9]*\.?[0-9]+)' not in regex_string:
raise Exception(f'([0-9]*\.?[0-9]+) not present in {regex_string} regex string')
if '($|\s+.*)' not in regex_string:
raise Exception(f'($|\s+.*) not present in {regex_string} regex string')
return (regex_string
.replace('([0-9]*\.?[0-9]+)', str(percent))
.replace('($|\s+.*)', str(f' {message}'))
.replace('\s+', ' ')
.replace('\\', ''))
progress_string = (regex_string
.replace('([0-9]*\.?[0-9]+)', str(percent))
.replace('($|\s+.*)', str(f' {message}'))
.replace('\s+', ' ')
.replace('\\', ''))
if write_to_file:
progress_env = retrieve_progress_file_env(cook_url)
return f'echo "{progress_string}" >> ${{{progress_env}}}'
else:
return progress_string


def group_submit_kill_retry(cook_url, retry_failed_jobs_only):
@@ -1149,9 +1163,9 @@ def _cook_executor_config():


def is_cook_executor_in_use():
"""Returns true if the cook executor is configured and COOK_TEST_DOCKER_IMAGE is not set"""
"""Returns true if the cook executor is configured"""
is_cook_executor_configured = is_not_blank(get_in(_cook_executor_config(), 'command'))
return is_cook_executor_configured and docker_image() is None
return is_cook_executor_configured


def slave_cpus(mesos_url, hostname):
@@ -13,6 +13,7 @@ COOK_AUTH=one-user
COOK_EXECUTOR=mesos
COOK_POOLS=on
CONFIG_FILE=scheduler_travis_config.edn
COOK_TEST_DOCKER_IMAGE=""

while (( $# > 0 )); do
case "$1" in
@@ -28,6 +29,10 @@ while (( $# > 0 )); do
COOK_POOLS="${1#--pools=}"
shift
;;
--image=*)
COOK_TEST_DOCKER_IMAGE="${1#--image=}"
shift
;;
*)
echo "Unrecognized option: $1"
exit 1
@@ -52,6 +57,7 @@ case "$COOK_EXECUTOR" in
COOK_EXECUTOR_COMMAND="${TRAVIS_BUILD_DIR}/travis/cook-executor-local/cook-executor-local"
# Build cook-executor
${TRAVIS_BUILD_DIR}/travis/build_cook_executor.sh
# Run with docker
;;
mesos)
COOK_EXECUTOR_COMMAND=""
@@ -124,6 +130,9 @@ lein exec -p datomic/data/seed_running_jobs.clj ${COOK_DATOMIC_URI_1}
## on travis, ports on 172.17.0.1 are bindable from the host OS, and are also
## available for processes inside minimesos containers to connect to
export COOK_EXECUTOR_COMMAND=${COOK_EXECUTOR_COMMAND}
if [[ ! -z "${COOK_TEST_DOCKER_IMAGE}" ]]; then
export COOK_TEST_DOCKER_IMAGE=${COOK_TEST_DOCKER_IMAGE}
fi
# Start one cook listening on port 12321, this will be the master of the "cook-framework-1" framework
LIBPROCESS_IP=172.17.0.1 COOK_DATOMIC="${COOK_DATOMIC_URI_1}" COOK_PORT=12321 COOK_SSL_PORT=12322 COOK_FRAMEWORK_ID=cook-framework-1 COOK_LOGFILE="log/cook-12321.log" COOK_DEFAULT_POOL=${DEFAULT_POOL} lein run ${PROJECT_DIR}/travis/${CONFIG_FILE} &
# Start a second cook listening on port 22321, this will be the master of the "cook-framework-2" framework
@@ -0,0 +1,5 @@
#!/bin/bash

UUID=$(uuidgen)

curl -XPOST -H"Content-Type: application/json" http://localhost:12321/rawscheduler -d"{\"jobs\": [{\"uuid\": \"$UUID\", \"env\": {\"EXECUTOR_TEST_EXIT\": \"1\"}, \"executor\": \"cook\", \"mem\": 128, \"cpus\": 1, \"command\": \"echo progress: 50 test_progress && exit 0\", \"max_retries\": 1, \"container\": {\"type\": \"DOCKER\", \"docker\": {\"image\": \"python:3.5\", \"network\": \"HOST\", \"force-pull-image\": false}, \"volumes\": [{\"container-path\": \"/Users/paul/src/Cook/executor/dist\", \"host-path\": \"/Users/paul/src/Cook/executor/dist\"}]}}]}"
@@ -84,8 +84,7 @@
;; If the custom-executor attr isn't set, we default to using a custom
;; executor in order to support jobs submitted before we added this field
custom-executor? (use-custom-executor? job-ent)
cook-executor? (and (not container) ;;TODO support cook-executor in containers
(use-cook-executor? job-ent))
cook-executor? (use-cook-executor? job-ent)
group-uuid (util/job-ent->group-uuid job-ent)
environment (cond-> (assoc (util/job-ent->env job-ent)
"COOK_INSTANCE_UUID" task-id
@@ -104,6 +103,7 @@
:value (if cook-executor? (:command (config/executor-config)) (:job/command job-ent))}
;; executor-key configure whether this is a command or custom executor
executor-key (cond
(and container cook-executor?) :container-cook-executor
(and container (not custom-executor?)) :container-command-executor
(and container custom-executor?) :container-executor
custom-executor? :custom-executor
@@ -113,6 +113,7 @@
executor (case executor-key
:command-executor :executor/mesos
:container-command-executor :executor/mesos
:container-cook-executor :executor/cook
:cook-executor :executor/cook
:executor/custom)
data (.getBytes
@@ -378,6 +379,11 @@
(assoc :executor (assoc executor :name cook-executor-name
:source cook-executor-source))

(= executor-key :container-cook-executor)
(assoc :executor (assoc executor :name cook-executor-name
:source cook-executor-source
:container container))

(= executor-key :custom-executor)
(assoc :executor (assoc executor :name custom-executor-name)))))

Oops, something went wrong.

0 comments on commit f73db62

Please sign in to comment.
You can’t perform that action at this time.