From d703bf88bc259174e7876a2293ac4afe80373594 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Thu, 29 Oct 2015 16:28:24 -0700 Subject: [PATCH 01/27] Initial fenzo work (untested) --- scheduler/project.clj | 1 + scheduler/src/cook/mesos.clj | 8 +- scheduler/src/cook/mesos/rebalancer.clj | 3 +- scheduler/src/cook/mesos/scheduler.clj | 532 +++++++++--------------- 4 files changed, 205 insertions(+), 339 deletions(-) diff --git a/scheduler/project.clj b/scheduler/project.clj index f2d0dc38d5..35bbc52bba 100644 --- a/scheduler/project.clj +++ b/scheduler/project.clj @@ -42,6 +42,7 @@ [org.clojure/data.priority-map "0.0.5"] [swiss-arrows "1.0.0"] [riddley "0.1.10"] + [com.netflix.fenzo/fenzo-core "0.7.10"] ;;Logging [org.clojure/tools.logging "0.2.6"] diff --git a/scheduler/src/cook/mesos.clj b/scheduler/src/cook/mesos.clj index b2b91593e6..3a1657209b 100644 --- a/scheduler/src/cook/mesos.clj +++ b/scheduler/src/cook/mesos.clj @@ -81,7 +81,8 @@ datomic-report-chan (async/chan (async/sliding-buffer 4096)) mesos-pending-jobs-atom (atom []) mesos-heartbeat-chan (async/chan (async/buffer 4096)) - {:keys [scheduler view-incubating-offers view-mature-offers]} + current-driver (atom nil) + {:keys [scheduler view-incubating-offers]} (sched/create-datomic-scheduler mesos-datomic-conn (fn set-or-create-framework-id [framework-id] @@ -89,13 +90,13 @@ curator-framework zk-framework-id (.getBytes framework-id "UTF-8"))) + current-driver mesos-pending-jobs-atom mesos-heartbeat-chan offer-incubate-time-ms task-constraints) framework-id (when-let [bytes (curator/get-or-nil curator-framework zk-framework-id)] (String. bytes)) - current-driver (atom nil) leader-selector (LeaderSelector. curator-framework zk-prefix @@ -136,8 +137,7 @@ :driver driver :mesos-master mesos-master :pending-jobs-atom mesos-pending-jobs-atom - :view-incubating-offers view-incubating-offers - :view-mature-offers view-mature-offers})) + :view-incubating-offers view-incubating-offers})) (counters/inc! mesos-leader) (async/tap mesos-datomic-mult datomic-report-chan) (let [kill-monitor (cook.mesos.scheduler/monitor-tx-report-queue datomic-report-chan mesos-datomic-conn current-driver)] diff --git a/scheduler/src/cook/mesos/rebalancer.clj b/scheduler/src/cook/mesos/rebalancer.clj index 6d593bb051..0d43415ff9 100644 --- a/scheduler/src/cook/mesos/rebalancer.clj +++ b/scheduler/src/cook/mesos/rebalancer.clj @@ -334,7 +334,7 @@ utilization)) (defn start-rebalancer! - [{:keys [conn driver mesos-master pending-jobs-atom view-incubating-offers view-mature-offers]}] + [{:keys [conn driver mesos-master pending-jobs-atom view-incubating-offers]}] (let [rebalance-interval (time/minutes 5) observe-interval (time/seconds 5) observe-refreshness-threshold (time/seconds 30) @@ -343,7 +343,6 @@ (fn [now] (let [host->combined-offers (-<>> (view-incubating-offers) - (sched/combine-offers) (map (fn [v] [(:hostname v) (assoc v :time-observed now)])) (into {}))] diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 3c5bfa08d1..999c2abad9 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -36,7 +36,9 @@ [cook.mesos.heartbeat :as heartbeat] [cook.mesos.share :as share] [swiss.arrows :refer :all]) - (import java.util.concurrent.TimeUnit)) + (import java.util.concurrent.TimeUnit + com.netflix.fenzo.TaskScheduler + com.netflix.fenzo.VirtualMachineLease)) (defn now [] @@ -164,77 +166,80 @@ (defn handle-status-update "Takes a status update from mesos." - [conn driver status] - (future - (log/info "Mesos status is: " status) - (timers/time! - handle-status-update-duration - (try (let [db (db conn) - {:keys [task-id] task-state :state} status - [job instance prior-instance-status] (first (q '[:find ?j ?i ?status - :in $ ?task-id - :where - [?i :instance/task-id ?task-id] - [?i :instance/status ?s] - [?s :db/ident ?status] - [?j :job/instance ?i]] - db task-id)) - job-ent (d/entity db job) - retries-so-far (count (:job/instance job-ent)) - instance-status (condp contains? task-state - #{:task-staging} :instance.status/unknown - #{:task-starting - :task-running} :instance.status/running - #{:task-finished} :instance.status/success - #{:task-failed - :task-killed - :task-lost} :instance.status/failed) - prior-job-state (:job/state (d/entity db job)) - progress (when (:data status) - (:percent (read-string (String. (:data status))))) - instance-runtime (- (.getTime (now)) ; Used for reporting - (.getTime (:instance/start-time (d/entity db instance)))) - job-resources (util/job-ent->resources job-ent)] - (when (= instance-status :instance.status/success) - (handle-throughput-metrics success-throughput-metrics - job-resources - instance-runtime) - (handle-throughput-metrics complete-throughput-metrics - job-resources - instance-runtime)) - (when (= instance-status :instance.status/failed) - (handle-throughput-metrics fail-throughput-metrics - job-resources - instance-runtime) - (handle-throughput-metrics complete-throughput-metrics - job-resources - instance-runtime)) - ;; This code kills any task that "shouldn't" be running - (when (and - (or (nil? instance) ; We could know nothing about the task, meaning a DB error happened and it's a waste to finish - (= prior-job-state :job.state/completed) ; The task is attached to a failed job, possibly due to instances running on multiple hosts - (= prior-instance-status :instance.status/failed)) ; The kill-task message could've been glitched on the network - (contains? #{:task-running - :task-staging - :task-starting} - task-state)) ; killing an unknown task causes a TASK_LOST message. Break the cycle! Only kill non-terminal tasks - (log/warn "Attempting to kill task" task-id - "as instance" instance "with" prior-job-state "and" prior-instance-status - "should've been put down already") - (meters/mark! tasks-killed-in-status-update) - (mesos/kill-task driver task-id)) - (when-not (nil? instance) - ;; (println "update:" task-id task-state job instance instance-status prior-job-state) - (transact-with-retries conn - (concat - [[:instance/update-state instance instance-status]] - (when (#{:instance.status/success - :instance.status/failed} instance-status) - [[:db/add instance :instance/end-time (now)]]) - (when progress - [[:db/add instance :instance/progress progress]]))))) - (catch Exception e - (log/error e "Mesos scheduler status update error")))))) + [conn driver ^TaskScheduler fenzo status] + (log/info "Mesos status is: " status) + (timers/time! + handle-status-update-duration + (try (let [db (db conn) + {:keys [task-id] task-state :state} status + [job instance prior-instance-status] (first (q '[:find ?j ?i ?status + :in $ ?task-id + :where + [?i :instance/task-id ?task-id] + [?i :instance/status ?s] + [?s :db/ident ?status] + [?j :job/instance ?i]] + db task-id)) + job-ent (d/entity db job) + retries-so-far (count (:job/instance job-ent)) + instance-status (condp contains? task-state + #{:task-staging} :instance.status/unknown + #{:task-starting + :task-running} :instance.status/running + #{:task-finished} :instance.status/success + #{:task-failed + :task-killed + :task-lost} :instance.status/failed) + prior-job-state (:job/state (d/entity db job)) + progress (when (:data status) + (:percent (read-string (String. (:data status))))) + instance-runtime (- (.getTime (now)) ; Used for reporting + (.getTime (:instance/start-time (d/entity db instance)))) + job-resources (util/job-ent->resources job-ent)] + (when (#{:instance.status/success :instance.status/failed} instance-status) + (.. fenzo + (getTaskUnAssigner) + (call task-id (:instance/hostname instance)))) ;;TODO is this hostname already in memory? + (when (= instance-status :instance.status/success) + (handle-throughput-metrics success-throughput-metrics + job-resources + instance-runtime) + (handle-throughput-metrics complete-throughput-metrics + job-resources + instance-runtime)) + (when (= instance-status :instance.status/failed) + (handle-throughput-metrics fail-throughput-metrics + job-resources + instance-runtime) + (handle-throughput-metrics complete-throughput-metrics + job-resources + instance-runtime)) + ;; This code kills any task that "shouldn't" be running + (when (and + (or (nil? instance) ; We could know nothing about the task, meaning a DB error happened and it's a waste to finish + (= prior-job-state :job.state/completed) ; The task is attached to a failed job, possibly due to instances running on multiple hosts + (= prior-instance-status :instance.status/failed)) ; The kill-task message could've been glitched on the network + (contains? #{:task-running + :task-staging + :task-starting} + task-state)) ; killing an unknown task causes a TASK_LOST message. Break the cycle! Only kill non-terminal tasks + (log/warn "Attempting to kill task" task-id + "as instance" instance "with" prior-job-state "and" prior-instance-status + "should've been put down already") + (meters/mark! tasks-killed-in-status-update) + (mesos/kill-task driver task-id)) + (when-not (nil? instance) + ;; (println "update:" task-id task-state job instance instance-status prior-job-state) + (transact-with-retries conn + (concat + [[:instance/update-state instance instance-status]] + (when (#{:instance.status/success + :instance.status/failed} instance-status) + [[:db/add instance :instance/end-time (now)]]) + (when progress + [[:db/add instance :instance/progress progress]]))))) + (catch Exception e + (log/error e "Mesos scheduler status update error"))))) (timers/deftimer [cook-mesos scheduler tx-report-queue-processing-duration]) (meters/defmeter [cook-mesos scheduler tx-report-queue-datoms]) @@ -304,48 +309,31 @@ ;;; API for matcher ;;; =========================================================================== -(def combine-offer-key - "Function to generate the key used to group offers." - (juxt :hostname :slave-id :driver :fid)) - -(meters/defmeter [cook-mesos scheduler offers-combine]) - -(defn combine-offers - "Takes a sequence of offers and returns a new sequence of offers such that all offers with the - same slave-id, hostname, driver, fid are combined into a single offer. Offers will have the following fields - set: `:resources` (same as a single offer, but excluding ports), `:hostname`, `:slave-id`, - `:ids` (a list of the all the composite offer ids), ':driver', ':fid', ':time-receieved'. - The combine offer will use the earliest time-received of the offers." - [offers] - (for [[[hostname slave-id driver fid] offers] (group-by combine-offer-key offers) - :let [ids (mapcat #(or (:ids %) (seq [(:id %)])) offers) - cpu (apply + (map (comp :cpus :resources) offers)) - mem (apply + (map (comp :mem :resources) offers)) - ports (vec (apply concat (map (comp :ports :resources) offers))) - time-received (->> offers - (sort-by :time-received) - first - :time-received)]] - (do - (when (> (count offers) 1) - (meters/mark! offers-combine) - (log/debug "Combining offers " offers)) - {:ids ids - :resources {:cpus cpu - :mem mem - :ports ports} - :hostname hostname - :slave-id slave-id - :driver driver - :fid fid - :time-received time-received}))) - -(defn prefixes - "Returns a seq of the prefixes of a seq" - [xs] - (if (seq xs) - (reductions conj [(first xs)] (rest xs)) - [])) +(defrecord VirtualMachineLeaseAdapter [offer time] + com.netflix.fenzo.VirtualMachineLease + (cpuCores [_] (get-in offer [:resources :cpus])) + (diskMB [_] (:disk offer)) + (getAttributeMap [_] nil) ;;TODO + (getId [_] (:id offer)) + (getOffer [_] (throw (UnsupportedOperationException.))) + (getOfferedTime [_] time) + (getVMID [_] (:slave-id offer)) + (hostname [_] (:hostname offer)) + (memoryMB [_] (get-in offer [:resources :mem])) + (networkMbps [_] (throw (UnsupportedOperationException.))) + (portRanges [_] (throw (UnsupportedOperationException.)))) + +(defrecord TaskRequestAdapter [job task-info] + com.netflix.fenzo.TaskRequest + (getCPUs [_] (get-in task-info [:resources :cpus])) + (getDisk [_] 0.0) ;;TODO + (getHardConstraints [_] []) ;;TODO + (getId [_] (:name task-info)) + (getMemory [_] (get-in task-info [:resources :mem])) + (getNetworkMbps [_] (throw (UnsupportedOperationException.))) + (getPorts [_] 0) + (getSoftConstraints [_] []) + (taskGroupName [_] (str (:job/uuid job)))) (defn match-offer-to-schedule "Given an offer and a schedule, computes all the tasks should be launched as a result. @@ -354,14 +342,18 @@ the offer. Returns a list of tasks that got matched to the offer" - [schedule {{:keys [cpus mem]} :resources :as offer}] - (log/debug "Matching offer " offer " to the schedule" schedule) - (loop [[candidates & remaining-prefixes] (prefixes schedule) - prev []] - (let [required (util/sum-resources-of-jobs candidates)] - (if (and candidates (>= cpus (:cpus required)) (>= mem (:mem required))) - (recur remaining-prefixes candidates) - prev)))) + [^TaskScheduler fenzo considerable offers db fid] + (let [t (System/currentTimeMillis) + leases (mapv #(->VirtualMachineLeaseAdapter % t) offers) + ;;TODO this requests conversion could probably benefit from a 500-1000 element cache if constructing task-info is expensive + requests (mapv #(->TaskRequestAdapter % (job->task-info db fid %)) considerable) + result (.scheduleOnce requests leases) + assignments (.. result getResultMap values)] + (mapv (fn [assignment] + {:leases (.getLeasesUsed assignment) + :tasks (.getTasksAssigned assignment) + :hostname (.getHostname assignment)}) + assignments))) (meters/defmeter [cook-mesos scheduler scheduler-offer-declined]) @@ -407,16 +399,17 @@ (gauges/defgauge [cook-mesos scheduler front-of-job-queue-mem] (fn [] @front-of-job-queue-mem-atom)) (gauges/defgauge [cook-mesos scheduler front-of-job-queue-cpus] (fn [] @front-of-job-queue-cpus-atom)) -(defn handle-resource-offer! +(defn handle-resource-offers! "Gets a list of offers from mesos. Decides what to do with them all--they should all be accepted or rejected at the end of the function." - [conn driver fid pending-jobs offer] - (histograms/update! - offer-size-cpus - (get-in offer [:resources :cpus])) - (histograms/update! - offer-size-mem - (get-in offer [:resources :mem])) + [conn driver ^TaskScheduler fenzo fid pending-jobs offers] + (doseq [offer offers] + (histograms/update! + offer-size-cpus + (get-in offer [:resources :cpus])) + (histograms/update! + offer-size-mem + (get-in offer [:resources :mem]))) (timers/time! handle-resource-offer!-duration (try @@ -429,38 +422,48 @@ true (catch clojure.lang.ExceptionInfo e false)) - ;; ensure it always tries new hosts - (not-any? #(= % (:hostname offer)) + ;; ensure it always tries new hosts TODO this is broken, and must be moved to a constraint in fenzo + #_(not-any? #(= % (:hostname offer)) (->> (:job/instance job) (filter #(false? (:instance/preempted? %))) (map :instance/hostname))))) scheduler-contents) - matches (match-offer-to-schedule considerable offer) - ;; We know that matches always form a prefix of the scheduler's contents - new-scheduler-contents (remove (set matches) scheduler-contents) + matches (match-offer-to-schedule fenzo considerable offers db fid) + matched-jobs (for [match matches + task (:tasks match)] + (:job task)) + matched-job-uuids (set (mapv :job/uuid matched-jobs)) + new-scheduler-contents (remove (fn [pending-job] + (contains? matched-job-uuids (:job/uuid pending-job))) + scheduler-contents) first-considerable-resources (-> considerable first util/job-ent->resources) - match-resource-requirements (util/sum-resources-of-jobs matches)] + match-resource-requirements (util/sum-resources-of-jobs matched-jobs)] (reset! front-of-job-queue-mem-atom (or (:mem first-considerable-resources) 0)) (reset! front-of-job-queue-cpus-atom (or (:cpus first-considerable-resources) 0)) (cond - (empty? matches) (decline-offers driver (:ids offer)) + (empty? matches) nil ;; Since Fenzo will manage declining offers, we'll do nothing here (not (compare-and-set! pending-jobs scheduler-contents new-scheduler-contents)) (do (log/info "Pending job atom contention encountered") (meters/mark! pending-job-atom-contended) (recur)) :else - (let [task-infos (map (fn [job] - (let [job-id (:db/id job)] - (assoc (job->task-info - db - fid - job-id) - :slave-id (:slave-id offer) - :job-id job-id))) - matches)] - ;TODO assign the ports to actual available ones, and configure the environment variables; look at the Fenzo API to match up with it + ;;TODO this construction of task-infos is legacy-oriented + ;;since now we treat "matches" as our primary structure, + ;;we should eventually remove this construction of task-infos + ;;as it's just a helper for building the datomic transaction + (let [task-infos (for [{:keys [tasks leases]} matches + :let [offers (mapv :offer leases) + slave-id (:slave-id (first offers))] + task tasks + :let [task-info (:task-info task) + job-id (get-in task [:jbo :db/id])]] + (assoc task-info + :offers offers + :slave-id slave-id + :job-id job-id))] + ;TODO assign the ports to actual available ones; look at the Fenzo API to match up with it ;First we need to convince mesos to actually offer ports ;then, we need to allocate the ports (incremental or bulk?) copy from my book. @@ -470,7 +473,7 @@ ;; the pending-jobs atom is repopulated @(d/transact conn - (mapcat (fn [task-info] + (mapcat (fn [{:keys [offer] :as task-info}] [[:job/allowed-to-start? (:job-id task-info)] {:db/id (d/tempid :db.part/user) :job/_instance (:job-id task-info) @@ -483,25 +486,36 @@ task-info [:executor :executor-id] (:task-id task-info)) - :instance/slave-id (:slave-id offer) + :instance/slave-id (:slave-id task-info) :instance/progress 0 :instance/status :instance.status/unknown :instance/preempted? false}]) task-infos)) - (log/info "Matched offer" offer "to tasks" task-infos) + (log/info "Matched tasks" task-infos) ;; This launch-tasks MUST happen after the above transaction in ;; order to allow a transaction failure (due to failed preconditions) ;; to block the launch - (meters/mark! scheduler-offer-matched (count (:ids offer))) + (meters/mark! scheduler-offer-matched + (->> matches + (mapcat (comp :id :offer :leases)) + (distinct) + (count))) (meters/mark! matched-tasks (count task-infos)) (meters/mark! matched-tasks-cpus (:cpus match-resource-requirements)) (meters/mark! matched-tasks-mem (:mem match-resource-requirements)) - (mesos/launch-tasks - driver - (:ids offer) - (mapv #(dissoc % :job-id :num-ports) task-infos)))))) + (doseq [{:keys [tasks leases]} matches + :let [offers (mapv :offer leases) + task-infos (mapv :task-info tasks) + slave-id (:slave-id (first offers))]] + (mesos/launch-tasks + driver + (mapv :id offers) + (mapv #(dissoc % :num-ports) task-infos)) + (doseq [task tasks] + (.. fenzo + (getTaskAssigner) + (call task (get-in (first leases) [:offer :hostname]))))))))) (catch Throwable t - (decline-offers driver (:ids offer)) (meters/mark! handle-resource-offer!-errors) (log/error t "Error in match:" (ex-data t)))))) @@ -759,177 +773,39 @@ (reset! pending-jobs-atom (rank-jobs (db conn) offensive-job-filter)))))) -;;; Offer buffer Design -;;; The offer buffer is used to store mesos resource offers made to cook until the matcher is -;;; ready to use them. -;;; -;;; Reasons for doing this: -;;; <> We can combine offers between offer sets -;;; <> Provide the matcher with the best (biggest) offer of all possible offers currently ready -;;; <> Lay the foundation for additional criteria before the offer is ready for the matcher -;;; <> Centralize outstanding offers -;;; -;;; Structure: -;;; We have three components that do the heavy lifting. -;;; 1. offer-incubator: Accepts new offers from mesos and allows them to 'incubate', -;;; meaning we wait some amount of time before we consider the -;;; offer ready for the matcher. We delay the offers in the hopes -;;; of combining them with other offers from the same host. When offers -;;; are done incubating, we attempt to combine with other offers and -;;; pass the combined offers to the prioritizer. -;;; 2. offer-prioritizer: Accepts offers from the incubator that are ready and sorts them by size. -;;; Ensures the largest offer is passed to the offer-matcher when requested. -;;; 3. offer-matcher: Pulls offers from the offer-prioritizer and calls `handle-resource-offer!` -;;; -;;; Interesting design points: -;;; <> The incubator puts a timer on each offer set (call `incubate!`) and once the timer is up, -;;; it puts the ids onto a channel to notify the incubator to attempt to combine offers and -;;; pass along to the prioritizer. -;;; <> Once an offer has incubated, we take it, plus any offers that can combine with it out of -;;; the buffer and put it onto the `offer-ready-chan`. This means we can be in a position where -;;; a later timer fires and some of the offers are no longer in the buffer. We handle this by -;;; using `select-keys` as a set intersection between offers we are considering and offers that -;;; still exist in the buffer. -;;; <> To avoid race conditions, the design is fully linearized using `async/alt!`. The sole exception -;;; is the incubate timers, however the firing of the timer is used only to notify that incubating -;;; is complete. Since the work of moving the incubated offers to the next stage of the pipeline is -;;; handled linearly through the `async/alt!` block, there is no concern of a race with the timers -;;; as well. - -(timers/deftimer [cook-mesos scheduler incubator-offer-received-duration]) -(timers/deftimer [cook-mesos scheduler incubator-offer-incubated-duration]) -(histograms/defhistogram [cook-mesos scheduler scheduler-offers-received]) -(counters/defcounter [cook-mesos scheduler incubating-size]) - -(defn start-offer-incubator! - "Starts a thread to accept offers from mesos and 'incubate' them. - Offers that are ready are passed to the offer-ready-chan." - [offer-chan offer-ready-chan incubate-ms] - (let [ready-chan (async/chan) - view-offers-atom (atom []) - view-offers-fn (fn [] @view-offers-atom) - incubate! (fn incubate! [ids] - (async/go - (when ids - (async/! ready-chan ids))))] - (async/go - (try - (loop [incubating {}] ; key is offer-id, value is offer - (reset! view-offers-atom - (->> incubating - (vals))) - (recur - (async/alt! - offer-chan ([[offers driver fid]] ; New offer - (timers/start-stop-time! ; Use this in go blocks, time! doesn't play nice - incubator-offer-received-duration - (let [annotated-offers (map (fn [offer] - (assoc offer - :time-received (time/now) - :driver driver - :fid fid)) - offers) - ids (map :id offers)] - (incubate! ids) - (histograms/update! - scheduler-offers-received - (count ids)) - (log/debug "Got " (count annotated-offers) " offers") - (log/debug "Size of incubating: " (+ (count annotated-offers) (count incubating))) - (counters/inc! incubating-size (count annotated-offers)) - (apply assoc incubating (interleave ids annotated-offers))))) - ready-chan ([offer-ids] ; Offer finished incubating - ; Can have < (count offer-ids) elements if they were combine previously - (timers/start-stop-time! - incubator-offer-incubated-duration - (let [matured-offers (vals (select-keys incubating offer-ids)) - combine-offer-groups (group-by combine-offer-key (vals incubating)) - matured-combine-offers (map #(-> % - combine-offer-key - combine-offer-groups - combine-offers - first) - matured-offers) - ids-to-remove (mapcat :ids matured-combine-offers)] - (doseq [offer matured-combine-offers] - (async/>! offer-ready-chan offer)) - (log/debug (count offer-ids) " matured. " - (count ids-to-remove) " offers removed from incubating. " - (- (count incubating) (count ids-to-remove)) " offers in incubating") - (counters/dec! incubating-size (count ids-to-remove)) - (apply dissoc incubating ids-to-remove))))))) - (catch Exception e - (log/error e "In start-offer-incubator")))) - view-offers-fn)) - -(counters/defcounter [cook-mesos scheduler prioritizer-buffer-size]) - -(defn start-offer-prioritizer! - "Starts a thread to accept offers that are done incubating. Maintains a sorted - buffer of these offers and passes the biggest one to the matcher when requested." - [offer-ready-chan matcher-chan] - (let [view-offers-atom (atom []) - view-offers-fn (fn [] @view-offers-atom)] - (async/go - (try - (loop [offers []] - (reset! view-offers-atom - offers) - (recur - (if (seq offers) - (do - (log/debug (count offers) " offers in prioritizer buffer") - (async/alt! - offer-ready-chan ([offer] - (log/debug "Got new offer. Size of prioritizer buffer: " - (inc (count offers))) - (counters/inc! prioritizer-buffer-size) - (sort-offers (conj offers offer))) - [[matcher-chan (first offers)]] (do - (log/debug "Offer sent to matcher. " - (count (rest offers)) - " offers remaining") - (counters/dec! prioritizer-buffer-size) - (rest offers)))) - (let [offer (async/ Date: Mon, 2 Nov 2015 12:39:11 -0800 Subject: [PATCH 02/27] Fix typo --- scheduler/src/cook/mesos/scheduler.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 999c2abad9..ac14729b09 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -473,12 +473,12 @@ ;; the pending-jobs atom is repopulated @(d/transact conn - (mapcat (fn [{:keys [offer] :as task-info}] + (mapcat (fn [{:keys [offers] :as task-info}] [[:job/allowed-to-start? (:job-id task-info)] {:db/id (d/tempid :db.part/user) :job/_instance (:job-id task-info) :instance/task-id (:task-id task-info) - :instance/hostname (:hostname offer) + :instance/hostname (:hostname (first offers)) :instance/start-time (now) ;; NB command executor uses the task-id ;; as the executor-id From 77b59c07eb33983537aa1b98afb4615b53718713 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Thu, 12 Nov 2015 18:51:45 -0500 Subject: [PATCH 03/27] Got fenzo prototype running --- scheduler/dev-config.edn | 2 + scheduler/project.clj | 5 +- scheduler/src/cook/mesos/scheduler.clj | 282 +++++++++++++++---------- 3 files changed, 176 insertions(+), 113 deletions(-) diff --git a/scheduler/dev-config.edn b/scheduler/dev-config.edn index b02229b80c..2bbc4ed09c 100644 --- a/scheduler/dev-config.edn +++ b/scheduler/dev-config.edn @@ -21,4 +21,6 @@ :levels {"datomic.db" :warn "datomic.peer" :warn "datomic.kv-cluster" :warn + "cook.mesos.scheduler" :debug + "com.netflix.fenzo.TaskScheduler" :debug :default :info}}} diff --git a/scheduler/project.clj b/scheduler/project.clj index 35bbc52bba..3c62a0173d 100644 --- a/scheduler/project.clj +++ b/scheduler/project.clj @@ -42,7 +42,10 @@ [org.clojure/data.priority-map "0.0.5"] [swiss-arrows "1.0.0"] [riddley "0.1.10"] - [com.netflix.fenzo/fenzo-core "0.7.10"] + [com.netflix.fenzo/fenzo-core "0.8.2" + :exclusions [org.apache.mesos/mesos + org.slf4j/slf4j-api + org.slf4j/slf4j-simple]] ;;Logging [org.clojure/tools.logging "0.2.6"] diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index ac14729b09..2ea51e905e 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -32,11 +32,12 @@ [clj-time.core :as time] [clj-time.periodic :as periodic] [clj-time.coerce :as tc] - [chime :refer [chime-at]] + [chime :refer [chime-at chime-ch]] [cook.mesos.heartbeat :as heartbeat] [cook.mesos.share :as share] [swiss.arrows :refer :all]) (import java.util.concurrent.TimeUnit + com.netflix.fenzo.TaskAssignmentResult com.netflix.fenzo.TaskScheduler com.netflix.fenzo.VirtualMachineLease)) @@ -167,79 +168,82 @@ (defn handle-status-update "Takes a status update from mesos." [conn driver ^TaskScheduler fenzo status] - (log/info "Mesos status is: " status) - (timers/time! - handle-status-update-duration - (try (let [db (db conn) - {:keys [task-id] task-state :state} status - [job instance prior-instance-status] (first (q '[:find ?j ?i ?status - :in $ ?task-id - :where - [?i :instance/task-id ?task-id] - [?i :instance/status ?s] - [?s :db/ident ?status] - [?j :job/instance ?i]] - db task-id)) - job-ent (d/entity db job) - retries-so-far (count (:job/instance job-ent)) - instance-status (condp contains? task-state - #{:task-staging} :instance.status/unknown - #{:task-starting - :task-running} :instance.status/running - #{:task-finished} :instance.status/success - #{:task-failed - :task-killed - :task-lost} :instance.status/failed) - prior-job-state (:job/state (d/entity db job)) - progress (when (:data status) - (:percent (read-string (String. (:data status))))) - instance-runtime (- (.getTime (now)) ; Used for reporting - (.getTime (:instance/start-time (d/entity db instance)))) - job-resources (util/job-ent->resources job-ent)] - (when (#{:instance.status/success :instance.status/failed} instance-status) - (.. fenzo - (getTaskUnAssigner) - (call task-id (:instance/hostname instance)))) ;;TODO is this hostname already in memory? - (when (= instance-status :instance.status/success) - (handle-throughput-metrics success-throughput-metrics - job-resources - instance-runtime) - (handle-throughput-metrics complete-throughput-metrics - job-resources - instance-runtime)) - (when (= instance-status :instance.status/failed) - (handle-throughput-metrics fail-throughput-metrics - job-resources - instance-runtime) - (handle-throughput-metrics complete-throughput-metrics - job-resources - instance-runtime)) - ;; This code kills any task that "shouldn't" be running - (when (and - (or (nil? instance) ; We could know nothing about the task, meaning a DB error happened and it's a waste to finish - (= prior-job-state :job.state/completed) ; The task is attached to a failed job, possibly due to instances running on multiple hosts - (= prior-instance-status :instance.status/failed)) ; The kill-task message could've been glitched on the network - (contains? #{:task-running - :task-staging - :task-starting} - task-state)) ; killing an unknown task causes a TASK_LOST message. Break the cycle! Only kill non-terminal tasks - (log/warn "Attempting to kill task" task-id - "as instance" instance "with" prior-job-state "and" prior-instance-status - "should've been put down already") - (meters/mark! tasks-killed-in-status-update) - (mesos/kill-task driver task-id)) - (when-not (nil? instance) - ;; (println "update:" task-id task-state job instance instance-status prior-job-state) - (transact-with-retries conn - (concat - [[:instance/update-state instance instance-status]] - (when (#{:instance.status/success - :instance.status/failed} instance-status) - [[:db/add instance :instance/end-time (now)]]) - (when progress - [[:db/add instance :instance/progress progress]]))))) - (catch Exception e - (log/error e "Mesos scheduler status update error"))))) + (future + (log/info "Mesos status is: " status) + (timers/time! + handle-status-update-duration + (try (let [db (db conn) + {:keys [task-id] task-state :state} status + [job instance prior-instance-status] (first (q '[:find ?j ?i ?status + :in $ ?task-id + :where + [?i :instance/task-id ?task-id] + [?i :instance/status ?s] + [?s :db/ident ?status] + [?j :job/instance ?i]] + db task-id)) + job-ent (d/entity db job) + retries-so-far (count (:job/instance job-ent)) + instance-status (condp contains? task-state + #{:task-staging} :instance.status/unknown + #{:task-starting + :task-running} :instance.status/running + #{:task-finished} :instance.status/success + #{:task-failed + :task-killed + :task-lost} :instance.status/failed) + prior-job-state (:job/state (d/entity db job)) + progress (when (:data status) + (:percent (read-string (String. (:data status))))) + instance-ent (d/entity db instance) + instance-runtime (- (.getTime (now)) ; Used for reporting + (.getTime (:instance/start-time instance-ent))) + job-resources (util/job-ent->resources job-ent)] + (when (#{:instance.status/success :instance.status/failed} instance-status) + (log/debug "Unassigning task" task-id "from" (:instance/hostname instance-ent)) + (.. fenzo + (getTaskUnAssigner) + (call task-id (:instance/hostname instance-ent)))) ;;TODO is this hostname already in memory? + (when (= instance-status :instance.status/success) + (handle-throughput-metrics success-throughput-metrics + job-resources + instance-runtime) + (handle-throughput-metrics complete-throughput-metrics + job-resources + instance-runtime)) + (when (= instance-status :instance.status/failed) + (handle-throughput-metrics fail-throughput-metrics + job-resources + instance-runtime) + (handle-throughput-metrics complete-throughput-metrics + job-resources + instance-runtime)) + ;; This code kills any task that "shouldn't" be running + (when (and + (or (nil? instance) ; We could know nothing about the task, meaning a DB error happened and it's a waste to finish + (= prior-job-state :job.state/completed) ; The task is attached to a failed job, possibly due to instances running on multiple hosts + (= prior-instance-status :instance.status/failed)) ; The kill-task message could've been glitched on the network + (contains? #{:task-running + :task-staging + :task-starting} + task-state)) ; killing an unknown task causes a TASK_LOST message. Break the cycle! Only kill non-terminal tasks + (log/warn "Attempting to kill task" task-id + "as instance" instance "with" prior-job-state "and" prior-instance-status + "should've been put down already") + (meters/mark! tasks-killed-in-status-update) + (mesos/kill-task driver task-id)) + (when-not (nil? instance) + ;; (println "update:" task-id task-state job instance instance-status prior-job-state) + (transact-with-retries conn + (concat + [[:instance/update-state instance instance-status]] + (when (#{:instance.status/success + :instance.status/failed} instance-status) + [[:db/add instance :instance/end-time (now)]]) + (when progress + [[:db/add instance :instance/progress progress]]))))) + (catch Exception e + (log/error e "Mesos scheduler status update error")))))) (timers/deftimer [cook-mesos scheduler tx-report-queue-processing-duration]) (meters/defmeter [cook-mesos scheduler tx-report-queue-datoms]) @@ -312,26 +316,28 @@ (defrecord VirtualMachineLeaseAdapter [offer time] com.netflix.fenzo.VirtualMachineLease (cpuCores [_] (get-in offer [:resources :cpus])) - (diskMB [_] (:disk offer)) - (getAttributeMap [_] nil) ;;TODO + (diskMB [_] (get-in offer [:resources :disk])) + (getAttributeMap [_] {}) ;;TODO (getId [_] (:id offer)) (getOffer [_] (throw (UnsupportedOperationException.))) (getOfferedTime [_] time) (getVMID [_] (:slave-id offer)) (hostname [_] (:hostname offer)) (memoryMB [_] (get-in offer [:resources :mem])) - (networkMbps [_] (throw (UnsupportedOperationException.))) - (portRanges [_] (throw (UnsupportedOperationException.)))) + (networkMbps [_] 0.0) + (portRanges [_] (mapv (fn [{:keys [begin end]}] + (com.netflix.fenzo.VirtualMachineLease$Range. begin end)) + (get-in offer [:resources :ports])))) (defrecord TaskRequestAdapter [job task-info] com.netflix.fenzo.TaskRequest (getCPUs [_] (get-in task-info [:resources :cpus])) (getDisk [_] 0.0) ;;TODO (getHardConstraints [_] []) ;;TODO - (getId [_] (:name task-info)) + (getId [_] (:task-id task-info)) (getMemory [_] (get-in task-info [:resources :mem])) - (getNetworkMbps [_] (throw (UnsupportedOperationException.))) - (getPorts [_] 0) + (getNetworkMbps [_] 0.0) + (getPorts [_] (:num-ports task-info)) (getSoftConstraints [_] []) (taskGroupName [_] (str (:job/uuid job)))) @@ -343,15 +349,25 @@ Returns a list of tasks that got matched to the offer" [^TaskScheduler fenzo considerable offers db fid] + (log/debug "Matching" (count offers) "offers to" (count considerable) "jobs with fenzo") (let [t (System/currentTimeMillis) leases (mapv #(->VirtualMachineLeaseAdapter % t) offers) ;;TODO this requests conversion could probably benefit from a 500-1000 element cache if constructing task-info is expensive - requests (mapv #(->TaskRequestAdapter % (job->task-info db fid %)) considerable) - result (.scheduleOnce requests leases) + requests (mapv #(->TaskRequestAdapter % (job->task-info db fid (:db/id %))) considerable) + result (.scheduleOnce fenzo requests leases) assignments (.. result getResultMap values)] + (log/debug "Found this assigment:" result) + (when (log/enabled? :debug) + (log/debug "Task placement failure information follows:") + (doseq [failure (.. result getFailures values) + :let [_ (log/debug (str (.getConstraintFailure failure)))] + f (.getFailures failure)] + (log/debug (str f))) + (log/debug "Task placement failure information concluded.")) (mapv (fn [assignment] {:leases (.getLeasesUsed assignment) :tasks (.getTasksAssigned assignment) + ;;TODO maybe we should pull the task requests out here, too? :hostname (.getHostname assignment)}) assignments))) @@ -399,10 +415,19 @@ (gauges/defgauge [cook-mesos scheduler front-of-job-queue-mem] (fn [] @front-of-job-queue-mem-atom)) (gauges/defgauge [cook-mesos scheduler front-of-job-queue-cpus] (fn [] @front-of-job-queue-cpus-atom)) +(defn job-allowed-to-start? + "Converts the DB function :job/allowed-to-start? into a predicate" + [db job] + (try + (d/invoke db :job/allowed-to-start? db (:db/id job)) + true + (catch clojure.lang.ExceptionInfo e + false))) + (defn handle-resource-offers! "Gets a list of offers from mesos. Decides what to do with them all--they should all be accepted or rejected at the end of the function." - [conn driver ^TaskScheduler fenzo fid pending-jobs offers] + [conn driver ^TaskScheduler fenzo fid pending-jobs offers-chan offers] (doseq [offer offers] (histograms/update! offer-size-cpus @@ -410,24 +435,24 @@ (histograms/update! offer-size-mem (get-in offer [:resources :mem]))) + (log/debug "invoked handle-resource-offers!") (timers/time! handle-resource-offer!-duration (try (loop [] ;; This loop is for compare-and-set! below (let [scheduler-contents @pending-jobs db (db conn) + _ (log/debug "There are" (count scheduler-contents) "pending jobs") considerable (filter (fn [job] - (and (try - (d/invoke db :job/allowed-to-start? db (:db/id job)) - true - (catch clojure.lang.ExceptionInfo e - false)) + (job-allowed-to-start? db job) + #_(and ;; ensure it always tries new hosts TODO this is broken, and must be moved to a constraint in fenzo - #_(not-any? #(= % (:hostname offer)) + (not-any? #(= % (:hostname offer)) (->> (:job/instance job) (filter #(false? (:instance/preempted? %))) (map :instance/hostname))))) scheduler-contents) + _ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs") matches (match-offer-to-schedule fenzo considerable offers db fid) matched-jobs (for [match matches task (:tasks match)] @@ -445,9 +470,14 @@ (cond (empty? matches) nil ;; Since Fenzo will manage declining offers, we'll do nothing here (not (compare-and-set! pending-jobs scheduler-contents new-scheduler-contents)) - (do (log/info "Pending job atom contention encountered") - (meters/mark! pending-job-atom-contended) - (recur)) + (let [recycle-offers (for [{:keys [leases]} matches + lease leases] + (:offer lease))] + (log/debug "Pending job atom contention encountered, recycling offers:" recycle-offers) + (async/go + (async/>! offers-chan recycle-offers)) + (meters/mark! pending-job-atom-contended) + (recur)) :else ;;TODO this construction of task-infos is legacy-oriented ;;since now we treat "matches" as our primary structure, @@ -456,9 +486,10 @@ (let [task-infos (for [{:keys [tasks leases]} matches :let [offers (mapv :offer leases) slave-id (:slave-id (first offers))] - task tasks - :let [task-info (:task-info task) - job-id (get-in task [:jbo :db/id])]] + ^TaskAssignmentResult task tasks + :let [request (.getRequest task) + task-info (:task-info request) + job-id (get-in request [:job :db/id])]] (assoc task-info :offers offers :slave-id slave-id @@ -505,20 +536,41 @@ (meters/mark! matched-tasks-mem (:mem match-resource-requirements)) (doseq [{:keys [tasks leases]} matches :let [offers (mapv :offer leases) - task-infos (mapv :task-info tasks) + task-infos (mapv (fn [^TaskAssignmentResult task] + (:task-info (.getRequest task))) + tasks) slave-id (:slave-id (first offers))]] (mesos/launch-tasks driver (mapv :id offers) - (mapv #(dissoc % :num-ports) task-infos)) - (doseq [task tasks] + (mapv #(-> % + (assoc :slave-id slave-id) + (dissoc :num-ports)) + task-infos)) + (doseq [^TaskAssignmentResult task tasks] (.. fenzo (getTaskAssigner) - (call task (get-in (first leases) [:offer :hostname]))))))))) + (call (.getRequest task) (get-in (first leases) [:offer :hostname]))))))))) (catch Throwable t (meters/mark! handle-resource-offer!-errors) (log/error t "Error in match:" (ex-data t)))))) +(defn make-offer-handler + [conn driver-atom fenzo fid-atom pending-jobs-atom] + (let [offers-chan (async/chan) + timer-chan (chime-ch (periodic/periodic-seq (time/now) (time/seconds 5)) + {:ch (async/chan (async/sliding-buffer 1))})] + (async/thread + (loop [] + (let [offers (async/alt!! + offers-chan ([offers] offers) + timer-chan ([_] []) + :priority true)] + (handle-resource-offers! conn @driver-atom fenzo @fid-atom pending-jobs-atom offers-chan offers) + ;;TODO make this cancelable + (recur)))) + offers-chan)) + (defn reconcile-jobs "Ensure all jobs saw their final state change" [conn] @@ -780,11 +832,13 @@ (.. (com.netflix.fenzo.TaskScheduler$Builder.) (disableShortfallEvaluation) ;; We're not using the autoscaling features (withLeaseOfferExpirySecs (max (quot offer-incubate-time-ms 1000) 1)) ;; Must convert from the ms, should be at least 1 second + (withRejectAllExpiredOffers) + (withDebugEnabled) (withLeaseRejectAction (reify com.netflix.fenzo.functions.Action1 (call [_ lease] (let [offer (:offer lease) id (:id offer)] - (log/debug "Declining offer" offer) + (log/debug "Fenzo is declining offer" offer) (if-let [driver @driver] (mesos/decline-offer driver id) (log/error "Unable to decline offer; no current driver")))))) @@ -792,12 +846,15 @@ (defn view-incubating-offers [^TaskScheduler fenzo] - (for [^com.netflix.fenzo.VirtualMachineCurrentState state (.getVmCurrentStates fenzo) - :let [lease (.getCurrAvailableResources state)]] - {:hostname (.hostname lease) - :slave-id (.getVMID lease) - :resources {:cpus (.cpuCores lease) - :mem (.memoryMB lease)}})) + (let [pending-offers (for [^com.netflix.fenzo.VirtualMachineCurrentState state (.getVmCurrentStates fenzo) + :let [lease (.getCurrAvailableResources state)] + :when lease] + {:hostname (.hostname lease) + :slave-id (.getVMID lease) + :resources {:cpus (.cpuCores lease) + :mem (.memoryMB lease)}})] + (log/debug "We have" (count pending-offers) "pending offers") + pending-offers)) (defn create-datomic-scheduler [conn set-framework-id driver-atom pending-jobs-atom heartbeat-ch offer-incubate-time-ms task-constraints] @@ -806,9 +863,8 @@ offer-ready-chan (async/chan 100) matcher-chan (async/chan) ; Don't want to buffer offers to the matcher chan. Only want when ready fenzo (make-fenzo-scheduler driver-atom offer-incubate-time-ms) - _ (start-offer-matcher! conn pending-jobs-atom matcher-chan) - _ (start-jobs-prioritizer! conn pending-jobs-atom task-constraints) - ] + offers-chan (make-offer-handler conn driver-atom fenzo fid pending-jobs-atom)] + (start-jobs-prioritizer! conn pending-jobs-atom task-constraints) {:scheduler (mesos/scheduler (registered [driver framework-id master-info] @@ -822,7 +878,7 @@ (catch Exception e (log/error e "Reconciliation error"))))) (reregistered [driver master-info] - (log/info "Reregisterd with new master") + (log/info "Reregistered with new master") (future (try (reconcile-jobs conn) @@ -842,7 +898,9 @@ (meters/mark! mesos-error) (log/error "Got a mesos error!!!!" message)) (resourceOffers [driver offers] - (handle-resource-offers! conn driver fenzo @fid pending-jobs-atom offers)) + ;;TODO handle failed puts + (log/debug "Got an offer, putting it into the offer channel:" offers) + (async/>!! offers-chan offers)) (statusUpdate [driver status] (handle-status-update conn driver fenzo status))) :view-incubating-offers (partial view-incubating-offers fenzo)})) From b0f2e54870fa856190cc088b3deaccf2e729d48e Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Fri, 13 Nov 2015 11:17:21 -0500 Subject: [PATCH 04/27] Improve fenzo robustness --- scheduler/project.clj | 6 +-- scheduler/src/cook/mesos/scheduler.clj | 53 +++++++++----------------- 2 files changed, 22 insertions(+), 37 deletions(-) diff --git a/scheduler/project.clj b/scheduler/project.clj index 3c62a0173d..b8c8836f53 100644 --- a/scheduler/project.clj +++ b/scheduler/project.clj @@ -31,10 +31,10 @@ [amalloy/ring-buffer "1.1"] [lonocloud/synthread "1.0.4"] [org.clojure/tools.namespace "0.2.4"] - [org.clojure/core.cache "0.6.3"] - [org.clojure/core.memoize "0.5.6"] + [org.clojure/core.cache "0.6.4"] + [org.clojure/core.memoize "0.5.8"] [clj-time "0.9.0"] - [org.clojure/core.async "0.1.346.0-17112a-alpha"] + [org.clojure/core.async "0.2.374"] [prismatic/schema "0.2.1" :exclusions [potemkin]] [clojure-miniprofiler "0.4.0"] diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 2ea51e905e..500d67526a 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -203,7 +203,7 @@ (log/debug "Unassigning task" task-id "from" (:instance/hostname instance-ent)) (.. fenzo (getTaskUnAssigner) - (call task-id (:instance/hostname instance-ent)))) ;;TODO is this hostname already in memory? + (call task-id (:instance/hostname instance-ent)))) (when (= instance-status :instance.status/success) (handle-throughput-metrics success-throughput-metrics job-resources @@ -329,11 +329,22 @@ (com.netflix.fenzo.VirtualMachineLease$Range. begin end)) (get-in offer [:resources :ports])))) +(defn novel-host-constraint + "This returns a Fenzo hard constraint that ensures the given job won't run on the same host again" + [job] + (reify com.netflix.fenzo.ConstraintEvaluator + (getName [_] "novel_host_constraint") + (evaluate [_ task-request target-vm task-tracker-state] + (not-any? #(= % (.getHostname target-vm)) + (->> (:job/instance job) + (filter #(false? (:instance/preempted? %))) + (map :instance/hostname)))))) + (defrecord TaskRequestAdapter [job task-info] com.netflix.fenzo.TaskRequest (getCPUs [_] (get-in task-info [:resources :cpus])) - (getDisk [_] 0.0) ;;TODO - (getHardConstraints [_] []) ;;TODO + (getDisk [_] 0.0) + (getHardConstraints [_] [(novel-host-constraint job)]) (getId [_] (:task-id task-info)) (getMemory [_] (get-in task-info [:resources :mem])) (getNetworkMbps [_] 0.0) @@ -381,26 +392,6 @@ (meters/mark! scheduler-offer-declined) (mesos/decline-offer driver id))) -(defn get-used-hosts - "Return a set of hosts that the job has already used" - [db job] - (->> (q '[:find ?host - :in $ ?j - :where - [?j :job/instance ?i] - [?i :instance/hostname ?host]] - db job) - (map (fn [[x]] x)) - (into #{}))) - -(defn sort-offers - "Sorts the offers such that the largest offers are in front" - ;; TODO: Use dru instead of mem as measure of largest - [offers] - (->> offers - (sort-by (fn [{{:keys [mem cpus]} :resources}] [mem cpus])) - reverse)) - (timers/deftimer [cook-mesos scheduler handle-resource-offer!-duration]) (meters/defmeter [cook-mesos scheduler pending-job-atom-contended]) (histograms/defhistogram [cook-mesos scheduler offer-size-mem]) @@ -444,13 +435,7 @@ db (db conn) _ (log/debug "There are" (count scheduler-contents) "pending jobs") considerable (filter (fn [job] - (job-allowed-to-start? db job) - #_(and - ;; ensure it always tries new hosts TODO this is broken, and must be moved to a constraint in fenzo - (not-any? #(= % (:hostname offer)) - (->> (:job/instance job) - (filter #(false? (:instance/preempted? %))) - (map :instance/hostname))))) + (job-allowed-to-start? db job)) scheduler-contents) _ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs") matches (match-offer-to-schedule fenzo considerable offers db fid) @@ -557,8 +542,8 @@ (defn make-offer-handler [conn driver-atom fenzo fid-atom pending-jobs-atom] - (let [offers-chan (async/chan) - timer-chan (chime-ch (periodic/periodic-seq (time/now) (time/seconds 5)) + (let [offers-chan (async/chan (async/buffer 5)) + timer-chan (chime-ch (periodic/periodic-seq (time/now) (time/seconds 1)) {:ch (async/chan (async/sliding-buffer 1))})] (async/thread (loop [] @@ -898,9 +883,9 @@ (meters/mark! mesos-error) (log/error "Got a mesos error!!!!" message)) (resourceOffers [driver offers] - ;;TODO handle failed puts (log/debug "Got an offer, putting it into the offer channel:" offers) - (async/>!! offers-chan offers)) + (when-not (async/offer! offers-chan offers) + (decline-offers driver offers))) (statusUpdate [driver status] (handle-status-update conn driver fenzo status))) :view-incubating-offers (partial view-incubating-offers fenzo)})) From 5031d5d6e128fe02b6e4dc4078ed468be7063157 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Fri, 13 Nov 2015 15:32:03 -0500 Subject: [PATCH 05/27] Remove thread safety condition for incubating offers view --- scheduler/src/cook/mesos/scheduler.clj | 51 ++++++++++++++++---------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 500d67526a..fba769a8ef 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -335,10 +335,18 @@ (reify com.netflix.fenzo.ConstraintEvaluator (getName [_] "novel_host_constraint") (evaluate [_ task-request target-vm task-tracker-state] - (not-any? #(= % (.getHostname target-vm)) - (->> (:job/instance job) - (filter #(false? (:instance/preempted? %))) - (map :instance/hostname)))))) + (let [previous-hosts (->> (:job/instance job) + (filter #(false? (:instance/preempted? %))) + (mapv :instance/hostname))] + (log/info "Host constraint evaluation:" (str "Can't run on " (.getHostname target-vm) + " since we already ran on that: " previous-hosts) + "which gave the result" (not-any? #(= % (.getHostname target-vm)) + previous-hosts)) + (com.netflix.fenzo.ConstraintEvaluator$Result. + (not-any? #(= % (.getHostname target-vm)) + previous-hosts) + (str "Can't run on " (.getHostname target-vm) + " since we already ran on that: " previous-hosts)))))) (defrecord TaskRequestAdapter [job task-info] com.netflix.fenzo.TaskRequest @@ -370,7 +378,8 @@ (log/debug "Found this assigment:" result) (when (log/enabled? :debug) (log/debug "Task placement failure information follows:") - (doseq [failure (.. result getFailures values) + (doseq [failures (.. result getFailures values) + failure failures :let [_ (log/debug (str (.getConstraintFailure failure)))] f (.getFailures failure)] (log/debug (str f))) @@ -540,9 +549,22 @@ (meters/mark! handle-resource-offer!-errors) (log/error t "Error in match:" (ex-data t)))))) +(defn view-incubating-offers + [^TaskScheduler fenzo] + (let [pending-offers (for [^com.netflix.fenzo.VirtualMachineCurrentState state (.getVmCurrentStates fenzo) + :let [lease (.getCurrAvailableResources state)] + :when lease] + {:hostname (.hostname lease) + :slave-id (.getVMID lease) + :resources {:cpus (.cpuCores lease) + :mem (.memoryMB lease)}})] + (log/debug "We have" (count pending-offers) "pending offers") + pending-offers)) + (defn make-offer-handler [conn driver-atom fenzo fid-atom pending-jobs-atom] (let [offers-chan (async/chan (async/buffer 5)) + resources-atom (atom (view-incubating-offers fenzo)) timer-chan (chime-ch (periodic/periodic-seq (time/now) (time/seconds 1)) {:ch (async/chan (async/sliding-buffer 1))})] (async/thread @@ -552,9 +574,10 @@ timer-chan ([_] []) :priority true)] (handle-resource-offers! conn @driver-atom fenzo @fid-atom pending-jobs-atom offers-chan offers) + (reset! resources-atom (view-incubating-offers fenzo)) ;;TODO make this cancelable (recur)))) - offers-chan)) + [offers-chan resources-atom])) (defn reconcile-jobs "Ensure all jobs saw their final state change" @@ -829,18 +852,6 @@ (log/error "Unable to decline offer; no current driver")))))) (build))) -(defn view-incubating-offers - [^TaskScheduler fenzo] - (let [pending-offers (for [^com.netflix.fenzo.VirtualMachineCurrentState state (.getVmCurrentStates fenzo) - :let [lease (.getCurrAvailableResources state)] - :when lease] - {:hostname (.hostname lease) - :slave-id (.getVMID lease) - :resources {:cpus (.cpuCores lease) - :mem (.memoryMB lease)}})] - (log/debug "We have" (count pending-offers) "pending offers") - pending-offers)) - (defn create-datomic-scheduler [conn set-framework-id driver-atom pending-jobs-atom heartbeat-ch offer-incubate-time-ms task-constraints] (let [fid (atom nil) @@ -848,7 +859,7 @@ offer-ready-chan (async/chan 100) matcher-chan (async/chan) ; Don't want to buffer offers to the matcher chan. Only want when ready fenzo (make-fenzo-scheduler driver-atom offer-incubate-time-ms) - offers-chan (make-offer-handler conn driver-atom fenzo fid pending-jobs-atom)] + [offers-chan resources-atom] (make-offer-handler conn driver-atom fenzo fid pending-jobs-atom)] (start-jobs-prioritizer! conn pending-jobs-atom task-constraints) {:scheduler (mesos/scheduler @@ -888,4 +899,4 @@ (decline-offers driver offers))) (statusUpdate [driver status] (handle-status-update conn driver fenzo status))) - :view-incubating-offers (partial view-incubating-offers fenzo)})) + :view-incubating-offers (fn get-resources-atom [] @resources-atom)})) From a8155fb29d8a89209e2776ba9261106ad24e6a9c Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Mon, 16 Nov 2015 13:50:32 -0500 Subject: [PATCH 06/27] Thunkified jobs, no longer running jobs twice --- scheduler/src/cook/mesos/scheduler.clj | 52 ++++++++++++++++++++------ 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index fba769a8ef..934fa51564 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -234,6 +234,7 @@ (mesos/kill-task driver task-id)) (when-not (nil? instance) ;; (println "update:" task-id task-state job instance instance-status prior-job-state) + (log/debug "Transacting updated state for instance" instance "to status" instance-status) (transact-with-retries conn (concat [[:instance/update-state instance instance-status]] @@ -329,6 +330,7 @@ (com.netflix.fenzo.VirtualMachineLease$Range. begin end)) (get-in offer [:resources :ports])))) +;; TODO sometimes we see a job run twice... (defn novel-host-constraint "This returns a Fenzo hard constraint that ensures the given job won't run on the same host again" [job] @@ -336,10 +338,10 @@ (getName [_] "novel_host_constraint") (evaluate [_ task-request target-vm task-tracker-state] (let [previous-hosts (->> (:job/instance job) - (filter #(false? (:instance/preempted? %))) + (remove #(true? (:instance/preempted? %))) (mapv :instance/hostname))] - (log/info "Host constraint evaluation:" (str "Can't run on " (.getHostname target-vm) - " since we already ran on that: " previous-hosts) + (log/info "Host constraint evaluation for host: " (.getHostname target-vm) + "for instances " (mapv #(into {} %) (:job/instance job)) "which gave the result" (not-any? #(= % (.getHostname target-vm)) previous-hosts)) (com.netflix.fenzo.ConstraintEvaluator$Result. @@ -348,17 +350,18 @@ (str "Can't run on " (.getHostname target-vm) " since we already ran on that: " previous-hosts)))))) -(defrecord TaskRequestAdapter [job task-info] +;;TODO we should not retain the job here; could hold the entire DB ref in memory [doesn't matter; cleared every loop] +(defrecord TaskRequestAdapter [job-thunk task-info] com.netflix.fenzo.TaskRequest (getCPUs [_] (get-in task-info [:resources :cpus])) (getDisk [_] 0.0) - (getHardConstraints [_] [(novel-host-constraint job)]) + (getHardConstraints [_] [(novel-host-constraint (job-thunk))]) (getId [_] (:task-id task-info)) (getMemory [_] (get-in task-info [:resources :mem])) (getNetworkMbps [_] 0.0) (getPorts [_] (:num-ports task-info)) (getSoftConstraints [_] []) - (taskGroupName [_] (str (:job/uuid job)))) + (taskGroupName [_] (str (:job/uuid (job-thunk))))) (defn match-offer-to-schedule "Given an offer and a schedule, computes all the tasks should be launched as a result. @@ -367,12 +370,16 @@ the offer. Returns a list of tasks that got matched to the offer" - [^TaskScheduler fenzo considerable offers db fid] + [^TaskScheduler fenzo considerable offers db fid conn] (log/debug "Matching" (count offers) "offers to" (count considerable) "jobs with fenzo") (let [t (System/currentTimeMillis) leases (mapv #(->VirtualMachineLeaseAdapter % t) offers) ;;TODO this requests conversion could probably benefit from a 500-1000 element cache if constructing task-info is expensive - requests (mapv #(->TaskRequestAdapter % (job->task-info db fid (:db/id %))) considerable) + requests (mapv (fn [job] + (let [job-id (:db/id job)] + (->TaskRequestAdapter #(d/entity (d/db conn) job-id) + (job->task-info db fid (:db/id job))))) + considerable) result (.scheduleOnce fenzo requests leases) assignments (.. result getResultMap values)] (log/debug "Found this assigment:" result) @@ -424,6 +431,24 @@ (catch clojure.lang.ExceptionInfo e false))) +;;TODO LOL +#_(let [db (db (d/connect "datomic:mem://cook-jobs"))] + (doseq [[job status] (vec (q '[:find ?j ?status + :where + [?j :job/state ?s] + [?s :db/ident ?status] + ] + db))] + (println job status (q '[:find ?i ?host ?status + :in $ ?j + :where + [?j :job/instance ?i] + [?i :instance/status ?s] + [?i :instance/hostname ?host] + [?s :db/ident ?status] + ] db job)) + )) + (defn handle-resource-offers! "Gets a list of offers from mesos. Decides what to do with them all--they should all be accepted or rejected at the end of the function." @@ -447,10 +472,11 @@ (job-allowed-to-start? db job)) scheduler-contents) _ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs") - matches (match-offer-to-schedule fenzo considerable offers db fid) + matches (match-offer-to-schedule fenzo considerable offers db fid conn) matched-jobs (for [match matches - task (:tasks match)] - (:job task)) + ^TaskAssignmentResult task-result (:tasks match) + :let [task-request (.getRequest task-result)]] + ((:job-thunk (doto task-request log/info)))) matched-job-uuids (set (mapv :job/uuid matched-jobs)) new-scheduler-contents (remove (fn [pending-job] (contains? matched-job-uuids (:job/uuid pending-job))) @@ -483,7 +509,7 @@ ^TaskAssignmentResult task tasks :let [request (.getRequest task) task-info (:task-info request) - job-id (get-in request [:job :db/id])]] + job-id (:db/id ((:job-thunk request)))]] (assoc task-info :offers offers :slave-id slave-id @@ -594,6 +620,7 @@ [:job/update-state j]) js)))))) +;; TODO this needs to dump into into fenzo, too (defn reconcile-tasks "Finds all non-completed tasks, and has Mesos let us know if any have changed." [db driver] @@ -755,6 +782,7 @@ A job is offensive if and only if its required memory or cpus exceeds the limits" + ;; TODO these limits should come from the largest observed host from Fenzo [{max-memory-gb :memory-gb max-cpus :cpus} offensive-jobs-ch jobs] (let [max-memory-mb (* 1024.0 max-memory-gb) categorized-jobs (group-by (fn [job] From 5b49ae9a0d75960b1c7a9f3acb8c3b4e16460183 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Mon, 16 Nov 2015 13:54:36 -0500 Subject: [PATCH 07/27] Remove thunk because it wasn't the actual bug --- scheduler/src/cook/mesos/scheduler.clj | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 934fa51564..3a6dfa10f9 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -351,17 +351,17 @@ " since we already ran on that: " previous-hosts)))))) ;;TODO we should not retain the job here; could hold the entire DB ref in memory [doesn't matter; cleared every loop] -(defrecord TaskRequestAdapter [job-thunk task-info] +(defrecord TaskRequestAdapter [job task-info] com.netflix.fenzo.TaskRequest (getCPUs [_] (get-in task-info [:resources :cpus])) (getDisk [_] 0.0) - (getHardConstraints [_] [(novel-host-constraint (job-thunk))]) + (getHardConstraints [_] [(novel-host-constraint job)]) (getId [_] (:task-id task-info)) (getMemory [_] (get-in task-info [:resources :mem])) (getNetworkMbps [_] 0.0) (getPorts [_] (:num-ports task-info)) (getSoftConstraints [_] []) - (taskGroupName [_] (str (:job/uuid (job-thunk))))) + (taskGroupName [_] (str (:job/uuid job)))) (defn match-offer-to-schedule "Given an offer and a schedule, computes all the tasks should be launched as a result. @@ -377,7 +377,7 @@ ;;TODO this requests conversion could probably benefit from a 500-1000 element cache if constructing task-info is expensive requests (mapv (fn [job] (let [job-id (:db/id job)] - (->TaskRequestAdapter #(d/entity (d/db conn) job-id) + (->TaskRequestAdapter job (job->task-info db fid (:db/id job))))) considerable) result (.scheduleOnce fenzo requests leases) @@ -476,7 +476,7 @@ matched-jobs (for [match matches ^TaskAssignmentResult task-result (:tasks match) :let [task-request (.getRequest task-result)]] - ((:job-thunk (doto task-request log/info)))) + (:job task-request)) matched-job-uuids (set (mapv :job/uuid matched-jobs)) new-scheduler-contents (remove (fn [pending-job] (contains? matched-job-uuids (:job/uuid pending-job))) @@ -509,7 +509,7 @@ ^TaskAssignmentResult task tasks :let [request (.getRequest task) task-info (:task-info request) - job-id (:db/id ((:job-thunk request)))]] + job-id (get-in request [:job :db/id])]] (assoc task-info :offers offers :slave-id slave-id From cd870c9f2320066e0e249b262a52f0a0aec2511a Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Mon, 16 Nov 2015 18:13:36 -0500 Subject: [PATCH 08/27] Add backfilling for fenzo --- scheduler/src/cook/mesos/api.clj | 1 + scheduler/src/cook/mesos/rebalancer.clj | 8 ++ scheduler/src/cook/mesos/scheduler.clj | 135 +++++++++++++++--------- scheduler/src/cook/mesos/schema.clj | 22 ++-- 4 files changed, 106 insertions(+), 60 deletions(-) diff --git a/scheduler/src/cook/mesos/api.clj b/scheduler/src/cook/mesos/api.clj index a3a2e274b8..d1406b4dc6 100644 --- a/scheduler/src/cook/mesos/api.clj +++ b/scheduler/src/cook/mesos/api.clj @@ -252,6 +252,7 @@ end (:instance/end-time instance) base {:task_id (:instance/task-id instance) :hostname hostname + :backfilled (:instance/backfilled? instance false) :slave_id (:instance/slave-id instance) :executor_id (:instance/executor-id instance) :status (name (:instance/status instance))} diff --git a/scheduler/src/cook/mesos/rebalancer.clj b/scheduler/src/cook/mesos/rebalancer.clj index 0d43415ff9..30011d0278 100644 --- a/scheduler/src/cook/mesos/rebalancer.clj +++ b/scheduler/src/cook/mesos/rebalancer.clj @@ -220,9 +220,17 @@ pending-job-ent] (let [{pending-job-mem :mem pending-job-cpus :cpus} (util/job-ent->resources pending-job-ent) pending-job-dru (compute-pending-job-dru state pending-job-ent) + ;; This will preserve the ordering of task->scored-task host->scored-tasks (->> task->scored-task + ;;TODO maybe we can just change every backfilled task here to have the worst possible DRU + ;;this should bias us towards always killing those tasks first... (vals) + ;;TODO We might need to sort again with this change + (map (fn [{:keys [task] :as scored-task}] + (if (:instance/backfilled? task) + (assoc scored-task :dru Double/MAX_VALUE) + scored-task))) (remove #(< (:dru %) safe-dru-threshold)) (filter #(> (- (:dru %) pending-job-dru) min-dru-diff)) (group-by (fn [{:keys [task]}] diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 3a6dfa10f9..e3882887ad 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -330,7 +330,6 @@ (com.netflix.fenzo.VirtualMachineLease$Range. begin end)) (get-in offer [:resources :ports])))) -;; TODO sometimes we see a job run twice... (defn novel-host-constraint "This returns a Fenzo hard constraint that ensures the given job won't run on the same host again" [job] @@ -340,17 +339,12 @@ (let [previous-hosts (->> (:job/instance job) (remove #(true? (:instance/preempted? %))) (mapv :instance/hostname))] - (log/info "Host constraint evaluation for host: " (.getHostname target-vm) - "for instances " (mapv #(into {} %) (:job/instance job)) - "which gave the result" (not-any? #(= % (.getHostname target-vm)) - previous-hosts)) (com.netflix.fenzo.ConstraintEvaluator$Result. (not-any? #(= % (.getHostname target-vm)) previous-hosts) (str "Can't run on " (.getHostname target-vm) " since we already ran on that: " previous-hosts)))))) -;;TODO we should not retain the job here; could hold the entire DB ref in memory [doesn't matter; cleared every loop] (defrecord TaskRequestAdapter [job task-info] com.netflix.fenzo.TaskRequest (getCPUs [_] (get-in task-info [:resources :cpus])) @@ -374,11 +368,9 @@ (log/debug "Matching" (count offers) "offers to" (count considerable) "jobs with fenzo") (let [t (System/currentTimeMillis) leases (mapv #(->VirtualMachineLeaseAdapter % t) offers) - ;;TODO this requests conversion could probably benefit from a 500-1000 element cache if constructing task-info is expensive requests (mapv (fn [job] (let [job-id (:db/id job)] - (->TaskRequestAdapter job - (job->task-info db fid (:db/id job))))) + (->TaskRequestAdapter job (job->task-info db fid (:db/id job))))) considerable) result (.scheduleOnce fenzo requests leases) assignments (.. result getResultMap values)] @@ -449,6 +441,44 @@ ] db job)) )) +;;TODO test this +(defn ids-of-backfilled-instances + "Returns a list of the ids of running, backfilled instances of the given job" + [job] + (->> (:job/instance job) + (filter #(and (= :instance.status/running (:instance/status %)) + (:instance/backfilled? %))) + (map :db/id))) + +;;TODO test this +(defn process-matches-for-backfill + "This computes some sets: + + fully-processed: this is a set of job uuids that should be removed from the list scheduler jobs. These are not backfilled + upgrade-backfill: this is a set of instance datomic ids that should be upgraded to non-backfill + backfill-jobs: this is a set of job uuids for jobs that were matched, but are in the backfill QoS class + " + [scheduler-contents matched-jobs] + (let [matched-job-uuids (set (mapv :job/uuid matched-jobs))] + (loop [scheduler-contents scheduler-contents + backfilling? false + fully-processed #{} + upgrade-backfill #{} + backfill-jobs #{}] + (if (seq scheduler-contents) + (let [scheduler-contents' (rest scheduler-contents) + pending-job-uuid (:job/uuid (first scheduler-contents))] + (if (contains? matched-job-uuids pending-job-uuid) + (if backfilling? + (recur scheduler-contents' backfilling? fully-processed upgrade-backfill (conj backfill-jobs pending-job-uuid)) + (recur scheduler-contents' backfilling? (conj fully-processed pending-job-uuid) upgrade-backfill backfill-jobs)) + (if-let [backfilled-ids (and (not backfilling?) (seq (ids-of-backfilled-instances pending-job)))] + (recur scheduler-contents' backfilling? fully-processed (into upgrade-backfill backfilled-ids) backfill-jobs) + (recur scheduler-contents' true fully-processed upgrade-backfill backfill-jobs)))) + {:fully-processed fully-processed + :upgrade-backfill upgrade-backfill + :backfill-jobs backfill-jobs})))) + (defn handle-resource-offers! "Gets a list of offers from mesos. Decides what to do with them all--they should all be accepted or rejected at the end of the function." @@ -477,9 +507,9 @@ ^TaskAssignmentResult task-result (:tasks match) :let [task-request (.getRequest task-result)]] (:job task-request)) - matched-job-uuids (set (mapv :job/uuid matched-jobs)) + processed-matches (process-matches-for-backfill scheduler-contents matched-jobs) new-scheduler-contents (remove (fn [pending-job] - (contains? matched-job-uuids (:job/uuid pending-job))) + (contains? (:fully-processed processed-matches) (:job/uuid pending-job))) scheduler-contents) first-considerable-resources (-> considerable first util/job-ent->resources) match-resource-requirements (util/sum-resources-of-jobs matched-jobs)] @@ -503,46 +533,43 @@ ;;since now we treat "matches" as our primary structure, ;;we should eventually remove this construction of task-infos ;;as it's just a helper for building the datomic transaction - (let [task-infos (for [{:keys [tasks leases]} matches - :let [offers (mapv :offer leases) - slave-id (:slave-id (first offers))] - ^TaskAssignmentResult task tasks - :let [request (.getRequest task) - task-info (:task-info request) - job-id (get-in request [:job :db/id])]] - (assoc task-info - :offers offers - :slave-id slave-id - :job-id job-id))] - ;TODO assign the ports to actual available ones; look at the Fenzo API to match up with it - ;First we need to convince mesos to actually offer ports - ;then, we need to allocate the ports (incremental or bulk?) copy from my book. - + (let [task-txns (for [{:keys [tasks leases]} matches + :let [offers (mapv :offer leases) + slave-id (:slave-id (first offers))] + ^TaskAssignmentResult task tasks + :let [request (.getRequest task) + task-info (:task-info request) + job-id (get-in request [:job :db/id])]] + [[:job/allowed-to-start? job-id] + {:db/id (d/tempid :db.part/user) + :job/_instance job-id + :instance/task-id (:task-id task-info) + :instance/hostname (.getHostname task) + :instance/start-time (now) + ;; NB command executor uses the task-id + ;; as the executor-id + :instance/executor-id (get-in + task-info + [:executor :executor-id] + (:task-id task-info)) + :instance/backfilled? (contains? (:backfill-jobs processed-matches) (get-in request [:job :job/uuid])) + :instance/slave-id slave-id + :instance/progress 0 + :instance/status :instance.status/unknown + :instance/preempted? false}]) + upgrade-txns (map (fn [instance-id] + [:db/add instance-id :instance/backfilled? false]) + (:upgrade-backfill processed-matches))] ;; Note that this transaction can fail if a job was scheduled ;; during a race. If that happens, then other jobs that should ;; be scheduled will not be eligible for rescheduling until ;; the pending-jobs atom is repopulated @(d/transact conn - (mapcat (fn [{:keys [offers] :as task-info}] - [[:job/allowed-to-start? (:job-id task-info)] - {:db/id (d/tempid :db.part/user) - :job/_instance (:job-id task-info) - :instance/task-id (:task-id task-info) - :instance/hostname (:hostname (first offers)) - :instance/start-time (now) - ;; NB command executor uses the task-id - ;; as the executor-id - :instance/executor-id (get-in - task-info - [:executor :executor-id] - (:task-id task-info)) - :instance/slave-id (:slave-id task-info) - :instance/progress 0 - :instance/status :instance.status/unknown - :instance/preempted? false}]) - task-infos)) - (log/info "Matched tasks" task-infos) + (vec (apply concat upgrade-txns task-txns))) + (log/info "Launching" (count task-txns) "tasks") + (log/info "Upgrading" (count (:upgrade-backfill processed-matches)) "tasks from backfilled to proper") + (log/debug "Matched tasks" task-txns) ;; This launch-tasks MUST happen after the above transaction in ;; order to allow a transaction failure (due to failed preconditions) ;; to block the launch @@ -551,7 +578,7 @@ (mapcat (comp :id :offer :leases)) (distinct) (count))) - (meters/mark! matched-tasks (count task-infos)) + (meters/mark! matched-tasks (count task-txns)) (meters/mark! matched-tasks-cpus (:cpus match-resource-requirements)) (meters/mark! matched-tasks-mem (:mem match-resource-requirements)) (doseq [{:keys [tasks leases]} matches @@ -560,6 +587,7 @@ (:task-info (.getRequest task))) tasks) slave-id (:slave-id (first offers))]] + ;TODO if the ports are in the offer, then Fenzo is identifying the ports with a list .getAssignedPorts on the TaskAssignmentResult (mesos/launch-tasks driver (mapv :id offers) @@ -600,7 +628,8 @@ timer-chan ([_] []) :priority true)] (handle-resource-offers! conn @driver-atom fenzo @fid-atom pending-jobs-atom offers-chan offers) - (reset! resources-atom (view-incubating-offers fenzo)) + (when (seq offers) + (reset! resources-atom (view-incubating-offers fenzo))) ;;TODO make this cancelable (recur)))) [offers-chan resources-atom])) @@ -623,7 +652,7 @@ ;; TODO this needs to dump into into fenzo, too (defn reconcile-tasks "Finds all non-completed tasks, and has Mesos let us know if any have changed." - [db driver] + [db driver fenzo] (let [running-tasks (q '[:find ?task-id ?status ?slave-id :in $ [?status ...] :where @@ -648,8 +677,9 @@ (timers/deftimer [cook-mesos scheduler reconciler-duration]) +;; TODO this should be running and enabled (defn reconciler - [conn driver & {:keys [interval] + [conn driver fenzo & {:keys [interval] :or {interval (* 30 60 1000)}}] (log/info "Starting reconciler. Interval millis:" interval) (chime-at (periodic/periodic-seq (time/now) (time/millis interval)) @@ -657,7 +687,7 @@ (timers/time! reconciler-duration (reconcile-jobs conn) - (reconcile-tasks (db conn) driver))))) + (reconcile-tasks (db conn) driver fenzo))))) (defn get-lingering-tasks "Return a list of lingering tasks. @@ -783,6 +813,7 @@ A job is offensive if and only if its required memory or cpus exceeds the limits" ;; TODO these limits should come from the largest observed host from Fenzo + ;; .getResourceStatus on TaskScheduler will give a map of hosts to resources; we can compute the max over those [{max-memory-gb :memory-gb max-cpus :cpus} offensive-jobs-ch jobs] (let [max-memory-mb (* 1024.0 max-memory-gb) categorized-jobs (group-by (fn [job] @@ -898,7 +929,7 @@ (future (try (reconcile-jobs conn) - (reconcile-tasks (db conn) driver) + (reconcile-tasks (db conn) driver fenzo) (catch Exception e (log/error e "Reconciliation error"))))) (reregistered [driver master-info] @@ -906,7 +937,7 @@ (future (try (reconcile-jobs conn) - (reconcile-tasks (db conn) driver) + (reconcile-tasks (db conn) driver fenzo) (catch Exception e (log/error e "Reconciliation error"))))) ;; Ignore this--we can just wait for new offers diff --git a/scheduler/src/cook/mesos/schema.clj b/scheduler/src/cook/mesos/schema.clj index ec8b79ff2e..636345f7ed 100644 --- a/scheduler/src/cook/mesos/schema.clj +++ b/scheduler/src/cook/mesos/schema.clj @@ -157,6 +157,12 @@ :db/valueType :db.type/long :db/cardinality :db.cardinality/one :db.install/_attribute :db.part/db} + {:db/id (d/tempid :db.part/db) + :db/ident :instance/backfilled? + :db/doc "If this is true, then this instance doesn't count towards making a job running, and it should be preempted first. It's okay to upgrade an instance to be non-backfilled after a while." + :db/valueType :db.type/boolean + :db/cardinality :db.cardinality/one + :db.install/_attribute :db.part/db} {:db/id (d/tempid :db.part/db) :db/ident :instance/hostname :db/valueType :db.type/string @@ -310,20 +316,20 @@ - task succeeded => job completed - task failed, no other tasks, retries exceeded => job completed - task failed, no other tasks, retries remaining => job waiting - - task failed, other tasks running => job running" + - task failed, other tasks running => job running + + Note that backfilled running instances are treated as if they don't exist." :db/fn #db/fn {:lang "clojure" :params [db j] :requires [[metatransaction.core :as mt]] :code (let [db (mt/filter-committed db) job (d/entity db j) - instance-states (mapv first (q '[:find ?state ?i - :in $ ?j - :where - [?j :job/instance ?i] - [?i :instance/status ?s] - [?s :db/ident ?state]] - db j)) + instance-states (mapcat (fn [instance] + (when-not (and (= :instance.status/running (:instance/status instance)) + (:instance/backfilled? instance)) + [(:instance/status instance)])) + (:job/instance job)) any-success? (some #{:instance.status/success} instance-states) any-running? (some #{:instance.status/running} instance-states) all-failed? (every? #{:instance.status/failed} instance-states) From 879109e571eb066281fd181cd0b1ec00c2498f7e Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Tue, 17 Nov 2015 11:31:51 -0500 Subject: [PATCH 09/27] Exponential backoff of considerable size in queue --- scheduler/src/cook/mesos/scheduler.clj | 72 +++++++++++++++----------- 1 file changed, 43 insertions(+), 29 deletions(-) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index e3882887ad..02c63bd2d9 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -316,15 +316,15 @@ (defrecord VirtualMachineLeaseAdapter [offer time] com.netflix.fenzo.VirtualMachineLease - (cpuCores [_] (get-in offer [:resources :cpus])) - (diskMB [_] (get-in offer [:resources :disk])) + (cpuCores [_] (get-in offer [:resources :cpus] 0.0)) + (diskMB [_] (get-in offer [:resources :disk] 0.0)) (getAttributeMap [_] {}) ;;TODO (getId [_] (:id offer)) (getOffer [_] (throw (UnsupportedOperationException.))) (getOfferedTime [_] time) (getVMID [_] (:slave-id offer)) (hostname [_] (:hostname offer)) - (memoryMB [_] (get-in offer [:resources :mem])) + (memoryMB [_] (get-in offer [:resources :mem] 0.0)) (networkMbps [_] 0.0) (portRanges [_] (mapv (fn [{:keys [begin end]}] (com.netflix.fenzo.VirtualMachineLease$Range. begin end)) @@ -458,38 +458,44 @@ upgrade-backfill: this is a set of instance datomic ids that should be upgraded to non-backfill backfill-jobs: this is a set of job uuids for jobs that were matched, but are in the backfill QoS class " - [scheduler-contents matched-jobs] - (let [matched-job-uuids (set (mapv :job/uuid matched-jobs))] + [scheduler-contents head-of-considerable matched-jobs] + (let [matched-job-uuids (set (mapv :job/uuid matched-jobs)) + head-uuid (:job/uuid head-of-considerable)] (loop [scheduler-contents scheduler-contents backfilling? false + matched-head? false fully-processed #{} upgrade-backfill #{} backfill-jobs #{}] (if (seq scheduler-contents) (let [scheduler-contents' (rest scheduler-contents) - pending-job-uuid (:job/uuid (first scheduler-contents))] + pending-job (first scheduler-contents) + pending-job-uuid (:job/uuid pending-job) + pending=head? (= pending-job-uuid head-uuid)] (if (contains? matched-job-uuids pending-job-uuid) (if backfilling? - (recur scheduler-contents' backfilling? fully-processed upgrade-backfill (conj backfill-jobs pending-job-uuid)) - (recur scheduler-contents' backfilling? (conj fully-processed pending-job-uuid) upgrade-backfill backfill-jobs)) + (recur scheduler-contents' true matched-head? fully-processed upgrade-backfill (conj backfill-jobs pending-job-uuid)) + (recur scheduler-contents' false (or pending=head? matched-head?) (conj fully-processed pending-job-uuid) upgrade-backfill backfill-jobs)) (if-let [backfilled-ids (and (not backfilling?) (seq (ids-of-backfilled-instances pending-job)))] - (recur scheduler-contents' backfilling? fully-processed (into upgrade-backfill backfilled-ids) backfill-jobs) - (recur scheduler-contents' true fully-processed upgrade-backfill backfill-jobs)))) + (recur scheduler-contents' false (or pending=head? matched-head?) fully-processed (into upgrade-backfill backfilled-ids) backfill-jobs) + (recur scheduler-contents' true matched-head? fully-processed upgrade-backfill backfill-jobs)))) {:fully-processed fully-processed :upgrade-backfill upgrade-backfill - :backfill-jobs backfill-jobs})))) + :backfill-jobs backfill-jobs + :matched-head? matched-head?})))) +;;TODO need to ensure that we exponentially converge to head of line blocking when we repeating come up with non-head solutions (defn handle-resource-offers! "Gets a list of offers from mesos. Decides what to do with them all--they should all be accepted or rejected at the end of the function." - [conn driver ^TaskScheduler fenzo fid pending-jobs offers-chan offers] + [conn driver ^TaskScheduler fenzo fid pending-jobs num-considerable offers-chan offers] (doseq [offer offers] (histograms/update! offer-size-cpus - (get-in offer [:resources :cpus])) + (get-in offer [:resources :cpus] 0)) (histograms/update! offer-size-mem - (get-in offer [:resources :mem]))) + (get-in offer [:resources :mem] 0))) (log/debug "invoked handle-resource-offers!") (timers/time! handle-resource-offer!-duration @@ -498,16 +504,17 @@ (let [scheduler-contents @pending-jobs db (db conn) _ (log/debug "There are" (count scheduler-contents) "pending jobs") - considerable (filter (fn [job] - (job-allowed-to-start? db job)) - scheduler-contents) - _ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs") + considerable (->> scheduler-contents + (filter (fn [job] + (job-allowed-to-start? db job))) + (take num-considerable)) + _ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs (limited to " num-considerable " due to backdown)") matches (match-offer-to-schedule fenzo considerable offers db fid conn) matched-jobs (for [match matches ^TaskAssignmentResult task-result (:tasks match) :let [task-request (.getRequest task-result)]] (:job task-request)) - processed-matches (process-matches-for-backfill scheduler-contents matched-jobs) + processed-matches (process-matches-for-backfill scheduler-contents (first considerable) matched-jobs) new-scheduler-contents (remove (fn [pending-job] (contains? (:fully-processed processed-matches) (:job/uuid pending-job))) scheduler-contents) @@ -518,11 +525,13 @@ (reset! front-of-job-queue-cpus-atom (or (:cpus first-considerable-resources) 0)) (cond - (empty? matches) nil ;; Since Fenzo will manage declining offers, we'll do nothing here + (empty? matches) (if (seq considerable) + nil ;; This case means we failed to match anything. We need to start head of line blocking now + true) ;; This case means there was nothing to match--don't penalize the scheduler for the next burst of submitted jobs (not (compare-and-set! pending-jobs scheduler-contents new-scheduler-contents)) - (let [recycle-offers (for [{:keys [leases]} matches - lease leases] - (:offer lease))] + (let [recycle-offers (doall (for [{:keys [leases]} matches + lease leases] + (:offer lease)))] (log/debug "Pending job atom contention encountered, recycling offers:" recycle-offers) (async/go (async/>! offers-chan recycle-offers)) @@ -598,7 +607,8 @@ (doseq [^TaskAssignmentResult task tasks] (.. fenzo (getTaskAssigner) - (call (.getRequest task) (get-in (first leases) [:offer :hostname]))))))))) + (call (.getRequest task) (get-in (first leases) [:offer :hostname]))))) + (:matched-head? processed-matches))))) (catch Throwable t (meters/mark! handle-resource-offer!-errors) (log/error t "Error in match:" (ex-data t)))))) @@ -620,18 +630,22 @@ (let [offers-chan (async/chan (async/buffer 5)) resources-atom (atom (view-incubating-offers fenzo)) timer-chan (chime-ch (periodic/periodic-seq (time/now) (time/seconds 1)) - {:ch (async/chan (async/sliding-buffer 1))})] + {:ch (async/chan (async/sliding-buffer 1))}) + max-considerable 1000 + ] (async/thread - (loop [] + (loop [num-considerable max-considerable] (let [offers (async/alt!! offers-chan ([offers] offers) timer-chan ([_] []) - :priority true)] - (handle-resource-offers! conn @driver-atom fenzo @fid-atom pending-jobs-atom offers-chan offers) + :priority true) + matched-head? (handle-resource-offers! conn @driver-atom fenzo @fid-atom pending-jobs-atom num-considerable offers-chan offers)] (when (seq offers) (reset! resources-atom (view-incubating-offers fenzo))) ;;TODO make this cancelable - (recur)))) + (recur (if matched-head? + max-considerable + (max 1 (quot num-considerable 2))))))) [offers-chan resources-atom])) (defn reconcile-jobs From 0dfe74e8d53817798b0a93af2276d47436c95f0d Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Tue, 17 Nov 2015 11:47:05 -0500 Subject: [PATCH 10/27] Try to sync fenzo and make a bit more robust --- scheduler/src/cook/mesos/scheduler.clj | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 02c63bd2d9..148c8da3d7 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -631,8 +631,7 @@ resources-atom (atom (view-incubating-offers fenzo)) timer-chan (chime-ch (periodic/periodic-seq (time/now) (time/seconds 1)) {:ch (async/chan (async/sliding-buffer 1))}) - max-considerable 1000 - ] + max-considerable 1000] (async/thread (loop [num-considerable max-considerable] (let [offers (async/alt!! @@ -645,7 +644,7 @@ ;;TODO make this cancelable (recur (if matched-head? max-considerable - (max 1 (quot num-considerable 2))))))) + (max 1 (long (* 0.95 num-considerable)))))))) ;; With max=1000 and 1 iter/sec, this will take 88 seconds to reach 1 [offers-chan resources-atom])) (defn reconcile-jobs @@ -663,10 +662,11 @@ [:job/update-state j]) js)))))) -;; TODO this needs to dump into into fenzo, too +;; TODO test that this fenzo recovery system actually works +;; TODO this may need a lock on the fenzo for the taskAssigner (defn reconcile-tasks "Finds all non-completed tasks, and has Mesos let us know if any have changed." - [db driver fenzo] + [db driver fid fenzo] (let [running-tasks (q '[:find ?task-id ?status ?slave-id :in $ [?status ...] :where @@ -680,6 +680,13 @@ :instance.status/running :task-running}] (when (seq running-tasks) (log/info "Preparing to reconcile" (count running-tasks) "tasks") + (doseq [[task-id] running-tasks + :let [task-ent (d/entity db task-id) + hostname (:instance/hostname task-ent) + job (:job/_instance task-ent)]] + (.. fenzo + (getTaskAssigner) + (call (->TaskRequestAdapter job (job->task-info db fid (:db/id job))) hostname))) (doseq [ts (partition-all 50 running-tasks)] (log/info "Reconciling" (count ts) "tasks, including task" (first ts)) (mesos/reconcile-tasks driver (mapv (fn [[task-id status slave-id]] @@ -693,7 +700,7 @@ ;; TODO this should be running and enabled (defn reconciler - [conn driver fenzo & {:keys [interval] + [conn driver fid fenzo & {:keys [interval] :or {interval (* 30 60 1000)}}] (log/info "Starting reconciler. Interval millis:" interval) (chime-at (periodic/periodic-seq (time/now) (time/millis interval)) @@ -701,7 +708,7 @@ (timers/time! reconciler-duration (reconcile-jobs conn) - (reconcile-tasks (db conn) driver fenzo))))) + (reconcile-tasks (db conn) driver fid fenzo))))) (defn get-lingering-tasks "Return a list of lingering tasks. @@ -943,7 +950,7 @@ (future (try (reconcile-jobs conn) - (reconcile-tasks (db conn) driver fenzo) + (reconcile-tasks (db conn) driver @fid fenzo) (catch Exception e (log/error e "Reconciliation error"))))) (reregistered [driver master-info] @@ -951,7 +958,7 @@ (future (try (reconcile-jobs conn) - (reconcile-tasks (db conn) driver fenzo) + (reconcile-tasks (db conn) driver @fid fenzo) (catch Exception e (log/error e "Reconciliation error"))))) ;; Ignore this--we can just wait for new offers From 7caceb1c8925289c6315448220e654758aba5910 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Wed, 18 Nov 2015 16:48:42 -0500 Subject: [PATCH 11/27] More comments --- scheduler/src/cook/mesos/scheduler.clj | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 148c8da3d7..dedac460bd 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -200,6 +200,8 @@ (.getTime (:instance/start-time instance-ent))) job-resources (util/job-ent->resources job-ent)] (when (#{:instance.status/success :instance.status/failed} instance-status) + ;;TODO what if these messages came through a reconciliation & there's no task-id in fenzo already? + ;;Maybe we need to suppress such an exception (log/debug "Unassigning task" task-id "from" (:instance/hostname instance-ent)) (.. fenzo (getTaskUnAssigner) From df5872bdd845aa63ae04f2a977217b10a105d7bc Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Wed, 18 Nov 2015 16:57:20 -0500 Subject: [PATCH 12/27] Fix priority update logic for preemptor --- scheduler/src/cook/mesos/rebalancer.clj | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/scheduler/src/cook/mesos/rebalancer.clj b/scheduler/src/cook/mesos/rebalancer.clj index 30011d0278..d3a430ebf4 100644 --- a/scheduler/src/cook/mesos/rebalancer.clj +++ b/scheduler/src/cook/mesos/rebalancer.clj @@ -218,6 +218,10 @@ [{:keys [task->scored-task host->spare-resources] :as state} {:keys [min-dru-diff safe-dru-threshold] :as params} pending-job-ent] + ;;We need to rely on this being a priority map TODO check that this is true + (when-not (instance? clojure.data.priority_map.PersistentPriorityMap task->scored-task) + (log/fatal "Implementation detail failed; needed priority, got" (class task->scored-task)) + (throw (ex-info "Implementation detail failed" {}))) (let [{pending-job-mem :mem pending-job-cpus :cpus} (util/job-ent->resources pending-job-ent) pending-job-dru (compute-pending-job-dru state pending-job-ent) @@ -225,12 +229,12 @@ host->scored-tasks (->> task->scored-task ;;TODO maybe we can just change every backfilled task here to have the worst possible DRU ;;this should bias us towards always killing those tasks first... + (reduce-kv (fn [m k {:keys [task] :as scored-task}] + (if (:instance/backfilled? task) + (assoc m k (assoc scored-task :dru Double/MAX_VALUE)) + m)) + task->scored-task) (vals) - ;;TODO We might need to sort again with this change - (map (fn [{:keys [task] :as scored-task}] - (if (:instance/backfilled? task) - (assoc scored-task :dru Double/MAX_VALUE) - scored-task))) (remove #(< (:dru %) safe-dru-threshold)) (filter #(> (- (:dru %) pending-job-dru) min-dru-diff)) (group-by (fn [{:keys [task]}] From b767bcb17109051acd2dc96c5a0cb74f6599f804 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Wed, 18 Nov 2015 17:08:06 -0500 Subject: [PATCH 13/27] Remove test for removed function prefixes --- scheduler/test/cook/test/mesos/scheduler.clj | 3 --- 1 file changed, 3 deletions(-) diff --git a/scheduler/test/cook/test/mesos/scheduler.clj b/scheduler/test/cook/test/mesos/scheduler.clj index 346f3fead5..6d198f5293 100644 --- a/scheduler/test/cook/test/mesos/scheduler.clj +++ b/scheduler/test/cook/test/mesos/scheduler.clj @@ -178,9 +178,6 @@ (is (= 1.0 (:cpus resources))) (is (= 1000.0 (:mem resources))))) -(deftest test-prefixes - (is (= [[1] [1 2] [1 2 3] [1 2 3 4] [1 2 3 4 5]] (sched/prefixes [1 2 3 4 5])))) - (deftest test-match-offer-to-schedule (let [schedule (map #(d/entity (db c) %) [j1 j2 j3 j4])] ; all 1gb 1 cpu (testing "Consume no schedule cases" From ba5d82a1ec83be7b30fc3ead572d1d76eca9b1b9 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Thu, 19 Nov 2015 16:27:54 -0500 Subject: [PATCH 14/27] Fix broken tests --- scheduler/src/cook/mesos/scheduler.clj | 4 +- scheduler/test/cook/test/mesos/scheduler.clj | 43 +++++++++++++------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index dedac460bd..e926a4411f 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -366,7 +366,7 @@ the offer. Returns a list of tasks that got matched to the offer" - [^TaskScheduler fenzo considerable offers db fid conn] + [^TaskScheduler fenzo considerable offers db fid] (log/debug "Matching" (count offers) "offers to" (count considerable) "jobs with fenzo") (let [t (System/currentTimeMillis) leases (mapv #(->VirtualMachineLeaseAdapter % t) offers) @@ -511,7 +511,7 @@ (job-allowed-to-start? db job))) (take num-considerable)) _ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs (limited to " num-considerable " due to backdown)") - matches (match-offer-to-schedule fenzo considerable offers db fid conn) + matches (match-offer-to-schedule fenzo considerable offers db fid) matched-jobs (for [match matches ^TaskAssignmentResult task-result (:tasks match) :let [task-request (.getRequest task-result)]] diff --git a/scheduler/test/cook/test/mesos/scheduler.clj b/scheduler/test/cook/test/mesos/scheduler.clj index 6d198f5293..2fd36d5ca0 100644 --- a/scheduler/test/cook/test/mesos/scheduler.clj +++ b/scheduler/test/cook/test/mesos/scheduler.clj @@ -179,20 +179,35 @@ (is (= 1000.0 (:mem resources))))) (deftest test-match-offer-to-schedule - (let [schedule (map #(d/entity (db c) %) [j1 j2 j3 j4])] ; all 1gb 1 cpu - (testing "Consume no schedule cases" - (is (= [] (sched/match-offer-to-schedule [] {:resources {:cpus 0 :mem 0}}))) - (is (= [] (sched/match-offer-to-schedule [] {:resources {:cpus 2 :mem 2000}}))) - (is (= [] (sched/match-offer-to-schedule schedule {:resources {:cpus 0 :mem 0}}))) - (is (= [] (sched/match-offer-to-schedule schedule {:resources {:cpus 0.5 :mem 100}}))) - (is (= [] (sched/match-offer-to-schedule schedule {:resources {:cpus 0.5 :mem 1000}}))) - (is (= [] (sched/match-offer-to-schedule schedule {:resources {:cpus 1 :mem 500}})))) - (testing "Consume Partial schedule cases" - (is (= (take 1 schedule) (sched/match-offer-to-schedule schedule {:resources {:cpus 1 :mem 1000}}))) - (is (= (take 1 schedule) (sched/match-offer-to-schedule schedule {:resources {:cpus 1.5 :mem 1500}})))) - (testing "Consume full schedule cases" - (is (= schedule (sched/match-offer-to-schedule schedule {:resources {:cpus 4 :mem 4000}}))) - (is (= schedule (sched/match-offer-to-schedule schedule {:resources {:cpus 5 :mem 5000}})))))) + (let [schedule (map #(d/entity (db c) %) [j1 j2 j3 j4]) ; all 1gb 1 cpu + offer-maker (fn [cpus mem] + [{:resources {:cpus (double cpus) :mem (double mem)} + :id (str "id-" (java.util.UUID/randomUUID)) + :slave-id (str "slave-" (java.util.UUID/randomUUID)) + :hostname (str "host-" (java.util.UUID/randomUUID))}]) + fid (str "framework-id-" (java.util.UUID/randomUUID)) + fenzo-maker #(sched/make-fenzo-scheduler nil 100000)] ; The params are for offer declining, which should never happen + (testing "Consume no schedule cases" + (are [schedule offers] (= [] (sched/match-offer-to-schedule (fenzo-maker) schedule offers (db c) fid)) + [] (offer-maker 0 0) + [] (offer-maker 2 2000) + schedule (offer-maker 0 0) + schedule (offer-maker 0.5 100) + schedule (offer-maker 0.5 1000) + schedule (offer-maker 1 500))) + (testing "Consume Partial schedule cases" + ;; We're looking for one task to get assigned + (are [offers] (= 1 (count (mapcat :tasks (sched/match-offer-to-schedule + (fenzo-maker) schedule offers (db c) fid)))) + (offer-maker 1 1000) + (offer-maker 1.5 1500))) + (testing "Consume full schedule cases" + ;; We're looking for the entire schedule to get assigned + (are [offers] (= (count schedule) + (count (mapcat :tasks (sched/match-offer-to-schedule + (fenzo-maker) schedule offers (db c) fid)))) + (offer-maker 4 4000) + (offer-maker 5 5000))))) (deftest test-get-user->used-resources (let [uri "datomic:mem://test-get-used-resources" From 087f09d697da359ccdb2306d50e1003df1c88348 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Thu, 19 Nov 2015 16:28:30 -0500 Subject: [PATCH 15/27] Remove excess debugging --- scheduler/dev-config.edn | 2 -- 1 file changed, 2 deletions(-) diff --git a/scheduler/dev-config.edn b/scheduler/dev-config.edn index 2bbc4ed09c..b02229b80c 100644 --- a/scheduler/dev-config.edn +++ b/scheduler/dev-config.edn @@ -21,6 +21,4 @@ :levels {"datomic.db" :warn "datomic.peer" :warn "datomic.kv-cluster" :warn - "cook.mesos.scheduler" :debug - "com.netflix.fenzo.TaskScheduler" :debug :default :info}}} From ff512b67b93d35cbe369b7d6147f1ee659b5f929 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Fri, 20 Nov 2015 11:18:50 -0500 Subject: [PATCH 16/27] Continued cleanup and improvements --- scheduler/src/cook/mesos/api.clj | 2 ++ scheduler/src/cook/mesos/rebalancer.clj | 2 +- scheduler/src/cook/mesos/scheduler.clj | 29 ++++---------------- scheduler/test/cook/test/mesos/scheduler.clj | 17 ++++++++++++ 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/scheduler/src/cook/mesos/api.clj b/scheduler/src/cook/mesos/api.clj index d1406b4dc6..d8b8760801 100644 --- a/scheduler/src/cook/mesos/api.clj +++ b/scheduler/src/cook/mesos/api.clj @@ -252,7 +252,9 @@ end (:instance/end-time instance) base {:task_id (:instance/task-id instance) :hostname hostname + ;;TODO validate that these show up in API :backfilled (:instance/backfilled? instance false) + :preempted (:instance/preempted? instance false) :slave_id (:instance/slave-id instance) :executor_id (:instance/executor-id instance) :status (name (:instance/status instance))} diff --git a/scheduler/src/cook/mesos/rebalancer.clj b/scheduler/src/cook/mesos/rebalancer.clj index d3a430ebf4..3f9352dc4f 100644 --- a/scheduler/src/cook/mesos/rebalancer.clj +++ b/scheduler/src/cook/mesos/rebalancer.clj @@ -314,7 +314,7 @@ (try @(d/transact conn - ;; Make :instance/status and :instance/preempted consistent to simplify the state machine. + ;; Make :instance/status and :instance/preempted? consistent to simplify the state machine. ;; We don't want to deal with {:instance/status :instance.stats/running, :instance/preempted? true} ;; all over the places. (let [job-eid (:db/id (:job/_instance task-ent)) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index e926a4411f..b4759bf9db 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -375,12 +375,13 @@ (->TaskRequestAdapter job (job->task-info db fid (:db/id job))))) considerable) result (.scheduleOnce fenzo requests leases) + failure-results (.. result getFailures values) assignments (.. result getResultMap values)] - (log/debug "Found this assigment:" result) - (when (log/enabled? :debug) + (log/info "Found this assigment:" result) + (when (and (seq failure-results) (log/enabled? :debug)) (log/debug "Task placement failure information follows:") - (doseq [failures (.. result getFailures values) - failure failures + (doseq [failure-result failure-results + failure failure-result :let [_ (log/debug (str (.getConstraintFailure failure)))] f (.getFailures failure)] (log/debug (str f))) @@ -388,7 +389,6 @@ (mapv (fn [assignment] {:leases (.getLeasesUsed assignment) :tasks (.getTasksAssigned assignment) - ;;TODO maybe we should pull the task requests out here, too? :hostname (.getHostname assignment)}) assignments))) @@ -425,25 +425,6 @@ (catch clojure.lang.ExceptionInfo e false))) -;;TODO LOL -#_(let [db (db (d/connect "datomic:mem://cook-jobs"))] - (doseq [[job status] (vec (q '[:find ?j ?status - :where - [?j :job/state ?s] - [?s :db/ident ?status] - ] - db))] - (println job status (q '[:find ?i ?host ?status - :in $ ?j - :where - [?j :job/instance ?i] - [?i :instance/status ?s] - [?i :instance/hostname ?host] - [?s :db/ident ?status] - ] db job)) - )) - -;;TODO test this (defn ids-of-backfilled-instances "Returns a list of the ids of running, backfilled instances of the given job" [job] diff --git a/scheduler/test/cook/test/mesos/scheduler.clj b/scheduler/test/cook/test/mesos/scheduler.clj index 2fd36d5ca0..924768c677 100644 --- a/scheduler/test/cook/test/mesos/scheduler.clj +++ b/scheduler/test/cook/test/mesos/scheduler.clj @@ -209,6 +209,23 @@ (offer-maker 4 4000) (offer-maker 5 5000))))) +(deftest test-ids-of-backfilled-instances + (let [uri "datomic:mem://test-ids-of-backfilled-instances" + conn (restore-fresh-database! uri) + jid (d/tempid :db.part/user) + tid1 (d/tempid :db.part/user) + tid2 (d/tempid :db.part/user) + {:keys [tempids db-after]} @(d/transact conn [[:db/add jid :job/instance tid1] + [:db/add jid :job/instance tid2] + [:db/add tid1 :instance/status :instance.status/running] + [:db/add tid2 :instance/status :instance.status/running] + [:db/add tid1 :instance/backfilled? false] + [:db/add tid2 :instance/backfilled? true]]) + jid' (d/resolve-tempid db-after tempids jid) + tid2' (d/resolve-tempid db-after tempids tid2) + backfilled-ids (sched/ids-of-backfilled-instances (d/entity db-after jid'))] + (is (= [tid2'] backfilled-ids)))) + (deftest test-get-user->used-resources (let [uri "datomic:mem://test-get-used-resources" conn (restore-fresh-database! uri) From 536d9ce38f01abe25d700d70d1f1435845d94ab7 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Fri, 20 Nov 2015 12:12:00 -0500 Subject: [PATCH 17/27] Add support for ports with Fenzo --- scheduler/src/cook/mesos/api.clj | 12 +- scheduler/src/cook/mesos/scheduler.clj | 265 +++++++++++++------------ scheduler/src/cook/mesos/schema.clj | 8 +- 3 files changed, 152 insertions(+), 133 deletions(-) diff --git a/scheduler/src/cook/mesos/api.clj b/scheduler/src/cook/mesos/api.clj index d8b8760801..ea2c69ba28 100644 --- a/scheduler/src/cook/mesos/api.clj +++ b/scheduler/src/cook/mesos/api.clj @@ -55,7 +55,7 @@ :max-retries (s/both s/Int (s/pred pos? 'pos?)) :max-runtime (s/both s/Int (s/pred pos? 'pos?)) (s/optional-key :uris) [Uri] - (s/optional-key :ports) [(s/pred zero? 'zero)] ;;TODO add to docs the limited uri/port support + (s/optional-key :ports) (s/pred pos? 'pos?) (s/optional-key :env) {NonEmptyString s/Str} :cpus PosDouble :mem PosDouble @@ -67,10 +67,8 @@ [conn jobs :- [Job]] (doseq [{:keys [uuid command max-retries max-runtime priority cpus mem user name ports uris env]} jobs :let [id (d/tempid :db.part/user) - ports (mapv (fn [port] - ;;TODO this schema might not work b/c all ports are zero - [:db/add id :job/port port]) - ports) + ports (when (and ports (not (zero? ports))) + [[:db/add id :job/port ports]]) uris (mapcat (fn [{:keys [value executable? cache? extract?]}] (let [uri-id (d/tempid :db.part/user) optional-params {:resource.uri/executable? executable? @@ -144,7 +142,7 @@ :priority (or priority util/default-job-priority) :max-retries max_retries :max-runtime (or max_runtime Long/MAX_VALUE) - :ports (or ports []) + :ports (or ports 0) :cpus (double cpus) :mem (double mem)} (when uris @@ -239,7 +237,7 @@ :status (name (:job/state job)) :uris (:uris resources) :env (util/job-ent->env job) - ;;TODO include ports + :ports (:job/port job 0) :instances (map (fn [instance] (let [hostname (:instance/hostname instance) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index b4759bf9db..2ce26ee3aa 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -95,7 +95,7 @@ ;; If the there is no value for key :job/name, the following name will contain a substring "null". {:name (format "%s_%s_%s" (:job/name job-ent "cookjob") (:job/user job-ent) task-id) :task-id task-id - :num-ports (count (:ports resources)) + :num-ports (:ports resources) :resources (select-keys resources [:mem :cpus]) ;;TODO this data is a race-condition :data (.getBytes @@ -480,121 +480,131 @@ offer-size-mem (get-in offer [:resources :mem] 0))) (log/debug "invoked handle-resource-offers!") - (timers/time! - handle-resource-offer!-duration - (try - (loop [] ;; This loop is for compare-and-set! below - (let [scheduler-contents @pending-jobs - db (db conn) - _ (log/debug "There are" (count scheduler-contents) "pending jobs") - considerable (->> scheduler-contents - (filter (fn [job] - (job-allowed-to-start? db job))) - (take num-considerable)) - _ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs (limited to " num-considerable " due to backdown)") - matches (match-offer-to-schedule fenzo considerable offers db fid) - matched-jobs (for [match matches - ^TaskAssignmentResult task-result (:tasks match) - :let [task-request (.getRequest task-result)]] - (:job task-request)) - processed-matches (process-matches-for-backfill scheduler-contents (first considerable) matched-jobs) - new-scheduler-contents (remove (fn [pending-job] - (contains? (:fully-processed processed-matches) (:job/uuid pending-job))) - scheduler-contents) - first-considerable-resources (-> considerable first util/job-ent->resources) - match-resource-requirements (util/sum-resources-of-jobs matched-jobs)] - (reset! front-of-job-queue-mem-atom - (or (:mem first-considerable-resources) 0)) - (reset! front-of-job-queue-cpus-atom - (or (:cpus first-considerable-resources) 0)) - (cond - (empty? matches) (if (seq considerable) - nil ;; This case means we failed to match anything. We need to start head of line blocking now - true) ;; This case means there was nothing to match--don't penalize the scheduler for the next burst of submitted jobs - (not (compare-and-set! pending-jobs scheduler-contents new-scheduler-contents)) - (let [recycle-offers (doall (for [{:keys [leases]} matches - lease leases] - (:offer lease)))] - (log/debug "Pending job atom contention encountered, recycling offers:" recycle-offers) - (async/go - (async/>! offers-chan recycle-offers)) - (meters/mark! pending-job-atom-contended) - (recur)) - :else - ;;TODO this construction of task-infos is legacy-oriented - ;;since now we treat "matches" as our primary structure, - ;;we should eventually remove this construction of task-infos - ;;as it's just a helper for building the datomic transaction - (let [task-txns (for [{:keys [tasks leases]} matches - :let [offers (mapv :offer leases) - slave-id (:slave-id (first offers))] - ^TaskAssignmentResult task tasks - :let [request (.getRequest task) - task-info (:task-info request) - job-id (get-in request [:job :db/id])]] - [[:job/allowed-to-start? job-id] - {:db/id (d/tempid :db.part/user) - :job/_instance job-id - :instance/task-id (:task-id task-info) - :instance/hostname (.getHostname task) - :instance/start-time (now) - ;; NB command executor uses the task-id - ;; as the executor-id - :instance/executor-id (get-in - task-info - [:executor :executor-id] - (:task-id task-info)) - :instance/backfilled? (contains? (:backfill-jobs processed-matches) (get-in request [:job :job/uuid])) - :instance/slave-id slave-id - :instance/progress 0 - :instance/status :instance.status/unknown - :instance/preempted? false}]) - upgrade-txns (map (fn [instance-id] - [:db/add instance-id :instance/backfilled? false]) - (:upgrade-backfill processed-matches))] - ;; Note that this transaction can fail if a job was scheduled - ;; during a race. If that happens, then other jobs that should - ;; be scheduled will not be eligible for rescheduling until - ;; the pending-jobs atom is repopulated - @(d/transact - conn - (vec (apply concat upgrade-txns task-txns))) - (log/info "Launching" (count task-txns) "tasks") - (log/info "Upgrading" (count (:upgrade-backfill processed-matches)) "tasks from backfilled to proper") - (log/debug "Matched tasks" task-txns) - ;; This launch-tasks MUST happen after the above transaction in - ;; order to allow a transaction failure (due to failed preconditions) - ;; to block the launch - (meters/mark! scheduler-offer-matched - (->> matches - (mapcat (comp :id :offer :leases)) - (distinct) - (count))) - (meters/mark! matched-tasks (count task-txns)) - (meters/mark! matched-tasks-cpus (:cpus match-resource-requirements)) - (meters/mark! matched-tasks-mem (:mem match-resource-requirements)) - (doseq [{:keys [tasks leases]} matches - :let [offers (mapv :offer leases) - task-infos (mapv (fn [^TaskAssignmentResult task] - (:task-info (.getRequest task))) - tasks) - slave-id (:slave-id (first offers))]] - ;TODO if the ports are in the offer, then Fenzo is identifying the ports with a list .getAssignedPorts on the TaskAssignmentResult - (mesos/launch-tasks - driver - (mapv :id offers) - (mapv #(-> % - (assoc :slave-id slave-id) - (dissoc :num-ports)) - task-infos)) - (doseq [^TaskAssignmentResult task tasks] - (.. fenzo - (getTaskAssigner) - (call (.getRequest task) (get-in (first leases) [:offer :hostname]))))) - (:matched-head? processed-matches))))) + (let [offer-stash (atom nil)] ;; This is a way to ensure we never lose offers fenzo assigned if an errors occures in the middle of processing + (timers/time! + handle-resource-offer!-duration + (try + (loop [] ;; This loop is for compare-and-set! below + (let [scheduler-contents @pending-jobs + db (db conn) + _ (log/debug "There are" (count scheduler-contents) "pending jobs") + considerable (->> scheduler-contents + (filter (fn [job] + (job-allowed-to-start? db job))) + (take num-considerable)) + _ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs (limited to " num-considerable " due to backdown)") + matches (match-offer-to-schedule fenzo considerable offers db fid) + _ (reset! offer-stash (doall (for [{:keys [leases]} matches + lease leases] + (:offer lease)))) + matched-jobs (for [match matches + ^TaskAssignmentResult task-result (:tasks match) + :let [task-request (.getRequest task-result)]] + (:job task-request)) + processed-matches (process-matches-for-backfill scheduler-contents (first considerable) matched-jobs) + new-scheduler-contents (remove (fn [pending-job] + (contains? (:fully-processed processed-matches) (:job/uuid pending-job))) + scheduler-contents) + first-considerable-resources (-> considerable first util/job-ent->resources) + match-resource-requirements (util/sum-resources-of-jobs matched-jobs)] + (reset! front-of-job-queue-mem-atom + (or (:mem first-considerable-resources) 0)) + (reset! front-of-job-queue-cpus-atom + (or (:cpus first-considerable-resources) 0)) + (cond + (empty? matches) (if (seq considerable) + nil ;; This case means we failed to match anything. We need to start head of line blocking now + true) ;; This case means there was nothing to match--don't penalize the scheduler for the next burst of submitted jobs + (not (compare-and-set! pending-jobs scheduler-contents new-scheduler-contents)) + (do + (log/debug "Pending job atom contention encountered, recycling offers:" @offer-stash) + (async/go + (async/>! offers-chan @offer-stash)) + (meters/mark! pending-job-atom-contended) + (recur)) + :else + (let [task-txns (for [{:keys [tasks leases]} matches + :let [offers (mapv :offer leases) + slave-id (:slave-id (first offers))] + ^TaskAssignmentResult task tasks + :let [request (.getRequest task) + task-info (:task-info request) + job-id (get-in request [:job :db/id])]] + [[:job/allowed-to-start? job-id] + {:db/id (d/tempid :db.part/user) + :job/_instance job-id + :instance/task-id (:task-id task-info) + :instance/hostname (.getHostname task) + :instance/start-time (now) + ;; NB command executor uses the task-id + ;; as the executor-id + :instance/executor-id (get-in + task-info + [:executor :executor-id] + (:task-id task-info)) + :instance/backfilled? (contains? (:backfill-jobs processed-matches) (get-in request [:job :job/uuid])) + :instance/slave-id slave-id + :instance/progress 0 + :instance/status :instance.status/unknown + :instance/preempted? false}]) + upgrade-txns (map (fn [instance-id] + [:db/add instance-id :instance/backfilled? false]) + (:upgrade-backfill processed-matches))] + ;; Note that this transaction can fail if a job was scheduled + ;; during a race. If that happens, then other jobs that should + ;; be scheduled will not be eligible for rescheduling until + ;; the pending-jobs atom is repopulated + @(d/transact + conn + (vec (apply concat upgrade-txns task-txns))) + (log/info "Launching" (count task-txns) "tasks") + (log/info "Upgrading" (count (:upgrade-backfill processed-matches)) "tasks from backfilled to proper") + (log/debug "Matched tasks" task-txns) + ;; This launch-tasks MUST happen after the above transaction in + ;; order to allow a transaction failure (due to failed preconditions) + ;; to block the launch + (meters/mark! scheduler-offer-matched + (->> matches + (mapcat (comp :id :offer :leases)) + (distinct) + (count))) + (meters/mark! matched-tasks (count task-txns)) + (meters/mark! matched-tasks-cpus (:cpus match-resource-requirements)) + (meters/mark! matched-tasks-mem (:mem match-resource-requirements)) + (doseq [{:keys [tasks leases]} matches + :let [offers (mapv :offer leases) + task-infos (mapv (fn process-results [^TaskAssignmentResult task] + (reduce + (fn add-ports-to-task-info [task-info [index port]] + (log/debug "task-info" task-info [index port]) + (-> task-info + (update-in [:resources :ports] + (fnil conj []) + {:begin port :end port}) + (assoc-in [:command :environment (str "PORT" index)] + (str port)))) + (:task-info (.getRequest task)) + (map-indexed (fn [index port] [index port]) + (.getAssignedPorts task)))) + tasks) + slave-id (:slave-id (first offers))]] + (mesos/launch-tasks + driver + (mapv :id offers) + (mapv #(-> % + (assoc :slave-id slave-id) + (dissoc :num-ports)) + task-infos)) + (doseq [^TaskAssignmentResult task tasks] + (.. fenzo + (getTaskAssigner) + (call (.getRequest task) (get-in (first leases) [:offer :hostname]))))) + (:matched-head? processed-matches))))) (catch Throwable t (meters/mark! handle-resource-offer!-errors) - (log/error t "Error in match:" (ex-data t)))))) + (async/go + (async/>! offers-chan @offer-stash)) + (log/error t "Error in match:" (ex-data t))))))) (defn view-incubating-offers [^TaskScheduler fenzo] @@ -617,17 +627,22 @@ max-considerable 1000] (async/thread (loop [num-considerable max-considerable] - (let [offers (async/alt!! - offers-chan ([offers] offers) - timer-chan ([_] []) - :priority true) - matched-head? (handle-resource-offers! conn @driver-atom fenzo @fid-atom pending-jobs-atom num-considerable offers-chan offers)] - (when (seq offers) - (reset! resources-atom (view-incubating-offers fenzo))) - ;;TODO make this cancelable - (recur (if matched-head? - max-considerable - (max 1 (long (* 0.95 num-considerable)))))))) ;; With max=1000 and 1 iter/sec, this will take 88 seconds to reach 1 + ;;TODO make this cancelable + (recur + (try + (let [offers (async/alt!! + offers-chan ([offers] offers) + timer-chan ([_] []) + :priority true) + matched-head? (handle-resource-offers! conn @driver-atom fenzo @fid-atom pending-jobs-atom num-considerable offers-chan offers)] + (when (seq offers) + (reset! resources-atom (view-incubating-offers fenzo))) + (if matched-head? + max-considerable + (max 1 (long (* 0.95 num-considerable))))) ;; With max=1000 and 1 iter/sec, this will take 88 seconds to reach 1 + (catch Exception e + (log/error e "Offer handler encountered exception; continuing") + max-considerable))))) [offers-chan resources-atom])) (defn reconcile-jobs diff --git a/scheduler/src/cook/mesos/schema.clj b/scheduler/src/cook/mesos/schema.clj index 636345f7ed..e81705b65e 100644 --- a/scheduler/src/cook/mesos/schema.clj +++ b/scheduler/src/cook/mesos/schema.clj @@ -227,6 +227,12 @@ :db/index true :db.alter/_attribute :db.part/db}]) +(def migration-port-is-a-count + "This was written on 10-20-2015" + [{:db/id :job/port + :db/cardinality :db.cardinality/one + :db.alter/_attribute :db.part/db}]) + (def rebalancer-configs [{:db/id (d/tempid :db.part/user) :db/ident :rebalancer/config} @@ -409,4 +415,4 @@ []))}}]) (def work-item-schema - [schema-attributes state-enums rebalancer-configs migration-add-index-to-job-state db-fns]) + [schema-attributes state-enums rebalancer-configs migration-add-index-to-job-state migration-port-is-a-count db-fns]) From 7605a4568011e86b0b2da7931bd14e849c481a2b Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Fri, 20 Nov 2015 15:52:08 -0500 Subject: [PATCH 18/27] Catch exceptions if task unassignemnt fails --- scheduler/src/cook/mesos/scheduler.clj | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 2ce26ee3aa..186fc6273f 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -200,12 +200,13 @@ (.getTime (:instance/start-time instance-ent))) job-resources (util/job-ent->resources job-ent)] (when (#{:instance.status/success :instance.status/failed} instance-status) - ;;TODO what if these messages came through a reconciliation & there's no task-id in fenzo already? - ;;Maybe we need to suppress such an exception (log/debug "Unassigning task" task-id "from" (:instance/hostname instance-ent)) - (.. fenzo - (getTaskUnAssigner) - (call task-id (:instance/hostname instance-ent)))) + (try + (.. fenzo + (getTaskUnAssigner) + (call task-id (:instance/hostname instance-ent))) + (catch Exception e + (log/error e "Failed to unassign task" task-id "from" (:instance/hostname instance-ent))))) (when (= instance-status :instance.status/success) (handle-throughput-metrics success-throughput-metrics job-resources From 583c84610847b3c6de5a6dac4d7a2954ad560338 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Fri, 20 Nov 2015 16:10:40 -0500 Subject: [PATCH 19/27] Test backfill filler --- scheduler/src/cook/mesos/scheduler.clj | 2 - scheduler/test/cook/test/mesos/scheduler.clj | 63 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index 186fc6273f..b441d9b651 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -434,7 +434,6 @@ (:instance/backfilled? %))) (map :db/id))) -;;TODO test this (defn process-matches-for-backfill "This computes some sets: @@ -467,7 +466,6 @@ :upgrade-backfill upgrade-backfill :backfill-jobs backfill-jobs :matched-head? matched-head?})))) -;;TODO need to ensure that we exponentially converge to head of line blocking when we repeating come up with non-head solutions (defn handle-resource-offers! "Gets a list of offers from mesos. Decides what to do with them all--they should all diff --git a/scheduler/test/cook/test/mesos/scheduler.clj b/scheduler/test/cook/test/mesos/scheduler.clj index 924768c677..704c0c8482 100644 --- a/scheduler/test/cook/test/mesos/scheduler.clj +++ b/scheduler/test/cook/test/mesos/scheduler.clj @@ -226,6 +226,69 @@ backfilled-ids (sched/ids-of-backfilled-instances (d/entity db-after jid'))] (is (= [tid2'] backfilled-ids)))) +(deftest test-process-matches-for-backfill + (letfn [(mock-job [& instances] ;;instances is a seq of booleans which denote the running instances that could be true or false + {:job/uuid (java.util.UUID/randomUUID) + :job/instance (for [backfill? instances] + {:instance/status :instance.status/running + :instance/backfilled? backfill? + :db/id (java.util.UUID/randomUUID)})})] + (let [j1 (mock-job) + j2 (mock-job true) + j3 (mock-job false true true) + j4 (mock-job) + j5 (mock-job true) + j6 (mock-job false false) + j7 (mock-job false)] + (testing "Match nothing" + (let [result (sched/process-matches-for-backfill [j1 j4] j1 [])] + (is (zero? (count (:fully-processed result)))) + (is (zero? (count (:upgrade-backfill result)))) + (is (zero? (count (:backfill-jobs result)))) + (is (not (:matched-head? result))))) + (testing "Match everything basic" + (let [result (sched/process-matches-for-backfill [j1 j4] j1 [j1 j4])] + (is (= 2 (count (:fully-processed result)))) + (is (zero? (count (:upgrade-backfill result)))) + (is (zero? (count (:backfill-jobs result)))) + (is (:matched-head? result)))) + (testing "Don't match head" + (let [result (sched/process-matches-for-backfill [j1 j4] j1 [j4])] + (is (zero? (count (:fully-processed result)))) + (is (zero? (count (:upgrade-backfill result)))) + (is (= 1 (count (:backfill-jobs result)))) + (is (not (:matched-head? result))))) + (testing "Match the tail, but the head isn't considerable" ;;TODO is this even correct? + (let [result (sched/process-matches-for-backfill [j1 j4] j4 [j4])] + (is (zero? (count (:fully-processed result)))) + (is (zero? (count (:upgrade-backfill result)))) + (is (= 1 (count (:backfill-jobs result)))) + (is (not (:matched-head? result))))) + (testing "Match the tail, and the head was backfilled" + (let [result (sched/process-matches-for-backfill [j2 j1 j4] j1 [j1 j4])] + (is (= 2 (count (:fully-processed result)))) + (is (= 1 (count (:upgrade-backfill result)))) + (is (zero? (count (:backfill-jobs result)))) + (is (:matched-head? result)))) + (testing "Match the tail, and the head was backfilled (multiple backfilled mixed in)" + (let [result (sched/process-matches-for-backfill [j2 j1 j3 j4] j1 [j1 j4])] + (is (= 2 (count (:fully-processed result)))) + (is (= 3 (count (:upgrade-backfill result)))) + (is (zero? (count (:backfill-jobs result)))) + (is (:matched-head? result)))) + (testing "Fail to match the head, but backfill several jobs" + (let [result (sched/process-matches-for-backfill [j1 j2 j3 j4 j5 j6 j7] j1 [j4 j6 j7])] + (is (zero? (count (:fully-processed result)))) + (is (zero? (count (:upgrade-backfill result)))) + (is (= 3 (count (:backfill-jobs result)))) + (is (not (:matched-head? result))))) + (testing "Fail to match the head, but backfill several jobs" + (let [result (sched/process-matches-for-backfill [j1 j2 j3 j4 j5 j6 j7] j1 [j4 j6 j7])] + (is (zero? (count (:fully-processed result)))) + (is (zero? (count (:upgrade-backfill result)))) + (is (= 3 (count (:backfill-jobs result)))) + (is (not (:matched-head? result)))))))) + (deftest test-get-user->used-resources (let [uri "datomic:mem://test-get-used-resources" conn (restore-fresh-database! uri) From 5476f1585f93f653b9e44125ae6fd24209974530 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Mon, 23 Nov 2015 15:56:57 -0500 Subject: [PATCH 20/27] Fix api tests --- scheduler/src/cook/mesos/api.clj | 2 +- scheduler/test/cook/test/mesos/api.clj | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/scheduler/src/cook/mesos/api.clj b/scheduler/src/cook/mesos/api.clj index ea2c69ba28..56fb11d14e 100644 --- a/scheduler/src/cook/mesos/api.clj +++ b/scheduler/src/cook/mesos/api.clj @@ -55,7 +55,7 @@ :max-retries (s/both s/Int (s/pred pos? 'pos?)) :max-runtime (s/both s/Int (s/pred pos? 'pos?)) (s/optional-key :uris) [Uri] - (s/optional-key :ports) (s/pred pos? 'pos?) + (s/optional-key :ports) (s/pred #(not (neg? %)) 'nonnegative?) (s/optional-key :env) {NonEmptyString s/Str} :cpus PosDouble :mem PosDouble diff --git a/scheduler/test/cook/test/mesos/api.clj b/scheduler/test/cook/test/mesos/api.clj index 3e48eae102..c82e9142f7 100644 --- a/scheduler/test/cook/test/mesos/api.clj +++ b/scheduler/test/cook/test/mesos/api.clj @@ -34,11 +34,10 @@ The first list is the correct list, which may be missing keys. The second list can be fully filled in by Cook with defaults." [gold-standard new-data] - (let [resp-uris (map (fn [gold copy] - (select-keys copy (keys gold))) - gold-standard - new-data)] - (= gold-standard resp-uris))) + (let [paired-uris (group-by #(or (get % :value) (get % "value")) (concat gold-standard new-data))] + (doseq [[orig post :as uris] (vals paired-uris)] + (is (= 2 (count uris)) "There should be only 1 original and 1 roundtripped value for each URI") + (is (= orig (select-keys post (keys orig))) "The orig and new value should be equal, excluding defaults")))) (deftest handler-db-roundtrip (let [conn (restore-fresh-database! "datomic:mem://mesos-api-test") @@ -57,6 +56,7 @@ "priority" 66 "max_retries" 100 "max_runtime" 1000000 + "ports" 2 "uris" uris "env" env "cpus" 2.0 @@ -76,7 +76,7 @@ (is (= (util/job-ent->env ent) env)) (is (= (:cpus resources) 2.0)) (is (= (:mem resources) 2048.0)) - (is (compare-uris (map kw-keys uris) (:uris resources)))) + (compare-uris (map kw-keys uris) (:uris resources))) (let [resp (h {:request-method :get :scheme :http :uri "/rawscheduler" @@ -90,7 +90,7 @@ uris (get trimmed-body "uris"))] (is (= (dissoc job "uris") (dissoc trimmed-body "uris"))) - (is (compare-uris uris (get trimmed-body "uris")))) + (compare-uris uris (get trimmed-body "uris"))) (is (<= 400 (:status (h {:request-method :delete :scheme :http From d807b04723383dfce0a7e15582bd2f4d95f95c02 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Mon, 23 Nov 2015 16:05:47 -0500 Subject: [PATCH 21/27] Ensure we propagate the proper default port count as zero --- scheduler/src/cook/mesos/util.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/src/cook/mesos/util.clj b/scheduler/src/cook/mesos/util.clj index cfffada81c..3f8ed75054 100644 --- a/scheduler/src/cook/mesos/util.clj +++ b/scheduler/src/cook/mesos/util.clj @@ -50,7 +50,7 @@ :executable (:resource.uri/executable? r false) :value (:resource.uri/value r) :extract (:resource.uri/extract? r false)})))) - {:ports (:job/port job-ent)} + {:ports (:job/port job-ent 0)} (:job/resource job-ent))) (defn sum-resources-of-jobs From 28199cdb4a136f702b710acd4f5c07a3875b9e29 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Mon, 23 Nov 2015 17:18:34 -0500 Subject: [PATCH 22/27] Alternative impl for backfill --- scheduler/src/cook/mesos/rebalancer.clj | 15 ++------------- scheduler/src/cook/mesos/scheduler.clj | 2 +- scheduler/src/cook/mesos/util.clj | 17 ++++++++++++----- scheduler/test/cook/test/mesos/rebalancer.clj | 16 ++++++++-------- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/scheduler/src/cook/mesos/rebalancer.clj b/scheduler/src/cook/mesos/rebalancer.clj index 3f9352dc4f..fc39c8d69d 100644 --- a/scheduler/src/cook/mesos/rebalancer.clj +++ b/scheduler/src/cook/mesos/rebalancer.clj @@ -170,7 +170,7 @@ user->sorted-running-task-ents (->> running-task-ents (group-by util/task-ent->user) (map (fn [[user task-ents]] - [user (into (sorted-set-by util/same-user-task-comparator) task-ents)])) + [user (into (sorted-set-by (util/same-user-task-comparator true)) task-ents)])) (into {})) task->scored-task (dru/init-task->scored-task user->sorted-running-task-ents user->dru-divisors)] (->State task->scored-task user->sorted-running-task-ents host->spare-resources user->dru-divisors))) @@ -195,7 +195,7 @@ (reduce (fn [task-ents-by-user task-ent] (let [user (util/task-ent->user task-ent) f (if (= new-running-task-ent task-ent) - (fnil conj (sorted-set-by util/same-user-task-comparator)) + (fnil conj (sorted-set-by (util/same-user-task-comparator true))) disj)] (update-in task-ents-by-user [user] f task-ent))) user->sorted-running-task-ents @@ -218,22 +218,11 @@ [{:keys [task->scored-task host->spare-resources] :as state} {:keys [min-dru-diff safe-dru-threshold] :as params} pending-job-ent] - ;;We need to rely on this being a priority map TODO check that this is true - (when-not (instance? clojure.data.priority_map.PersistentPriorityMap task->scored-task) - (log/fatal "Implementation detail failed; needed priority, got" (class task->scored-task)) - (throw (ex-info "Implementation detail failed" {}))) (let [{pending-job-mem :mem pending-job-cpus :cpus} (util/job-ent->resources pending-job-ent) pending-job-dru (compute-pending-job-dru state pending-job-ent) ;; This will preserve the ordering of task->scored-task host->scored-tasks (->> task->scored-task - ;;TODO maybe we can just change every backfilled task here to have the worst possible DRU - ;;this should bias us towards always killing those tasks first... - (reduce-kv (fn [m k {:keys [task] :as scored-task}] - (if (:instance/backfilled? task) - (assoc m k (assoc scored-task :dru Double/MAX_VALUE)) - m)) - task->scored-task) (vals) (remove #(< (:dru %) safe-dru-threshold)) (filter #(> (- (:dru %) pending-job-dru) min-dru-diff)) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index b441d9b651..a5d5694901 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -810,7 +810,7 @@ jobs (-<>> (concat running-task-ents pending-task-ents) (group-by util/task-ent->user) (map (fn [[user task-ents]] - [user (into (sorted-set-by util/same-user-task-comparator) task-ents)])) + [user (into (sorted-set-by (util/same-user-task-comparator false)) task-ents)])) (into {}) (dru/init-task->scored-task <> user->dru-divisors) (filter (fn [[task scored-task]] diff --git a/scheduler/src/cook/mesos/util.clj b/scheduler/src/cook/mesos/util.clj index 3f8ed75054..1f5e1b3296 100644 --- a/scheduler/src/cook/mesos/util.clj +++ b/scheduler/src/cook/mesos/util.clj @@ -108,15 +108,22 @@ (def ^:const default-job-priority 50) (defn same-user-task-comparator - "Comparator to order same user's tasks" - [task1 task2] - (letfn [(task->feature-vector [task] + "Comparator to order same user's tasks + + If consider-backfilled-jobs? is true, then we treat jobs which were backfilled as being the lowest priority" + [consider-backfilled-jobs?] + (letfn [(task->feature-vector + [task] ;; Last two elements are aribitary tie breakers. ;; Use :db/id because they guarantee uniqueness for different entities ;; (:db/id task) is not sufficient because synthetic task entities don't have :db/id ;; This assumes there are at most one synthetic task for a job, otherwise uniqueness invariant will break - [(- (:job/priority (:job/_instance task) default-job-priority)) + [(if (and consider-backfilled-jobs? (:instance/backfilled? task)) + Integer/MAX_VALUE + (- (:job/priority (:job/_instance task) default-job-priority))) (:instance/start-time task (java.util.Date. Long/MAX_VALUE)) (:db/id task) (:db/id (:job/_instance task))])] - (compare (task->feature-vector task1) (task->feature-vector task2)))) + (fn same-user-task-comparator-inner + [task1 task2] + (compare (task->feature-vector task1) (task->feature-vector task2))))) diff --git a/scheduler/test/cook/test/mesos/rebalancer.clj b/scheduler/test/cook/test/mesos/rebalancer.clj index 94beeaecfc..a7a0ebaaa9 100644 --- a/scheduler/test/cook/test/mesos/rebalancer.clj +++ b/scheduler/test/cook/test/mesos/rebalancer.clj @@ -386,8 +386,8 @@ (let [task-ent9 {:job/_instance job-ent9 :instance/hostname "hostB" :instance/status :instance.status/running} - user->sorted-running-task-ents' {"ljin" (into (sorted-set-by util/same-user-task-comparator) [task-ent1 task-ent2 task-ent3 task-ent4]) - "wzhao" (into (sorted-set-by util/same-user-task-comparator) [task-ent5 task-ent7 task-ent9])} + user->sorted-running-task-ents' {"ljin" (into (sorted-set-by (util/same-user-task-comparator true)) [task-ent1 task-ent2 task-ent3 task-ent4]) + "wzhao" (into (sorted-set-by (util/same-user-task-comparator true)) [task-ent5 task-ent7 task-ent9])} host->spare-resources' {"hostA" {:mem 50.0 :cpus 50.0} "hostB" {:mem 5.0 :cpus 5.0}}] (let [{task->scored-task'' :task->scored-task user->sorted-running-task-ents'' :user->sorted-running-task-ents @@ -411,9 +411,9 @@ (let [task-ent10 {:job/_instance job-ent10 :instance/hostname "hostA" :instance/status :instance.status/running} - user->sorted-running-task-ents' {"ljin" (into (sorted-set-by util/same-user-task-comparator) [task-ent1 task-ent3 task-ent4]) - "wzhao" (into (sorted-set-by util/same-user-task-comparator) [task-ent5 task-ent6 task-ent8]) - "sunil" (into (sorted-set-by util/same-user-task-comparator) [task-ent10])} + user->sorted-running-task-ents' {"ljin" (into (sorted-set-by (util/same-user-task-comparator true)) [task-ent1 task-ent3 task-ent4]) + "wzhao" (into (sorted-set-by (util/same-user-task-comparator true)) [task-ent5 task-ent6 task-ent8]) + "sunil" (into (sorted-set-by (util/same-user-task-comparator true)) [task-ent10])} host->spare-resources' {"hostA" {:mem 50.0 :cpus 50.0}}] (let [{task->scored-task'' :task->scored-task user->sorted-running-task-ents'' :user->sorted-running-task-ents @@ -436,9 +436,9 @@ (let [task-ent12 {:job/_instance job-ent12 :instance/hostname "hostA" :instance/status :instance.status/running} - user->sorted-running-task-ents' {"ljin" (into (sorted-set-by util/same-user-task-comparator) [task-ent1 task-ent2 task-ent3 task-ent4]) - "wzhao" (into (sorted-set-by util/same-user-task-comparator) [task-ent5 task-ent6 task-ent7 task-ent8]) - "sunil" (into (sorted-set-by util/same-user-task-comparator) [task-ent12])} + user->sorted-running-task-ents' {"ljin" (into (sorted-set-by (util/same-user-task-comparator true)) [task-ent1 task-ent2 task-ent3 task-ent4]) + "wzhao" (into (sorted-set-by (util/same-user-task-comparator true)) [task-ent5 task-ent6 task-ent7 task-ent8]) + "sunil" (into (sorted-set-by (util/same-user-task-comparator true)) [task-ent12])} host->spare-resources' {"hostA" {:mem 10.0 :cpus 10.0}}] (let [{task->scored-task'' :task->scored-task From 27da88d1a7872810a7342d4ec8847f580b0bd305 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Mon, 23 Nov 2015 17:43:36 -0500 Subject: [PATCH 23/27] Add really important bit for backfill DRU penalty back in --- scheduler/src/cook/mesos/rebalancer.clj | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scheduler/src/cook/mesos/rebalancer.clj b/scheduler/src/cook/mesos/rebalancer.clj index fc39c8d69d..254c59a7cb 100644 --- a/scheduler/src/cook/mesos/rebalancer.clj +++ b/scheduler/src/cook/mesos/rebalancer.clj @@ -224,6 +224,10 @@ ;; This will preserve the ordering of task->scored-task host->scored-tasks (->> task->scored-task (vals) + (map (fn [{:keys [task] :as scored-task}] + (if (:instance/backfilled? task) + (assoc scored-task :dru Double/MAX_VALUE) + scored-task))) (remove #(< (:dru %) safe-dru-threshold)) (filter #(> (- (:dru %) pending-job-dru) min-dru-diff)) (group-by (fn [{:keys [task]}] From 930d3ec0eb93d1ac49279730210fb1339e4848ff Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Tue, 24 Nov 2015 14:32:36 -0500 Subject: [PATCH 24/27] Remove global preemption of backfilled tasks --- scheduler/src/cook/mesos/rebalancer.clj | 4 ---- 1 file changed, 4 deletions(-) diff --git a/scheduler/src/cook/mesos/rebalancer.clj b/scheduler/src/cook/mesos/rebalancer.clj index 254c59a7cb..fc39c8d69d 100644 --- a/scheduler/src/cook/mesos/rebalancer.clj +++ b/scheduler/src/cook/mesos/rebalancer.clj @@ -224,10 +224,6 @@ ;; This will preserve the ordering of task->scored-task host->scored-tasks (->> task->scored-task (vals) - (map (fn [{:keys [task] :as scored-task}] - (if (:instance/backfilled? task) - (assoc scored-task :dru Double/MAX_VALUE) - scored-task))) (remove #(< (:dru %) safe-dru-threshold)) (filter #(> (- (:dru %) pending-job-dru) min-dru-diff)) (group-by (fn [{:keys [task]}] From aee80743f5924e9a5c7276ebec98f8fe16dd15b2 Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Thu, 3 Dec 2015 17:15:33 -0500 Subject: [PATCH 25/27] Add test for backfill job upgrades --- scheduler/test/cook/test/mesos/scheduler.clj | 33 ++++++++++++++++++++ scheduler/test/cook/test/mesos/schema.clj | 6 ++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/scheduler/test/cook/test/mesos/scheduler.clj b/scheduler/test/cook/test/mesos/scheduler.clj index 704c0c8482..df3d67da09 100644 --- a/scheduler/test/cook/test/mesos/scheduler.clj +++ b/scheduler/test/cook/test/mesos/scheduler.clj @@ -426,5 +426,38 @@ (is (= [task-id-2] (sched/get-lingering-tasks test-db now 120)))) ) +(deftest test-upgrade-backfill-flow + ;; We need to create a DB and connect the tx monitor queue to it + ;; Then we'll put in some jobs + ;; Then pretend to launch them as backfilled + ;; Check here that they have the right properties (pending, not ready to run) + ;; Then we'll upgrade the jobs + ;; Check here that they have the right properties (running, not ready to run) + ;; Then shut it down + (let [uri "datomic:mem://test-backfill-upgrade" + conn (restore-fresh-database! uri) + check-count-of-pending-and-runnable-jobs + (fn check-count-of-pending-and-runnable-jobs [expected-pending expected-runnable msg] + (let [db (db conn) + pending-jobs (sched/rank-jobs db identity) + runnable-jobs (filter (fn [job] (sched/job-allowed-to-start? db job)) pending-jobs)] + (is (= expected-pending (count pending-jobs)) (str "Didn't match pending job count in " msg)) + (is (= expected-runnable (count runnable-jobs)) (str "Didn't match runnable job count in " msg)))) + job (create-dummy-job conn + :user "tsram" + :job-state :job.state/waiting + :max-runtime Long/MAX_VALUE) + _ (check-count-of-pending-and-runnable-jobs 1 1 "job created") + instance (create-dummy-instance conn job + :job-state :job.state/waiting + :instance-status :instance.status/running + :backfilled? true)] + (check-count-of-pending-and-runnable-jobs 1 0 "job backfilled without update") + @(d/transact conn [[:job/update-state job]]) + (check-count-of-pending-and-runnable-jobs 1 0 "job backfilled") + @(d/transact conn [[:db/add instance :instance/backfilled? false]]) + @(d/transact conn [[:job/update-state job]]) + (check-count-of-pending-and-runnable-jobs 0 0 "job promoted"))) + (comment (run-tests)) diff --git a/scheduler/test/cook/test/mesos/schema.clj b/scheduler/test/cook/test/mesos/schema.clj index d499dbc744..51a40f7cd6 100644 --- a/scheduler/test/cook/test/mesos/schema.clj +++ b/scheduler/test/cook/test/mesos/schema.clj @@ -71,13 +71,14 @@ (defn create-dummy-instance "Return the entity id for the created instance." - [conn job & {:keys [job-state instance-status start-time hostname task-id progress] + [conn job & {:keys [job-state instance-status start-time hostname task-id progress backfilled?] :or {job-state :job.state/running instance-status :instance.status/unknown start-time (java.util.Date.) hostname "localhost" task-id (str (str (java.util.UUID/randomUUID))) - progress 0}}] + backfilled? false + progress 0} :as cfg}] (let [id (d/tempid :db.part/user) val @(d/transact conn [{:db/id job :job/state job-state} @@ -85,6 +86,7 @@ :job/_instance job :instance/hostname hostname :instance/progress progress + :instance/backfilled? backfilled? :instance/status instance-status :instance/start-time start-time :instance/task-id task-id}])] From 0126080476a2add2d883c474ec69717709df0f6b Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Wed, 9 Dec 2015 10:31:19 -0500 Subject: [PATCH 26/27] updates for Li's comments --- scheduler/project.clj | 2 +- scheduler/src/cook/mesos/api.clj | 1 - scheduler/src/cook/mesos/scheduler.clj | 2 +- scheduler/src/cook/mesos/util.clj | 5 ++--- scheduler/test/cook/test/mesos/scheduler.clj | 1 + 5 files changed, 5 insertions(+), 6 deletions(-) diff --git a/scheduler/project.clj b/scheduler/project.clj index b8c8836f53..0f95dd4d31 100644 --- a/scheduler/project.clj +++ b/scheduler/project.clj @@ -16,7 +16,7 @@ (defproject cook "1.0.1-SNAPSHOT" :description "This launches jobs on a Mesos cluster with fair sharing and preemption" :license {:name "Apache License, Version 2.0"} - :dependencies [[org.clojure/clojure "1.6.0"] + :dependencies [[org.clojure/clojure "1.7.0"] ;;Data marshalling [org.clojure/data.codec "0.1.0"] diff --git a/scheduler/src/cook/mesos/api.clj b/scheduler/src/cook/mesos/api.clj index 56fb11d14e..ff7cb1f23e 100644 --- a/scheduler/src/cook/mesos/api.clj +++ b/scheduler/src/cook/mesos/api.clj @@ -250,7 +250,6 @@ end (:instance/end-time instance) base {:task_id (:instance/task-id instance) :hostname hostname - ;;TODO validate that these show up in API :backfilled (:instance/backfilled? instance false) :preempted (:instance/preempted? instance false) :slave_id (:instance/slave-id instance) diff --git a/scheduler/src/cook/mesos/scheduler.clj b/scheduler/src/cook/mesos/scheduler.clj index a5d5694901..624d291fb1 100644 --- a/scheduler/src/cook/mesos/scheduler.clj +++ b/scheduler/src/cook/mesos/scheduler.clj @@ -626,7 +626,7 @@ max-considerable 1000] (async/thread (loop [num-considerable max-considerable] - ;;TODO make this cancelable + ;;TODO make this cancelable (if we want to be able to restart the server w/o restarting the JVM) (recur (try (let [offers (async/alt!! diff --git a/scheduler/src/cook/mesos/util.clj b/scheduler/src/cook/mesos/util.clj index 1f5e1b3296..b5fe9e6eaf 100644 --- a/scheduler/src/cook/mesos/util.clj +++ b/scheduler/src/cook/mesos/util.clj @@ -118,9 +118,8 @@ ;; Use :db/id because they guarantee uniqueness for different entities ;; (:db/id task) is not sufficient because synthetic task entities don't have :db/id ;; This assumes there are at most one synthetic task for a job, otherwise uniqueness invariant will break - [(if (and consider-backfilled-jobs? (:instance/backfilled? task)) - Integer/MAX_VALUE - (- (:job/priority (:job/_instance task) default-job-priority))) + [(and consider-backfilled-jobs? (:instance/backfilled? task)) ; true sorts higher than false + (- (:job/priority (:job/_instance task) default-job-priority)) (:instance/start-time task (java.util.Date. Long/MAX_VALUE)) (:db/id task) (:db/id (:job/_instance task))])] diff --git a/scheduler/test/cook/test/mesos/scheduler.clj b/scheduler/test/cook/test/mesos/scheduler.clj index df3d67da09..7c28782802 100644 --- a/scheduler/test/cook/test/mesos/scheduler.clj +++ b/scheduler/test/cook/test/mesos/scheduler.clj @@ -434,6 +434,7 @@ ;; Then we'll upgrade the jobs ;; Check here that they have the right properties (running, not ready to run) ;; Then shut it down + (let [uri "datomic:mem://test-backfill-upgrade" conn (restore-fresh-database! uri) check-count-of-pending-and-runnable-jobs From 0b7accf3da92bf518a56ea73f9155e0b7c113a8e Mon Sep 17 00:00:00 2001 From: David Greenberg Date: Wed, 9 Dec 2015 16:16:45 -0500 Subject: [PATCH 27/27] Actually, true doesn't sort higher than false --- scheduler/src/cook/mesos/util.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/src/cook/mesos/util.clj b/scheduler/src/cook/mesos/util.clj index b5fe9e6eaf..c2f818d464 100644 --- a/scheduler/src/cook/mesos/util.clj +++ b/scheduler/src/cook/mesos/util.clj @@ -118,7 +118,7 @@ ;; Use :db/id because they guarantee uniqueness for different entities ;; (:db/id task) is not sufficient because synthetic task entities don't have :db/id ;; This assumes there are at most one synthetic task for a job, otherwise uniqueness invariant will break - [(and consider-backfilled-jobs? (:instance/backfilled? task)) ; true sorts higher than false + [(if (and consider-backfilled-jobs? (:instance/backfilled? task)) 1 0) (- (:job/priority (:job/_instance task) default-job-priority)) (:instance/start-time task (java.util.Date. Long/MAX_VALUE)) (:db/id task)