Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Limit number of running and waiting jobs loaded by check-queue-positi…
Browse files Browse the repository at this point in the history
…on (#1042)
  • Loading branch information
pschorf authored and shamsimam committed Nov 30, 2018
1 parent 90007c0 commit cc00ed4
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 38 deletions.
6 changes: 3 additions & 3 deletions integration/tests/cook/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,9 +1620,9 @@ def test_unscheduled_jobs(self):
def check_unscheduled_reason():
jobs, _ = util.unscheduled_jobs(self.cook_url, job_uuid_1, job_uuid_2)
self.logger.info(f'Unscheduled jobs: {jobs}')
pattern = re.compile('^You have [0-9]+ other jobs ahead in the queue.$')
self.assertTrue(any([pattern.match(reason['reason']) for reason in jobs[0]['reasons']]))
self.assertTrue(any([pattern.match(reason['reason']) for reason in jobs[1]['reasons']]))
pattern = re.compile('^You have (at least )?[0-9]+ other jobs ahead in the queue.$')
self.assertTrue(any([pattern.match(reason['reason']) for reason in jobs[0]['reasons']]), jobs[0]['reasons'])
self.assertTrue(any([pattern.match(reason['reason']) for reason in jobs[1]['reasons']]), jobs[1]['reasons'])
self.assertEqual(job_uuid_1, jobs[0]['uuid'])
self.assertEqual(job_uuid_2, jobs[1]['uuid'])

Expand Down
83 changes: 49 additions & 34 deletions scheduler/src/cook/mesos/unscheduled.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
;; limitations under the License.
;;
(ns cook.mesos.unscheduled
(:require [datomic.api :as d :refer (q)]
(:require [clj-time.coerce :as tc]
[clj-time.core :as t]
[cook.mesos.scheduler :as scheduler]
[cook.mesos.quota :as quota]
[cook.mesos.share :as share]
[cook.mesos.util :as util]
[cook.rate-limit :as ratelimit]
[clojure.edn :as edn]))
[clojure.edn :as edn]
[datomic.api :as d :refer (q)]))

(defn check-exhausted-retries
[db job]
Expand Down Expand Up @@ -54,13 +56,13 @@
This function can be used for different types of limts (quota or share);
the function to read the user's limit as well as the error message on
exceeding the limit are parameters."
[read-limit-fn err-msg db job]
[read-limit-fn err-msg db job running-jobs]
(when (= (:job/state job) :job.state/waiting)
(let [user (:job/user job)
pool-name (-> job :job/pool :pool/name)
ways (how-job-would-exceed-resource-limits
(read-limit-fn db user pool-name)
(util/jobs-by-user-and-state db user :job.state/running pool-name)
running-jobs
job)]
(when (seq ways)
[err-msg ways]))))
Expand Down Expand Up @@ -104,35 +106,44 @@
fenzo-failures-for-user)}]
["The job is now under investigation. Check back in a minute for more details!" {}]))

(defn- get-jobs-by-user-and-state
"Returns the first `limit` jobs for the given `user` in `state` in `pool-name`,
submitted within `days-to-look-back`"
[db user state limit pool-name days-to-look-back]
(let [end (tc/to-date (t/now))
start (tc/to-date (t/minus (t/now) (t/days days-to-look-back)))]
(util/get-jobs-by-user-and-states db user [state] start end
limit (constantly true)
true pool-name)))

