Skip to content
Permalink
Browse files

Thread compute cluster through Fenzo (#1136)

  • Loading branch information...
scrosby authored and pschorf committed May 13, 2019
1 parent e9bcc30 commit 35520687c2d0101c169e97f567cd7d849760ba04
@@ -15,6 +15,7 @@
;;
(ns cook.compute-cluster
(:require [clojure.tools.logging :as log]
[cook.config :as config]
[datomic.api :as d]))

(defn- write-compute-cluster
@@ -62,6 +63,20 @@
:mesos-framework-id framework-id
:db-id (or cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster))}))

(defn get-default-cluster-name-for-legacy
"What cluster name to put on for legacy jobs when generating their compute-cluster."
[]
{:post [%]} ; Never returns nil.
(-> config/config :settings :mesos-compute-cluster-name))

; A hack to store the mesos cluster name, until we refactor the code so that we support multiple clusters. In the long term future
; this is probably replaced with a function from driver->cluster-id, or the cluster name is propagated by function arguments and
; closed over.
(defn get-mesos-cluster-name-hack
[]
{:post [%]} ; Never returns nil.
(-> config/config :settings :mesos-compute-cluster-name))

(defn get-mesos-clusters-from-config
"Get all of the mesos clusters defined in the configuration.
In config.edn, we put all of the mesos keys under one toplevel dictionary.
@@ -92,6 +107,7 @@
"Given a cluster name, return the db-id we should use to refer to that compute cluster
when we put it within a task structure."
[cluster-name]
{:post [%]} ; Never returns nil.
(let [{:keys [db-id]} (get @cluster-name->cluster-dict-atom cluster-name)]
; All clusters referenced by name must have been installed in the db previously.
(when-not db-id (throw (IllegalStateException. (str "Was asked to lookup db-id for " cluster-name " and got nil"))))
@@ -111,14 +127,3 @@
(run! (fn [compute-cluster]
(log/info "Setting up compute cluster: " compute-cluster)) compute-clusters)
(reset! cluster-name->cluster-dict-atom (reduce reduce-fn {} compute-clusters))))

; TODO: Until we thread the compute-cluster-name through fenzo, we'll need this hack to remember
; the cluster name so that we can find the compute cluster when generating the fenzo result.
(defn cluster-name-hack
"A hack that today returns the default cluster name. This is used e.g., in processing
fenzo responses. In the future it will need to exist in order to fill in compute cluster
objects in legacy task entities."
[]
(-> @cluster-name->cluster-dict-atom
keys
first))
@@ -960,7 +960,7 @@
[db entity]
(if entity
(compute-cluster-entity->map entity)
(->> (cc/cluster-name-hack) ; Get the default cluster.
(->> (cc/get-default-cluster-name-for-legacy) ; Get the default cluster.
cc/cluster-name->db-id
(d/entity db)
(fetch-compute-cluster-map db))))
@@ -653,7 +653,9 @@
[matches]
(for [{:keys [leases task-metadata-seq]} matches
:let [offers (mapv :offer leases)
slave-id (-> offers first :slave-id :value)]
first-offer (-> offers first)
slave-id (-> first-offer :slave-id :value)
compute-cluster-name (-> first-offer :compute-cluster-name)]
{:keys [executor hostname ports-assigned task-id task-request]} task-metadata-seq
:let [job-ref [:job/uuid (get-in task-request [:job :job/uuid])]]]
[[:job/allowed-to-start? job-ref]
@@ -673,7 +675,7 @@
:instance/start-time (now)
:instance/status :instance.status/unknown
:instance/task-id task-id
:instance/compute-cluster (cc/cluster-name->db-id (cc/cluster-name-hack))}]))
:instance/compute-cluster (cc/cluster-name->db-id compute-cluster-name)}]))

(defn launch-matched-tasks!
"Updates the state of matched tasks in the database and then launches them."
@@ -1392,7 +1394,7 @@
(defn create-mesos-scheduler
"Creates the mesos scheduler which processes status updates asynchronously but in order of receipt."
[gpu-enabled? conn heartbeat-ch pool->fenzo pool->offers-chan match-trigger-chan
handle-exit-code handle-progress-message sandbox-syncer-state]
handle-exit-code handle-progress-message sandbox-syncer-state compute-cluster-name]
(let [configured-framework-id (cook.config/framework-id-config)
sync-agent-sandboxes-fn #(sandbox/sync-agent-sandboxes sandbox-syncer-state configured-framework-id %1 %2)
message-handlers {:handle-exit-code handle-exit-code
@@ -1458,9 +1460,10 @@
(meters/mark! mesos-error)
(log/error "Got a mesos error!!!!" message))
(resource-offers
[this driver offers]
(log/debug "Got offers:" offers)
(let [pool->offers (group-by (fn [o] (plugins/select-pool pool-plugin/plugin o)) offers)
[this driver raw-offers]
(log/debug "Got offers:" raw-offers)
(let [offers (map #(assoc % :compute-cluster-name compute-cluster-name) raw-offers)
pool->offers (group-by (fn [o] (plugins/select-pool pool-plugin/plugin o)) offers)
using-pools? (config/default-pool)]
(log/info "Offers by pool:" (pc/map-vals count pool->offers))
(run!
@@ -1529,6 +1532,7 @@
(sandbox/aggregate-exit-code exit-code-syncer-state task-id exit-code))]
(start-jobs-prioritizer! conn pool-name->pending-jobs-atom task-constraints rank-trigger-chan)
{:scheduler (create-mesos-scheduler gpu-enabled? conn heartbeat-ch pool-name->fenzo pool->offers-chan
match-trigger-chan handle-exit-code handle-progress-message sandbox-syncer-state)
match-trigger-chan handle-exit-code handle-progress-message sandbox-syncer-state
(cc/get-mesos-cluster-name-hack))
:view-incubating-offers (fn get-resources-atom [p]
(deref (get pool->resources-atom p)))}))
@@ -53,7 +53,7 @@
(let [minimal-config {:authorization {:one-user ""}
:database {:datomic-uri ""}
:log {}
:mesos {:leader-path "", :master ""}
:mesos {:leader-path "", :master "" :compute-cluster-name "compute-cluster-default-compute-cluster-name"}
:metrics {}
:nrepl {}
:port 80
@@ -1466,6 +1466,7 @@
(is (= 100 (progress-from-api))))))

(deftest test-fetch-instance-map
(testutil/setup)
(let [conn (restore-fresh-database! "datomic:mem://test-fetch-instance-map")]
(testutil/setup-fake-test-compute-cluster conn)
(let [job-entity-id (create-dummy-job conn :user "test-user" :job-state :job.state/completed)
@@ -1483,11 +1484,11 @@
; Track whether we invoke this function to fetch the default. We shouldn't use this unless
; we're filling in because the entity lacks a compute cluster.
fetched-default-cluster-atom (atom false)
tmp-cluster-name-hack cc/cluster-name-hack]
(with-redefs [cc/cluster-name-hack
tmp-default-cluster-name-for-legacy cc/get-default-cluster-name-for-legacy]
(with-redefs [cc/get-default-cluster-name-for-legacy
(fn []
(reset! fetched-default-cluster-atom true)
(tmp-cluster-name-hack))]
(tmp-default-cluster-name-for-legacy))]

(reset! fetched-default-cluster-atom false)
(testing "basic-instance-without-sandbox"
Oops, something went wrong.

0 comments on commit 3552068

Please sign in to comment.
You can’t perform that action at this time.