Permalink
Browse files

Outgoing/global launch limit (#1038)

  • Loading branch information...
scrosby authored and pschorf committed Dec 11, 2018
1 parent d392c26 commit 44fc9b3cd4c2fa7be2d179267dde045f366e94a0
@@ -60,7 +60,7 @@ matrix:
services: docker
install: sudo ./travis/install_mesos.sh
before_script: cd integration && ./travis/prepare_integration.sh
script: ./travis/run_integration.sh --pools=off --auth=http-basic --job-launch-rate-limit=on
script: ./travis/run_integration_ratelimit.sh

- name: 'Cook Scheduler Simulator tests'
services: docker
@@ -263,7 +263,7 @@ def test_rate_limit_launching_jobs(self):
pytest.skip("Can't test job launch rate limit without launch rate limit set.")

# Allow an environmental variable override.
name = os.getenv('COOK_LAUNCH_RATE_LIMIT_NAME')
name = os.getenv('COOK_LAUNCH_RATE_LIMIT_USER_NAME')
if name is not None:
user = self.user_factory.user_class(name)
else:
@@ -323,6 +323,63 @@ def is_rate_limit_triggered(_):
finally:
util.kill_jobs(self.cook_url, job_uuids)

# Note that subsequent runs of this test under the same user can fail if sufficient time has not
# passed; the subsequent run will have used up the rate limit quota and it will need time to recharge.
def test_global_rate_limit_launching_jobs(self):
settings = util.settings(self.cook_url)
if settings['rate-limit']['global-job-launch'] is None:
pytest.skip("Can't test job launch rate limit without launch rate limit set.")

# Allow an environmental variable override.
name = os.getenv('COOK_LAUNCH_RATE_LIMIT_USER_NAME')
if name is not None:
user = self.user_factory.user_class(name)
else:
user = self.user_factory.new_user()

if not settings['rate-limit']['global-job-launch']['enforce?']:
pytest.skip("Enforcing must be on for test to run")
bucket_size = settings['rate-limit']['global-job-launch']['bucket-size']
token_rate = settings['rate-limit']['global-job-launch']['tokens-replenished-per-minute']
# In some environments, e.g., minimesos, we can only launch so many concurrent jobs.
if token_rate < 5 or token_rate > 20:
pytest.skip(
"Global job launch rate limit test is only validated to reliably work correctly with certain token rates.")
if bucket_size < 10 or bucket_size > 20:
pytest.skip(
"Global job launch rate limit test is only validated to reliably work correctly with certain token bucket sizes.")
with user:
job_uuids = []
try:
jobspec = {"command": "sleep 240", 'cpus': 0.03, 'mem': 32}

self.logger.info(f'Submitting initial batch of {bucket_size-1} jobs')
initial_uuids, initial_response = util.submit_jobs(self.cook_url, jobspec, bucket_size - 1)
job_uuids.extend(initial_uuids)
self.assertEqual(201, initial_response.status_code, msg=initial_response.content)

def submit_jobs():
self.logger.info(f'Submitting subsequent batch of {bucket_size-1} jobs')
subsequent_uuids, subsequent_response = util.submit_jobs(self.cook_url, jobspec, bucket_size - 1)
job_uuids.extend(subsequent_uuids)
self.assertEqual(201, subsequent_response.status_code, msg=subsequent_response.content)

def is_rate_limit_triggered(_):
jobs1 = util.query_jobs(self.cook_url, True, uuid=job_uuids).json()
running_jobs = [j for j in jobs1 if j['status'] == 'running']
waiting_jobs = [j for j in jobs1 if j['status'] == 'waiting']
self.logger.debug(f'There are {len(waiting_jobs)} waiting jobs')
return len(waiting_jobs) > 0 and len(running_jobs) >= bucket_size

util.wait_until(submit_jobs, is_rate_limit_triggered,120000,5000)
jobs2 = util.query_jobs(self.cook_url, True, uuid=job_uuids).json()
running_jobs = [j for j in jobs2 if j['status'] == 'running']
self.assertGreaterEqual(len(running_jobs), bucket_size)
self.assertLessEqual(len(running_jobs), bucket_size+4)
finally:
util.kill_jobs(self.cook_url, job_uuids)


def trigger_preemption(self, pool):
"""
Triggers preemption on the provided pool (which can be None) by doing the following:
@@ -13,7 +13,6 @@ COOK_AUTH=one-user
COOK_EXECUTOR=mesos
COOK_POOLS=on
CONFIG_FILE=scheduler_travis_config.edn
JOB_LAUNCH_RATE_LIMIT=off

while (( $# > 0 )); do
case "$1" in
@@ -29,10 +28,6 @@ while (( $# > 0 )); do
COOK_POOLS="${1#--pools=}"
shift
;;
--job-launch-rate-limit=*)
JOB_LAUNCH_RATE_LIMIT="${1#--job-launch-rate-limit=}"
shift
;;
*)
echo "Unrecognized option: $1"
exit 1
@@ -112,24 +107,6 @@ case "$COOK_POOLS" in
exit 1
esac

case "$JOB_LAUNCH_RATE_LIMIT" in
on)
# Note: Carefully chosen for test_rate_limit_launching_jobs unit test.
export JOB_LAUNCH_RATE_LIMIT_BUCKET_SIZE=10
export JOB_LAUNCH_RATE_LIMIT_REPLENISHED_PER_MINUTE=5
echo "Job launch rate limit turned on"
;;
off)
# Note: Wide enough that we're unlikely to hit these in testing.
export JOB_LAUNCH_RATE_LIMIT_BUCKET_SIZE=10000
export JOB_LAUNCH_RATE_LIMIT_REPLENISHED_PER_MINUTE=10000
echo "Job launch rate limit turned off"
;;
*)
echo "Unrecognized job-launch-rate-limit toggle (should be on/off): $JOB_LAUNCH_RATE_LIMIT"
exit 1
esac

pip install flask
export DATA_LOCAL_PORT=35847
export DATA_LOCAL_SERVICE="http://localhost:${DATA_LOCAL_PORT}"
@@ -187,12 +164,8 @@ export COOK_SLAVE_URL=http://localhost:12323
export COOK_MESOS_LEADER_URL=${MINIMESOS_MASTER}
{
echo "Using Mesos leader URL: ${COOK_MESOS_LEADER_URL}"
if [ "$JOB_LAUNCH_RATE_LIMIT" = off ]; then
pytest -n4 -v --color=no --timeout-method=thread --boxed -m "not serial" || test_failures=true
pytest -n0 -v --color=no --timeout-method=thread --boxed -m "serial" || test_failures=true
else
pytest -n0 -v --color=no --timeout-method=thread --boxed -m multi_user tests/cook/test_multi_user.py -k test_rate_limit_launching_jobs || test_failures=true
fi
pytest -n4 -v --color=no --timeout-method=thread --boxed -m "not serial" || test_failures=true
pytest -n0 -v --color=no --timeout-method=thread --boxed -m "serial" || test_failures=true
} &> >(tee ./log/pytest.log)


@@ -0,0 +1,100 @@
#!/bin/bash

# Usage: ./run_integration [OPTIONS...]

set -ev

export PROJECT_DIR=`pwd`

CONFIG_FILE=scheduler_travis_config.edn

function wait_for_cook {
COOK_PORT=${1:-12321}
while ! curl -s localhost:${COOK_PORT} >/dev/null;
do
echo "$(date +%H:%M:%S) Cook is not listening on ${COOK_PORT} yet"
sleep 2.0
done
echo "$(date +%H:%M:%S) Connected to Cook on ${COOK_PORT}!"
curl -s localhost:${COOK_PORT}/info
echo
}
export -f wait_for_cook

# Start minimesos
cd ${TRAVIS_BUILD_DIR}/travis
./minimesos up
$(./minimesos info | grep MINIMESOS)
export COOK_ZOOKEEPER="${MINIMESOS_ZOOKEEPER_IP}:2181"
export MINIMESOS_ZOOKEEPER=${MINIMESOS_ZOOKEEPER%;}
export MINIMESOS_MASTER=${MINIMESOS_MASTER%;}

SCHEDULER_DIR=${TRAVIS_BUILD_DIR}/scheduler
COOK_DATOMIC_URI_1=datomic:mem://cook-jobs

# Generate SSL certificate
COOK_KEYSTORE_PATH=${SCHEDULER_DIR}/cook.p12
keytool -genkeypair -keystore ${COOK_KEYSTORE_PATH} -storetype PKCS12 -storepass cookstore -dname "CN=cook, OU=Cook Developers, O=Two Sigma Investments, L=New York, ST=New York, C=US" -keyalg RSA -keysize 2048
export COOK_KEYSTORE_PATH=${COOK_KEYSTORE_PATH}

mkdir ${SCHEDULER_DIR}/log

cd ${SCHEDULER_DIR}

# Start two cook schedulers.
export COOK_HTTP_BASIC_AUTH=true
export COOK_EXECUTOR_COMMAND=""
## We launch two instances, with different configurations for the different unit tests.
## on travis, ports on 172.17.0.1 are bindable from the host OS, and are also
## available for processes inside minimesos containers to connect to
# Start one cook listening on port 12321, this will be the master of the "cook-framework-1" framework
export GLOBAL_JOB_LAUNCH_RATE_LIMIT_BUCKET_SIZE=10000
export GLOBAL_JOB_LAUNCH_RATE_LIMIT_REPLENISHED_PER_MINUTE=10000
export JOB_LAUNCH_RATE_LIMIT_BUCKET_SIZE=10
export JOB_LAUNCH_RATE_LIMIT_REPLENISHED_PER_MINUTE=5
LIBPROCESS_IP=172.17.0.1 COOK_DATOMIC="${COOK_DATOMIC_URI_1}" COOK_PORT=12321 COOK_SSL_PORT=12322 COOK_COOKEEPER_LOCAL=true COOK_COOKEEPER_LOCAL_PORT=5291 COOK_FRAMEWORK_ID=cook-framework-1 COOK_LOGFILE="log/cook-12321.log" COOK_DEFAULT_POOL=${DEFAULT_POOL} lein run ${PROJECT_DIR}/travis/${CONFIG_FILE} &
# Start a second cook listening on port 22321, this will be the master of the "cook-framework-2" framework
export JOB_LAUNCH_RATE_LIMIT_BUCKET_SIZE=10000
export JOB_LAUNCH_RATE_LIMIT_REPLENISHED_PER_MINUTE=10000
export GLOBAL_JOB_LAUNCH_RATE_LIMIT_BUCKET_SIZE=10
export GLOBAL_JOB_LAUNCH_RATE_LIMIT_REPLENISHED_PER_MINUTE=5
LIBPROCESS_IP=172.17.0.1 COOK_DATOMIC="${COOK_DATOMIC_URI_1}" COOK_PORT=22321 COOK_SSL_PORT=22322 COOK_ZOOKEEPER_LOCAL=true COOK_ZOOKEEPER_LOCAL_PORT=4291 COOK_FRAMEWORK_ID=cook-framework-2 COOK_LOGFILE="log/cook-22321.log" lein run ${PROJECT_DIR}/travis/${CONFIG_FILE} &

# Wait for the cooks to be listening
timeout 180s bash -c "wait_for_cook 12321" || curl_error=true
if [ "$curl_error" = true ]; then
echo "$(date +%H:%M:%S) Timed out waiting for cook to start listening"
${TRAVIS_BUILD_DIR}/travis/upload_logs.sh
exit 1
fi

timeout 180s bash -c "wait_for_cook 22321" || curl_error=true
if [ "$curl_error" = true ]; then
echo "$(date +%H:%M:%S) Timed out waiting for cook to start listening"
${TRAVIS_BUILD_DIR}/travis/upload_logs.sh
exit 1
fi

# Ensure the Cook Scheduler CLI is available
command -v cs

# Run the integration tests
cd ${PROJECT_DIR}
export COOK_MESOS_LEADER_URL=${MINIMESOS_MASTER}
{
echo "Using Mesos leader URL: ${COOK_MESOS_LEADER_URL}"
export COOK_SCHEDULER_URL=http://localhost:12321
pytest -n0 -v --color=no --timeout-method=thread --boxed -m multi_user tests/cook/test_multi_user.py -k test_rate_limit_launching_jobs || test_failures=true


export COOK_SCHEDULER_URL=http://localhost:22321
pytest -n0 -v --color=no --timeout-method=thread --boxed -m multi_user tests/cook/test_multi_user.py -k test_global_rate_limit_launching_jobs || test_failures=true
} &> >(tee ./log/pytest.log)


# If there were failures, then we should save the logs
if [ "$test_failures" = true ]; then
echo "Uploading logs..."
${TRAVIS_BUILD_DIR}/travis/upload_logs.sh
exit 1
fi
@@ -34,6 +34,9 @@
:rate-limit {:expire-minutes 120 ; Expire unused rate limit entries after 2 hours.
; Keep these job-launch and job-submission values as they are for integration tests. Making them smaller can cause
; spurious failures, and making them larger will cause the rate-limit integration test to skip itself.
:global-job-launch {:bucket-size #config/env-int-default ["GLOBAL_JOB_LAUNCH_RATE_LIMIT_BUCKET_SIZE" 10000]
:enforce? true
:tokens-replenished-per-minute #config/env-int-default ["GLOBAL_JOB_LAUNCH_RATE_LIMIT_REPLENISHED_PER_MINUTE" 10000]}
:job-launch {:bucket-size #config/env-int-default ["JOB_LAUNCH_RATE_LIMIT_BUCKET_SIZE" 10000]
:enforce? true
:tokens-replenished-per-minute #config/env-int-default ["JOB_LAUNCH_RATE_LIMIT_REPLENISHED_PER_MINUTE" 10000]}
@@ -48,9 +48,12 @@
:keystore-path #config/env "COOK_KEYSTORE_PATH"
:keystore-type "pkcs12"
:keystore-pass "cookstore"}
:rate-limit {:expire-minutes 120 ; Expire unused rate limit entries after 2 hours.
:rate-limit {:expire-minutes 1200 ; Expire unused rate limit entries after 20 hours.
; Keep these job-launch and job-submission values as they are for integration tests. Making them smaller can cause
; spurious failures, and making them larger will cause test_rate_limit_launching_jobs to skip itself.
:global-job-launch {:bucket-size 10000
:enforce? true
:tokens-replenished-per-minute 5000}
:job-launch {:bucket-size 10000
:enforce? true
:tokens-replenished-per-minute 5000}
@@ -204,10 +204,11 @@
((util/lazy-load-var 'cook.impersonation/create-impersonation-middleware) impersonators)
{:json-value "config-impersonation"})))
:rate-limit (fnk [[:config {rate-limit nil}]]
(let [{:keys [expire-minutes user-limit-per-m job-submission job-launch]
(let [{:keys [expire-minutes user-limit-per-m global-job-launch job-submission job-launch]
:or {expire-minutes 120
user-limit-per-m 600}} rate-limit]
{:expire-minutes expire-minutes
:global-job-launch global-job-launch
:job-submission job-submission
:job-launch job-launch
:user-limit (->UserRateLimit :user-limit user-limit-per-m (t/minutes 1))}))
@@ -23,6 +23,7 @@
[cook.mesos.data-locality :as dl]
[cook.mesos.group :as group]
[cook.mesos.util :as util]
[cook.rate-limit :as ratelimit]
[swiss.arrows :refer :all])
(:import com.netflix.fenzo.VirtualMachineLease
java.util.Date))
@@ -234,6 +235,21 @@
(when (< 0 max-expected-runtime)
(->estimated-completion-constraint expected-end-time host-lifetime-mins))))))

(defn build-launch-max-tasks-constraint
"This returns a Fenzo hard constraint that ensures that we don't match more than a given number of tasks per cycle."
[]
(let [enforcing? (ratelimit/enforce? ratelimit/global-job-launch-rate-limiter)
max-tasks (ratelimit/get-token-count! ratelimit/global-job-launch-rate-limiter ratelimit/global-job-launch-rate-limiter-key)]
(if enforcing?
(reify com.netflix.fenzo.ConstraintEvaluator
(getName [_] "launch_max_tasks")
(evaluate [_ _ _ task-tracker-state]
(let [num-assigned (-> task-tracker-state .getAllCurrentlyAssignedTasks .size)]
(com.netflix.fenzo.ConstraintEvaluator$Result.
(< num-assigned max-tasks)
(str "Hit the global rate limit")))))
nil)))

(def job-constraint-constructors [build-novel-host-constraint build-gpu-host-constraint build-user-defined-constraint build-estimated-completion-constraint build-data-locality-constraint])

(defn fenzoize-job-constraint
@@ -260,10 +276,12 @@
(defn make-fenzo-job-constraints
"Returns a sequence of all the constraints for 'job', in Fenzo-compatible format."
[job]
(->> job-constraint-constructors
(map (fn [constructor] (constructor job)))
(remove nil?)
(map fenzoize-job-constraint)))
(let [launch-max-tasks-constraint (build-launch-max-tasks-constraint)]
(cond-> (->> job-constraint-constructors
(map (fn [constructor] (constructor job)))
(remove nil?)
(map fenzoize-job-constraint))
launch-max-tasks-constraint (conj (build-launch-max-tasks-constraint)))))

(defn build-rebalancer-reservation-constraint
"Constructs a rebalancer-reservation-constraint"
@@ -707,6 +707,7 @@
matches)
(throw e))))
(log/info "Launching" (count task-txns) "tasks")
(ratelimit/spend! ratelimit/global-job-launch-rate-limiter ratelimit/global-job-launch-rate-limiter-key (count task-txns))
(log/debug "Matched tasks" task-txns)
;; This launch-tasks MUST happen after the above transaction in
;; order to allow a transaction failure (due to failed preconditions)
@@ -55,3 +55,20 @@

(mount/defstate job-launch-rate-limiter
:start (create-job-launch-rate-limiter config))

(defn create-global-job-launch-rate-limiter
"From the configuration map, extract the keys that setup the job-launch rate limiter and return
the constructed object. If the configuration map is not found, the AllowAllRateLimiter is returned."
[config]
(let [{:keys [settings]} config
{:keys [rate-limit]} settings
{:keys [expire-minutes global-job-launch]} rate-limit]
(if (seq global-job-launch)
(let [{:keys [bucket-size enforce? tokens-replenished-per-minute]} global-job-launch]
(rtg/make-token-bucket-filter bucket-size tokens-replenished-per-minute expire-minutes enforce?))
AllowAllRateLimiter)))

(mount/defstate global-job-launch-rate-limiter
:start (create-global-job-launch-rate-limiter config))

(def global-job-launch-rate-limiter-key "*DEF*")
@@ -54,6 +54,7 @@


(deftest test-gpu-constraint
(cook.test.testutil/setup)
(let [framework-id #mesomatic.types.FrameworkID{:value "my-framework-id"}
gpu-offer #mesomatic.types.Offer{:id #mesomatic.types.OfferID {:value "my-offer-id"}
:framework-id framework-id
@@ -92,6 +92,7 @@


(deftest test-record-placement-failures
(cook.test.testutil/setup)
(let [uri "datomic:mem://test-record-placement-failures"
conn (restore-fresh-database! uri)
job-id (create-dummy-job conn :under-investigation true)
Oops, something went wrong.

0 comments on commit 44fc9b3

Please sign in to comment.