(defn check-queue-position
"IFF the job is not first in the user's queue, returns
[\"You have x other jobs ahead in the queue\", {:jobs [other job uuids]]}]"
[conn job]
[conn job running-jobs waiting-jobs]
(let [db (d/db conn)
user (:job/user job)
job-uuid (:job/uuid job)
pool-name (-> job :job/pool :pool/name)
running-tasks (map
(fn [j] (->> j
:job/instance
(filter util/instance-running?)
last))
(util/jobs-by-user-and-state db user :job.state/running pool-name))
pending-tasks (->> (util/jobs-by-user-and-state db user :job.state/waiting pool-name)
(filter (fn [job] (-> job :job/commit-latch :commit-latch/committed?)))
running-tasks (->> running-jobs
(map (fn [j] (->> j
:job/instance
(filter util/instance-running?)
last))))
pending-tasks (->> waiting-jobs
(map util/create-task-ent))
all-tasks (into running-tasks pending-tasks)
sorted-tasks (vec (sort (util/same-user-task-comparator) all-tasks))
queue-pos (first
(keep-indexed
(fn [i instance]
(when (= (-> instance :job/_instance :job/uuid) job-uuid) i))
sorted-tasks))
tasks-ahead (subvec sorted-tasks 0 queue-pos)]
queue-pos (or (first
(keep-indexed
(fn [i instance]
(when (= (-> instance :job/_instance :job/uuid) job-uuid) i))
sorted-tasks))
(count all-tasks))
tasks-ahead (subvec sorted-tasks 0 (min queue-pos 10))
message (if (= (count all-tasks) queue-pos)
(str "You have at least " queue-pos " other jobs ahead in the queue.")
(str "You have " queue-pos " other jobs ahead in the queue."))]
(when (seq tasks-ahead)
[(str "You have " queue-pos " other jobs ahead in the queue.")
[message
{:jobs (->> tasks-ahead
(take 10)
(mapv #(-> % :job/_instance :job/uuid str)))}])))

(defn- check-launch-rate-limit
Expand Down Expand Up @@ -160,14 +171,18 @@
(case (:job/state job)
:job.state/running [["The job is running now." {}]]
:job.state/completed [["The job already completed." {}]]
(filter some?
[(check-exhausted-retries db job)
(check-exceeds-limit quota/get-quota
"The job would cause you to exceed resource quotas."
db job)
(check-exceeds-limit share/get-share
"The job would cause you to exceed resource shares."
db job)
(check-launch-rate-limit job)
(check-queue-position conn job)
(check-fenzo-placement conn job)]))))
(let [user (:job/user job)
pool-name (-> job :job/pool :pool/name)
running-jobs (get-jobs-by-user-and-state db user "running" 1000000 pool-name 30)
waiting-jobs (get-jobs-by-user-and-state db user "waiting" 100 pool-name 7)]
(filter some?
[(check-exhausted-retries db job)
(check-exceeds-limit quota/get-quota
"The job would cause you to exceed resource quotas."
db job running-jobs)
(check-exceeds-limit share/get-share
"The job would cause you to exceed resource shares."
db job running-jobs)
(check-launch-rate-limit job)
(check-queue-position conn job running-jobs waiting-jobs)
(check-fenzo-placement conn job)])))))
35 changes: 34 additions & 1 deletion scheduler/test/cook/test/mesos/unscheduled.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
[cook.rate-limit :as rate-limit]
[cook.test.testutil :refer (create-dummy-instance
create-dummy-job
restore-fresh-database!)]
restore-fresh-database!
setup)]
[cook.mesos.quota :as quota]))

(defn resource-map->list
Expand Down Expand Up @@ -72,6 +73,7 @@
{:reason "Job already ran on this host." :host_count 3}]}]))))

(deftest test-reasons
(setup)
(let [conn (restore-fresh-database! datomic-uri)
_ (quota/set-quota! conn "mforsyth" nil "test-reasons" :count 2)
running-job-id1 (-> (create-dummy-job conn :user "mforsyth"
Expand Down Expand Up @@ -197,3 +199,34 @@
(is (= (nth reasons 3)
["The job is now under investigation. Check back in a minute for more details!"
{}]))))))


(deftest test-check-queue-position
(let [conn (restore-fresh-database! "datomic:mem://test-check-queue-position")
waiting-job-ids (doall (for [x (range 0 500)]
(create-dummy-job conn :user "mforsyth"
:ncpus 1.0 :memory 3.0
:job-state :job.state/waiting)))
waiting-jobs (map #(d/entity (d/db conn) %) waiting-job-ids)
running-job-ids (doall (for [x (range 0 50)]
(let [job-id (create-dummy-job conn :user "mforsyth"
:ncpus 1.0 :memory 3.0
:job-state :job.state/running)]
(create-dummy-instance conn job-id
:instance-status :instance.status/running)
job-id)))
running-jobs (map #(d/entity (d/db conn) %) running-job-ids)
first-running-uuid (str (:job/uuid (first running-jobs)))]

(let [reason (u/check-queue-position conn (second running-jobs) running-jobs (take 100 waiting-jobs))]
(is (= "You have 1 other jobs ahead in the queue."))
(is (= [first-running-uuid] (-> reason second :jobs))))
(let [reason (u/check-queue-position conn (first waiting-jobs) running-jobs (take 100 waiting-jobs))]
(is (= (str "You have " (count running-jobs) " other jobs ahead in the queue.")
(first reason)))
(is (= first-running-uuid (-> reason second :jobs first))))

(let [reason (u/check-queue-position conn (last waiting-jobs) running-jobs (take 100 waiting-jobs))]
(is (= "You have at least 150 other jobs ahead in the queue."
(first reason)))
(is (= first-running-uuid (-> reason second :jobs first))))))

0 comments on commit cc00ed4

Please sign in to comment.