Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Refactor mesos scheduler initialization #1152

Merged
merged 11 commits into from Jun 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 58 additions & 52 deletions scheduler/src/cook/components.clj
Expand Up @@ -13,7 +13,8 @@
;; limitations under the License.
;;
(ns cook.components
(:require [clojure.core.cache :as cache]
(:require [clojure.core.async :as async]
[clojure.core.cache :as cache]
[clojure.pprint :refer (pprint)]
[clojure.tools.logging :as log]
[compojure.core :refer (GET POST routes context)]
Expand Down Expand Up @@ -95,13 +96,11 @@
(def mesos-scheduler
{:mesos-scheduler (fnk [[:settings fenzo-fitness-calculator fenzo-floor-iterations-before-reset
fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback
good-enough-fitness hostname mea-culpa-failure-limit mesos-failover-timeout mesos-framework-name
mesos-gpu-enabled mesos-leader-path mesos-master mesos-principal
mesos-role mesos-run-as-user offer-incubate-time-ms optimizer progress rebalancer server-port
task-constraints]
curator-framework exit-code-syncer-state framework-id mesos-datomic-mult mesos-leadership-atom
mesos-agent-attributes-cache pool-name->pending-jobs-atom sandbox-syncer-state
compute-clusters]
good-enough-fitness hostname mea-culpa-failure-limit mesos-leader-path mesos-run-as-user
offer-incubate-time-ms optimizer rebalancer server-port task-constraints]
compute-clusters curator-framework mesos-datomic-mult mesos-leadership-atom
mesos-agent-attributes-cache pool-name->pending-jobs-atom mesos-heartbeat-chan
trigger-chans]
(if (cook.config/api-only-mode?)
(if curator-framework
(throw (ex-info "This node is configured for API-only mode, but also has a curator configured"
Expand All @@ -110,48 +109,35 @@
(if curator-framework
(do
(log/info "Initializing mesos scheduler")
(let [make-mesos-driver-fn (partial (util/lazy-load-var 'cook.mesos/make-mesos-driver)
{:mesos-master mesos-master
:mesos-failover-timeout mesos-failover-timeout
:mesos-principal mesos-principal
:mesos-role mesos-role
:mesos-framework-name mesos-framework-name
:gpu-enabled? mesos-gpu-enabled})
trigger-chans ((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints)]
(try
(Class/forName "org.apache.mesos.Scheduler")
((util/lazy-load-var 'cook.mesos/start-mesos-scheduler)
{:curator-framework curator-framework
:exit-code-syncer-state exit-code-syncer-state
:fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered
:fenzo-scaleback fenzo-scaleback
:fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn
:fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset
:fenzo-fitness-calculator fenzo-fitness-calculator
:good-enough-fitness good-enough-fitness}
:framework-id framework-id
:gpu-enabled? mesos-gpu-enabled
:make-mesos-driver-fn make-mesos-driver-fn
:mea-culpa-failure-limit mea-culpa-failure-limit
:mesos-datomic-conn datomic/conn
:mesos-datomic-mult mesos-datomic-mult
:mesos-leadership-atom mesos-leadership-atom
:pool-name->pending-jobs-atom pool-name->pending-jobs-atom
:mesos-run-as-user mesos-run-as-user
:agent-attributes-cache mesos-agent-attributes-cache
:offer-incubate-time-ms offer-incubate-time-ms
:optimizer-config optimizer
:progress-config progress
:rebalancer-config rebalancer
:sandbox-syncer-state sandbox-syncer-state
:server-config {:hostname hostname
:server-port server-port}
:task-constraints task-constraints
:trigger-chans trigger-chans
:zk-prefix mesos-leader-path})
(catch ClassNotFoundException e
(log/warn e "Not loading mesos support...")
nil))))
(try
(Class/forName "org.apache.mesos.Scheduler")
((util/lazy-load-var 'cook.mesos/start-leader-selector)
{:curator-framework curator-framework
:fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered
:fenzo-scaleback fenzo-scaleback
:fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn
:fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset
:fenzo-fitness-calculator fenzo-fitness-calculator
:good-enough-fitness good-enough-fitness}
:mea-culpa-failure-limit mea-culpa-failure-limit
:mesos-datomic-conn datomic/conn
:mesos-datomic-mult mesos-datomic-mult
:mesos-heartbeat-chan mesos-heartbeat-chan
:mesos-leadership-atom mesos-leadership-atom
:pool-name->pending-jobs-atom pool-name->pending-jobs-atom
:mesos-run-as-user mesos-run-as-user
:agent-attributes-cache mesos-agent-attributes-cache
:offer-incubate-time-ms offer-incubate-time-ms
:optimizer-config optimizer
:rebalancer-config rebalancer
:server-config {:hostname hostname
:server-port server-port}
:task-constraints task-constraints
:trigger-chans trigger-chans
:zk-prefix mesos-leader-path})
(catch ClassNotFoundException e
(log/warn e "Not loading mesos support...")
nil)))
(throw (ex-info "This node does not have a curator configured" {})))))})

(defn health-check-middleware
Expand Down Expand Up @@ -294,10 +280,28 @@
(throw (ex-info "Framework id not configured and not in ZooKeeper" {})))
(log/info "Using framework id:" framework-id)
framework-id)))
:compute-clusters (fnk [settings]
((util/lazy-load-var 'cook.mesos.mesos-compute-cluster/setup-compute-cluster-map-from-config) datomic/conn settings))
:compute-clusters (fnk [exit-code-syncer-state
mesos-heartbeat-chan
sandbox-syncer-state
settings
trigger-chans]
(let [constructor (util/lazy-load-var 'cook.mesos.mesos-compute-cluster/->MesosComputeCluster)
create-mesos-compute-cluster (fn [compute-cluster-name framework-id db-id driver-atom]
(constructor compute-cluster-name
framework-id
db-id
driver-atom
sandbox-syncer-state
exit-code-syncer-state
mesos-heartbeat-chan
pschorf marked this conversation as resolved.
Show resolved Hide resolved
trigger-chans))]
((util/lazy-load-var 'cook.mesos.mesos-compute-cluster/setup-compute-cluster-map-from-config) datomic/conn settings
create-mesos-compute-cluster)))
:mesos-datomic-mult (fnk []
(first ((util/lazy-load-var 'cook.datomic/create-tx-report-mult) datomic/conn)))
; TODO(pschorf): Remove hearbeat support
:mesos-heartbeat-chan (fnk []
shamsimam marked this conversation as resolved.
Show resolved Hide resolved
(async/chan (async/buffer 4096)))
:local-zookeeper (fnk [[:settings zookeeper-server]]
(when zookeeper-server
(log/info "Starting local ZK server")
Expand All @@ -318,6 +322,8 @@
((util/lazy-load-var 'cook.mesos.sandbox/prepare-sandbox-publisher)
framework-id datomic/conn publish-batch-size publish-interval-ms sync-interval-ms
max-consecutive-sync-failure mesos-agent-query-cache)))
:trigger-chans (fnk [[:settings rebalancer progress optimizer task-constraints]]
((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints))
:clear-uncommitted-canceler (fnk [mesos-leadership-atom]
((util/lazy-load-var 'cook.tools/clear-uncommitted-jobs-on-schedule)
datomic/conn mesos-leadership-atom))
Expand Down
16 changes: 11 additions & 5 deletions scheduler/src/cook/compute_cluster.clj
Expand Up @@ -16,8 +16,7 @@
(ns cook.compute-cluster
(:require [clojure.tools.logging :as log]
[cook.config :as config]
[datomic.api :as d]
[mesomatic.scheduler :as mesos]))
[datomic.api :as d]))

(defprotocol ComputeCluster
; These methods should accept bulk data and process in batches.
Expand All @@ -28,10 +27,17 @@
(db-id [this]
"Get a database entity-id for this compute cluster (used for putting it into a task structure).")

(initialize-cluster [this pool->fenzo pool->offers-chan]
"Initializes the cluster. Returns a channel that will be delivered on when the cluster loses leadership.
We expect Cook to give up leadership when a compute cluster loses leadership, so leadership is not expected to be regained.
The channel result will be an exception if an error occurred, or a status message if leadership was lost normally.")

(current-leader? [this]
"Returns true if this cook instance is currently the leader for the compute cluster")

; TODO - remove
(get-mesos-driver-hack [this]
"Get the mesos driver. Hack; any funciton invoking this should be put within the compute-cluster implementation")
(set-mesos-driver-atom-hack! [this driver]
pschorf marked this conversation as resolved.
Show resolved Hide resolved
"Hack to overwrite the driver. Used until we fix the initialization order of compute-cluster"))
"Get the mesos driver. Hack; any function invoking this should be put within the compute-cluster implementation"))

; Internal method
(defn write-compute-cluster
Expand Down