Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Cook hooks implementation and demo plugin. (#1064)
Browse files Browse the repository at this point in the history
  • Loading branch information
scrosby authored and dposada committed Feb 4, 2019
1 parent 8d720af commit d09c449
Show file tree
Hide file tree
Showing 23 changed files with 943 additions and 31 deletions.
1 change: 1 addition & 0 deletions integration/tests/cook/reasons.py
Expand Up @@ -11,3 +11,4 @@
JOB_WOULD_EXCEED_QUOTA = 'The job would cause you to exceed resource quotas.'
JOB_IS_RUNNING_NOW = 'The job is running now.'
JOB_LAUNCH_RATE_LIMIT = 'You are currently rate limited on how many jobs you launch per minute.'
PLUGIN_IS_BLOCKING = 'The launch filter plugin is blocking the job launch.'
48 changes: 48 additions & 0 deletions integration/tests/cook/test_basic.py
Expand Up @@ -2558,6 +2558,54 @@ def test_pools_in_default_limit_response(self):
resp = util.get_limit(self.cook_url, limit, user)
self.assertFalse('pools' in resp.json())

def test_submit_plugin(self):
if not util.demo_plugin_is_configured(self.cook_url):
self.skipTest("Requires demo plugin to be configured")
job_uuids = []
try:
# Should succeed, demo-plugin accepts jobs by default.
job_uuid1, resp = util.submit_job(self.cook_url)
job_uuids.append(job_uuid1)
self.assertEqual(resp.status_code, 201, msg=resp.content)
self.assertEqual(resp.content, str.encode(f"submitted jobs {job_uuid1}"))
job = util.wait_for_job_in_statuses(self.cook_url, job_uuid1, ['completed', 'running'])

# This should now fail to submit (demo plugin looks at job name)
job_uuid2, resp = util.submit_job(self.cook_url, name='plugin_test.submit_fail')
job_uuids.append(job_uuid2)
self.assertEqual(resp.status_code, 400, msg=resp.content)
self.assertTrue(b"Message1- Fail to submit" in resp.content, msg=resp.content)
finally:
util.kill_jobs(self.cook_url, [job_uuids], assert_response=False)

def test_launch_plugin(self):
if not util.demo_plugin_is_configured(self.cook_url):
self.skipTest("Requires demo plugin to be configured")
job_uuid = None
try:
# Tell demo plugin server to defer launching jobs (special logic matches based on job name)
job_uuid, resp = util.submit_job(self.cook_url, name='plugin_test.launch_defer')
self.assertEqual(resp.status_code, 201, msg=resp.content)
self.assertEqual(resp.content, str.encode(f"submitted jobs {job_uuid}"))

# Validate job is still waiting and unscheduled.
def query_unscheduled():
resp = util.unscheduled_jobs(self.cook_url, job_uuid)[0][0]
self.logger.info(f"unscheduled_jobs response: {resp}")
return any([r['reason'] == reasons.PLUGIN_IS_BLOCKING
for r in resp['reasons']])

util.wait_until(query_unscheduled, lambda r: r, 30000)
job = util.load_job(self.cook_url, job_uuid)
details = f"Job details: {json.dumps(job, sort_keys=True)}"
self.assertEquals(job['status'], 'waiting', details)

# Wait a bit and the demo plugin will mark it as launchable.
# So, see if it is now running or completed.
job = util.wait_for_job_in_statuses(self.cook_url, job_uuid, ['completed', 'running'])
finally:
util.kill_jobs(self.cook_url, [job_uuid], assert_response=False)


@unittest.skipIf(os.getenv('COOK_TEST_SKIP_RECONCILE') is not None,
'Requires not setting the COOK_TEST_SKIP_RECONCILE environment variable')
Expand Down
25 changes: 20 additions & 5 deletions integration/tests/cook/util.py
Expand Up @@ -664,23 +664,29 @@ def group_detail_query(cook_url, group_uuid, assert_response=True):

def wait_for_job(cook_url, job_id, status, max_wait_ms=DEFAULT_TIMEOUT_MS):
"""Wait for the given job's status to change to the specified value."""
return wait_for_jobs(cook_url, [job_id], status, max_wait_ms)[0]

return wait_for_jobs_in_statuses(cook_url, [job_id], [status], max_wait_ms)[0]

def wait_for_jobs(cook_url, job_ids, status, max_wait_ms=DEFAULT_TIMEOUT_MS):
return wait_for_jobs_in_statuses(cook_url, job_ids, [status], max_wait_ms)

def wait_for_job_in_statuses(cook_url, job_id, statuses, max_wait_ms=DEFAULT_TIMEOUT_MS):
"""Wait for the given job's status to change to one of the specified statuses."""
return wait_for_jobs_in_statuses(cook_url, [job_id], statuses, max_wait_ms)[0]

def wait_for_jobs_in_statuses(cook_url, job_ids, statuses, max_wait_ms=DEFAULT_TIMEOUT_MS):
"""Wait for the given job's status to change to one of the specified statuses."""
def query():
return query_jobs(cook_url, True, uuid=job_ids)

def predicate(resp):
jobs = resp.json()
for job in jobs:
logger.info(f"Job {job['uuid']} has status {job['status']}, expecting {status}.")
return all([job['status'] == status for job in jobs])
logger.info(f"Job {job['uuid']} has status {job['status']}, expecting {statuses}.")
return all([job['status'] in statuses for job in jobs])

response = wait_until(query, predicate, max_wait_ms=max_wait_ms, wait_interval_ms=DEFAULT_WAIT_INTERVAL_MS * 2)
return response.json()


def wait_for_exit_code(cook_url, job_id, max_wait_ms=DEFAULT_TIMEOUT_MS):
"""
Wait for the given job's exit_code field to appear.
Expand Down Expand Up @@ -1261,6 +1267,15 @@ def should_expect_sandbox_directory_for_job(job):
def data_local_service_is_set():
return os.getenv('DATA_LOCAL_SERVICE', None) is not None

def demo_plugin_is_configured(cook_url):
settings_dict = settings(cook_url)
# Because we always create plugin configuration in config.clj, the first keys always exist.
# The actual factory-fn keys are not set unless the user specifies them.
if settings_dict['plugins']['job-submission-validator'].get('factory-fn') != "cook.demo-plugin/submission-factory":
return False
if settings_dict['plugins']['job-launch-filter'].get('factory-fn') != "cook.demo-plugin/launch-factory":
return False
return True

@functools.lru_cache()
def _fenzo_fitness_calculator():
Expand Down
6 changes: 6 additions & 0 deletions integration/travis/scheduler_travis_config.edn
Expand Up @@ -44,6 +44,12 @@
:enforce? true
:tokens-replenished-per-minute 600}
:user-limit-per-m 1000000}
:plugins {:job-submission-validator {:batch-timeout-seconds 40
:factory-fn "cook.demo-plugin/submission-factory"}
:job-launch-filter {:age-out-last-seen-deadline-minutes 10
:age-out-first-seen-deadline-minutes 600
:age-out-seen-count 10
:factory-fn "cook.demo-plugin/launch-factory"}}
:rebalancer {:dru-scale 1
:interval-seconds 30
:max-preemption 500.0
Expand Down
7 changes: 7 additions & 0 deletions scheduler/bin/run-docker.sh
Expand Up @@ -47,11 +47,13 @@ esac
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
NAME=cook-scheduler-${COOK_PORT}

echo "About to: Clean up existing image"
if [ "$(docker ps -aq -f name=${NAME})" ]; then
# Cleanup
docker stop ${NAME}
fi

echo "About to: Check minimesos information"
$(${DIR}/../../travis/minimesos info | grep MINIMESOS)
EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 0 ]
Expand Down Expand Up @@ -81,6 +83,7 @@ case "$COOK_EXECUTOR" in
exit 1
esac

echo "About to: Setup and check docker networking"
if [ -z "$(docker network ls -q -f name=cook_nw)" ];
then
# Using a separate network allows us to access hosts by name (cook-scheduler-12321)
Expand All @@ -101,6 +104,7 @@ else
COOK_ZOOKEEPER_LOCAL=true
fi

echo "About to: Setup data local service"
DATA_LOCAL_IP=$(docker inspect data-local | jq -r '.[].NetworkSettings.IPAddress // empty')
if [[ -z "${DATA_LOCAL_IP}" ]];
then
Expand Down Expand Up @@ -158,8 +162,11 @@ docker create \
-v ${DIR}/../log:/opt/cook/log \
cook-scheduler:latest ${COOK_CONFIG:-}

echo "About to: Connect cook networking"
docker network connect bridge ${NAME}
docker network connect cook_nw ${NAME}

echo "About to: 'docker start ${NAME}'"
docker start ${NAME}

echo "Attaching to container..."
Expand Down
6 changes: 6 additions & 0 deletions scheduler/config.edn
Expand Up @@ -24,6 +24,12 @@
:executable false
:extract true
:value #config/env "COOK_EXECUTOR"}}
:plugins {:job-submission-validator {:batch-timeout-seconds 40
:factory-fn "cook.demo-plugin/submission-factory"}
:job-launch-filter {:age-out-last-seen-deadline-minutes 10
:age-out-first-seen-deadline-minutes 10000
:age-out-seen-count 10
:factory-fn "cook.demo-plugin/launch-factory"}}
:hostname #config/env "COOK_HOSTNAME"
:log {:file #config/env "COOK_LOG_FILE"
:levels {"datomic.db" :warn
Expand Down
11 changes: 11 additions & 0 deletions scheduler/docs/configuration.adoc
Expand Up @@ -77,6 +77,17 @@ If need it to bind to another port, you can specify that with the `:local-port`
`:admins`::
The value of this key is a set of usernames who should be considered administrators when using the `configfile-admins-auth` authorization-fn.

`:plugins`:: Cook has two extension points that let plugins reject jobs at job submission time as well as accept or defer the launching of jobs via `:job-launch-filter` and `:job-submission-validator`.

* `:job-launch-filter` configures the entrypoint for filtering job launches. It has several keys:
** `:factory-fn` contains a string with a namespace-qualified path of a function for creating either a `JobLaunchFilter`. The factory function can fetch the current configuration out of the config defstate in `cook.config/config`
** `:age-out-first-seen-deadline-minutes` controls how we get rid of jobs whose launch is perpetually deferred by a plugin. When a job 'ages out', we force it to launch now, to keep the queue from being cluttered with always deferred jobs. The clock for aging out starts when a job is near enough to the front of the queue to be eligible to run, not when it is added to the queue. A job is eligible for aging out when it was first seen in the scheudler queue at least this long ago. If you want jobs to sit longer in the queue than the default 10 hours before being aged out, increase this number.
** `:age-out-last-seen-deadline-minutes` A job is eligible for aging out when it has been seen in the launch queue at least this recently.
** `:age-out-seen-count` We must have attempted to schedule the job at least this many times before we age it out.
* `:job-submission-validator` configures the entrypoint for filtering job submissions. It has several keys:
** `:factory-fn` contains a string with a namespace-qualified path of a function for creating a `JobSubmissionValidator`. The factory function can fetch the current configuration out of the config `defstate` in `cook.config/config`
** `:batch-timeout-seconds`: This includes a critical timeout value. We check job launches synchronously, so the plugin has to respond fast. In particular, we must complete all of the launch checks before the HTTP timeout. To do this we implement another timeout, defaulting to 40 seconds. Once we cross this soft timeout, a default accept/reject policy is implemented for submitted jobs, one that cannot look at jobs individually.

`:rate-limit`::
Configure rate limits for the scheduler. `rate-limit` is a map with three possible keys: `:user-limit-per-m` , `:job-submission`, `:expire-minutes`. These keys control rate limits.

Expand Down
55 changes: 51 additions & 4 deletions scheduler/src/cook/cache.clj
@@ -1,8 +1,41 @@
;;
;; Copyright (c) Two Sigma Open Source, LLC
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
;;
(ns cook.cache
(:require [clj-time.core :as t])
(:import (com.google.common.cache Cache)))


(defn get-if-present
"Generic cache. Caches under a key (extracted from the item with extract-key-fn. Uses miss-fn to fill
any misses. Caches only positive hits where both functions return non-nil"
[^Cache cache extract-key-fn item]
(if-let [key (extract-key-fn item)]
(.getIfPresent cache key)))

(defn put-cache!
"Generic cache. Caches under a key (extracted from the item with extract-key-fn. Uses miss-fn to fill
any misses. Caches only positive hits where both functions return non-nil"
[^Cache cache extract-key-fn item new-result]
(if-let [key (extract-key-fn item)]
(do
; Only cache non-nil
(when new-result
(.put cache key new-result))
new-result)))

(defn lookup-cache!
"Generic cache. Caches under a key (extracted from the item with extract-key-fn. Uses miss-fn to fill
any misses. Caches only positive hits where both functions return non-nil"
Expand All @@ -17,18 +50,32 @@
new-result))
(miss-fn item)))

(defn- expire-key-helper
"Helper function for expiring a key explicitly."
[cache key]
(when-let [result (.getIfPresent cache key)]
(let [{:keys [cache-expires-at]} result]
(when (and cache-expires-at (t/after? (t/now) cache-expires-at)) (.invalidate cache key)))))

(defn expire-key!
"Generic cache. Caches under a key (extracted from the item with extract-key-fn). Uses miss-fn to fill
any misses. Caches only positive hits where both functions return non-nil. Also handles expiration if the value
is a map with the key :cache-expires-at."
[^Cache cache extract-key-fn item]
(if-let [key (extract-key-fn item)]
(locking cache ; TODO: Consider lock striping based on hashcode of the key to allow concurrent loads.
; If it has a timed expiration, expire it.
(expire-key-helper cache key))))

(defn lookup-cache-with-expiration!
"Generic cache. Caches under a key (extracted from the item with extract-key-fn. Uses miss-fn to fill
any misses. Caches only positive hits where both functions return non-nil. Also handles expiration if the value
is a map with the key :cache-expires-at."
[^Cache cache extract-key-fn miss-fn item]
(if-let [key (extract-key-fn item)]
(locking cache ; TOOD: Consider lock striping based on hashcode of the key to allow concurrent loads.
(locking cache ; TODO: Consider lock striping based on hashcode of the key to allow concurrent loads.
; If it has a timed expiration, expire it.
(when-let [result (.getIfPresent cache key)]
(let [{:keys [cache-expires-at]} result]
(when (and cache-expires-at (t/after? (t/now) cache-expires-at)) (.invalidate cache key))))
(expire-key-helper cache key)
; And then fetch it.
(if-let [result (.getIfPresent cache key)]
result ; we got a hit.
Expand Down
6 changes: 5 additions & 1 deletion scheduler/src/cook/components.clj
Expand Up @@ -24,9 +24,13 @@
[cook.cors :as cors]
[cook.curator :as curator]
[cook.datomic :as datomic]
; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.submission namespace.
[cook.plugins.submission]
; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.launch namespace.
[cook.plugins.launch]
[cook.impersonation :refer (impersonation-authorized-wrapper)]
[cook.mesos.pool :as pool]
; This explicit require is needed so that mount can see the defstate defined in this namespace.
; This explicit require is needed so that mount can see the defstate defined in the cook.rate-limit namespace.
; cook.rate-limit and everything else under cook.mesos.api is normally hidden from mount's defstate because
; cook.mesos.api is loaded via util/lazy-load-var, not via 'ns :require'
[cook.rate-limit]
Expand Down
33 changes: 32 additions & 1 deletion scheduler/src/cook/config.clj
Expand Up @@ -406,7 +406,18 @@
:cost-endpoint (get fitness-calculator :cost-endpoint nil)
:data-locality-weight (get fitness-calculator :data-locality-weight 0.95)
:launch-wait-seconds (get fitness-calculator :launch-wait-seconds 60)
:update-interval-ms (get fitness-calculator :update-interval-ms nil)}))}))
:update-interval-ms (get fitness-calculator :update-interval-ms nil)}))
:plugins (fnk [[:config {plugins {}}]]
(let [{:keys [job-launch-filter job-submission-validator]} plugins]
{:job-launch-filter
(merge
{:age-out-last-seen-deadline-minutes 10
:age-out-first-seen-deadline-minutes 600
:age-out-seen-count 10}
job-launch-filter)
:job-submission-validator
(merge {:batch-timeout-seconds 40}
job-submission-validator)}))}))

(defn read-config
"Given a config file path, reads the config and returns the map"
Expand Down Expand Up @@ -484,3 +495,23 @@
"Returns the fitness calculator specified by fitness-calculator, or the default if nil"
[fitness-calculator]
(config-string->fitness-calculator (or fitness-calculator default-fitness-calculator)))

(defn batch-timeout-seconds-config
"Used by job submission plugin."
[]
(-> config :settings :plugins :job-submission-validator :batch-timeout-seconds t/seconds))

(defn age-out-last-seen-deadline-minutes-config
"Used by job launch plugin."
[]
(-> config :settings :plugins :job-launch-filter :age-out-last-seen-deadline-minutes t/minutes))

(defn age-out-first-seen-deadline-minutes-config
"Used by job launch plugin."
[]
(-> config :settings :plugins :job-launch-filter :age-out-first-seen-deadline-minutes t/minutes))

(defn age-out-seen-count-config
"Used by job launch plugin."
[]
(-> config :settings :plugins :job-launch-filter :age-out-seen-count))
57 changes: 57 additions & 0 deletions scheduler/src/cook/demo_plugin.clj
@@ -0,0 +1,57 @@
;;
;; Copyright (c) Two Sigma Open Source, LLC
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
;;
(ns cook.demo-plugin
(:require [clj-time.core :as t]
[clojure.string :as str]
[clojure.tools.logging :as log]
[cook.plugins.definitions :as chd]))

(def uuid-seen-counts (atom {}))

(defn- generate-result
[result message]
{:status result :message message :cache-expires-at (-> 1 t/seconds t/from-now)})

; Demo validation plugin, designed to match with the integration tests.
(defrecord DemoValidateSubmission []
chd/JobSubmissionValidator
(chd/check-job-submission
[this {:keys [name] :as job-map}]
(if (and name (str/starts-with? name "plugin_test.submit_fail"))
(generate-result :rejected "Message1- Fail to submit")
(generate-result :accepted "Message2"))))

(defrecord DemoFilterLaunch []
chd/JobLaunchFilter
(chd/check-job-launch
[this {:keys [:job/name :job/uuid] :as job-map}]
(let [newdict (swap! uuid-seen-counts update-in [uuid] (fnil inc 0))
seen (get newdict uuid)]
(if (and name
(str/starts-with? name "plugin_test.launch_defer")
(<= seen 3))
(generate-result :deferred "Message3")
(generate-result :accepted "Message4")))))

(defn launch-factory
"Factory method for the launch-plugin to be used in config.edn"
[]
(->DemoFilterLaunch))

(defn submission-factory
"Factory method for the submission plugin to be used in config.edn"
[]
(->DemoValidateSubmission))

0 comments on commit d09c449

Please sign in to comment.