-
Notifications
You must be signed in to change notification settings - Fork 64
Add Fenzo for scheduling #84
Changes from 27 commits
d703bf8
f5fae89
77b59c0
b0f2e54
5031d5d
a8155fb
5b49ae9
cd870c9
879109e
0dfe74e
7caceb1
df5872b
b767bcb
ba5d82a
087f09d
ff512b6
536d9ce
e850b5c
7605a45
583c846
5476f15
d807b04
28199cd
27da88d
930d3ec
aee8074
7fa71c0
0126080
0b7accf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,21 +81,22 @@ | |
datomic-report-chan (async/chan (async/sliding-buffer 4096)) | ||
mesos-pending-jobs-atom (atom []) | ||
mesos-heartbeat-chan (async/chan (async/buffer 4096)) | ||
{:keys [scheduler view-incubating-offers view-mature-offers]} | ||
current-driver (atom nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exposing this data here feels hacky...perhaps this file should be refactored? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is current-driver used for? |
||
{:keys [scheduler view-incubating-offers]} | ||
(sched/create-datomic-scheduler | ||
mesos-datomic-conn | ||
(fn set-or-create-framework-id [framework-id] | ||
(curator/set-or-create | ||
curator-framework | ||
zk-framework-id | ||
(.getBytes framework-id "UTF-8"))) | ||
current-driver | ||
mesos-pending-jobs-atom | ||
mesos-heartbeat-chan | ||
offer-incubate-time-ms | ||
task-constraints) | ||
framework-id (when-let [bytes (curator/get-or-nil curator-framework zk-framework-id)] | ||
(String. bytes)) | ||
current-driver (atom nil) | ||
leader-selector (LeaderSelector. | ||
curator-framework | ||
zk-prefix | ||
|
@@ -136,8 +137,7 @@ | |
:driver driver | ||
:mesos-master-hosts mesos-master-hosts | ||
:pending-jobs-atom mesos-pending-jobs-atom | ||
:view-incubating-offers view-incubating-offers | ||
:view-mature-offers view-mature-offers})) | ||
:view-incubating-offers view-incubating-offers})) | ||
(counters/inc! mesos-leader) | ||
(async/tap mesos-datomic-mult datomic-report-chan) | ||
(let [kill-monitor (cook.mesos.scheduler/monitor-tx-report-queue datomic-report-chan mesos-datomic-conn current-driver)] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,7 +55,7 @@ | |
:max-retries (s/both s/Int (s/pred pos? 'pos?)) | ||
:max-runtime (s/both s/Int (s/pred pos? 'pos?)) | ||
(s/optional-key :uris) [Uri] | ||
(s/optional-key :ports) [(s/pred zero? 'zero)] ;;TODO add to docs the limited uri/port support | ||
(s/optional-key :ports) (s/pred #(not (neg? %)) 'nonnegative?) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is ports allowed to be a list? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see it is the number of ports. |
||
(s/optional-key :env) {NonEmptyString s/Str} | ||
:cpus PosDouble | ||
:mem PosDouble | ||
|
@@ -67,10 +67,8 @@ | |
[conn jobs :- [Job]] | ||
(doseq [{:keys [uuid command max-retries max-runtime priority cpus mem user name ports uris env]} jobs | ||
:let [id (d/tempid :db.part/user) | ||
ports (mapv (fn [port] | ||
;;TODO this schema might not work b/c all ports are zero | ||
[:db/add id :job/port port]) | ||
ports) | ||
ports (when (and ports (not (zero? ports))) | ||
[[:db/add id :job/port ports]]) | ||
uris (mapcat (fn [{:keys [value executable? cache? extract?]}] | ||
(let [uri-id (d/tempid :db.part/user) | ||
optional-params {:resource.uri/executable? executable? | ||
|
@@ -144,7 +142,7 @@ | |
:priority (or priority util/default-job-priority) | ||
:max-retries max_retries | ||
:max-runtime (or max_runtime Long/MAX_VALUE) | ||
:ports (or ports []) | ||
:ports (or ports 0) | ||
:cpus (double cpus) | ||
:mem (double mem)} | ||
(when uris | ||
|
@@ -239,7 +237,7 @@ | |
:status (name (:job/state job)) | ||
:uris (:uris resources) | ||
:env (util/job-ent->env job) | ||
;;TODO include ports | ||
:ports (:job/port job 0) | ||
:instances | ||
(map (fn [instance] | ||
(let [hostname (:instance/hostname instance) | ||
|
@@ -252,6 +250,9 @@ | |
end (:instance/end-time instance) | ||
base {:task_id (:instance/task-id instance) | ||
:hostname hostname | ||
;;TODO validate that these show up in API | ||
:backfilled (:instance/backfilled? instance false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't exposed in the Java API There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
:preempted (:instance/preempted? instance false) | ||
:slave_id (:instance/slave-id instance) | ||
:executor_id (:instance/executor-id instance) | ||
:status (name (:instance/status instance))} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -170,7 +170,7 @@ | |
user->sorted-running-task-ents (->> running-task-ents | ||
(group-by util/task-ent->user) | ||
(map (fn [[user task-ents]] | ||
[user (into (sorted-set-by util/same-user-task-comparator) task-ents)])) | ||
[user (into (sorted-set-by (util/same-user-task-comparator true)) task-ents)])) | ||
(into {})) | ||
task->scored-task (dru/init-task->scored-task user->sorted-running-task-ents user->dru-divisors)] | ||
(->State task->scored-task user->sorted-running-task-ents host->spare-resources user->dru-divisors))) | ||
|
@@ -195,7 +195,7 @@ | |
(reduce (fn [task-ents-by-user task-ent] | ||
(let [user (util/task-ent->user task-ent) | ||
f (if (= new-running-task-ent task-ent) | ||
(fnil conj (sorted-set-by util/same-user-task-comparator)) | ||
(fnil conj (sorted-set-by (util/same-user-task-comparator true))) | ||
disj)] | ||
(update-in task-ents-by-user [user] f task-ent))) | ||
user->sorted-running-task-ents | ||
|
@@ -220,6 +220,7 @@ | |
pending-job-ent] | ||
(let [{pending-job-mem :mem pending-job-cpus :cpus} (util/job-ent->resources pending-job-ent) | ||
pending-job-dru (compute-pending-job-dru state pending-job-ent) | ||
|
||
;; This will preserve the ordering of task->scored-task | ||
host->scored-tasks (->> task->scored-task | ||
(vals) | ||
|
@@ -302,7 +303,7 @@ | |
(try | ||
@(d/transact | ||
conn | ||
;; Make :instance/status and :instance/preempted consistent to simplify the state machine. | ||
;; Make :instance/status and :instance/preempted? consistent to simplify the state machine. | ||
;; We don't want to deal with {:instance/status :instance.stats/running, :instance/preempted? true} | ||
;; all over the places. | ||
(let [job-eid (:db/id (:job/_instance task-ent)) | ||
|
@@ -342,7 +343,6 @@ | |
(fn [now] | ||
(let [host->combined-offers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which offers is this seeing? Pre or post fenzo? If pre, change the name since we are no longer combining offers here. |
||
(-<>> (view-incubating-offers) | ||
(sched/combine-offers) | ||
(map (fn [v] | ||
[(:hostname v) (assoc v :time-observed now)])) | ||
(into {}))] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the newer core.async seems to require clojure 1.7, is this upgrade safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, moved to 1.7