Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Refactor mesos scheduler initialization (#1152)
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Schorfheide authored and shamsimam committed Jun 12, 2019
1 parent 6304a44 commit 6c6597f
Show file tree
Hide file tree
Showing 11 changed files with 507 additions and 406 deletions.
110 changes: 58 additions & 52 deletions scheduler/src/cook/components.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
;; limitations under the License. ;; limitations under the License.
;; ;;
(ns cook.components (ns cook.components
(:require [clojure.core.cache :as cache] (:require [clojure.core.async :as async]
[clojure.core.cache :as cache]
[clojure.pprint :refer (pprint)] [clojure.pprint :refer (pprint)]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[compojure.core :refer (GET POST routes context)] [compojure.core :refer (GET POST routes context)]
Expand Down Expand Up @@ -95,13 +96,11 @@
(def mesos-scheduler (def mesos-scheduler
{:mesos-scheduler (fnk [[:settings fenzo-fitness-calculator fenzo-floor-iterations-before-reset {:mesos-scheduler (fnk [[:settings fenzo-fitness-calculator fenzo-floor-iterations-before-reset
fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback
good-enough-fitness hostname mea-culpa-failure-limit mesos-failover-timeout mesos-framework-name good-enough-fitness hostname mea-culpa-failure-limit mesos-leader-path mesos-run-as-user
mesos-gpu-enabled mesos-leader-path mesos-master mesos-principal offer-incubate-time-ms optimizer rebalancer server-port task-constraints]
mesos-role mesos-run-as-user offer-incubate-time-ms optimizer progress rebalancer server-port compute-clusters curator-framework mesos-datomic-mult mesos-leadership-atom
task-constraints] mesos-agent-attributes-cache pool-name->pending-jobs-atom mesos-heartbeat-chan
curator-framework exit-code-syncer-state framework-id mesos-datomic-mult mesos-leadership-atom trigger-chans]
mesos-agent-attributes-cache pool-name->pending-jobs-atom sandbox-syncer-state
compute-clusters]
(if (cook.config/api-only-mode?) (if (cook.config/api-only-mode?)
(if curator-framework (if curator-framework
(throw (ex-info "This node is configured for API-only mode, but also has a curator configured" (throw (ex-info "This node is configured for API-only mode, but also has a curator configured"
Expand All @@ -110,48 +109,35 @@
(if curator-framework (if curator-framework
(do (do
(log/info "Initializing mesos scheduler") (log/info "Initializing mesos scheduler")
(let [make-mesos-driver-fn (partial (util/lazy-load-var 'cook.mesos/make-mesos-driver) (try
{:mesos-master mesos-master (Class/forName "org.apache.mesos.Scheduler")
:mesos-failover-timeout mesos-failover-timeout ((util/lazy-load-var 'cook.mesos/start-leader-selector)
:mesos-principal mesos-principal {:curator-framework curator-framework
:mesos-role mesos-role :fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered
:mesos-framework-name mesos-framework-name :fenzo-scaleback fenzo-scaleback
:gpu-enabled? mesos-gpu-enabled}) :fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn
trigger-chans ((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints)] :fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset
(try :fenzo-fitness-calculator fenzo-fitness-calculator
(Class/forName "org.apache.mesos.Scheduler") :good-enough-fitness good-enough-fitness}
((util/lazy-load-var 'cook.mesos/start-mesos-scheduler) :mea-culpa-failure-limit mea-culpa-failure-limit
{:curator-framework curator-framework :mesos-datomic-conn datomic/conn
:exit-code-syncer-state exit-code-syncer-state :mesos-datomic-mult mesos-datomic-mult
:fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered :mesos-heartbeat-chan mesos-heartbeat-chan
:fenzo-scaleback fenzo-scaleback :mesos-leadership-atom mesos-leadership-atom
:fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn :pool-name->pending-jobs-atom pool-name->pending-jobs-atom
:fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset :mesos-run-as-user mesos-run-as-user
:fenzo-fitness-calculator fenzo-fitness-calculator :agent-attributes-cache mesos-agent-attributes-cache
:good-enough-fitness good-enough-fitness} :offer-incubate-time-ms offer-incubate-time-ms
:framework-id framework-id :optimizer-config optimizer
:gpu-enabled? mesos-gpu-enabled :rebalancer-config rebalancer
:make-mesos-driver-fn make-mesos-driver-fn :server-config {:hostname hostname
:mea-culpa-failure-limit mea-culpa-failure-limit :server-port server-port}
:mesos-datomic-conn datomic/conn :task-constraints task-constraints
:mesos-datomic-mult mesos-datomic-mult :trigger-chans trigger-chans
:mesos-leadership-atom mesos-leadership-atom :zk-prefix mesos-leader-path})
:pool-name->pending-jobs-atom pool-name->pending-jobs-atom (catch ClassNotFoundException e
:mesos-run-as-user mesos-run-as-user (log/warn e "Not loading mesos support...")
:agent-attributes-cache mesos-agent-attributes-cache nil)))
:offer-incubate-time-ms offer-incubate-time-ms
:optimizer-config optimizer
:progress-config progress
:rebalancer-config rebalancer
:sandbox-syncer-state sandbox-syncer-state
:server-config {:hostname hostname
:server-port server-port}
:task-constraints task-constraints
:trigger-chans trigger-chans
:zk-prefix mesos-leader-path})
(catch ClassNotFoundException e
(log/warn e "Not loading mesos support...")
nil))))
(throw (ex-info "This node does not have a curator configured" {})))))}) (throw (ex-info "This node does not have a curator configured" {})))))})


(defn health-check-middleware (defn health-check-middleware
Expand Down Expand Up @@ -294,10 +280,28 @@
(throw (ex-info "Framework id not configured and not in ZooKeeper" {}))) (throw (ex-info "Framework id not configured and not in ZooKeeper" {})))
(log/info "Using framework id:" framework-id) (log/info "Using framework id:" framework-id)
framework-id))) framework-id)))
:compute-clusters (fnk [settings] :compute-clusters (fnk [exit-code-syncer-state
((util/lazy-load-var 'cook.mesos.mesos-compute-cluster/setup-compute-cluster-map-from-config) datomic/conn settings)) mesos-heartbeat-chan
sandbox-syncer-state
settings
trigger-chans]
(let [constructor (util/lazy-load-var 'cook.mesos.mesos-compute-cluster/->MesosComputeCluster)
create-mesos-compute-cluster (fn [compute-cluster-name framework-id db-id driver-atom]
(constructor compute-cluster-name
framework-id
db-id
driver-atom
sandbox-syncer-state
exit-code-syncer-state
mesos-heartbeat-chan
trigger-chans))]
((util/lazy-load-var 'cook.mesos.mesos-compute-cluster/setup-compute-cluster-map-from-config) datomic/conn settings
create-mesos-compute-cluster)))
:mesos-datomic-mult (fnk [] :mesos-datomic-mult (fnk []
(first ((util/lazy-load-var 'cook.datomic/create-tx-report-mult) datomic/conn))) (first ((util/lazy-load-var 'cook.datomic/create-tx-report-mult) datomic/conn)))
; TODO(pschorf): Remove hearbeat support
:mesos-heartbeat-chan (fnk []
(async/chan (async/buffer 4096)))
:local-zookeeper (fnk [[:settings zookeeper-server]] :local-zookeeper (fnk [[:settings zookeeper-server]]
(when zookeeper-server (when zookeeper-server
(log/info "Starting local ZK server") (log/info "Starting local ZK server")
Expand All @@ -318,6 +322,8 @@
((util/lazy-load-var 'cook.mesos.sandbox/prepare-sandbox-publisher) ((util/lazy-load-var 'cook.mesos.sandbox/prepare-sandbox-publisher)
framework-id datomic/conn publish-batch-size publish-interval-ms sync-interval-ms framework-id datomic/conn publish-batch-size publish-interval-ms sync-interval-ms
max-consecutive-sync-failure mesos-agent-query-cache))) max-consecutive-sync-failure mesos-agent-query-cache)))
:trigger-chans (fnk [[:settings rebalancer progress optimizer task-constraints]]
((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints))
:clear-uncommitted-canceler (fnk [mesos-leadership-atom] :clear-uncommitted-canceler (fnk [mesos-leadership-atom]
((util/lazy-load-var 'cook.tools/clear-uncommitted-jobs-on-schedule) ((util/lazy-load-var 'cook.tools/clear-uncommitted-jobs-on-schedule)
datomic/conn mesos-leadership-atom)) datomic/conn mesos-leadership-atom))
Expand Down
16 changes: 11 additions & 5 deletions scheduler/src/cook/compute_cluster.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
(ns cook.compute-cluster (ns cook.compute-cluster
(:require [clojure.tools.logging :as log] (:require [clojure.tools.logging :as log]
[cook.config :as config] [cook.config :as config]
[datomic.api :as d] [datomic.api :as d]))
[mesomatic.scheduler :as mesos]))


(defprotocol ComputeCluster (defprotocol ComputeCluster
; These methods should accept bulk data and process in batches. ; These methods should accept bulk data and process in batches.
Expand All @@ -28,10 +27,17 @@
(db-id [this] (db-id [this]
"Get a database entity-id for this compute cluster (used for putting it into a task structure).") "Get a database entity-id for this compute cluster (used for putting it into a task structure).")


(initialize-cluster [this pool->fenzo pool->offers-chan]
"Initializes the cluster. Returns a channel that will be delivered on when the cluster loses leadership.
We expect Cook to give up leadership when a compute cluster loses leadership, so leadership is not expected to be regained.
The channel result will be an exception if an error occurred, or a status message if leadership was lost normally.")

(current-leader? [this]
"Returns true if this cook instance is currently the leader for the compute cluster")

; TODO - remove
(get-mesos-driver-hack [this] (get-mesos-driver-hack [this]
"Get the mesos driver. Hack; any funciton invoking this should be put within the compute-cluster implementation") "Get the mesos driver. Hack; any function invoking this should be put within the compute-cluster implementation"))
(set-mesos-driver-atom-hack! [this driver]
"Hack to overwrite the driver. Used until we fix the initialization order of compute-cluster"))


; Internal method ; Internal method
(defn write-compute-cluster (defn write-compute-cluster
Expand Down
Loading

0 comments on commit 6c6597f

Please sign in to comment.