-
Notifications
You must be signed in to change notification settings - Fork 64
Add Fenzo for scheduling #84
Changes from 17 commits
d703bf8
f5fae89
77b59c0
b0f2e54
5031d5d
a8155fb
5b49ae9
cd870c9
879109e
0dfe74e
7caceb1
df5872b
b767bcb
ba5d82a
087f09d
ff512b6
536d9ce
e850b5c
7605a45
583c846
5476f15
d807b04
28199cd
27da88d
930d3ec
aee8074
7fa71c0
0126080
0b7accf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,21 +81,22 @@ | |
datomic-report-chan (async/chan (async/sliding-buffer 4096)) | ||
mesos-pending-jobs-atom (atom []) | ||
mesos-heartbeat-chan (async/chan (async/buffer 4096)) | ||
{:keys [scheduler view-incubating-offers view-mature-offers]} | ||
current-driver (atom nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exposing this data here feels hacky...perhaps this file should be refactored? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is current-driver used for? |
||
{:keys [scheduler view-incubating-offers]} | ||
(sched/create-datomic-scheduler | ||
mesos-datomic-conn | ||
(fn set-or-create-framework-id [framework-id] | ||
(curator/set-or-create | ||
curator-framework | ||
zk-framework-id | ||
(.getBytes framework-id "UTF-8"))) | ||
current-driver | ||
mesos-pending-jobs-atom | ||
mesos-heartbeat-chan | ||
offer-incubate-time-ms | ||
task-constraints) | ||
framework-id (when-let [bytes (curator/get-or-nil curator-framework zk-framework-id)] | ||
(String. bytes)) | ||
current-driver (atom nil) | ||
leader-selector (LeaderSelector. | ||
curator-framework | ||
zk-prefix | ||
|
@@ -136,8 +137,7 @@ | |
:driver driver | ||
:mesos-master mesos-master | ||
:pending-jobs-atom mesos-pending-jobs-atom | ||
:view-incubating-offers view-incubating-offers | ||
:view-mature-offers view-mature-offers})) | ||
:view-incubating-offers view-incubating-offers})) | ||
(counters/inc! mesos-leader) | ||
(async/tap mesos-datomic-mult datomic-report-chan) | ||
(let [kill-monitor (cook.mesos.scheduler/monitor-tx-report-queue datomic-report-chan mesos-datomic-conn current-driver)] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,7 +55,7 @@ | |
:max-retries (s/both s/Int (s/pred pos? 'pos?)) | ||
:max-runtime (s/both s/Int (s/pred pos? 'pos?)) | ||
(s/optional-key :uris) [Uri] | ||
(s/optional-key :ports) [(s/pred zero? 'zero)] ;;TODO add to docs the limited uri/port support | ||
(s/optional-key :ports) (s/pred pos? 'pos?) | ||
(s/optional-key :env) {NonEmptyString s/Str} | ||
:cpus PosDouble | ||
:mem PosDouble | ||
|
@@ -67,10 +67,8 @@ | |
[conn jobs :- [Job]] | ||
(doseq [{:keys [uuid command max-retries max-runtime priority cpus mem user name ports uris env]} jobs | ||
:let [id (d/tempid :db.part/user) | ||
ports (mapv (fn [port] | ||
;;TODO this schema might not work b/c all ports are zero | ||
[:db/add id :job/port port]) | ||
ports) | ||
ports (when (and ports (not (zero? ports))) | ||
[[:db/add id :job/port ports]]) | ||
uris (mapcat (fn [{:keys [value executable? cache? extract?]}] | ||
(let [uri-id (d/tempid :db.part/user) | ||
optional-params {:resource.uri/executable? executable? | ||
|
@@ -144,7 +142,7 @@ | |
:priority (or priority util/default-job-priority) | ||
:max-retries max_retries | ||
:max-runtime (or max_runtime Long/MAX_VALUE) | ||
:ports (or ports []) | ||
:ports (or ports 0) | ||
:cpus (double cpus) | ||
:mem (double mem)} | ||
(when uris | ||
|
@@ -239,7 +237,7 @@ | |
:status (name (:job/state job)) | ||
:uris (:uris resources) | ||
:env (util/job-ent->env job) | ||
;;TODO include ports | ||
:ports (:job/port job 0) | ||
:instances | ||
(map (fn [instance] | ||
(let [hostname (:instance/hostname instance) | ||
|
@@ -252,6 +250,9 @@ | |
end (:instance/end-time instance) | ||
base {:task_id (:instance/task-id instance) | ||
:hostname hostname | ||
;;TODO validate that these show up in API | ||
:backfilled (:instance/backfilled? instance false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't exposed in the Java API There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
:preempted (:instance/preempted? instance false) | ||
:slave_id (:instance/slave-id instance) | ||
:executor_id (:instance/executor-id instance) | ||
:status (name (:instance/status instance))} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -218,10 +218,22 @@ | |
[{:keys [task->scored-task host->spare-resources] :as state} | ||
{:keys [min-dru-diff safe-dru-threshold] :as params} | ||
pending-job-ent] | ||
;;We need to rely on this being a priority map TODO check that this is true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is true. It has to be a priority map |
||
(when-not (instance? clojure.data.priority_map.PersistentPriorityMap task->scored-task) | ||
(log/fatal "Implementation detail failed; needed priority, got" (class task->scored-task)) | ||
(throw (ex-info "Implementation detail failed" {}))) | ||
(let [{pending-job-mem :mem pending-job-cpus :cpus} (util/job-ent->resources pending-job-ent) | ||
pending-job-dru (compute-pending-job-dru state pending-job-ent) | ||
|
||
;; This will preserve the ordering of task->scored-task | ||
host->scored-tasks (->> task->scored-task | ||
;;TODO maybe we can just change every backfilled task here to have the worst possible DRU | ||
;;this should bias us towards always killing those tasks first... | ||
(reduce-kv (fn [m k {:keys [task] :as scored-task}] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this actually work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @icexelloss Could you take a look and confirm this code does what I'm asserting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we change the per user sorting function to reflect backfilled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is that defined? Which one? I had some trouble figuring out which sorting functions were used for which code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Feel free to put it other file. Util might not be the best place :( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't make the change there, since that code is used in many places, and we only want to penalize backfilled jobs during preemption, not during normal scheduling. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to have other two orders (preemption order and scheduling order) defined in two separate functions and revisit the use of the old sorting function to be replaced by one of the two and double check the assumption and invariant. We can sit together and do this if you want. |
||
(if (:instance/backfilled? task) | ||
(assoc m k (assoc scored-task :dru Double/MAX_VALUE)) | ||
m)) | ||
task->scored-task) | ||
(vals) | ||
(remove #(< (:dru %) safe-dru-threshold)) | ||
(filter #(> (- (:dru %) pending-job-dru) min-dru-diff)) | ||
|
@@ -302,7 +314,7 @@ | |
(try | ||
@(d/transact | ||
conn | ||
;; Make :instance/status and :instance/preempted consistent to simplify the state machine. | ||
;; Make :instance/status and :instance/preempted? consistent to simplify the state machine. | ||
;; We don't want to deal with {:instance/status :instance.stats/running, :instance/preempted? true} | ||
;; all over the places. | ||
(let [job-eid (:db/id (:job/_instance task-ent)) | ||
|
@@ -334,7 +346,7 @@ | |
utilization)) | ||
|
||
(defn start-rebalancer! | ||
[{:keys [conn driver mesos-master pending-jobs-atom view-incubating-offers view-mature-offers]}] | ||
[{:keys [conn driver mesos-master pending-jobs-atom view-incubating-offers]}] | ||
(let [rebalance-interval (time/minutes 5) | ||
observe-interval (time/seconds 5) | ||
observe-refreshness-threshold (time/seconds 30) | ||
|
@@ -343,7 +355,6 @@ | |
(fn [now] | ||
(let [host->combined-offers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which offers is this seeing? Pre or post fenzo? If pre, change the name since we are no longer combining offers here. |
||
(-<>> (view-incubating-offers) | ||
(sched/combine-offers) | ||
(map (fn [v] | ||
[(:hostname v) (assoc v :time-observed now)])) | ||
(into {}))] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the newer core.async seems to require clojure 1.7, is this upgrade safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, moved to 1.7