Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Add support for ports with Fenzo
Browse files Browse the repository at this point in the history
  • Loading branch information
dgrnbrg committed Nov 20, 2015
1 parent ff512b6 commit 5b62224
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 133 deletions.
12 changes: 5 additions & 7 deletions scheduler/src/cook/mesos/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
:max-retries (s/both s/Int (s/pred pos? 'pos?))
:max-runtime (s/both s/Int (s/pred pos? 'pos?))
(s/optional-key :uris) [Uri]
(s/optional-key :ports) [(s/pred zero? 'zero)] ;;TODO add to docs the limited uri/port support
(s/optional-key :ports) (s/pred pos? 'pos?)
(s/optional-key :env) {NonEmptyString s/Str}
:cpus PosDouble
:mem PosDouble
Expand All @@ -67,10 +67,8 @@
[conn jobs :- [Job]]
(doseq [{:keys [uuid command max-retries max-runtime priority cpus mem user name ports uris env]} jobs
:let [id (d/tempid :db.part/user)
ports (mapv (fn [port]
;;TODO this schema might not work b/c all ports are zero
[:db/add id :job/port port])
ports)
ports (when (and ports (not (zero? ports)))
[[:db/add id :job/port ports]])
uris (mapcat (fn [{:keys [value executable? cache? extract?]}]
(let [uri-id (d/tempid :db.part/user)
optional-params {:resource.uri/executable? executable?
Expand Down Expand Up @@ -144,7 +142,7 @@
:priority (or priority util/default-job-priority)
:max-retries max_retries
:max-runtime (or max_runtime Long/MAX_VALUE)
:ports (or ports [])
:ports (or ports 0)
:cpus (double cpus)
:mem (double mem)}
(when uris
Expand Down Expand Up @@ -239,7 +237,7 @@
:status (name (:job/state job))
:uris (:uris resources)
:env (util/job-ent->env job)
;;TODO include ports
:ports (:job/port job 0)
:instances
(map (fn [instance]
(let [hostname (:instance/hostname instance)
Expand Down
266 changes: 141 additions & 125 deletions scheduler/src/cook/mesos/scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
;; If the there is no value for key :job/name, the following name will contain a substring "null".
{:name (format "%s_%s_%s" (:job/name job-ent "cookjob") (:job/user job-ent) task-id)
:task-id task-id
:num-ports (count (:ports resources))
:num-ports (:ports resources)
:resources (select-keys resources [:mem :cpus])
;;TODO this data is a race-condition
:data (.getBytes
Expand Down Expand Up @@ -480,121 +480,132 @@
offer-size-mem
(get-in offer [:resources :mem] 0)))
(log/debug "invoked handle-resource-offers!")
(timers/time!
handle-resource-offer!-duration
(try
(loop [] ;; This loop is for compare-and-set! below
(let [scheduler-contents @pending-jobs
db (db conn)
_ (log/debug "There are" (count scheduler-contents) "pending jobs")
considerable (->> scheduler-contents
(filter (fn [job]
(job-allowed-to-start? db job)))
(take num-considerable))
_ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs (limited to " num-considerable " due to backdown)")
matches (match-offer-to-schedule fenzo considerable offers db fid)
matched-jobs (for [match matches
^TaskAssignmentResult task-result (:tasks match)
:let [task-request (.getRequest task-result)]]
(:job task-request))
processed-matches (process-matches-for-backfill scheduler-contents (first considerable) matched-jobs)
new-scheduler-contents (remove (fn [pending-job]
(contains? (:fully-processed processed-matches) (:job/uuid pending-job)))
scheduler-contents)
first-considerable-resources (-> considerable first util/job-ent->resources)
match-resource-requirements (util/sum-resources-of-jobs matched-jobs)]
(reset! front-of-job-queue-mem-atom
(or (:mem first-considerable-resources) 0))
(reset! front-of-job-queue-cpus-atom
(or (:cpus first-considerable-resources) 0))
(cond
(empty? matches) (if (seq considerable)
nil ;; This case means we failed to match anything. We need to start head of line blocking now
true) ;; This case means there was nothing to match--don't penalize the scheduler for the next burst of submitted jobs
(not (compare-and-set! pending-jobs scheduler-contents new-scheduler-contents))
(let [recycle-offers (doall (for [{:keys [leases]} matches
lease leases]
(:offer lease)))]
(log/debug "Pending job atom contention encountered, recycling offers:" recycle-offers)
(async/go
(async/>! offers-chan recycle-offers))
(meters/mark! pending-job-atom-contended)
(recur))
:else
;;TODO this construction of task-infos is legacy-oriented
;;since now we treat "matches" as our primary structure,
;;we should eventually remove this construction of task-infos
;;as it's just a helper for building the datomic transaction
(let [task-txns (for [{:keys [tasks leases]} matches
:let [offers (mapv :offer leases)
slave-id (:slave-id (first offers))]
^TaskAssignmentResult task tasks
:let [request (.getRequest task)
task-info (:task-info request)
job-id (get-in request [:job :db/id])]]
[[:job/allowed-to-start? job-id]
{:db/id (d/tempid :db.part/user)
:job/_instance job-id
:instance/task-id (:task-id task-info)
:instance/hostname (.getHostname task)
:instance/start-time (now)
;; NB command executor uses the task-id
;; as the executor-id
:instance/executor-id (get-in
task-info
[:executor :executor-id]
(:task-id task-info))
:instance/backfilled? (contains? (:backfill-jobs processed-matches) (get-in request [:job :job/uuid]))
:instance/slave-id slave-id
:instance/progress 0
:instance/status :instance.status/unknown
:instance/preempted? false}])
upgrade-txns (map (fn [instance-id]
[:db/add instance-id :instance/backfilled? false])
(:upgrade-backfill processed-matches))]
;; Note that this transaction can fail if a job was scheduled
;; during a race. If that happens, then other jobs that should
;; be scheduled will not be eligible for rescheduling until
;; the pending-jobs atom is repopulated
@(d/transact
conn
(vec (apply concat upgrade-txns task-txns)))
(log/info "Launching" (count task-txns) "tasks")
(log/info "Upgrading" (count (:upgrade-backfill processed-matches)) "tasks from backfilled to proper")
(log/debug "Matched tasks" task-txns)
;; This launch-tasks MUST happen after the above transaction in
;; order to allow a transaction failure (due to failed preconditions)
;; to block the launch
(meters/mark! scheduler-offer-matched
(->> matches
(mapcat (comp :id :offer :leases))
(distinct)
(count)))
(meters/mark! matched-tasks (count task-txns))
(meters/mark! matched-tasks-cpus (:cpus match-resource-requirements))
(meters/mark! matched-tasks-mem (:mem match-resource-requirements))
(doseq [{:keys [tasks leases]} matches
:let [offers (mapv :offer leases)
task-infos (mapv (fn [^TaskAssignmentResult task]
(:task-info (.getRequest task)))
tasks)
slave-id (:slave-id (first offers))]]
;TODO if the ports are in the offer, then Fenzo is identifying the ports with a list .getAssignedPorts on the TaskAssignmentResult
(mesos/launch-tasks
driver
(mapv :id offers)
(mapv #(-> %
(assoc :slave-id slave-id)
(dissoc :num-ports))
task-infos))
(doseq [^TaskAssignmentResult task tasks]
(.. fenzo
(getTaskAssigner)
(call (.getRequest task) (get-in (first leases) [:offer :hostname])))))
(:matched-head? processed-matches)))))
(let [offer-stash (atom nil)] ;; This is a way to ensure we never lose offers fenzo assigned if an errors occures in the middle of processing
(timers/time!
handle-resource-offer!-duration
(try
(loop [] ;; This loop is for compare-and-set! below
(let [scheduler-contents @pending-jobs
db (db conn)
_ (log/debug "There are" (count scheduler-contents) "pending jobs")
considerable (->> scheduler-contents
(filter (fn [job]
(job-allowed-to-start? db job)))
(take num-considerable))
_ (log/debug "We'll consider scheduling" (count considerable) "of those pending jobs (limited to " num-considerable " due to backdown)")
matches (match-offer-to-schedule fenzo considerable offers db fid)
_ (reset! offer-stash (doall (for [{:keys [leases]} matches
lease leases]
(:offer lease))))
matched-jobs (for [match matches
^TaskAssignmentResult task-result (:tasks match)
:let [task-request (.getRequest task-result)]]
(:job task-request))
processed-matches (process-matches-for-backfill scheduler-contents (first considerable) matched-jobs)
new-scheduler-contents (remove (fn [pending-job]
(contains? (:fully-processed processed-matches) (:job/uuid pending-job)))
scheduler-contents)
first-considerable-resources (-> considerable first util/job-ent->resources)
match-resource-requirements (util/sum-resources-of-jobs matched-jobs)]
(reset! front-of-job-queue-mem-atom
(or (:mem first-considerable-resources) 0))
(reset! front-of-job-queue-cpus-atom
(or (:cpus first-considerable-resources) 0))
(cond
(empty? matches) (if (seq considerable)
nil ;; This case means we failed to match anything. We need to start head of line blocking now
true) ;; This case means there was nothing to match--don't penalize the scheduler for the next burst of submitted jobs
(not (compare-and-set! pending-jobs scheduler-contents new-scheduler-contents))
(do
(log/debug "Pending job atom contention encountered, recycling offers:" @offer-stash)
(async/go
(async/>! offers-chan @offer-stash))
(meters/mark! pending-job-atom-contended)
(recur))
:else
(let [task-txns (for [{:keys [tasks leases]} matches
:let [offers (mapv :offer leases)
slave-id (:slave-id (first offers))]
^TaskAssignmentResult task tasks
:let [request (.getRequest task)
task-info (:task-info request)
job-id (get-in request [:job :db/id])]]
[[:job/allowed-to-start? job-id]
{:db/id (d/tempid :db.part/user)
:job/_instance job-id
:instance/task-id (:task-id task-info)
:instance/hostname (.getHostname task)
:instance/start-time (now)
;; NB command executor uses the task-id
;; as the executor-id
:instance/executor-id (get-in
task-info
[:executor :executor-id]
(:task-id task-info))
:instance/backfilled? (contains? (:backfill-jobs processed-matches) (get-in request [:job :job/uuid]))
:instance/slave-id slave-id
:instance/progress 0
:instance/status :instance.status/unknown
:instance/preempted? false}])
upgrade-txns (map (fn [instance-id]
[:db/add instance-id :instance/backfilled? false])
(:upgrade-backfill processed-matches))]
;; Note that this transaction can fail if a job was scheduled
;; during a race. If that happens, then other jobs that should
;; be scheduled will not be eligible for rescheduling until
;; the pending-jobs atom is repopulated
@(d/transact
conn
(vec (apply concat upgrade-txns task-txns)))
(log/info "Launching" (count task-txns) "tasks")
(log/info "Upgrading" (count (:upgrade-backfill processed-matches)) "tasks from backfilled to proper")
(log/debug "Matched tasks" task-txns)
;; This launch-tasks MUST happen after the above transaction in
;; order to allow a transaction failure (due to failed preconditions)
;; to block the launch
(meters/mark! scheduler-offer-matched
(->> matches
(mapcat (comp :id :offer :leases))
(distinct)
(count)))
(meters/mark! matched-tasks (count task-txns))
(meters/mark! matched-tasks-cpus (:cpus match-resource-requirements))
(meters/mark! matched-tasks-mem (:mem match-resource-requirements))
(doseq [{:keys [tasks leases]} matches
:let [offers (mapv :offer leases)
task-infos (mapv (fn process-results [^TaskAssignmentResult task]
(reduce
(fn add-ports-to-task-info [task-info [index port]]
(log/debug "task-info" task-info [index port])
(-> task-info
(update-in [:resources :ports]
(fnil conj [])
{:begin port :end port})
(assoc-in [:command :environment (str "PORT" index)]
(str port))))
(:task-info (.getRequest task))
(map-indexed (fn [index port] [index port])
(.getAssignedPorts task))))
tasks)
slave-id (:slave-id (first offers))]]
;TODO if the ports are in the offer, then Fenzo is identifying the ports with a list .getAssignedPorts on the TaskAssignmentResult
(mesos/launch-tasks
driver
(mapv :id offers)
(mapv #(-> %
(assoc :slave-id slave-id)
(dissoc :num-ports))
task-infos))
(doseq [^TaskAssignmentResult task tasks]
(.. fenzo
(getTaskAssigner)
(call (.getRequest task) (get-in (first leases) [:offer :hostname])))))
(:matched-head? processed-matches)))))
(catch Throwable t
(meters/mark! handle-resource-offer!-errors)
(log/error t "Error in match:" (ex-data t))))))
(async/go
(async/>! offers-chan @offer-stash))
(log/error t "Error in match:" (ex-data t)))))))

(defn view-incubating-offers
[^TaskScheduler fenzo]
Expand All @@ -617,17 +628,22 @@
max-considerable 1000]
(async/thread
(loop [num-considerable max-considerable]
(let [offers (async/alt!!
offers-chan ([offers] offers)
timer-chan ([_] [])
:priority true)
matched-head? (handle-resource-offers! conn @driver-atom fenzo @fid-atom pending-jobs-atom num-considerable offers-chan offers)]
(when (seq offers)
(reset! resources-atom (view-incubating-offers fenzo)))
;;TODO make this cancelable
(recur (if matched-head?
max-considerable
(max 1 (long (* 0.95 num-considerable)))))))) ;; With max=1000 and 1 iter/sec, this will take 88 seconds to reach 1
;;TODO make this cancelable
(recur
(try
(let [offers (async/alt!!
offers-chan ([offers] offers)
timer-chan ([_] [])
:priority true)
matched-head? (handle-resource-offers! conn @driver-atom fenzo @fid-atom pending-jobs-atom num-considerable offers-chan offers)]
(when (seq offers)
(reset! resources-atom (view-incubating-offers fenzo)))
(if matched-head?
max-considerable
(max 1 (long (* 0.95 num-considerable))))) ;; With max=1000 and 1 iter/sec, this will take 88 seconds to reach 1
(catch Exception e
(log/error e "Offer handler encountered exception; continuing")
max-considerable)))))
[offers-chan resources-atom]))

(defn reconcile-jobs
Expand Down
Loading

0 comments on commit 5b62224

Please sign in to comment.