Permalink
Browse files

Adds per-pool job scheduling (#870)

  • Loading branch information...
dposada authored and pschorf committed Aug 28, 2018
1 parent b508400 commit d8e48273c0690a3f106f996affb865663bceafe1
@@ -1,3 +1,3 @@
[tool:pytest]
addopts = -n10 -v --timeout-method=thread
addopts = -n10 -v --timeout-method=thread --maxfail=5
timeout = 1200
@@ -4,7 +4,7 @@
EXECUTOR_UNREGISTERED = 6002
CMD_NON_ZERO_EXIT = 99003


# Named constants for unscheduled job reason strings from cook or fenzo.
UNDER_INVESTIGATION = 'The job is now under investigation. Check back in a minute for more details!'
COULD_NOT_PLACE_JOB = 'The job couldn\'t be placed on any available hosts.'
JOB_WOULD_EXCEED_QUOTA = 'The job would cause you to exceed resource quotas.'
@@ -52,7 +52,7 @@ def test_basic_submit(self):
self.assertEqual(resp.status_code, 201, msg=resp.content)
self.assertEqual(resp.content, str.encode(f"submitted jobs {job_uuid}"))
job = util.wait_for_job(self.cook_url, job_uuid, 'completed')
self.assertIn('success', (i['status'] for i in job['instances']))
self.assertIn('success', [i['status'] for i in job['instances']], json.dumps(job, indent=2))
self.assertEqual(False, job['disable_mea_culpa_retries'])
self.assertTrue(len(util.wait_for_output_url(self.cook_url, job_uuid)['output_url']) > 0)

@@ -1468,14 +1468,18 @@ def test_queue_endpoint(self):
uuids, resp = util.submit_jobs(self.cook_url, job_spec, clones=100, groups=[group])
self.assertEqual(201, resp.status_code, resp.content)
try:
default_pool = util.default_pool(self.cook_url)
pool = default_pool or 'no-pool'
self.logger.info(f'Checking the queue endpoint for pool {pool}')

def query_queue():
return util.query_queue(self.cook_url)

def queue_predicate(resp):
return any([job['job/uuid'] in uuids for job in resp.json()['normal']])
return any([job['job/uuid'] in uuids for job in resp.json()[pool]])

resp = util.wait_until(query_queue, queue_predicate)
job = [job for job in resp.json()['normal'] if job['job/uuid'] in uuids][0]
job = [job for job in resp.json()[pool] if job['job/uuid'] in uuids][0]
job_group = job['group/_job'][0]
self.assertEqual(200, resp.status_code, resp.content)
self.assertTrue('group/_job' in job.keys())
@@ -1620,8 +1624,7 @@ def test_unscheduled_jobs_partial(self):
self.assertEqual(job_uuid_1, jobs[0]['uuid'])

def test_unique_host_constraint(self):
state = util.get_mesos_state(self.mesos_url)
num_hosts = len(state['slaves'])
num_hosts = util.num_hosts_to_consider(self.cook_url, self.mesos_url)
group = {'uuid': str(uuid.uuid4()),
'host-placement': {'type': 'unique'}}
job_spec = {'group': group['uuid'], 'command': 'sleep 600'}
@@ -1674,8 +1677,7 @@ def check_unique_constraint(response):
util.kill_jobs(self.cook_url, uuids)

def test_balanced_host_constraint_cannot_place(self):
state = util.get_mesos_state(self.mesos_url)
num_hosts = len(state['slaves'])
num_hosts = util.num_hosts_to_consider(self.cook_url, self.mesos_url)
if num_hosts > 10:
# Skip this test on large clusters
self.logger.info(f"Skipping test due to cluster size of {num_hosts} greater than 10")
@@ -1722,8 +1724,7 @@ def query_unscheduled():
util.kill_jobs(self.cook_url, uuids)

def test_balanced_host_constraint_can_place(self):
state = util.get_mesos_state(self.mesos_url)
num_hosts = len(state['slaves'])
num_hosts = util.num_hosts_to_consider(self.cook_url, self.mesos_url)
minimum_hosts = min(10, num_hosts)
group = {'uuid': str(uuid.uuid4()),
'host-placement': {'type': 'balanced',
@@ -1774,20 +1775,26 @@ def test_attribute_equals_hostname_constraint(self):
uuids, resp = util.submit_jobs(self.cook_url, jobs, groups=[group])
self.assertEqual(201, resp.status_code, resp.content)
try:
reasons = {
# We expect the reason to be either our attribute-equals constraint:
"Host had a different attribute than other jobs in the group.",
# Or, if there are no other offers, we simply don't have enough cpus:
"Not enough cpus available."
}

def query():
unscheduled_jobs, _ = util.unscheduled_jobs(self.cook_url, *[j['uuid'] for j in jobs])
self.logger.info(f"unscheduled_jobs response: {unscheduled_jobs}")
self.logger.info(f"unscheduled_jobs response: {json.dumps(unscheduled_jobs, indent=2)}")
no_hosts = [reason for job in unscheduled_jobs for reason in job['reasons']
if reason['reason'] == "The job couldn't be placed on any available hosts."]
for no_hosts_reason in no_hosts:
for sub_reason in no_hosts_reason['data']['reasons']:
if sub_reason['reason'] == "Host had a different attribute than other jobs in the group.":
if sub_reason['reason'] in reasons:
return sub_reason
return None

reason = util.wait_until(query, lambda r: r is not None)
self.assertEqual(reason['reason'],
"Host had a different attribute than other jobs in the group.")
self.assertIn(reason['reason'], reasons)
finally:
util.kill_jobs(self.cook_url, uuids)

@@ -2350,22 +2357,30 @@ def test_pool_specific_quota_check_on_submit(self):
pool_name = pool['name']
self.logger.info(f'Testing quota check for pool {pool_name}')
quota = util.get_limit(self.cook_url, 'quota', user, pool_name).json()
cpus_over_quota = quota['cpus'] + 0.1
mem_over_quota = quota['mem'] + 1
self.assertLessEqual(cpus_over_quota, task_constraint_cpus)
self.assertLessEqual(mem_over_quota, task_constraint_mem)

# cpus
job_uuid, resp = util.submit_job(self.cook_url, pool=pool_name, cpus=0.1)
self.assertEqual(201, resp.status_code, msg=resp.content)
job_uuid, resp = util.submit_job(self.cook_url, pool=pool_name, cpus=cpus_over_quota)
self.assertEqual(422, resp.status_code, msg=resp.content)
cpus_over_quota = quota['cpus'] + 0.1
if cpus_over_quota > task_constraint_cpus:
self.logger.info(f'Unable to check CPU quota on {pool_name} because the quota ({quota["cpus"]}) is '
f'higher than the task constraint ({task_constraint_cpus})')
else:
self.assertLessEqual(cpus_over_quota, task_constraint_cpus)
job_uuid, resp = util.submit_job(self.cook_url, pool=pool_name, cpus=0.1)
self.assertEqual(201, resp.status_code, msg=resp.content)
job_uuid, resp = util.submit_job(self.cook_url, pool=pool_name, cpus=cpus_over_quota)
self.assertEqual(422, resp.status_code, msg=resp.content)

# mem
job_uuid, resp = util.submit_job(self.cook_url, pool=pool_name, mem=32)
self.assertEqual(201, resp.status_code, msg=resp.content)
job_uuid, resp = util.submit_job(self.cook_url, pool=pool_name, mem=mem_over_quota)
self.assertEqual(422, resp.status_code, msg=resp.content)
mem_over_quota = quota['mem'] + 1
if mem_over_quota > task_constraint_mem:
self.logger.info(f'Unable to check mem quota on {pool_name} because the quota ({quota["mem"]}) is '
f'higher than the task constraint ({task_constraint_mem})')
else:
self.assertLessEqual(mem_over_quota, task_constraint_mem)
job_uuid, resp = util.submit_job(self.cook_url, pool=pool_name, mem=32)
self.assertEqual(201, resp.status_code, msg=resp.content)
job_uuid, resp = util.submit_job(self.cook_url, pool=pool_name, mem=mem_over_quota)
self.assertEqual(422, resp.status_code, msg=resp.content)

def test_decrease_retries_below_attempts(self):
uuid, resp = util.submit_job(self.cook_url, command='exit 1', max_retries=2)
@@ -31,12 +31,15 @@ def test_get_queue(self):
try:
slave_queue = util.session.get('%s/queue' % self.slave_url, allow_redirects=False)
self.assertEqual(307, slave_queue.status_code)
default_pool = util.default_pool(self.master_url)
pool = default_pool or 'no-pool'
self.logger.info(f'Checking the queue endpoint for pool {pool}')

@retry(stop_max_delay=30000, wait_fixed=1000) # Need to wait for a rank cycle
def check_queue():
master_queue = util.session.get(slave_queue.headers['Location'])
self.assertEqual(200, master_queue.status_code, master_queue.content)
self.assertTrue(any([job['job/uuid'] in uuids for job in master_queue.json()['normal']]))
self.assertTrue(any([job['job/uuid'] in uuids for job in master_queue.json()[pool]]))

check_queue()
finally:
@@ -0,0 +1,86 @@
import logging
import unittest

import pytest
from retrying import retry

from tests.cook import util, reasons


@pytest.mark.multi_user
@unittest.skipUnless(util.multi_user_tests_enabled(), 'Requires using multi-user coniguration '
'(e.g., BasicAuth) for Cook Scheduler')
@pytest.mark.timeout(util.DEFAULT_TEST_TIMEOUT_SECS) # individual test timeout
class PoolsCookTest(util.CookTest):

@classmethod
def setUpClass(cls):
cls.cook_url = util.retrieve_cook_url()
util.init_cook_session(cls.cook_url)

def setUp(self):
self.cook_url = type(self).cook_url
self.mesos_url = util.retrieve_mesos_url()
self.logger = logging.getLogger(__name__)
self.user_factory = util.UserFactory(self)

@unittest.skipUnless(util.are_pools_enabled(), 'Pools are not enabled on the cluster')
def test_pool_scheduling(self):
admin = self.user_factory.admin()
user = self.user_factory.new_user()
pools, _ = util.active_pools(self.cook_url)
all_job_uuids = []
try:
default_pool = util.default_pool(self.cook_url)
self.assertLess(1, len(pools))
self.assertIsNotNone(default_pool)

cpus = 0.1
with admin:
for pool in pools:
# Lower the user's cpu quota on this pool
pool_name = pool['name']
quota_multiplier = 1 if pool_name == default_pool else 2
util.set_limit(self.cook_url, 'quota', user.name, cpus=cpus * quota_multiplier, pool=pool_name)

with user:
for pool in pools:
pool_name = pool['name']

# Submit a job that fills the user's quota on this pool
quota = util.get_limit(self.cook_url, 'quota', user.name, pool_name).json()
quota_cpus = quota['cpus']
filling_job_uuid, _ = util.submit_job(self.cook_url, cpus=quota_cpus,
command='sleep 600', pool=pool_name)
all_job_uuids.append(filling_job_uuid)
instance = util.wait_for_running_instance(self.cook_url, filling_job_uuid)
slave_pool = util.slave_pool(self.mesos_url, instance['hostname'])
self.assertEqual(pool_name, slave_pool)

# Submit a job that should not get scheduled
job_uuid, _ = util.submit_job(self.cook_url, cpus=cpus, command='ls', pool=pool_name)
all_job_uuids.append(job_uuid)
job = util.load_job(self.cook_url, job_uuid)
self.assertEqual('waiting', job['status'])

# Assert that the unscheduled reason and data are correct
@retry(stop_max_delay=60000, wait_fixed=5000)
def check_unscheduled_reason():
jobs, _ = util.unscheduled_jobs(self.cook_url, job_uuid)
self.logger.info(f'Unscheduled jobs: {jobs}')
self.assertEqual(job_uuid, jobs[0]['uuid'])
job_reasons = jobs[0]['reasons']
# Check the spot-in-queue reason
reason = next(r for r in job_reasons if r['reason'] == 'You have 1 other jobs ahead in the '
'queue.')
self.assertEqual({'jobs': [filling_job_uuid]}, reason['data'])
# Check the exceeding-quota reason
reason = next(r for r in job_reasons if r['reason'] == reasons.JOB_WOULD_EXCEED_QUOTA)
self.assertEqual({'cpus': {'limit': quota_cpus, 'usage': quota_cpus + cpus}}, reason['data'])

check_unscheduled_reason()
finally:
with admin:
util.kill_jobs(self.cook_url, all_job_uuids, assert_response=False)
for pool in pools:
util.reset_limit(self.cook_url, 'quota', user.name, reason=self.current_name(), pool=pool['name'])
@@ -1135,10 +1135,19 @@ def is_cook_executor_in_use():
def slave_cpus(mesos_url, hostname):
"""Returns the cpus of the specified Mesos agent"""
slaves = get_mesos_state(mesos_url)['slaves']
slave_cpus = next(s['resources']['cpus'] for s in slaves if s['hostname'] == hostname)
# Here we need to use unreserved_resources because Mesos might only
# send offers for the unreserved (role = "*") portions of the agents.
slave_cpus = next(s['unreserved_resources']['cpus'] for s in slaves if s['hostname'] == hostname)
return slave_cpus


def slave_pool(mesos_url, hostname):
"""Returns the pool of the specified Mesos agent, or None if the agent doesn't have the attribute"""
slaves = get_mesos_state(mesos_url)['slaves']
pool = next(s.get('attributes', {}).get('cook-pool', None) for s in slaves if s['hostname'] == hostname)
return pool


def max_slave_cpus(mesos_url):
"""Returns the max cpus of all current Mesos agents"""
slaves = get_mesos_state(mesos_url)['slaves']
@@ -1185,3 +1194,26 @@ def is_preemption_enabled():
def current_milli_time():
"""Returns the current epoch time in milliseconds"""
return int(round(time.time() * 1000))


@functools.lru_cache()
def are_pools_enabled():
"""Returns true if there are at least 2 active pools on the cluster"""
cook_url = retrieve_cook_url()
init_cook_session(cook_url)
_wait_for_cook(cook_url)
return len(active_pools(cook_url)[0]) > 1


def num_hosts_to_consider(cook_url, mesos_url):
"""
Returns the number of hosts in the default pool, or the
total number of hosts if the cluster is not using pools
"""
state = get_mesos_state(mesos_url)
slaves = state['slaves']
pool = default_pool(cook_url)
slaves = [s for s in slaves if s['attributes']['cook-pool'] == pool] if pool else slaves
num_hosts = len(slaves)
logging.info(f'There are {num_hosts} hosts in the default pool')
return num_hosts
@@ -80,6 +80,7 @@ cd ${TRAVIS_BUILD_DIR}/travis
$(./minimesos info | grep MINIMESOS)
export COOK_ZOOKEEPER="${MINIMESOS_ZOOKEEPER_IP}:2181"
export MINIMESOS_ZOOKEEPER=${MINIMESOS_ZOOKEEPER%;}
export MINIMESOS_MASTER=${MINIMESOS_MASTER%;}

SCHEDULER_DIR=${TRAVIS_BUILD_DIR}/scheduler
./datomic-free-0.9.5394/bin/transactor ${SCHEDULER_DIR}/datomic/datomic_transactor.properties &
@@ -149,6 +150,8 @@ cd ${PROJECT_DIR}
export COOK_MULTI_CLUSTER=
export COOK_MASTER_SLAVE=
export COOK_SLAVE_URL=http://localhost:12323
export COOK_MESOS_LEADER_URL=${MINIMESOS_MASTER}
echo "Using Mesos leader URL: ${COOK_MESOS_LEADER_URL}"
pytest -n4 -v --color=no --timeout-method=thread --boxed -m "not serial" || test_failures=true
pytest -n0 -v --color=no --timeout-method=thread --boxed -m "serial" || test_failures=true

@@ -50,6 +50,7 @@
:levels {"datomic.db" :warn
"datomic.peer" :warn
"datomic.kv-cluster" :warn
"cook.mesos.fenzo-utils" :debug
"cook.mesos.rebalancer" :debug
"cook.mesos.scheduler" :debug
:default :info}}
@@ -25,6 +25,9 @@
:levels {"datomic.db" :warn
"datomic.kv-cluster" :warn
"datomic.peer" :warn
"cook.mesos.fenzo-utils" :debug
"cook.mesos.rebalancer" :debug
"cook.mesos.scheduler" :debug
:default :info}}
:metrics {:jmx true
:user-metrics-interval-seconds 60}
@@ -12,7 +12,8 @@
@(d/transact conn [{:db/id (d/tempid :db.part/user)
:pool/name name
:pool/purpose "This is a pool for testing purposes"
:pool/state state}]))
:pool/state state
:pool/dru-mode :pool.dru-mode/default}]))

(defn pools
[db]
Oops, something went wrong.

0 comments on commit d8e4827

Please sign in to comment.