From 47ccf7466f530858623b41bb10839d1760d56ba5 Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Fri, 31 May 2019 10:32:59 -0500 Subject: [PATCH 01/11] wip --- scheduler/src/cook/components.clj | 89 ++++---- scheduler/src/cook/compute_cluster.clj | 229 ++++++++++++++++++++- scheduler/src/cook/mesos.clj | 75 ++----- scheduler/src/cook/scheduler/scheduler.clj | 156 +------------- scheduler/test/cook/test/zz_simulator.clj | 2 +- 5 files changed, 305 insertions(+), 246 deletions(-) diff --git a/scheduler/src/cook/components.clj b/scheduler/src/cook/components.clj index c426d0ff38..b7b9a73e00 100644 --- a/scheduler/src/cook/components.clj +++ b/scheduler/src/cook/components.clj @@ -24,21 +24,21 @@ [cook.rest.cors :as cors] [cook.curator :as curator] [cook.datomic :as datomic] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.completion namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.completion namespace. [cook.plugins.completion] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.submission namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.submission namespace. [cook.plugins.submission] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.file namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.file namespace. [cook.plugins.file] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.launch namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.launch namespace. [cook.plugins.launch] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.pool namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.pool namespace. [cook.plugins.pool] [cook.rest.impersonation :refer (impersonation-authorized-wrapper)] [cook.pool :as pool] - ; This explicit require is needed so that mount can see the defstate defined in the cook.rate-limit namespace. - ; cook.rate-limit and everything else under cook.rest.api is normally hidden from mount's defstate because - ; cook.rest.api is loaded via util/lazy-load-var, not via 'ns :require' + ; This explicit require is needed so that mount can see the defstate defined in the cook.rate-limit namespace. + ; cook.rate-limit and everything else under cook.rest.api is normally hidden from mount's defstate because + ; cook.rest.api is loaded via util/lazy-load-var, not via 'ns :require' [cook.rate-limit] [cook.util :as util] [datomic.api :as d] @@ -51,7 +51,8 @@ [ring.middleware.params :refer (wrap-params)] [ring.middleware.stacktrace :refer (wrap-stacktrace)] [ring.util.mime-type] - [ring.util.response :refer (response)]) + [ring.util.response :refer (response)] + [clojure.core.async :as async]) (:import clojure.core.async.impl.channels.ManyToManyChannel java.io.IOException java.security.Principal @@ -111,44 +112,46 @@ (do (log/info "Initializing mesos scheduler") (let [make-mesos-driver-fn (partial (util/lazy-load-var 'cook.mesos/make-mesos-driver) - {:mesos-master mesos-master + {:mesos-master mesos-master :mesos-failover-timeout mesos-failover-timeout - :mesos-principal mesos-principal - :mesos-role mesos-role - :mesos-framework-name mesos-framework-name - :gpu-enabled? mesos-gpu-enabled}) - trigger-chans ((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints)] + :mesos-principal mesos-principal + :mesos-role mesos-role + :mesos-framework-name mesos-framework-name + :gpu-enabled? mesos-gpu-enabled}) + trigger-chans ((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints) + mesos-heartbeat-chan (async/chan (async/buffer 4096))] (try (Class/forName "org.apache.mesos.Scheduler") - ((util/lazy-load-var 'cook.mesos/start-mesos-scheduler) - {:curator-framework curator-framework - :exit-code-syncer-state exit-code-syncer-state - :fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered - :fenzo-scaleback fenzo-scaleback - :fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn - :fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset - :fenzo-fitness-calculator fenzo-fitness-calculator - :good-enough-fitness good-enough-fitness} - :framework-id framework-id - :gpu-enabled? mesos-gpu-enabled - :make-mesos-driver-fn make-mesos-driver-fn - :mea-culpa-failure-limit mea-culpa-failure-limit - :mesos-datomic-conn datomic/conn - :mesos-datomic-mult mesos-datomic-mult - :mesos-leadership-atom mesos-leadership-atom + ((util/lazy-load-var 'cook.mesos/start-leader-selector) + {:curator-framework curator-framework + :exit-code-syncer-state exit-code-syncer-state + :fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered + :fenzo-scaleback fenzo-scaleback + :fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn + :fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset + :fenzo-fitness-calculator fenzo-fitness-calculator + :good-enough-fitness good-enough-fitness} + :framework-id framework-id + :gpu-enabled? mesos-gpu-enabled + :make-mesos-driver-fn make-mesos-driver-fn + :mea-culpa-failure-limit mea-culpa-failure-limit + :mesos-datomic-conn datomic/conn + :mesos-datomic-mult mesos-datomic-mult + :mesos-heartbeat-chan mesos-heartbeat-chan + :mesos-leadership-atom mesos-leadership-atom :pool-name->pending-jobs-atom pool-name->pending-jobs-atom - :mesos-run-as-user mesos-run-as-user - :agent-attributes-cache mesos-agent-attributes-cache - :offer-incubate-time-ms offer-incubate-time-ms - :optimizer-config optimizer - :progress-config progress - :rebalancer-config rebalancer - :sandbox-syncer-state sandbox-syncer-state - :server-config {:hostname hostname - :server-port server-port} - :task-constraints task-constraints - :trigger-chans trigger-chans - :zk-prefix mesos-leader-path}) + :mesos-run-as-user mesos-run-as-user + :agent-attributes-cache mesos-agent-attributes-cache + :offer-incubate-time-ms offer-incubate-time-ms + :optimizer-config optimizer + :progress-config progress + :rebalancer-config rebalancer + :sandbox-syncer-state sandbox-syncer-state + :server-config {:hostname hostname + :server-port server-port} + :task-constraints task-constraints + :trigger-chans trigger-chans + :zk-prefix mesos-leader-path}) (catch ClassNotFoundException e (log/warn e "Not loading mesos support...") nil)))) diff --git a/scheduler/src/cook/compute_cluster.clj b/scheduler/src/cook/compute_cluster.clj index 6b646cfc9f..432dd20416 100644 --- a/scheduler/src/cook/compute_cluster.clj +++ b/scheduler/src/cook/compute_cluster.clj @@ -16,8 +16,19 @@ (ns cook.compute-cluster (:require [clojure.tools.logging :as log] [cook.config :as config] + [cook.datomic] [datomic.api :as d] - [mesomatic.scheduler :as mesos])) + [clojure.core.async :as async] + [cook.mesos.sandbox :as sandbox] + [cook.plugins.pool :as pool-plugin] + [mesomatic.scheduler :as mesos] + [clojure.data.json :as json] + [cook.mesos.heartbeat :as heartbeat] + [cook.plugins.definitions :as plugins] + [metrics.meters :as meters] + [plumbing.core :as pc] + [metrics.histograms :as histograms] + [metrics.counters :as counters])) (defprotocol ComputeCluster ; These methods should accept bulk data and process in batches. @@ -28,11 +39,227 @@ (db-id [this] "Get a database entity-id for this compute cluster (used for putting it into a task structure).") + (initialize-cluster [this pool->fenzo pool->offers-chan]) + (get-mesos-driver-hack [this] "Get the mesos driver. Hack; any funciton invoking this should be put within the compute-cluster implementation") (set-mesos-driver-atom-hack! [this driver] "Hack to overwrite the driver. Used until we fix the initialization order of compute-cluster")) +(meters/defmeter [cook-mesos scheduler scheduler-offer-declined]) + +(defn decline-offers + "declines a collection of offer ids" + [driver offer-ids] + (log/debug "Declining offers:" offer-ids) + (doseq [id offer-ids] + (meters/mark! scheduler-offer-declined) + (mesos/decline-offer driver id))) + +(defn decline-offers-safe + "Declines a collection of offers, catching exceptions" + [driver offers] + (try + (decline-offers driver (map :id offers)) + (catch Exception e + (log/error e "Unable to decline offers!")))) + +(defn metric-title + [metric-name pool] + ["cook-mesos" "scheduler" metric-name (str "pool-" pool)]) + +(counters/defcounter [cook-mesos scheduler offer-chan-depth]) +(meters/defmeter [cook-mesos scheduler mesos-error]) +(meters/defmeter [cook-mesos scheduler offer-chan-full-error]) + +(defn receive-offers + [offers-chan match-trigger-chan driver pool-name offers] + (doseq [offer offers] + (histograms/update! (histograms/histogram (metric-title "offer-size-cpus" pool-name)) (get-in offer [:resources :cpus] 0)) + (histograms/update! (histograms/histogram (metric-title "offer-size-mem" pool-name)) (get-in offer [:resources :mem] 0))) + (if (async/offer! offers-chan offers) + (do + (counters/inc! offer-chan-depth) + (async/offer! match-trigger-chan :trigger)) ; :trigger is arbitrary, the value is ignored + (do (log/warn "Offer chan is full. Are we not handling offers fast enough?") + (meters/mark! offer-chan-full-error) + (future + (decline-offers-safe driver offers))))) + +(meters/defmeter [cook-mesos scheduler handle-framework-message-rate]) + +(defn create-mesos-scheduler + "Creates the mesos scheduler which processes status updates asynchronously but in order of receipt." + [gpu-enabled? conn heartbeat-ch pool->fenzo pool->offers-chan match-trigger-chan sandbox-syncer-state compute-cluster + reconcile-jobs-fn reconcile-tasks-fn handle-framework-message-fn handle-status-update-fn] + (let [configured-framework-id (cook.config/framework-id-config)] + (mesos/scheduler + (registered + [this driver framework-id master-info] + (log/info "Registered with mesos with framework-id " framework-id) + (let [value (-> framework-id mesomatic.types/pb->data :value)] + (when (not= configured-framework-id value) + (let [message (str "The framework-id provided by Mesos (" value ") " + "does not match the one Cook is configured with (" configured-framework-id ")")] + (log/error message) + (throw (ex-info message {:framework-id-mesos value :framework-id-cook configured-framework-id}))))) + (when (and gpu-enabled? (not (re-matches #"1\.\d+\.\d+" (:version master-info)))) + (binding [*out* *err*] + (println "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info))) + (log/error "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info)) + (Thread/sleep 1000) + (System/exit 1)) + ;; Use future because the thread that runs mesos/scheduler doesn't load classes correctly. for reasons. + ;; As Sophie says, you want to future proof your code. + (future + (try + (reconcile-jobs-fn conn) + (reconcile-tasks-fn (d/db conn) driver pool->fenzo) + (catch Exception e + (log/error e "Reconciliation error"))))) + (reregistered + [this driver master-info] + (log/info "Reregistered with new master") + (future + (try + (reconcile-jobs-fn conn) + (reconcile-tasks-fn (d/db conn) driver pool->fenzo) + (catch Exception e + (log/error e "Reconciliation error"))))) + ;; Ignore this--we can just wait for new offers + (offer-rescinded + [this driver offer-id] + (comment "TODO: Rescind the offer in fenzo")) + (framework-message + [this driver executor-id slave-id message] + (meters/mark! handle-framework-message-rate) + (try + (let [{:strs [task-id type] :as parsed-message} (json/read-str (String. ^bytes message "UTF-8"))] + (case type + "directory" (sandbox/update-sandbox sandbox-syncer-state parsed-message) + "heartbeat" (heartbeat/notify-heartbeat heartbeat-ch executor-id slave-id parsed-message) + (handle-framework-message-fn parsed-message))) + (catch Exception e + (log/error e "Unable to process framework message" + {:executor-id executor-id, :message message, :slave-id slave-id})))) + (disconnected + [this driver] + (log/error "Disconnected from the previous master")) + ;; We don't care about losing slaves or executors--only tasks + (slave-lost [this driver slave-id]) + (executor-lost [this driver executor-id slave-id status]) + (error + [this driver message] + (meters/mark! mesos-error) + (log/error "Got a mesos error!!!!" message)) + (resource-offers + [this driver raw-offers] + (log/debug "Got offers:" raw-offers) + (let [offers (map #(assoc % :compute-cluster compute-cluster) raw-offers) + pool->offers (group-by (fn [o] (plugins/select-pool pool-plugin/plugin o)) offers) + using-pools? (config/default-pool)] + (log/info "Offers by pool:" (pc/map-vals count pool->offers)) + (run! + (fn [[pool-name offers]] + (let [offer-count (count offers)] + (if using-pools? + (if-let [offers-chan (get pool->offers-chan pool-name)] + (do + (log/info "Processing" offer-count "offer(s) for known pool" pool-name) + (receive-offers offers-chan match-trigger-chan driver pool-name offers)) + (do + (log/warn "Declining" offer-count "offer(s) for non-existent pool" pool-name) + (decline-offers-safe driver offers))) + (if-let [offers-chan (get pool->offers-chan "no-pool")] + (do + (log/info "Processing" offer-count "offer(s) for pool" pool-name "(not using pools)") + (receive-offers offers-chan match-trigger-chan driver pool-name offers)) + (do + (log/error "Declining" offer-count "offer(s) for pool" pool-name "(missing no-pool offer chan)") + (decline-offers-safe driver offers)))))) + pool->offers) + (log/debug "Finished receiving offers for all pools"))) + (status-update + [this driver status] + (handle-status-update-fn status))))) + +(defn make-mesos-driver + "Creates a mesos driver + + Parameters: + mesos-framework-name -- string, name to use when connecting to mesos. + Will be appended with version of cook + mesos-role -- string, (optional) The role to connect to mesos with + mesos-principal -- string, (optional) principal to connect to mesos with + gpu-enabled? -- boolean, (optional) whether cook will schedule gpu jobs + mesos-failover-timeout -- long, (optional) time in milliseconds mesos will wait + for framework to reconnect. + See http://mesos.apache.org/documentation/latest/high-availability-framework-guide/ + and search for failover_timeout + mesos-master -- str, (optional) connection string for mesos masters + scheduler -- mesos scheduler implementation + framework-id -- str, (optional) Id of framework if it has connected to mesos before + " + ([config scheduler] + (make-mesos-driver config scheduler nil)) + ([{:keys [mesos-framework-name mesos-role mesos-principal gpu-enabled? + mesos-failover-timeout mesos-master] :as config} + scheduler framework-id] + (apply mesomatic.scheduler/scheduler-driver + scheduler + (cond-> {:checkpoint true + :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) + :user ""} + framework-id (assoc :id {:value framework-id}) + gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) + mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) + mesos-principal (assoc :principal mesos-principal) + mesos-role (assoc :role mesos-role)) + mesos-master + (when mesos-principal + [{:principal mesos-principal}])))) + +(defrecord MesosComputeCluster [compute-cluster-name framework-id db-id driver-atom + sandbox-syncer-state reconcile-jobs-fn reconcile-tasks-fn handle-framework-message-fn + handle-status-update-fn mesos-heartbeat-chan] + ComputeCluster + (compute-cluster-name [this] + compute-cluster-name) + (get-mesos-driver-hack [this] + @driver-atom) + (db-id [this] + db-id) + (get-mesos-framework-id-hack [this] + framework-id) + (set-mesos-driver-atom-hack! [this driver] + (reset! driver-atom driver)) + (initialize-cluster [this pool->fenzo pool->offers-chan] + (let [mesos-config (select-keys config/config [:mesos-master + :mesos-failover-timeout + :mesos-principal + :mesos-role + :mesos-framework-name + :gpu-enabled?]) + trigger-chans {} + {:keys [match-trigger-chan]} trigger-chans + scheduler (create-mesos-scheduler (:gpu-enabled? mesos-config) + cook.datomic/conn + mesos-heartbeat-chan + pool->fenzo + pool->offers-chan + match-trigger-chan + sandbox-syncer-state + this + reconcile-jobs-fn + reconcile-tasks-fn + handle-framework-message-fn + handle-status-update-fn) + framework-id (config/framework-id-config) + _ (log/info "Initializing mesos driver with config: " mesos-config) + driver (make-mesos-driver mesos-config scheduler framework-id)] + (mesomatic.scheduler/start! driver) + (set-mesos-driver-atom-hack! this driver)))) + ; Internal method (defn write-compute-cluster "Create a missing compute-cluster for one that's not yet in the database." diff --git a/scheduler/src/cook/mesos.clj b/scheduler/src/cook/mesos.clj index df6684c4df..12bf523366 100644 --- a/scheduler/src/cook/mesos.clj +++ b/scheduler/src/cook/mesos.clj @@ -83,44 +83,8 @@ (counters/defcounter [cook-mesos mesos mesos-leader]) -(defn make-mesos-driver - "Creates a mesos driver - - Parameters: - mesos-framework-name -- string, name to use when connecting to mesos. - Will be appended with version of cook - mesos-role -- string, (optional) The role to connect to mesos with - mesos-principal -- string, (optional) principal to connect to mesos with - gpu-enabled? -- boolean, (optional) whether cook will schedule gpu jobs - mesos-failover-timeout -- long, (optional) time in milliseconds mesos will wait - for framework to reconnect. - See http://mesos.apache.org/documentation/latest/high-availability-framework-guide/ - and search for failover_timeout - mesos-master -- str, (optional) connection string for mesos masters - scheduler -- mesos scheduler implementation - framework-id -- str, (optional) Id of framework if it has connected to mesos before - " - ([config scheduler] - (make-mesos-driver config scheduler nil)) - ([{:keys [mesos-framework-name mesos-role mesos-principal gpu-enabled? - mesos-failover-timeout mesos-master] :as config} - scheduler framework-id] - (apply mesomatic.scheduler/scheduler-driver - scheduler - (cond-> {:checkpoint true - :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) - :user ""} - framework-id (assoc :id {:value framework-id}) - gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) - mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) - mesos-principal (assoc :principal mesos-principal) - mesos-role (assoc :role mesos-role)) - mesos-master - (when mesos-principal - [{:principal mesos-principal}])))) - (defn make-trigger-chans - "Creates a map of of the trigger channels expected by `start-mesos-scheduler` + "Creates a map of of the trigger channels expected by `start-leader-selector` Each channel receives chime triggers at particular intervals and it is possible to send additional events as desired" [rebalancer-config progress-config optimizer-config @@ -145,7 +109,7 @@ update-interval-ms (assoc :update-data-local-costs-trigger-chan (prepare-trigger-chan (time/millis update-interval-ms)))))) -(defn start-mesos-scheduler +(defn start-leader-selector "Starts a leader elector. When the process is leader, it starts the mesos scheduler and associated threads to interact with mesos. @@ -171,16 +135,15 @@ [{:keys [curator-framework exit-code-syncer-state fenzo-config framework-id gpu-enabled? make-mesos-driver-fn mea-culpa-failure-limit mesos-datomic-conn mesos-datomic-mult mesos-leadership-atom pool-name->pending-jobs-atom mesos-run-as-user agent-attributes-cache offer-incubate-time-ms optimizer-config progress-config rebalancer-config - sandbox-syncer-state server-config task-constraints trigger-chans zk-prefix]}] + sandbox-syncer-state server-config task-constraints trigger-chans zk-prefix mesos-heartbeat-chan]}] (let [{:keys [fenzo-fitness-calculator fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback good-enough-fitness]} fenzo-config {:keys [cancelled-task-trigger-chan lingering-task-trigger-chan optimizer-trigger-chan rebalancer-trigger-chan straggler-trigger-chan]} trigger-chans {:keys [hostname server-port server-https-port]} server-config datomic-report-chan (async/chan (async/sliding-buffer 4096)) - mesos-heartbeat-chan (async/chan (async/buffer 4096)) + compute-cluster (mcc/mesos-cluster-hack) - current-driver (atom nil) rebalancer-reservation-atom (atom {}) leader-selector (LeaderSelector. curator-framework @@ -194,7 +157,7 @@ ;; TODO: get the framework ID and try to reregister (let [normal-exit (atom true)] (try - (let [{:keys [scheduler view-incubating-offers]} + (let [{:keys [pool-name->fenzo pool->offers-chan view-incubating-offers]} (sched/create-datomic-scheduler {:conn mesos-datomic-conn :compute-cluster compute-cluster @@ -206,7 +169,6 @@ :fenzo-scaleback fenzo-scaleback :good-enough-fitness good-enough-fitness :gpu-enabled? gpu-enabled? - :heartbeat-ch mesos-heartbeat-chan :mea-culpa-failure-limit mea-culpa-failure-limit :mesos-run-as-user mesos-run-as-user :agent-attributes-cache agent-attributes-cache @@ -217,20 +179,21 @@ :sandbox-syncer-state sandbox-syncer-state :task-constraints task-constraints :trigger-chans trigger-chans}) - driver (make-mesos-driver-fn scheduler framework-id)] - (mesomatic.scheduler/start! driver) - (reset! current-driver driver) - (cc/set-mesos-driver-atom-hack! compute-cluster driver) - + cluster-leadership-promise (cc/initialize-cluster compute-cluster + pool-name->fenzo + pool->offers-chan)] (cook.monitor/start-collecting-stats) ; Many of these should look at the compute-cluster of the underlying jobs, and not use driver at all. - (cook.scheduler.scheduler/lingering-task-killer mesos-datomic-conn driver task-constraints lingering-task-trigger-chan) - (cook.scheduler.scheduler/straggler-handler mesos-datomic-conn driver straggler-trigger-chan) - (cook.scheduler.scheduler/cancelled-task-killer mesos-datomic-conn driver cancelled-task-trigger-chan) + (cook.scheduler.scheduler/lingering-task-killer mesos-datomic-conn (cc/get-mesos-driver-hack compute-cluster) + task-constraints lingering-task-trigger-chan) + (cook.scheduler.scheduler/straggler-handler mesos-datomic-conn (cc/get-mesos-driver-hack compute-cluster) + straggler-trigger-chan) + (cook.scheduler.scheduler/cancelled-task-killer mesos-datomic-conn (cc/get-mesos-driver-hack compute-cluster) + cancelled-task-trigger-chan) (cook.mesos.heartbeat/start-heartbeat-watcher! mesos-datomic-conn mesos-heartbeat-chan) (cook.rebalancer/start-rebalancer! {:config rebalancer-config :conn mesos-datomic-conn - :driver driver + :driver (cc/get-mesos-driver-hack compute-cluster) :agent-attributes-cache agent-attributes-cache :pool-name->pending-jobs-atom pool-name->pending-jobs-atom :rebalancer-reservation-atom rebalancer-reservation-atom @@ -252,10 +215,10 @@ (counters/inc! mesos-leader) (async/tap mesos-datomic-mult datomic-report-chan) (cook.scheduler.scheduler/monitor-tx-report-queue datomic-report-chan mesos-datomic-conn) - (mesomatic.scheduler/join! driver) - (reset! current-driver nil)) + @cluster-leadership-promise + (cc/set-mesos-driver-atom-hack! compute-cluster nil)) (catch Throwable e - (log/error e "Lost mesos leadership due to exception") + (log/error e "Lost leadership due to exception") (reset! normal-exit false)) (finally (counters/dec! mesos-leader) @@ -270,7 +233,7 @@ ;; ZK connection (when (#{ConnectionState/LOST ConnectionState/SUSPENDED} newState) (reset! mesos-leadership-atom false) - (when @current-driver + (when (cc/get-mesos-driver-hack compute-cluster) (counters/dec! mesos-leader) ;; Better to fail over and rely on start up code we trust then rely on rarely run code ;; to make sure we yield leadership correctly (and fully) diff --git a/scheduler/src/cook/scheduler/scheduler.clj b/scheduler/src/cook/scheduler/scheduler.clj index 38f902d390..d7d1b95246 100644 --- a/scheduler/src/cook/scheduler/scheduler.clj +++ b/scheduler/src/cook/scheduler/scheduler.clj @@ -102,7 +102,6 @@ (timers/deftimer [cook-mesos scheduler handle-status-update-duration]) (meters/defmeter [cook-mesos scheduler handle-status-update-rate]) (timers/deftimer [cook-mesos scheduler handle-framework-message-duration]) -(meters/defmeter [cook-mesos scheduler handle-framework-message-rate]) (timers/deftimer [cook-mesos scheduler generate-user-usage-map-duration]) @@ -215,6 +214,7 @@ "Takes a status update from mesos." [conn compute-cluster pool->fenzo sync-agent-sandboxes-fn status] (log/info "Mesos status is:" status) + (meters/mark! handle-status-update-rate) (timers/time! handle-status-update-duration (try (let [db (db conn) @@ -553,6 +553,7 @@ (meters/defmeter [cook-mesos scheduler scheduler-offer-declined]) +; TODO (pschorf): Delete (defn decline-offers "declines a collection of offer ids" [driver offer-ids] @@ -1333,9 +1334,6 @@ (reset! pool-name->pending-jobs-atom (rank-jobs (d/db conn) offensive-job-filter)))))) -(meters/defmeter [cook-mesos scheduler mesos-error]) -(meters/defmeter [cook-mesos scheduler offer-chan-full-error]) - (defn make-fenzo-scheduler [compute-cluster offer-incubate-time-ms fitness-calculator good-enough-fitness] (.. (TaskScheduler$Builder.) @@ -1375,28 +1373,6 @@ @(d/transact conn [{:db/id :scheduler/config :scheduler.config/mea-culpa-failure-limit limits}]))) -(defn decline-offers-safe - "Declines a collection of offers, catching exceptions" - [driver offers] - (try - (decline-offers driver (map :id offers)) - (catch Exception e - (log/error e "Unable to decline offers!")))) - -(defn receive-offers - [offers-chan match-trigger-chan driver pool-name offers] - (doseq [offer offers] - (histograms/update! (histograms/histogram (metric-title "offer-size-cpus" pool-name)) (get-in offer [:resources :cpus] 0)) - (histograms/update! (histograms/histogram (metric-title "offer-size-mem" pool-name)) (get-in offer [:resources :mem] 0))) - (if (async/offer! offers-chan offers) - (do - (counters/inc! offer-chan-depth) - (async/offer! match-trigger-chan :trigger)) ; :trigger is arbitrary, the value is ignored - (do (log/warn "Offer chan is full. Are we not handling offers fast enough?") - (meters/mark! offer-chan-full-error) - (future - (decline-offers-safe driver offers))))) - (let [in-order-queue-counter (counters/counter ["cook-mesos" "scheduler" "in-order-queue-size"]) in-order-queue-timer (timers/timer ["cook-mesos" "scheduler" "in-order-queue-delay-duration"]) parallelism 19 ; a prime number to potentially help make the distribution uniform @@ -1421,118 +1397,16 @@ (counters/dec! in-order-queue-counter) (body-fn)))))) -(defn create-mesos-scheduler - "Creates the mesos scheduler which processes status updates asynchronously but in order of receipt." - [gpu-enabled? conn heartbeat-ch pool->fenzo pool->offers-chan match-trigger-chan - handle-exit-code handle-progress-message sandbox-syncer-state compute-cluster] - (let [configured-framework-id (cook.config/framework-id-config) - sync-agent-sandboxes-fn #(sandbox/sync-agent-sandboxes sandbox-syncer-state configured-framework-id %1 %2) - message-handlers {:handle-exit-code handle-exit-code - :handle-progress-message handle-progress-message}] - (mesos/scheduler - (registered - [this driver framework-id master-info] - (log/info "Registered with mesos with framework-id " framework-id) - (let [value (-> framework-id mesomatic.types/pb->data :value)] - (when (not= configured-framework-id value) - (let [message (str "The framework-id provided by Mesos (" value ") " - "does not match the one Cook is configured with (" configured-framework-id ")")] - (log/error message) - (throw (ex-info message {:framework-id-mesos value :framework-id-cook configured-framework-id}))))) - (when (and gpu-enabled? (not (re-matches #"1\.\d+\.\d+" (:version master-info)))) - (binding [*out* *err*] - (println "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info))) - (log/error "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info)) - (Thread/sleep 1000) - (System/exit 1)) - ;; Use future because the thread that runs mesos/scheduler doesn't load classes correctly. for reasons. - ;; As Sophie says, you want to future proof your code. - (future - (try - (reconcile-jobs conn) - (reconcile-tasks (db conn) driver pool->fenzo) - (catch Exception e - (log/error e "Reconciliation error"))))) - (reregistered - [this driver master-info] - (log/info "Reregistered with new master") - (future - (try - (reconcile-jobs conn) - (reconcile-tasks (db conn) driver pool->fenzo) - (catch Exception e - (log/error e "Reconciliation error"))))) - ;; Ignore this--we can just wait for new offers - (offer-rescinded - [this driver offer-id] - (comment "TODO: Rescind the offer in fenzo")) - (framework-message - [this driver executor-id slave-id message] - (meters/mark! handle-framework-message-rate) - (try - (let [{:strs [task-id type] :as parsed-message} (json/read-str (String. ^bytes message "UTF-8"))] - (case type - "directory" (sandbox/update-sandbox sandbox-syncer-state parsed-message) - "heartbeat" (heartbeat/notify-heartbeat heartbeat-ch executor-id slave-id parsed-message) - (async-in-order-processing - task-id #(handle-framework-message conn message-handlers parsed-message)))) - (catch Exception e - (log/error e "Unable to process framework message" - {:executor-id executor-id, :message message, :slave-id slave-id})))) - (disconnected - [this driver] - (log/error "Disconnected from the previous master")) - ;; We don't care about losing slaves or executors--only tasks - (slave-lost [this driver slave-id]) - (executor-lost [this driver executor-id slave-id status]) - (error - [this driver message] - (meters/mark! mesos-error) - (log/error "Got a mesos error!!!!" message)) - (resource-offers - [this driver raw-offers] - (log/debug "Got offers:" raw-offers) - (let [offers (map #(assoc % :compute-cluster compute-cluster) raw-offers) - pool->offers (group-by (fn [o] (plugins/select-pool pool-plugin/plugin o)) offers) - using-pools? (config/default-pool)] - (log/info "Offers by pool:" (pc/map-vals count pool->offers)) - (run! - (fn [[pool-name offers]] - (let [offer-count (count offers)] - (if using-pools? - (if-let [offers-chan (get pool->offers-chan pool-name)] - (do - (log/info "Processing" offer-count "offer(s) for known pool" pool-name) - (receive-offers offers-chan match-trigger-chan driver pool-name offers)) - (do - (log/warn "Declining" offer-count "offer(s) for non-existent pool" pool-name) - (decline-offers-safe driver offers))) - (if-let [offers-chan (get pool->offers-chan "no-pool")] - (do - (log/info "Processing" offer-count "offer(s) for pool" pool-name "(not using pools)") - (receive-offers offers-chan match-trigger-chan driver pool-name offers)) - (do - (log/error "Declining" offer-count "offer(s) for pool" pool-name "(missing no-pool offer chan)") - (decline-offers-safe driver offers)))))) - pool->offers) - (log/debug "Finished receiving offers for all pools"))) - (status-update - [this driver status] - (meters/mark! handle-status-update-rate) - (let [task-id (-> status :task-id :value)] - (async-in-order-processing - task-id #(handle-status-update conn compute-cluster pool->fenzo sync-agent-sandboxes-fn status))))))) - (defn create-datomic-scheduler - [{:keys [conn compute-cluster exit-code-syncer-state fenzo-fitness-calculator fenzo-floor-iterations-before-reset + [{:keys [conn compute-cluster fenzo-fitness-calculator fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback good-enough-fitness - gpu-enabled? heartbeat-ch mea-culpa-failure-limit mesos-run-as-user agent-attributes-cache offer-incubate-time-ms - pool-name->pending-jobs-atom progress-config rebalancer-reservation-atom sandbox-syncer-state task-constraints + mea-culpa-failure-limit mesos-run-as-user agent-attributes-cache offer-incubate-time-ms + pool-name->pending-jobs-atom rebalancer-reservation-atom task-constraints trigger-chans]}] (persist-mea-culpa-failure-limit! conn mea-culpa-failure-limit) - (let [{:keys [match-trigger-chan progress-updater-trigger-chan rank-trigger-chan]} trigger-chans + (let [{:keys [match-trigger-chan rank-trigger-chan]} trigger-chans pools (pool/all-pools (d/db conn)) pools' (if (-> pools count pos?) pools @@ -1552,17 +1426,9 @@ (assoc-in [:pool->offers-chan pool-name] offers-chan) (assoc-in [:pool->resources-atom pool-name] resources-atom)))) {} - pools') - {:keys [batch-size]} progress-config - {:keys [progress-state-chan]} (progress/progress-update-transactor progress-updater-trigger-chan batch-size conn) - progress-aggregator-chan (progress/progress-update-aggregator progress-config progress-state-chan) - handle-progress-message (fn handle-progress-message-curried [progress-message-map] - (progress/handle-progress-message! progress-aggregator-chan progress-message-map)) - handle-exit-code (fn handle-exit-code [task-id exit-code] - (sandbox/aggregate-exit-code exit-code-syncer-state task-id exit-code))] + pools')] (start-jobs-prioritizer! conn pool-name->pending-jobs-atom task-constraints rank-trigger-chan) - {:scheduler (create-mesos-scheduler gpu-enabled? conn heartbeat-ch pool-name->fenzo pool->offers-chan - match-trigger-chan handle-exit-code handle-progress-message sandbox-syncer-state - compute-cluster) - :view-incubating-offers (fn get-resources-atom [p] - (deref (get pool->resources-atom p)))})) + {:view-incubating-offers (fn get-resources-atom [p] + (deref (get pool->resources-atom p))) + :pool->offers-chan pool->offers-chan + :pool-name->fenzo pool-name->fenzo})) diff --git a/scheduler/test/cook/test/zz_simulator.clj b/scheduler/test/cook/test/zz_simulator.clj index 1df6c46c09..e9273887b4 100644 --- a/scheduler/test/cook/test/zz_simulator.clj +++ b/scheduler/test/cook/test/zz_simulator.clj @@ -134,7 +134,7 @@ ; registration responses matches the configured cook scheduler passes simulator ; and mesos-mock unit tests. (cook.scheduler, lines 1428 create-mesos-scheduler) cook.config/framework-id-config (constantly framework-id#)] - (c/start-mesos-scheduler + (c/start-leader-selector {:curator-framework curator-framework# :exit-code-syncer-state exit-code-syncer-state# :fenzo-config fenzo-config# From b5ebb0641d006b6f64e2f647ce179aa67e48726b Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Fri, 31 May 2019 12:13:58 -0500 Subject: [PATCH 02/11] Most integration tests passing --- scheduler/src/cook/components.clj | 38 +-- scheduler/src/cook/compute_cluster.clj | 214 ----------------- scheduler/src/cook/mesos.clj | 4 +- .../src/cook/mesos/mesos_compute_cluster.clj | 217 +++++++++++++++++- scheduler/src/cook/progress.clj | 1 + scheduler/src/cook/scheduler/scheduler.clj | 28 ++- .../test/cook/test/scheduler/scheduler.clj | 110 --------- 7 files changed, 260 insertions(+), 352 deletions(-) diff --git a/scheduler/src/cook/components.clj b/scheduler/src/cook/components.clj index b7b9a73e00..3906f95721 100644 --- a/scheduler/src/cook/components.clj +++ b/scheduler/src/cook/components.clj @@ -101,8 +101,8 @@ mesos-role mesos-run-as-user offer-incubate-time-ms optimizer progress rebalancer server-port task-constraints] curator-framework exit-code-syncer-state framework-id mesos-datomic-mult mesos-leadership-atom - mesos-agent-attributes-cache pool-name->pending-jobs-atom sandbox-syncer-state - compute-clusters] + mesos-agent-attributes-cache pool-name->pending-jobs-atom sandbox-syncer-state mesos-heartbeat-chan + compute-clusters trigger-chans] (if (cook.config/api-only-mode?) (if curator-framework (throw (ex-info "This node is configured for API-only mode, but also has a curator configured" @@ -111,15 +111,7 @@ (if curator-framework (do (log/info "Initializing mesos scheduler") - (let [make-mesos-driver-fn (partial (util/lazy-load-var 'cook.mesos/make-mesos-driver) - {:mesos-master mesos-master - :mesos-failover-timeout mesos-failover-timeout - :mesos-principal mesos-principal - :mesos-role mesos-role - :mesos-framework-name mesos-framework-name - :gpu-enabled? mesos-gpu-enabled}) - trigger-chans ((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints) - mesos-heartbeat-chan (async/chan (async/buffer 4096))] + (let [] (try (Class/forName "org.apache.mesos.Scheduler") ((util/lazy-load-var 'cook.mesos/start-leader-selector) @@ -133,7 +125,6 @@ :good-enough-fitness good-enough-fitness} :framework-id framework-id :gpu-enabled? mesos-gpu-enabled - :make-mesos-driver-fn make-mesos-driver-fn :mea-culpa-failure-limit mea-culpa-failure-limit :mesos-datomic-conn datomic/conn :mesos-datomic-mult mesos-datomic-mult @@ -297,10 +288,27 @@ (throw (ex-info "Framework id not configured and not in ZooKeeper" {}))) (log/info "Using framework id:" framework-id) framework-id))) - :compute-clusters (fnk [settings] - ((util/lazy-load-var 'cook.mesos.mesos-compute-cluster/setup-compute-cluster-map-from-config) datomic/conn settings)) + :compute-clusters (fnk [exit-code-syncer-state + mesos-heartbeat-chan + sandbox-syncer-state + settings + trigger-chans] + (let [constructor (util/lazy-load-var 'cook.mesos.mesos-compute-cluster/->MesosComputeCluster) + create-mesos-compute-cluster (fn [compute-cluster-name framework-id db-id driver-atom] + (constructor compute-cluster-name + framework-id + db-id + driver-atom + sandbox-syncer-state + exit-code-syncer-state + mesos-heartbeat-chan + trigger-chans))] + ((util/lazy-load-var 'cook.mesos.mesos-compute-cluster/setup-compute-cluster-map-from-config) datomic/conn settings + create-mesos-compute-cluster))) :mesos-datomic-mult (fnk [] (first ((util/lazy-load-var 'cook.datomic/create-tx-report-mult) datomic/conn))) + :mesos-heartbeat-chan (fnk [] + (async/chan (async/buffer 4096))) :local-zookeeper (fnk [[:settings zookeeper-server]] (when zookeeper-server (log/info "Starting local ZK server") @@ -321,6 +329,8 @@ ((util/lazy-load-var 'cook.mesos.sandbox/prepare-sandbox-publisher) framework-id datomic/conn publish-batch-size publish-interval-ms sync-interval-ms max-consecutive-sync-failure mesos-agent-query-cache))) + :trigger-chans (fnk [[:settings rebalancer progress optimizer task-constraints]] + ((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints)) :clear-uncommitted-canceler (fnk [mesos-leadership-atom] ((util/lazy-load-var 'cook.tools/clear-uncommitted-jobs-on-schedule) datomic/conn mesos-leadership-atom)) diff --git a/scheduler/src/cook/compute_cluster.clj b/scheduler/src/cook/compute_cluster.clj index 432dd20416..aa781a9cc9 100644 --- a/scheduler/src/cook/compute_cluster.clj +++ b/scheduler/src/cook/compute_cluster.clj @@ -46,220 +46,6 @@ (set-mesos-driver-atom-hack! [this driver] "Hack to overwrite the driver. Used until we fix the initialization order of compute-cluster")) -(meters/defmeter [cook-mesos scheduler scheduler-offer-declined]) - -(defn decline-offers - "declines a collection of offer ids" - [driver offer-ids] - (log/debug "Declining offers:" offer-ids) - (doseq [id offer-ids] - (meters/mark! scheduler-offer-declined) - (mesos/decline-offer driver id))) - -(defn decline-offers-safe - "Declines a collection of offers, catching exceptions" - [driver offers] - (try - (decline-offers driver (map :id offers)) - (catch Exception e - (log/error e "Unable to decline offers!")))) - -(defn metric-title - [metric-name pool] - ["cook-mesos" "scheduler" metric-name (str "pool-" pool)]) - -(counters/defcounter [cook-mesos scheduler offer-chan-depth]) -(meters/defmeter [cook-mesos scheduler mesos-error]) -(meters/defmeter [cook-mesos scheduler offer-chan-full-error]) - -(defn receive-offers - [offers-chan match-trigger-chan driver pool-name offers] - (doseq [offer offers] - (histograms/update! (histograms/histogram (metric-title "offer-size-cpus" pool-name)) (get-in offer [:resources :cpus] 0)) - (histograms/update! (histograms/histogram (metric-title "offer-size-mem" pool-name)) (get-in offer [:resources :mem] 0))) - (if (async/offer! offers-chan offers) - (do - (counters/inc! offer-chan-depth) - (async/offer! match-trigger-chan :trigger)) ; :trigger is arbitrary, the value is ignored - (do (log/warn "Offer chan is full. Are we not handling offers fast enough?") - (meters/mark! offer-chan-full-error) - (future - (decline-offers-safe driver offers))))) - -(meters/defmeter [cook-mesos scheduler handle-framework-message-rate]) - -(defn create-mesos-scheduler - "Creates the mesos scheduler which processes status updates asynchronously but in order of receipt." - [gpu-enabled? conn heartbeat-ch pool->fenzo pool->offers-chan match-trigger-chan sandbox-syncer-state compute-cluster - reconcile-jobs-fn reconcile-tasks-fn handle-framework-message-fn handle-status-update-fn] - (let [configured-framework-id (cook.config/framework-id-config)] - (mesos/scheduler - (registered - [this driver framework-id master-info] - (log/info "Registered with mesos with framework-id " framework-id) - (let [value (-> framework-id mesomatic.types/pb->data :value)] - (when (not= configured-framework-id value) - (let [message (str "The framework-id provided by Mesos (" value ") " - "does not match the one Cook is configured with (" configured-framework-id ")")] - (log/error message) - (throw (ex-info message {:framework-id-mesos value :framework-id-cook configured-framework-id}))))) - (when (and gpu-enabled? (not (re-matches #"1\.\d+\.\d+" (:version master-info)))) - (binding [*out* *err*] - (println "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info))) - (log/error "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info)) - (Thread/sleep 1000) - (System/exit 1)) - ;; Use future because the thread that runs mesos/scheduler doesn't load classes correctly. for reasons. - ;; As Sophie says, you want to future proof your code. - (future - (try - (reconcile-jobs-fn conn) - (reconcile-tasks-fn (d/db conn) driver pool->fenzo) - (catch Exception e - (log/error e "Reconciliation error"))))) - (reregistered - [this driver master-info] - (log/info "Reregistered with new master") - (future - (try - (reconcile-jobs-fn conn) - (reconcile-tasks-fn (d/db conn) driver pool->fenzo) - (catch Exception e - (log/error e "Reconciliation error"))))) - ;; Ignore this--we can just wait for new offers - (offer-rescinded - [this driver offer-id] - (comment "TODO: Rescind the offer in fenzo")) - (framework-message - [this driver executor-id slave-id message] - (meters/mark! handle-framework-message-rate) - (try - (let [{:strs [task-id type] :as parsed-message} (json/read-str (String. ^bytes message "UTF-8"))] - (case type - "directory" (sandbox/update-sandbox sandbox-syncer-state parsed-message) - "heartbeat" (heartbeat/notify-heartbeat heartbeat-ch executor-id slave-id parsed-message) - (handle-framework-message-fn parsed-message))) - (catch Exception e - (log/error e "Unable to process framework message" - {:executor-id executor-id, :message message, :slave-id slave-id})))) - (disconnected - [this driver] - (log/error "Disconnected from the previous master")) - ;; We don't care about losing slaves or executors--only tasks - (slave-lost [this driver slave-id]) - (executor-lost [this driver executor-id slave-id status]) - (error - [this driver message] - (meters/mark! mesos-error) - (log/error "Got a mesos error!!!!" message)) - (resource-offers - [this driver raw-offers] - (log/debug "Got offers:" raw-offers) - (let [offers (map #(assoc % :compute-cluster compute-cluster) raw-offers) - pool->offers (group-by (fn [o] (plugins/select-pool pool-plugin/plugin o)) offers) - using-pools? (config/default-pool)] - (log/info "Offers by pool:" (pc/map-vals count pool->offers)) - (run! - (fn [[pool-name offers]] - (let [offer-count (count offers)] - (if using-pools? - (if-let [offers-chan (get pool->offers-chan pool-name)] - (do - (log/info "Processing" offer-count "offer(s) for known pool" pool-name) - (receive-offers offers-chan match-trigger-chan driver pool-name offers)) - (do - (log/warn "Declining" offer-count "offer(s) for non-existent pool" pool-name) - (decline-offers-safe driver offers))) - (if-let [offers-chan (get pool->offers-chan "no-pool")] - (do - (log/info "Processing" offer-count "offer(s) for pool" pool-name "(not using pools)") - (receive-offers offers-chan match-trigger-chan driver pool-name offers)) - (do - (log/error "Declining" offer-count "offer(s) for pool" pool-name "(missing no-pool offer chan)") - (decline-offers-safe driver offers)))))) - pool->offers) - (log/debug "Finished receiving offers for all pools"))) - (status-update - [this driver status] - (handle-status-update-fn status))))) - -(defn make-mesos-driver - "Creates a mesos driver - - Parameters: - mesos-framework-name -- string, name to use when connecting to mesos. - Will be appended with version of cook - mesos-role -- string, (optional) The role to connect to mesos with - mesos-principal -- string, (optional) principal to connect to mesos with - gpu-enabled? -- boolean, (optional) whether cook will schedule gpu jobs - mesos-failover-timeout -- long, (optional) time in milliseconds mesos will wait - for framework to reconnect. - See http://mesos.apache.org/documentation/latest/high-availability-framework-guide/ - and search for failover_timeout - mesos-master -- str, (optional) connection string for mesos masters - scheduler -- mesos scheduler implementation - framework-id -- str, (optional) Id of framework if it has connected to mesos before - " - ([config scheduler] - (make-mesos-driver config scheduler nil)) - ([{:keys [mesos-framework-name mesos-role mesos-principal gpu-enabled? - mesos-failover-timeout mesos-master] :as config} - scheduler framework-id] - (apply mesomatic.scheduler/scheduler-driver - scheduler - (cond-> {:checkpoint true - :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) - :user ""} - framework-id (assoc :id {:value framework-id}) - gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) - mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) - mesos-principal (assoc :principal mesos-principal) - mesos-role (assoc :role mesos-role)) - mesos-master - (when mesos-principal - [{:principal mesos-principal}])))) - -(defrecord MesosComputeCluster [compute-cluster-name framework-id db-id driver-atom - sandbox-syncer-state reconcile-jobs-fn reconcile-tasks-fn handle-framework-message-fn - handle-status-update-fn mesos-heartbeat-chan] - ComputeCluster - (compute-cluster-name [this] - compute-cluster-name) - (get-mesos-driver-hack [this] - @driver-atom) - (db-id [this] - db-id) - (get-mesos-framework-id-hack [this] - framework-id) - (set-mesos-driver-atom-hack! [this driver] - (reset! driver-atom driver)) - (initialize-cluster [this pool->fenzo pool->offers-chan] - (let [mesos-config (select-keys config/config [:mesos-master - :mesos-failover-timeout - :mesos-principal - :mesos-role - :mesos-framework-name - :gpu-enabled?]) - trigger-chans {} - {:keys [match-trigger-chan]} trigger-chans - scheduler (create-mesos-scheduler (:gpu-enabled? mesos-config) - cook.datomic/conn - mesos-heartbeat-chan - pool->fenzo - pool->offers-chan - match-trigger-chan - sandbox-syncer-state - this - reconcile-jobs-fn - reconcile-tasks-fn - handle-framework-message-fn - handle-status-update-fn) - framework-id (config/framework-id-config) - _ (log/info "Initializing mesos driver with config: " mesos-config) - driver (make-mesos-driver mesos-config scheduler framework-id)] - (mesomatic.scheduler/start! driver) - (set-mesos-driver-atom-hack! this driver)))) - ; Internal method (defn write-compute-cluster "Create a missing compute-cluster for one that's not yet in the database." diff --git a/scheduler/src/cook/mesos.clj b/scheduler/src/cook/mesos.clj index 12bf523366..2abf6b570c 100644 --- a/scheduler/src/cook/mesos.clj +++ b/scheduler/src/cook/mesos.clj @@ -215,7 +215,9 @@ (counters/inc! mesos-leader) (async/tap mesos-datomic-mult datomic-report-chan) (cook.scheduler.scheduler/monitor-tx-report-queue datomic-report-chan mesos-datomic-conn) - @cluster-leadership-promise + (let [res (async/fenzo pool->offers-chan match-trigger-chan + handle-exit-code handle-progress-message sandbox-syncer-state compute-cluster] + (let [configured-framework-id (cook.config/framework-id-config) + sync-agent-sandboxes-fn #(sandbox/sync-agent-sandboxes sandbox-syncer-state configured-framework-id %1 %2) + message-handlers {:handle-exit-code handle-exit-code + :handle-progress-message handle-progress-message}] + (mesos/scheduler + (registered + [this driver framework-id master-info] + (log/info "Registered with mesos with framework-id " framework-id) + (let [value (-> framework-id mesomatic.types/pb->data :value)] + (when (not= configured-framework-id value) + (let [message (str "The framework-id provided by Mesos (" value ") " + "does not match the one Cook is configured with (" configured-framework-id ")")] + (log/error message) + (throw (ex-info message {:framework-id-mesos value :framework-id-cook configured-framework-id}))))) + (when (and gpu-enabled? (not (re-matches #"1\.\d+\.\d+" (:version master-info)))) + (binding [*out* *err*] + (println "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info))) + (log/error "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info)) + (Thread/sleep 1000) + (System/exit 1)) + ;; Use future because the thread that runs mesos/scheduler doesn't load classes correctly. for reasons. + ;; As Sophie says, you want to future proof your code. + (future + (try + (sched/reconcile-jobs conn) + (sched/reconcile-tasks (d/db conn) driver pool->fenzo) + (catch Exception e + (log/error e "Reconciliation error"))))) + (reregistered + [this driver master-info] + (log/info "Reregistered with new master") + (future + (try + (sched/reconcile-jobs conn) + (sched/reconcile-tasks (d/db conn) driver pool->fenzo) + (catch Exception e + (log/error e "Reconciliation error"))))) + ;; Ignore this--we can just wait for new offers + (offer-rescinded + [this driver offer-id] + (comment "TODO: Rescind the offer in fenzo")) + (framework-message + [this driver executor-id slave-id message] + (meters/mark! handle-framework-message-rate) + (try + (let [{:strs [task-id type] :as parsed-message} (json/read-str (String. ^bytes message "UTF-8"))] + (case type + "directory" (sandbox/update-sandbox sandbox-syncer-state parsed-message) + "heartbeat" (heartbeat/notify-heartbeat heartbeat-ch executor-id slave-id parsed-message) + (sched/async-in-order-processing + task-id #(sched/handle-framework-message conn message-handlers parsed-message)))) + (catch Exception e + (log/error e "Unable to process framework message" + {:executor-id executor-id, :message message, :slave-id slave-id})))) + (disconnected + [this driver] + (log/error "Disconnected from the previous master")) + ;; We don't care about losing slaves or executors--only tasks + (slave-lost [this driver slave-id]) + (executor-lost [this driver executor-id slave-id status]) + (error + [this driver message] + (meters/mark! mesos-error) + (log/error "Got a mesos error!!!!" message)) + (resource-offers + [this driver raw-offers] + (log/debug "Got offers:" raw-offers) + (let [offers (map #(assoc % :compute-cluster compute-cluster) raw-offers) + pool->offers (group-by (fn [o] (plugins/select-pool pool-plugin/plugin o)) offers) + using-pools? (config/default-pool)] + (log/info "Offers by pool:" (pc/map-vals count pool->offers)) + (run! + (fn [[pool-name offers]] + (let [offer-count (count offers)] + (if using-pools? + (if-let [offers-chan (get pool->offers-chan pool-name)] + (do + (log/info "Processing" offer-count "offer(s) for known pool" pool-name) + (sched/receive-offers offers-chan match-trigger-chan driver pool-name offers)) + (do + (log/warn "Declining" offer-count "offer(s) for non-existent pool" pool-name) + (sched/decline-offers-safe driver offers))) + (if-let [offers-chan (get pool->offers-chan "no-pool")] + (do + (log/info "Processing" offer-count "offer(s) for pool" pool-name "(not using pools)") + (sched/receive-offers offers-chan match-trigger-chan driver pool-name offers)) + (do + (log/error "Declining" offer-count "offer(s) for pool" pool-name "(missing no-pool offer chan)") + (sched/decline-offers-safe driver offers)))))) + pool->offers) + (log/debug "Finished receiving offers for all pools"))) + (status-update + [this driver status] + (meters/mark! handle-status-update-rate) + (let [task-id (-> status :task-id :value)] + (sched/async-in-order-processing + task-id #(sched/handle-status-update conn compute-cluster pool->fenzo sync-agent-sandboxes-fn status))))))) + + +(defn make-mesos-driver + "Creates a mesos driver + + Parameters: + mesos-framework-name -- string, name to use when connecting to mesos. + Will be appended with version of cook + mesos-role -- string, (optional) The role to connect to mesos with + mesos-principal -- string, (optional) principal to connect to mesos with + gpu-enabled? -- boolean, (optional) whether cook will schedule gpu jobs + mesos-failover-timeout -- long, (optional) time in milliseconds mesos will wait + for framework to reconnect. + See http://mesos.apache.org/documentation/latest/high-availability-framework-guide/ + and search for failover_timeout + mesos-master -- str, (optional) connection string for mesos masters + scheduler -- mesos scheduler implementation + framework-id -- str, (optional) Id of framework if it has connected to mesos before + " + ([config scheduler] + (make-mesos-driver config scheduler nil)) + ([{:keys [mesos-framework-name mesos-role mesos-principal gpu-enabled? + mesos-failover-timeout mesos-master] :as config} + scheduler framework-id] + (apply mesomatic.scheduler/scheduler-driver + scheduler + (cond-> {:checkpoint true + :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) + :user ""} + framework-id (assoc :id {:value framework-id}) + gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) + mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) + mesos-principal (assoc :principal mesos-principal) + mesos-role (assoc :role mesos-role)) + mesos-master + (when mesos-principal + [{:principal mesos-principal}])))) + +(defrecord MesosComputeCluster [compute-cluster-name framework-id db-id driver-atom + sandbox-syncer-state exit-code-syncer-state mesos-heartbeat-chan + trigger-chans] cc/ComputeCluster (compute-cluster-name [this] compute-cluster-name) @@ -34,7 +192,45 @@ (db-id [this] db-id) (set-mesos-driver-atom-hack! [this driver] - (reset! driver-atom driver))) + (reset! driver-atom driver)) + (initialize-cluster [this pool->fenzo pool->offers-chan] + (let [settings (:settings config/config) + mesos-config (select-keys settings [:mesos-master + :mesos-failover-timeout + :mesos-principal + :mesos-role + :mesos-framework-name + :gpu-enabled?]) + progress-config (:progress settings) + conn cook.datomic/conn + {:keys [match-trigger-chan progress-updater-trigger-chan]} trigger-chans + {:keys [batch-size]} progress-config + {:keys [progress-state-chan]} (progress/progress-update-transactor progress-updater-trigger-chan batch-size conn) + progress-aggregator-chan (progress/progress-update-aggregator progress-config progress-state-chan) + handle-progress-message (fn handle-progress-message-curried [progress-message-map] + (progress/handle-progress-message! progress-aggregator-chan progress-message-map)) + handle-exit-code (fn handle-exit-code [task-id exit-code] + (sandbox/aggregate-exit-code exit-code-syncer-state task-id exit-code)) + scheduler (create-mesos-scheduler (:gpu-enabled? mesos-config) + conn + mesos-heartbeat-chan + pool->fenzo + pool->offers-chan + match-trigger-chan + handle-exit-code + handle-progress-message + sandbox-syncer-state + this) + framework-id (config/framework-id-config) + _ (log/info "Initializing mesos driver with config: " mesos-config) + driver (make-mesos-driver mesos-config scheduler framework-id)] + (mesomatic.scheduler/start! driver) + (reset! driver-atom driver) + (async/thread + (try + (mesomatic.scheduler/join! driver) + (catch Exception e + e)))))) ; Internal method (defn- mesos-cluster->compute-cluster-map-for-datomic @@ -65,17 +261,16 @@ (defn get-mesos-compute-cluster "Process one mesos cluster specification, returning the entity id of the corresponding compute-cluster, creating the cluster if it does not exist. Warning: Not idempotent. Only call once " - [conn {:keys [compute-cluster-name framework-id] :as mesos-cluster}] + [conn create-mesos-compute-cluster {:keys [compute-cluster-name framework-id] :as mesos-cluster}] {:pre [compute-cluster-name framework-id]} (let [cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)] (when-not cluster-entity-id (cc/write-compute-cluster conn (mesos-cluster->compute-cluster-map-for-datomic mesos-cluster))) - (->MesosComputeCluster - compute-cluster-name - framework-id - (or cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)) - (atom nil)))) + (create-mesos-compute-cluster compute-cluster-name + framework-id + (or cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)) + (atom nil)))) (defn- get-mesos-clusters-from-config "Get all of the mesos clusters defined in the configuration. @@ -105,9 +300,9 @@ (defn setup-compute-cluster-map-from-config "Setup the cluster-map configs, linking a cluster name to the associated metadata needed to represent/process it." - [conn settings] + [conn settings create-mesos-compute-cluster] (let [compute-clusters (->> (get-mesos-clusters-from-config settings) - (map (partial get-mesos-compute-cluster conn)) + (map (partial get-mesos-compute-cluster conn create-mesos-compute-cluster)) (map cc/register-compute-cluster!))] (doall compute-clusters))) diff --git a/scheduler/src/cook/progress.clj b/scheduler/src/cook/progress.clj index 071277650b..60cb8a53c9 100644 --- a/scheduler/src/cook/progress.clj +++ b/scheduler/src/cook/progress.clj @@ -78,6 +78,7 @@ Note: the wrapper chan is used due to our use of `util/xform-pipe`" [{:keys [pending-threshold publish-interval-ms sequence-cache-threshold]} progress-state-chan] + (log/info "In aggregator: " pending-threshold publish-interval-ms sequence-cache-threshold progress-state-chan) (log/info "Starting progress update aggregator") (let [progress-aggregator-chan (async/chan (async/sliding-buffer pending-threshold)) sequence-cache-store (-> {} diff --git a/scheduler/src/cook/scheduler/scheduler.clj b/scheduler/src/cook/scheduler/scheduler.clj index d7d1b95246..2d1457e88f 100644 --- a/scheduler/src/cook/scheduler/scheduler.clj +++ b/scheduler/src/cook/scheduler/scheduler.clj @@ -100,8 +100,8 @@ (merge mesos-attributes cook-attributes))) (timers/deftimer [cook-mesos scheduler handle-status-update-duration]) -(meters/defmeter [cook-mesos scheduler handle-status-update-rate]) (timers/deftimer [cook-mesos scheduler handle-framework-message-duration]) +(meters/defmeter [cook-mesos scheduler handle-framework-message-rate]) (timers/deftimer [cook-mesos scheduler generate-user-usage-map-duration]) @@ -214,7 +214,6 @@ "Takes a status update from mesos." [conn compute-cluster pool->fenzo sync-agent-sandboxes-fn status] (log/info "Mesos status is:" status) - (meters/mark! handle-status-update-rate) (timers/time! handle-status-update-duration (try (let [db (db conn) @@ -1334,6 +1333,9 @@ (reset! pool-name->pending-jobs-atom (rank-jobs (d/db conn) offensive-job-filter)))))) +(meters/defmeter [cook-mesos scheduler mesos-error]) +(meters/defmeter [cook-mesos scheduler offer-chan-full-error]) + (defn make-fenzo-scheduler [compute-cluster offer-incubate-time-ms fitness-calculator good-enough-fitness] (.. (TaskScheduler$Builder.) @@ -1373,6 +1375,28 @@ @(d/transact conn [{:db/id :scheduler/config :scheduler.config/mea-culpa-failure-limit limits}]))) +(defn decline-offers-safe + "Declines a collection of offers, catching exceptions" + [driver offers] + (try + (decline-offers driver (map :id offers)) + (catch Exception e + (log/error e "Unable to decline offers!")))) + +(defn receive-offers + [offers-chan match-trigger-chan driver pool-name offers] + (doseq [offer offers] + (histograms/update! (histograms/histogram (metric-title "offer-size-cpus" pool-name)) (get-in offer [:resources :cpus] 0)) + (histograms/update! (histograms/histogram (metric-title "offer-size-mem" pool-name)) (get-in offer [:resources :mem] 0))) + (if (async/offer! offers-chan offers) + (do + (counters/inc! offer-chan-depth) + (async/offer! match-trigger-chan :trigger)) ; :trigger is arbitrary, the value is ignored + (do (log/warn "Offer chan is full. Are we not handling offers fast enough?") + (meters/mark! offer-chan-full-error) + (future + (decline-offers-safe driver offers))))) + (let [in-order-queue-counter (counters/counter ["cook-mesos" "scheduler" "in-order-queue-size"]) in-order-queue-timer (timers/timer ["cook-mesos" "scheduler" "in-order-queue-delay-duration"]) parallelism 19 ; a prime number to potentially help make the distribution uniform diff --git a/scheduler/test/cook/test/scheduler/scheduler.clj b/scheduler/test/cook/test/scheduler/scheduler.clj index 32e4b7b7f7..709ec78ee6 100644 --- a/scheduler/test/cook/test/scheduler/scheduler.clj +++ b/scheduler/test/cook/test/scheduler/scheduler.clj @@ -1882,116 +1882,6 @@ (is (= (:slave-id offer-1) (:slave-id task-1))) (is (= (:slave-id offer-2) (:slave-id task-2))))))))) -(deftest test-in-order-status-update-processing - (let [status-store (atom {}) - latch (CountDownLatch. 11)] - (with-redefs [sched/handle-status-update - (fn [_ _ _ _ status] - (let [task-id (-> status :task-id :value str)] - (swap! status-store update task-id - (fn [statuses] (conj (or statuses []) - (-> status mtypes/pb->data :state))))) - (Thread/sleep (rand-int 100)) - (.countDown latch))] - (let [s (sched/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil)] - - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T2"} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-running})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T2"} :state :task-running})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-running})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-finished})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-failed})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T4"} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {} :state :task-failed})) - - (.await latch 4 TimeUnit/SECONDS) - - (is (= [:task-starting :task-failed] (->> "" (get @status-store) vec))) - (is (= [:task-starting :task-running :task-finished] (->> "T1" (get @status-store) vec))) - (is (= [:task-starting :task-running] (->> "T2" (get @status-store) vec))) - (is (= [:task-starting :task-running :task-failed] (->> "T3" (get @status-store) vec))) - (is (= [:task-starting] (->> "T4" (get @status-store) vec))))))) - -(deftest test-framework-message-processing-delegation - (let [framework-message-store (atom []) - heartbeat-store (atom []) - sandbox-store (atom [])] - (with-redefs [heartbeat/notify-heartbeat (fn [_ _ _ framework-message] - (swap! heartbeat-store conj framework-message)) - sandbox/update-sandbox (fn [_ framework-message] - (swap! sandbox-store conj framework-message)) - sched/handle-framework-message (fn [_ _ framework-message] - (swap! framework-message-store conj framework-message))] - (let [s (sched/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil) - make-message (fn [message] (-> message json/write-str str (.getBytes "UTF-8")))] - - (testing "message delegation" - (let [task-id "T1" - executor-id (-> task-id mtypes/->ExecutorID mtypes/data->pb) - m1 {"task-id" task-id} - m2 {"task-id" task-id, "timestamp" 123456, "type" "heartbeat"} - m3 {"exit-code" 0, "task-id" task-id} - m4 {"task-id" task-id, "timestamp" 123456, "type" "heartbeat"} - m5 {"sandbox" "/path/to/a/directory", "task-id" task-id, "type" "directory"}] - - (.frameworkMessage s nil executor-id nil (make-message m1)) - (.frameworkMessage s nil executor-id nil (make-message m2)) - (.frameworkMessage s nil executor-id nil (make-message m3)) - (.frameworkMessage s nil executor-id nil (make-message m4)) - (.frameworkMessage s nil executor-id nil (make-message m5)) - - (let [latch (CountDownLatch. 1)] - (sched/async-in-order-processing task-id #(.countDown latch)) - (.await latch)) - - (is (= [m1 m3] @framework-message-store)) - (is (= [m2 m4] @heartbeat-store)) - (is (= [m5] @sandbox-store)))))))) - -(deftest test-in-order-framework-message-processing - (let [messages-store (atom {}) - latch (CountDownLatch. 11)] - (with-redefs [heartbeat/notify-heartbeat (constantly true) - sched/handle-framework-message - (fn [_ _ framework-message] - (let [{:strs [message task-id]} framework-message] - (swap! messages-store update (str task-id) (fn [messages] (conj (or messages []) message)))) - (Thread/sleep (rand-int 100)) - (.countDown latch))] - (let [s (sched/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil) - foo 11 - bar 21 - fee 31 - fie 41 - make-message (fn [index message] - (-> {"message" message, "task-id" (str "T" index)} - json/write-str - str - (.getBytes "UTF-8")))] - - (.frameworkMessage s nil (-> "" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 0 foo)) - (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 foo)) - (.frameworkMessage s nil (-> "T2" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 2 foo)) - (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 bar)) - (.frameworkMessage s nil (-> "T2" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 2 bar)) - (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 foo)) - (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 bar)) - (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 fee)) - (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 fie)) - (.frameworkMessage s nil (-> "T4" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 4 foo)) - (.frameworkMessage s nil (-> "" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 0 fie)) - - (.await latch 4 TimeUnit/SECONDS) - - (is (= [foo fie] (->> "T0" (get @messages-store) vec))) - (is (= [foo bar fee] (->> "T1" (get @messages-store) vec))) - (is (= [foo bar] (->> "T2" (get @messages-store) vec))) - (is (= [foo bar fie] (->> "T3" (get @messages-store) vec))) - (is (= [foo] (->> "T4" (get @messages-store) vec))))))) - (defn- task-id->instance-entity [db-conn task-id] (let [datomic-db (d/db db-conn)] From 6b884eb73be61f894450c3d09c49f44bc908ce66 Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Fri, 31 May 2019 12:22:55 -0500 Subject: [PATCH 03/11] working on some unit tests --- scheduler/src/cook/test/testutil.clj | 17 ++++++++++++++--- scheduler/test/cook/test/compute_cluster.clj | 18 ++++++++++++------ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/scheduler/src/cook/test/testutil.clj b/scheduler/src/cook/test/testutil.clj index df752e260f..41efd3681d 100644 --- a/scheduler/src/cook/test/testutil.clj +++ b/scheduler/src/cook/test/testutil.clj @@ -38,14 +38,25 @@ (:import (java.util UUID) (org.apache.log4j ConsoleAppender Logger PatternLayout))) +(defn create-dummy-mesos-compute-cluster + [compute-cluster-name framework-id db-id driver-atom] + (mcc/->MesosComputeCluster compute-cluster-name + framework-id + db-id + driver-atom + nil ; sandbox-syncer-state + nil ; exit-code-syncer-state + nil ; mesos-heartbeat-chan + nil ; trigger-chans + )) + (defn fake-test-compute-cluster-with-driver "Create a test compute cluster with associated driver attached to it. Returns the compute cluster." [conn compute-cluster-name driver] {:pre [compute-cluster-name]} - (let [existing-compute-cluster (get @cc/cluster-name->compute-cluster-atom compute-cluster-name) - compute-cluster-mesos-map {:framework-id (str compute-cluster-name "-framework") + (let [compute-cluster-mesos-map {:framework-id (str compute-cluster-name "-framework") :compute-cluster-name compute-cluster-name} - compute-cluster (mcc/get-mesos-compute-cluster conn compute-cluster-mesos-map)] + compute-cluster (mcc/get-mesos-compute-cluster conn create-dummy-mesos-compute-cluster compute-cluster-mesos-map)] (cc/register-compute-cluster! compute-cluster) (cc/set-mesos-driver-atom-hack! compute-cluster driver) compute-cluster)) diff --git a/scheduler/test/cook/test/compute_cluster.clj b/scheduler/test/cook/test/compute_cluster.clj index aae697ca5c..5efde407a3 100644 --- a/scheduler/test/cook/test/compute_cluster.clj +++ b/scheduler/test/cook/test/compute_cluster.clj @@ -35,13 +35,17 @@ (is (= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-2)))) (testing "Create a cluster. Should be a new cluster" - (let [{id1a :db-id :as fetch-mesos-1a} (mcc/get-mesos-compute-cluster conn mesos-1)] + (let [{id1a :db-id :as fetch-mesos-1a} (mcc/get-mesos-compute-cluster testutil/create-dummy-mesos-compute-cluster + conn mesos-1)] ; This should create one cluster in the DB, but not the other. (is (not= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-1))) (is (= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-2))) - (let [{id2a :db-id :as fetch-mesos-2a} (mcc/get-mesos-compute-cluster conn mesos-2) - {id1b :db-id :as fetch-mesos-1b} (mcc/get-mesos-compute-cluster conn mesos-1) - {id2b :db-id :as fetch-mesos-2b} (mcc/get-mesos-compute-cluster conn mesos-2)] + (let [{id2a :db-id :as fetch-mesos-2a} (mcc/get-mesos-compute-cluster testutil/create-dummy-mesos-compute-cluster + conn mesos-2) + {id1b :db-id :as fetch-mesos-1b} (mcc/get-mesos-compute-cluster testutil/create-dummy-mesos-compute-cluster + conn mesos-1) + {id2b :db-id :as fetch-mesos-2b} (mcc/get-mesos-compute-cluster testutil/create-dummy-mesos-compute-cluster + conn mesos-2)] ; Should see both clusters created. (is (not= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-1))) (is (not= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-2))) @@ -70,14 +74,16 @@ :framework-id "foo-1a"} {:compute-cluster-name "foo-1" :framework-id "foo-1a"}])] - (is (thrown-with-msg? IllegalArgumentException #"Multiple" (mcc/setup-compute-cluster-map-from-config conn nil))))) + (is (thrown-with-msg? IllegalArgumentException #"Multiple" + (mcc/setup-compute-cluster-map-from-config conn nil + testutil/create-dummy-mesos-compute-cluster))))) (testing "Install the clusters per the configuration read and make sure that they can be found" (with-redefs [mcc/get-mesos-clusters-from-config (constantly [{:compute-cluster-name "foo-3" :framework-id "foo-1a"} {:compute-cluster-name "foo-4" :framework-id "foo-1a"}])] - (mcc/setup-compute-cluster-map-from-config conn nil) + (mcc/setup-compute-cluster-map-from-config conn nil testutil/create-dummy-mesos-compute-cluster) (is (< 0 (-> "foo-3" cc/compute-cluster-name->ComputeCluster cc/db-id))) (is (< 0 (-> "foo-4" cc/compute-cluster-name->ComputeCluster cc/db-id))) (is (= nil (cc/compute-cluster-name->ComputeCluster "foo-5"))))))) From 3a18836b16a64beddb66bd2749cad31429bd4d67 Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Fri, 31 May 2019 13:15:41 -0500 Subject: [PATCH 04/11] All unit test passing --- scheduler/src/cook/test/testutil.clj | 18 ++++++------ scheduler/test/cook/test/compute_cluster.clj | 22 +++++++++------ scheduler/test/cook/test/mesos/mesos_mock.clj | 3 +- scheduler/test/cook/test/zz_simulator.clj | 28 +++++++++++++++---- 4 files changed, 47 insertions(+), 24 deletions(-) diff --git a/scheduler/src/cook/test/testutil.clj b/scheduler/src/cook/test/testutil.clj index 41efd3681d..6d34d71ae2 100644 --- a/scheduler/src/cook/test/testutil.clj +++ b/scheduler/src/cook/test/testutil.clj @@ -52,14 +52,16 @@ (defn fake-test-compute-cluster-with-driver "Create a test compute cluster with associated driver attached to it. Returns the compute cluster." - [conn compute-cluster-name driver] - {:pre [compute-cluster-name]} - (let [compute-cluster-mesos-map {:framework-id (str compute-cluster-name "-framework") - :compute-cluster-name compute-cluster-name} - compute-cluster (mcc/get-mesos-compute-cluster conn create-dummy-mesos-compute-cluster compute-cluster-mesos-map)] - (cc/register-compute-cluster! compute-cluster) - (cc/set-mesos-driver-atom-hack! compute-cluster driver) - compute-cluster)) + ([conn compute-cluster-name driver] + (fake-test-compute-cluster-with-driver conn compute-cluster-name driver create-dummy-mesos-compute-cluster)) + ([conn compute-cluster-name driver mesos-compute-cluster-factory] + {:pre [compute-cluster-name]} + (let [compute-cluster-mesos-map {:framework-id (str compute-cluster-name "-framework") + :compute-cluster-name compute-cluster-name} + compute-cluster (mcc/get-mesos-compute-cluster conn mesos-compute-cluster-factory compute-cluster-mesos-map)] + (cc/register-compute-cluster! compute-cluster) + (cc/set-mesos-driver-atom-hack! compute-cluster driver) + compute-cluster))) ; The name of the fake compute cluster to use. (def fake-test-compute-cluster-name "unittest-default-compute-cluster-name") diff --git a/scheduler/test/cook/test/compute_cluster.clj b/scheduler/test/cook/test/compute_cluster.clj index 5efde407a3..0707c116e5 100644 --- a/scheduler/test/cook/test/compute_cluster.clj +++ b/scheduler/test/cook/test/compute_cluster.clj @@ -35,17 +35,21 @@ (is (= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-2)))) (testing "Create a cluster. Should be a new cluster" - (let [{id1a :db-id :as fetch-mesos-1a} (mcc/get-mesos-compute-cluster testutil/create-dummy-mesos-compute-cluster - conn mesos-1)] + (let [{id1a :db-id :as fetch-mesos-1a} (mcc/get-mesos-compute-cluster conn + testutil/create-dummy-mesos-compute-cluster + mesos-1)] ; This should create one cluster in the DB, but not the other. (is (not= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-1))) (is (= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-2))) - (let [{id2a :db-id :as fetch-mesos-2a} (mcc/get-mesos-compute-cluster testutil/create-dummy-mesos-compute-cluster - conn mesos-2) - {id1b :db-id :as fetch-mesos-1b} (mcc/get-mesos-compute-cluster testutil/create-dummy-mesos-compute-cluster - conn mesos-1) - {id2b :db-id :as fetch-mesos-2b} (mcc/get-mesos-compute-cluster testutil/create-dummy-mesos-compute-cluster - conn mesos-2)] + (let [{id2a :db-id :as fetch-mesos-2a} (mcc/get-mesos-compute-cluster conn + testutil/create-dummy-mesos-compute-cluster + mesos-2) + {id1b :db-id :as fetch-mesos-1b} (mcc/get-mesos-compute-cluster conn + testutil/create-dummy-mesos-compute-cluster + mesos-1) + {id2b :db-id :as fetch-mesos-2b} (mcc/get-mesos-compute-cluster conn + testutil/create-dummy-mesos-compute-cluster + mesos-2)] ; Should see both clusters created. (is (not= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-1))) (is (not= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-2))) @@ -60,7 +64,7 @@ (is (and id1b (< 0 id1b))) (is (and id2b (< 0 id2b))) - (is (= (dissoc fetch-mesos-1a :db-id :driver-atom) + (is (= (select-keys fetch-mesos-1a [:compute-cluster-name :framework-id]) {:compute-cluster-name "mesos-1" :framework-id "mesos-1a"}))))))) diff --git a/scheduler/test/cook/test/mesos/mesos_mock.clj b/scheduler/test/cook/test/mesos/mesos_mock.clj index 35d3b52829..595fdce607 100644 --- a/scheduler/test/cook/test/mesos/mesos_mock.clj +++ b/scheduler/test/cook/test/mesos/mesos_mock.clj @@ -672,9 +672,8 @@ hosts (for [_ (range num-hosts)] (dummy-host :mem {"*" mem} :cpus {"*" cpus} :ports {"*" ports})) offer-trigger-chan (chime-ch (util/time-seq (t/now) (t/millis 50))) - make-mesos-driver-fn (fn [scheduler _] ;; _ is framework-id + make-mesos-driver-fn (fn [config scheduler framework-id] ;; _ is framework-id (mm/mesos-mock hosts offer-trigger-chan scheduler))] - (testutil/setup-fake-test-compute-cluster mesos-datomic-conn) (with-cook-scheduler mesos-datomic-conn make-mesos-driver-fn {} (share/set-share! mesos-datomic-conn "default" nil "new cluster settings" diff --git a/scheduler/test/cook/test/zz_simulator.clj b/scheduler/test/cook/test/zz_simulator.clj index e9273887b4..ea125bd5f0 100644 --- a/scheduler/test/cook/test/zz_simulator.clj +++ b/scheduler/test/cook/test/zz_simulator.clj @@ -18,6 +18,7 @@ [clojure.walk :refer (keywordize-keys)] [com.rpl.specter :refer (transform ALL MAP-VALS MAP-KEYS select FIRST)] [cook.config :refer (executor-config, init-logger)] + [cook.datomic :as datomic] [cook.mesos :as c] [cook.mesos.mesos-mock :as mm] [cook.scheduler.share :as share] @@ -25,7 +26,8 @@ [cook.plugins.completion :as completion] [cook.test.testutil :as testutil :refer (restore-fresh-database! poll-until)] [datomic.api :as d] - [plumbing.core :refer (map-vals map-keys map-from-vals)]) + [plumbing.core :refer (map-vals map-keys map-from-vals)] + [cook.mesos.mesos-compute-cluster :as mcc]) (:import java.util.Date org.apache.curator.framework.CuratorFrameworkFactory org.apache.curator.framework.state.ConnectionStateListener @@ -126,14 +128,30 @@ optimizer-config# (or (:optimizer-config ~scheduler-config) {}) trigger-chans# (or (:trigger-chans ~scheduler-config) - (c/make-trigger-chans rebalancer-config# progress-config# optimizer-config# task-constraints#))] + (c/make-trigger-chans rebalancer-config# progress-config# optimizer-config# task-constraints#)) + mesos-heartbeat-chan# (async/chan 1024) + create-compute-cluster# (fn [compute-cluster-name# framework-id# db-id# driver-atom#] + (mcc/->MesosComputeCluster compute-cluster-name# + framework-id# + db-id# + driver-atom# + sandbox-syncer-state# + exit-code-syncer-state# + mesos-heartbeat-chan# + trigger-chans#))] (try (with-redefs [executor-config (constantly executor-config#) completion/plugin completion/no-op ; This initializatioon is needed so the code to validate that the ; registration responses matches the configured cook scheduler passes simulator ; and mesos-mock unit tests. (cook.scheduler, lines 1428 create-mesos-scheduler) - cook.config/framework-id-config (constantly framework-id#)] + cook.config/framework-id-config (constantly framework-id#) + mcc/make-mesos-driver ~make-mesos-driver-fn + datomic/conn ~conn] + (testutil/fake-test-compute-cluster-with-driver ~conn + testutil/fake-test-compute-cluster-name + nil ; no dummy driver - simulator is going to call initialize + create-compute-cluster#) (c/start-leader-selector {:curator-framework curator-framework# :exit-code-syncer-state exit-code-syncer-state# @@ -144,6 +162,7 @@ :mea-culpa-failure-limit mea-culpa-failure-limit# :mesos-datomic-conn ~conn :mesos-datomic-mult mesos-mult# + :mesos-heartbeat-chan mesos-heartbeat-chan# :mesos-leadership-atom mesos-leadership-atom# :pool-name->pending-jobs-atom pool-name->pending-jobs-atom# :mesos-run-as-user nil @@ -336,7 +355,7 @@ rebalancer-trigger-chan (async/chan) optimizer-trigger-chan (async/chan) state-atom (atom {}) - make-mesos-driver-fn (fn [scheduler _] + make-mesos-driver-fn (fn [config scheduler framework-id] (mm/mesos-mock mesos-hosts offer-trigger-chan scheduler :task->runtime-ms task->runtime-ms :task->complete-status task->complete-status @@ -390,7 +409,6 @@ (DateTimeUtils/setCurrentMillisFixed simulation-time) (log/info "Starting simulation at" simulation-time) ;; launch the simulator - (testutil/setup-fake-test-compute-cluster mesos-datomic-conn) (with-cook-scheduler mesos-datomic-conn make-mesos-driver-fn From 67b2d98c7c590a6b08335bbdb4a6113ea441d4c1 Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Fri, 31 May 2019 13:25:04 -0500 Subject: [PATCH 05/11] cleaning up unused args --- scheduler/src/cook/mesos.clj | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/scheduler/src/cook/mesos.clj b/scheduler/src/cook/mesos.clj index 2abf6b570c..af1b43ab53 100644 --- a/scheduler/src/cook/mesos.clj +++ b/scheduler/src/cook/mesos.clj @@ -132,10 +132,10 @@ framework-id -- str, the Mesos framework id from the cook settings fenzo-config -- map, config for fenzo, See scheduler/docs/configuration.adoc for more details sandbox-syncer-state -- map, representing the sandbox syncer object" - [{:keys [curator-framework exit-code-syncer-state fenzo-config framework-id gpu-enabled? make-mesos-driver-fn - mea-culpa-failure-limit mesos-datomic-conn mesos-datomic-mult mesos-leadership-atom pool-name->pending-jobs-atom - mesos-run-as-user agent-attributes-cache offer-incubate-time-ms optimizer-config progress-config rebalancer-config - sandbox-syncer-state server-config task-constraints trigger-chans zk-prefix mesos-heartbeat-chan]}] + [{:keys [curator-framework fenzo-config mea-culpa-failure-limit mesos-datomic-conn mesos-datomic-mult + mesos-heartbeat-chan mesos-leadership-atom pool-name->pending-jobs-atom mesos-run-as-user agent-attributes-cache + offer-incubate-time-ms optimizer-config rebalancer-config server-config task-constraints trigger-chans + zk-prefix]}] (let [{:keys [fenzo-fitness-calculator fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback good-enough-fitness]} fenzo-config {:keys [cancelled-task-trigger-chan lingering-task-trigger-chan optimizer-trigger-chan @@ -161,22 +161,18 @@ (sched/create-datomic-scheduler {:conn mesos-datomic-conn :compute-cluster compute-cluster - :exit-code-syncer-state exit-code-syncer-state :fenzo-fitness-calculator fenzo-fitness-calculator :fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset :fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn :fenzo-max-jobs-considered fenzo-max-jobs-considered :fenzo-scaleback fenzo-scaleback :good-enough-fitness good-enough-fitness - :gpu-enabled? gpu-enabled? :mea-culpa-failure-limit mea-culpa-failure-limit :mesos-run-as-user mesos-run-as-user :agent-attributes-cache agent-attributes-cache :offer-incubate-time-ms offer-incubate-time-ms :pool-name->pending-jobs-atom pool-name->pending-jobs-atom - :progress-config progress-config :rebalancer-reservation-atom rebalancer-reservation-atom - :sandbox-syncer-state sandbox-syncer-state :task-constraints task-constraints :trigger-chans trigger-chans}) cluster-leadership-promise (cc/initialize-cluster compute-cluster From 0c6ade18e37e9eae9073dc34a855ade0164260a6 Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Fri, 31 May 2019 14:14:37 -0500 Subject: [PATCH 06/11] Clean up some formatting --- scheduler/src/cook/components.clj | 98 +++++++++---------- scheduler/src/cook/compute_cluster.clj | 20 +--- scheduler/src/cook/mesos.clj | 3 +- .../src/cook/mesos/mesos_compute_cluster.clj | 58 +++++------ scheduler/src/cook/progress.clj | 1 - scheduler/src/cook/scheduler/scheduler.clj | 10 +- scheduler/src/cook/test/testutil.clj | 4 +- scheduler/test/cook/test/zz_simulator.clj | 7 -- 8 files changed, 82 insertions(+), 119 deletions(-) diff --git a/scheduler/src/cook/components.clj b/scheduler/src/cook/components.clj index 3906f95721..141e053582 100644 --- a/scheduler/src/cook/components.clj +++ b/scheduler/src/cook/components.clj @@ -13,7 +13,8 @@ ;; limitations under the License. ;; (ns cook.components - (:require [clojure.core.cache :as cache] + (:require [clojure.core.async :as async] + [clojure.core.cache :as cache] [clojure.pprint :refer (pprint)] [clojure.tools.logging :as log] [compojure.core :refer (GET POST routes context)] @@ -24,21 +25,21 @@ [cook.rest.cors :as cors] [cook.curator :as curator] [cook.datomic :as datomic] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.completion namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.completion namespace. [cook.plugins.completion] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.submission namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.submission namespace. [cook.plugins.submission] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.file namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.file namespace. [cook.plugins.file] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.launch namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.launch namespace. [cook.plugins.launch] - ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.pool namespace. + ; This explicit require is needed so that mount can see the defstate defined in the cook.plugins.pool namespace. [cook.plugins.pool] [cook.rest.impersonation :refer (impersonation-authorized-wrapper)] [cook.pool :as pool] - ; This explicit require is needed so that mount can see the defstate defined in the cook.rate-limit namespace. - ; cook.rate-limit and everything else under cook.rest.api is normally hidden from mount's defstate because - ; cook.rest.api is loaded via util/lazy-load-var, not via 'ns :require' + ; This explicit require is needed so that mount can see the defstate defined in the cook.rate-limit namespace. + ; cook.rate-limit and everything else under cook.rest.api is normally hidden from mount's defstate because + ; cook.rest.api is loaded via util/lazy-load-var, not via 'ns :require' [cook.rate-limit] [cook.util :as util] [datomic.api :as d] @@ -51,8 +52,7 @@ [ring.middleware.params :refer (wrap-params)] [ring.middleware.stacktrace :refer (wrap-stacktrace)] [ring.util.mime-type] - [ring.util.response :refer (response)] - [clojure.core.async :as async]) + [ring.util.response :refer (response)]) (:import clojure.core.async.impl.channels.ManyToManyChannel java.io.IOException java.security.Principal @@ -96,13 +96,11 @@ (def mesos-scheduler {:mesos-scheduler (fnk [[:settings fenzo-fitness-calculator fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback - good-enough-fitness hostname mea-culpa-failure-limit mesos-failover-timeout mesos-framework-name - mesos-gpu-enabled mesos-leader-path mesos-master mesos-principal - mesos-role mesos-run-as-user offer-incubate-time-ms optimizer progress rebalancer server-port - task-constraints] - curator-framework exit-code-syncer-state framework-id mesos-datomic-mult mesos-leadership-atom - mesos-agent-attributes-cache pool-name->pending-jobs-atom sandbox-syncer-state mesos-heartbeat-chan - compute-clusters trigger-chans] + good-enough-fitness hostname mea-culpa-failure-limit mesos-leader-path mesos-run-as-user + offer-incubate-time-ms optimizer rebalancer server-port task-constraints] + curator-framework mesos-datomic-mult mesos-leadership-atom + mesos-agent-attributes-cache pool-name->pending-jobs-atom mesos-heartbeat-chan + trigger-chans] (if (cook.config/api-only-mode?) (if curator-framework (throw (ex-info "This node is configured for API-only mode, but also has a curator configured" @@ -111,41 +109,35 @@ (if curator-framework (do (log/info "Initializing mesos scheduler") - (let [] - (try - (Class/forName "org.apache.mesos.Scheduler") - ((util/lazy-load-var 'cook.mesos/start-leader-selector) - {:curator-framework curator-framework - :exit-code-syncer-state exit-code-syncer-state - :fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered - :fenzo-scaleback fenzo-scaleback - :fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn - :fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset - :fenzo-fitness-calculator fenzo-fitness-calculator - :good-enough-fitness good-enough-fitness} - :framework-id framework-id - :gpu-enabled? mesos-gpu-enabled - :mea-culpa-failure-limit mea-culpa-failure-limit - :mesos-datomic-conn datomic/conn - :mesos-datomic-mult mesos-datomic-mult - :mesos-heartbeat-chan mesos-heartbeat-chan - :mesos-leadership-atom mesos-leadership-atom - :pool-name->pending-jobs-atom pool-name->pending-jobs-atom - :mesos-run-as-user mesos-run-as-user - :agent-attributes-cache mesos-agent-attributes-cache - :offer-incubate-time-ms offer-incubate-time-ms - :optimizer-config optimizer - :progress-config progress - :rebalancer-config rebalancer - :sandbox-syncer-state sandbox-syncer-state - :server-config {:hostname hostname - :server-port server-port} - :task-constraints task-constraints - :trigger-chans trigger-chans - :zk-prefix mesos-leader-path}) - (catch ClassNotFoundException e - (log/warn e "Not loading mesos support...") - nil)))) + (try + (Class/forName "org.apache.mesos.Scheduler") + ((util/lazy-load-var 'cook.mesos/start-leader-selector) + {:curator-framework curator-framework + :fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered + :fenzo-scaleback fenzo-scaleback + :fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn + :fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset + :fenzo-fitness-calculator fenzo-fitness-calculator + :good-enough-fitness good-enough-fitness} + :mea-culpa-failure-limit mea-culpa-failure-limit + :mesos-datomic-conn datomic/conn + :mesos-datomic-mult mesos-datomic-mult + :mesos-heartbeat-chan mesos-heartbeat-chan + :mesos-leadership-atom mesos-leadership-atom + :pool-name->pending-jobs-atom pool-name->pending-jobs-atom + :mesos-run-as-user mesos-run-as-user + :agent-attributes-cache mesos-agent-attributes-cache + :offer-incubate-time-ms offer-incubate-time-ms + :optimizer-config optimizer + :rebalancer-config rebalancer + :server-config {:hostname hostname + :server-port server-port} + :task-constraints task-constraints + :trigger-chans trigger-chans + :zk-prefix mesos-leader-path}) + (catch ClassNotFoundException e + (log/warn e "Not loading mesos support...") + nil))) (throw (ex-info "This node does not have a curator configured" {})))))}) (defn health-check-middleware diff --git a/scheduler/src/cook/compute_cluster.clj b/scheduler/src/cook/compute_cluster.clj index aa781a9cc9..4c45ce41b4 100644 --- a/scheduler/src/cook/compute_cluster.clj +++ b/scheduler/src/cook/compute_cluster.clj @@ -17,18 +17,7 @@ (:require [clojure.tools.logging :as log] [cook.config :as config] [cook.datomic] - [datomic.api :as d] - [clojure.core.async :as async] - [cook.mesos.sandbox :as sandbox] - [cook.plugins.pool :as pool-plugin] - [mesomatic.scheduler :as mesos] - [clojure.data.json :as json] - [cook.mesos.heartbeat :as heartbeat] - [cook.plugins.definitions :as plugins] - [metrics.meters :as meters] - [plumbing.core :as pc] - [metrics.histograms :as histograms] - [metrics.counters :as counters])) + [datomic.api :as d])) (defprotocol ComputeCluster ; These methods should accept bulk data and process in batches. @@ -39,12 +28,11 @@ (db-id [this] "Get a database entity-id for this compute cluster (used for putting it into a task structure).") - (initialize-cluster [this pool->fenzo pool->offers-chan]) + (initialize-cluster [this pool->fenzo pool->offers-chan] + "Initializes the cluster. Returns a channel that will be delivered on when the cluster loses leadership.") (get-mesos-driver-hack [this] - "Get the mesos driver. Hack; any funciton invoking this should be put within the compute-cluster implementation") - (set-mesos-driver-atom-hack! [this driver] - "Hack to overwrite the driver. Used until we fix the initialization order of compute-cluster")) + "Get the mesos driver. Hack; any funciton invoking this should be put within the compute-cluster implementation")) ; Internal method (defn write-compute-cluster diff --git a/scheduler/src/cook/mesos.clj b/scheduler/src/cook/mesos.clj index af1b43ab53..35087844a2 100644 --- a/scheduler/src/cook/mesos.clj +++ b/scheduler/src/cook/mesos.clj @@ -213,8 +213,7 @@ (cook.scheduler.scheduler/monitor-tx-report-queue datomic-report-chan mesos-datomic-conn) (let [res (async/fenzo pool->offers-chan] (let [settings (:settings config/config) mesos-config (select-keys settings [:mesos-master - :mesos-failover-timeout - :mesos-principal - :mesos-role - :mesos-framework-name - :gpu-enabled?]) + :mesos-failover-timeout + :mesos-principal + :mesos-role + :mesos-framework-name + :gpu-enabled?]) progress-config (:progress settings) conn cook.datomic/conn {:keys [match-trigger-chan progress-updater-trigger-chan]} trigger-chans @@ -230,7 +226,9 @@ (try (mesomatic.scheduler/join! driver) (catch Exception e - e)))))) + e) + (finally + (reset! driver-atom nil))))))) ; Internal method (defn- mesos-cluster->compute-cluster-map-for-datomic @@ -261,16 +259,18 @@ (defn get-mesos-compute-cluster "Process one mesos cluster specification, returning the entity id of the corresponding compute-cluster, creating the cluster if it does not exist. Warning: Not idempotent. Only call once " - [conn create-mesos-compute-cluster {:keys [compute-cluster-name framework-id] :as mesos-cluster}] - {:pre [compute-cluster-name - framework-id]} - (let [cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)] - (when-not cluster-entity-id - (cc/write-compute-cluster conn (mesos-cluster->compute-cluster-map-for-datomic mesos-cluster))) - (create-mesos-compute-cluster compute-cluster-name - framework-id - (or cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)) - (atom nil)))) + ([conn create-mesos-compute-cluster mesos-cluster] + (get-mesos-compute-cluster conn create-mesos-compute-cluster mesos-cluster nil)) + ([conn create-mesos-compute-cluster {:keys [compute-cluster-name framework-id] :as mesos-cluster} driver] + {:pre [compute-cluster-name + framework-id]} + (let [cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)] + (when-not cluster-entity-id + (cc/write-compute-cluster conn (mesos-cluster->compute-cluster-map-for-datomic mesos-cluster))) + (create-mesos-compute-cluster compute-cluster-name + framework-id + (or cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)) + (atom driver))))) (defn- get-mesos-clusters-from-config "Get all of the mesos clusters defined in the configuration. diff --git a/scheduler/src/cook/progress.clj b/scheduler/src/cook/progress.clj index 60cb8a53c9..071277650b 100644 --- a/scheduler/src/cook/progress.clj +++ b/scheduler/src/cook/progress.clj @@ -78,7 +78,6 @@ Note: the wrapper chan is used due to our use of `util/xform-pipe`" [{:keys [pending-threshold publish-interval-ms sequence-cache-threshold]} progress-state-chan] - (log/info "In aggregator: " pending-threshold publish-interval-ms sequence-cache-threshold progress-state-chan) (log/info "Starting progress update aggregator") (let [progress-aggregator-chan (async/chan (async/sliding-buffer pending-threshold)) sequence-cache-store (-> {} diff --git a/scheduler/src/cook/scheduler/scheduler.clj b/scheduler/src/cook/scheduler/scheduler.clj index 2d1457e88f..4ddf14728b 100644 --- a/scheduler/src/cook/scheduler/scheduler.clj +++ b/scheduler/src/cook/scheduler/scheduler.clj @@ -17,10 +17,8 @@ (:require [chime :refer [chime-at chime-ch]] [clj-time.coerce :as tc] [clj-time.core :as time] - [clj-time.periodic :as periodic] [clojure.core.async :as async] [clojure.core.cache :as cache] - [clojure.data.json :as json] [clojure.edn :as edn] [clojure.string :as str] [clojure.tools.logging :as log] @@ -30,18 +28,14 @@ [cook.plugins.completion :as completion] [cook.plugins.definitions :as plugins] [cook.plugins.launch :as launch-plugin] - [cook.plugins.pool :as pool-plugin] [cook.scheduler.constraints :as constraints] [cook.scheduler.data-locality :as dl] [cook.scheduler.dru :as dru] [cook.scheduler.fenzo-utils :as fenzo] [cook.group :as group] - [cook.mesos.heartbeat :as heartbeat] [cook.pool :as pool] - [cook.progress :as progress] [cook.quota :as quota] [cook.mesos.reason :as reason] - [cook.mesos.sandbox :as sandbox] [cook.scheduler.share :as share] [cook.mesos.task :as task] [cook.tools :as util] @@ -56,8 +50,7 @@ [metrics.meters :as meters] [metrics.timers :as timers] [plumbing.core :as pc]) - (import [com.netflix.fenzo ConstraintEvaluator ConstraintEvaluator$Result - TaskAssignmentResult TaskRequest TaskScheduler TaskScheduler$Builder + (import [com.netflix.fenzo TaskAssignmentResult TaskRequest TaskScheduler TaskScheduler$Builder VirtualMachineLease VirtualMachineLease$Range VirtualMachineCurrentState] [com.netflix.fenzo.functions Action1 Func1])) @@ -552,7 +545,6 @@ (meters/defmeter [cook-mesos scheduler scheduler-offer-declined]) -; TODO (pschorf): Delete (defn decline-offers "declines a collection of offer ids" [driver offer-ids] diff --git a/scheduler/src/cook/test/testutil.clj b/scheduler/src/cook/test/testutil.clj index 6d34d71ae2..c2057da319 100644 --- a/scheduler/src/cook/test/testutil.clj +++ b/scheduler/src/cook/test/testutil.clj @@ -58,9 +58,9 @@ {:pre [compute-cluster-name]} (let [compute-cluster-mesos-map {:framework-id (str compute-cluster-name "-framework") :compute-cluster-name compute-cluster-name} - compute-cluster (mcc/get-mesos-compute-cluster conn mesos-compute-cluster-factory compute-cluster-mesos-map)] + compute-cluster (mcc/get-mesos-compute-cluster conn mesos-compute-cluster-factory compute-cluster-mesos-map + driver)] (cc/register-compute-cluster! compute-cluster) - (cc/set-mesos-driver-atom-hack! compute-cluster driver) compute-cluster))) ; The name of the fake compute cluster to use. diff --git a/scheduler/test/cook/test/zz_simulator.clj b/scheduler/test/cook/test/zz_simulator.clj index ea125bd5f0..a410031da5 100644 --- a/scheduler/test/cook/test/zz_simulator.clj +++ b/scheduler/test/cook/test/zz_simulator.clj @@ -113,7 +113,6 @@ :executable true :extract false :value "file:///path/to/cook/executor"}} - gpu-enabled?# (or (:gpus-enabled? ~scheduler-config) false) progress-config# {:batch-size 100 :pending-threshold 1000 :publish-interval-ms 2000 @@ -154,11 +153,7 @@ create-compute-cluster#) (c/start-leader-selector {:curator-framework curator-framework# - :exit-code-syncer-state exit-code-syncer-state# :fenzo-config fenzo-config# - :framework-id framework-id# - :gpu-enabled? gpu-enabled?# - :make-mesos-driver-fn ~make-mesos-driver-fn :mea-culpa-failure-limit mea-culpa-failure-limit# :mesos-datomic-conn ~conn :mesos-datomic-mult mesos-mult# @@ -169,9 +164,7 @@ :agent-attributes-cache agent-attributes-cache# :offer-incubate-time-ms offer-incubate-time-ms# :optimizer-config optimizer-config# - :progress-config progress-config# :rebalancer-config rebalancer-config# - :sandbox-syncer-state sandbox-syncer-state# :server-config host-settings# :task-constraints task-constraints# :trigger-chans trigger-chans# From 1e3380188249f484839eb5e775ec9b5fba9c7146 Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Fri, 31 May 2019 14:40:13 -0500 Subject: [PATCH 07/11] fix components --- scheduler/config.edn | 2 +- scheduler/src/cook/components.clj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/config.edn b/scheduler/config.edn index 5883b4b203..d6bafb9031 100644 --- a/scheduler/config.edn +++ b/scheduler/config.edn @@ -48,7 +48,7 @@ :master #config/env "MESOS_MASTER"} :nrepl {:enabled? true :port #config/env-int "COOK_NREPL_PORT"} - :pools {:default "gamma"} + ; :pools {:default "gamma"} :port #config/env-int "COOK_PORT" :ssl {:port #config/env-int "COOK_SSL_PORT" :keystore-path #config/env "COOK_KEYSTORE_PATH" diff --git a/scheduler/src/cook/components.clj b/scheduler/src/cook/components.clj index 141e053582..40cba5c75c 100644 --- a/scheduler/src/cook/components.clj +++ b/scheduler/src/cook/components.clj @@ -98,7 +98,7 @@ fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback good-enough-fitness hostname mea-culpa-failure-limit mesos-leader-path mesos-run-as-user offer-incubate-time-ms optimizer rebalancer server-port task-constraints] - curator-framework mesos-datomic-mult mesos-leadership-atom + compute-clusters curator-framework mesos-datomic-mult mesos-leadership-atom mesos-agent-attributes-cache pool-name->pending-jobs-atom mesos-heartbeat-chan trigger-chans] (if (cook.config/api-only-mode?) From 116d508d7240cb44e96373164e4c9ba22319a63f Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Tue, 11 Jun 2019 09:54:03 -0500 Subject: [PATCH 08/11] Feedback for code review --- scheduler/config.edn | 2 +- scheduler/src/cook/compute_cluster.clj | 3 + scheduler/src/cook/mesos.clj | 11 +- .../src/cook/mesos/mesos_compute_cluster.clj | 31 ++--- scheduler/src/cook/test/testutil.clj | 7 +- .../cook/test/mesos/mesos_compute_cluster.clj | 118 ++++++++++++++++++ scheduler/test/cook/test/zz_simulator.clj | 3 +- 7 files changed, 152 insertions(+), 23 deletions(-) create mode 100644 scheduler/test/cook/test/mesos/mesos_compute_cluster.clj diff --git a/scheduler/config.edn b/scheduler/config.edn index d6bafb9031..5883b4b203 100644 --- a/scheduler/config.edn +++ b/scheduler/config.edn @@ -48,7 +48,7 @@ :master #config/env "MESOS_MASTER"} :nrepl {:enabled? true :port #config/env-int "COOK_NREPL_PORT"} - ; :pools {:default "gamma"} + :pools {:default "gamma"} :port #config/env-int "COOK_PORT" :ssl {:port #config/env-int "COOK_SSL_PORT" :keystore-path #config/env "COOK_KEYSTORE_PATH" diff --git a/scheduler/src/cook/compute_cluster.clj b/scheduler/src/cook/compute_cluster.clj index 4c45ce41b4..2aa8b0baa9 100644 --- a/scheduler/src/cook/compute_cluster.clj +++ b/scheduler/src/cook/compute_cluster.clj @@ -31,6 +31,9 @@ (initialize-cluster [this pool->fenzo pool->offers-chan] "Initializes the cluster. Returns a channel that will be delivered on when the cluster loses leadership.") + (current-leader? [this] + "Returns true if this cook instance is currently the leader for the compute cluster") + (get-mesos-driver-hack [this] "Get the mesos driver. Hack; any funciton invoking this should be put within the compute-cluster implementation")) diff --git a/scheduler/src/cook/mesos.clj b/scheduler/src/cook/mesos.clj index 35087844a2..53c24a7b76 100644 --- a/scheduler/src/cook/mesos.clj +++ b/scheduler/src/cook/mesos.clj @@ -134,7 +134,7 @@ sandbox-syncer-state -- map, representing the sandbox syncer object" [{:keys [curator-framework fenzo-config mea-culpa-failure-limit mesos-datomic-conn mesos-datomic-mult mesos-heartbeat-chan mesos-leadership-atom pool-name->pending-jobs-atom mesos-run-as-user agent-attributes-cache - offer-incubate-time-ms optimizer-config rebalancer-config server-config task-constraints trigger-chans + offer-incubate-time-ms optimizer-config rebalancer-config server-config task-constraints trigger-chans zk-prefix]}] (let [{:keys [fenzo-fitness-calculator fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback good-enough-fitness]} fenzo-config @@ -175,7 +175,7 @@ :rebalancer-reservation-atom rebalancer-reservation-atom :task-constraints task-constraints :trigger-chans trigger-chans}) - cluster-leadership-promise (cc/initialize-cluster compute-cluster + cluster-leadership-chan (cc/initialize-cluster compute-cluster pool-name->fenzo pool->offers-chan)] (cook.monitor/start-collecting-stats) @@ -211,13 +211,16 @@ (counters/inc! mesos-leader) (async/tap mesos-datomic-mult datomic-report-chan) (cook.scheduler.scheduler/monitor-tx-report-queue datomic-report-chan mesos-datomic-conn) - (let [res (async/fenzo pool->offers-chan match-trigger-chan - handle-exit-code handle-progress-message sandbox-syncer-state compute-cluster] - (let [configured-framework-id (cook.config/framework-id-config) - sync-agent-sandboxes-fn #(sandbox/sync-agent-sandboxes sandbox-syncer-state configured-framework-id %1 %2) + handle-exit-code handle-progress-message sandbox-syncer-state framework-id compute-cluster] + (let [sync-agent-sandboxes-fn #(sandbox/sync-agent-sandboxes sandbox-syncer-state framework-id %1 %2) message-handlers {:handle-exit-code handle-exit-code :handle-progress-message handle-progress-message}] (mesos/scheduler (registered - [this driver framework-id master-info] - (log/info "Registered with mesos with framework-id " framework-id) - (let [value (-> framework-id mesomatic.types/pb->data :value)] - (when (not= configured-framework-id value) + [this driver mesos-framework-id master-info] + (log/info "Registered with mesos with framework-id " mesos-framework-id) + (let [value (-> mesos-framework-id mesomatic.types/pb->data :value)] + (when (not= framework-id value) (let [message (str "The framework-id provided by Mesos (" value ") " - "does not match the one Cook is configured with (" configured-framework-id ")")] + "does not match the one Cook is configured with (" framework-id ")")] (log/error message) - (throw (ex-info message {:framework-id-mesos value :framework-id-cook configured-framework-id}))))) + (throw (ex-info message {:framework-id-mesos value :framework-id-cook framework-id}))))) (when (and gpu-enabled? (not (re-matches #"1\.\d+\.\d+" (:version master-info)))) (binding [*out* *err*] (println "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info))) @@ -189,6 +188,10 @@ (task/compile-mesos-messages framework-id offers task-metadata-seq))) (db-id [this] db-id) + + (current-leader? [this] + (not (nil? @driver-atom))) + (initialize-cluster [this pool->fenzo pool->offers-chan] (let [settings (:settings config/config) mesos-config (select-keys settings [:mesos-master @@ -216,8 +219,8 @@ handle-exit-code handle-progress-message sandbox-syncer-state + framework-id this) - framework-id (config/framework-id-config) _ (log/info "Initializing mesos driver with config: " mesos-config) driver (make-mesos-driver mesos-config scheduler framework-id)] (mesomatic.scheduler/start! driver) @@ -259,15 +262,15 @@ (defn get-mesos-compute-cluster "Process one mesos cluster specification, returning the entity id of the corresponding compute-cluster, creating the cluster if it does not exist. Warning: Not idempotent. Only call once " - ([conn create-mesos-compute-cluster mesos-cluster] - (get-mesos-compute-cluster conn create-mesos-compute-cluster mesos-cluster nil)) - ([conn create-mesos-compute-cluster {:keys [compute-cluster-name framework-id] :as mesos-cluster} driver] + ([conn mesos-compute-cluster-factory mesos-cluster] + (get-mesos-compute-cluster conn mesos-compute-cluster-factory mesos-cluster nil)) + ([conn mesos-compute-cluster-factory {:keys [compute-cluster-name framework-id] :as mesos-cluster} driver] ; driver argument for unit tests {:pre [compute-cluster-name framework-id]} (let [cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)] (when-not cluster-entity-id (cc/write-compute-cluster conn (mesos-cluster->compute-cluster-map-for-datomic mesos-cluster))) - (create-mesos-compute-cluster compute-cluster-name + (mesos-compute-cluster-factory compute-cluster-name framework-id (or cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)) (atom driver))))) diff --git a/scheduler/src/cook/test/testutil.clj b/scheduler/src/cook/test/testutil.clj index c2057da319..5539aff157 100644 --- a/scheduler/src/cook/test/testutil.clj +++ b/scheduler/src/cook/test/testutil.clj @@ -53,10 +53,11 @@ (defn fake-test-compute-cluster-with-driver "Create a test compute cluster with associated driver attached to it. Returns the compute cluster." ([conn compute-cluster-name driver] - (fake-test-compute-cluster-with-driver conn compute-cluster-name driver create-dummy-mesos-compute-cluster)) - ([conn compute-cluster-name driver mesos-compute-cluster-factory] + (fake-test-compute-cluster-with-driver conn compute-cluster-name driver create-dummy-mesos-compute-cluster + (str compute-cluster-name "-framework"))) + ([conn compute-cluster-name driver mesos-compute-cluster-factory framework-id] {:pre [compute-cluster-name]} - (let [compute-cluster-mesos-map {:framework-id (str compute-cluster-name "-framework") + (let [compute-cluster-mesos-map {:framework-id framework-id :compute-cluster-name compute-cluster-name} compute-cluster (mcc/get-mesos-compute-cluster conn mesos-compute-cluster-factory compute-cluster-mesos-map driver)] diff --git a/scheduler/test/cook/test/mesos/mesos_compute_cluster.clj b/scheduler/test/cook/test/mesos/mesos_compute_cluster.clj new file mode 100644 index 0000000000..e9134ab404 --- /dev/null +++ b/scheduler/test/cook/test/mesos/mesos_compute_cluster.clj @@ -0,0 +1,118 @@ +(ns cook.test.mesos.mesos-compute-cluster + (:require [clojure.test :refer :all] + [cook.scheduler.scheduler :as sched] + [cook.mesos.mesos-compute-cluster :as mcc] + [mesomatic.types :as mtypes] + [clojure.data.json :as json] + [cook.mesos.sandbox :as sandbox] + [cook.mesos.heartbeat :as heartbeat]) + (:import (java.util.concurrent CountDownLatch TimeUnit))) + +(deftest test-in-order-status-update-processing + (let [status-store (atom {}) + latch (CountDownLatch. 11)] + (with-redefs [sched/handle-status-update + (fn [_ _ _ _ status] + (let [task-id (-> status :task-id :value str)] + (swap! status-store update task-id + (fn [statuses] (conj (or statuses []) + (-> status mtypes/pb->data :state))))) + (Thread/sleep (rand-int 100)) + (.countDown latch))] + (let [s (mcc/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil nil)] + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T2"} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-running})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T2"} :state :task-running})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-running})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-finished})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-failed})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T4"} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {} :state :task-failed})) + + (.await latch 4 TimeUnit/SECONDS) + + (is (= [:task-starting :task-failed] (->> "" (get @status-store) vec))) + (is (= [:task-starting :task-running :task-finished] (->> "T1" (get @status-store) vec))) + (is (= [:task-starting :task-running] (->> "T2" (get @status-store) vec))) + (is (= [:task-starting :task-running :task-failed] (->> "T3" (get @status-store) vec))) + (is (= [:task-starting] (->> "T4" (get @status-store) vec))))))) + +(deftest test-framework-message-processing-delegation + (let [framework-message-store (atom []) + heartbeat-store (atom []) + sandbox-store (atom [])] + (with-redefs [heartbeat/notify-heartbeat (fn [_ _ _ framework-message] + (swap! heartbeat-store conj framework-message)) + sandbox/update-sandbox (fn [_ framework-message] + (swap! sandbox-store conj framework-message)) + sched/handle-framework-message (fn [_ _ framework-message] + (swap! framework-message-store conj framework-message))] + (let [s (mcc/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil nil) + make-message (fn [message] (-> message json/write-str str (.getBytes "UTF-8")))] + + (testing "message delegation" + (let [task-id "T1" + executor-id (-> task-id mtypes/->ExecutorID mtypes/data->pb) + m1 {"task-id" task-id} + m2 {"task-id" task-id, "timestamp" 123456, "type" "heartbeat"} + m3 {"exit-code" 0, "task-id" task-id} + m4 {"task-id" task-id, "timestamp" 123456, "type" "heartbeat"} + m5 {"sandbox" "/path/to/a/directory", "task-id" task-id, "type" "directory"}] + + (.frameworkMessage s nil executor-id nil (make-message m1)) + (.frameworkMessage s nil executor-id nil (make-message m2)) + (.frameworkMessage s nil executor-id nil (make-message m3)) + (.frameworkMessage s nil executor-id nil (make-message m4)) + (.frameworkMessage s nil executor-id nil (make-message m5)) + + (let [latch (CountDownLatch. 1)] + (sched/async-in-order-processing task-id #(.countDown latch)) + (.await latch)) + + (is (= [m1 m3] @framework-message-store)) + (is (= [m2 m4] @heartbeat-store)) + (is (= [m5] @sandbox-store)))))))) + +(deftest test-in-order-framework-message-processing + (let [messages-store (atom {}) + latch (CountDownLatch. 11)] + (with-redefs [heartbeat/notify-heartbeat (constantly true) + sched/handle-framework-message + (fn [_ _ framework-message] + (let [{:strs [message task-id]} framework-message] + (swap! messages-store update (str task-id) (fn [messages] (conj (or messages []) message)))) + (Thread/sleep (rand-int 100)) + (.countDown latch))] + (let [s (mcc/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil nil) + foo 11 + bar 21 + fee 31 + fie 41 + make-message (fn [index message] + (-> {"message" message, "task-id" (str "T" index)} + json/write-str + str + (.getBytes "UTF-8")))] + + (.frameworkMessage s nil (-> "" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 0 foo)) + (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 foo)) + (.frameworkMessage s nil (-> "T2" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 2 foo)) + (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 bar)) + (.frameworkMessage s nil (-> "T2" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 2 bar)) + (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 foo)) + (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 bar)) + (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 fee)) + (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 fie)) + (.frameworkMessage s nil (-> "T4" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 4 foo)) + (.frameworkMessage s nil (-> "" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 0 fie)) + + (.await latch 4 TimeUnit/SECONDS) + + (is (= [foo fie] (->> "T0" (get @messages-store) vec))) + (is (= [foo bar fee] (->> "T1" (get @messages-store) vec))) + (is (= [foo bar] (->> "T2" (get @messages-store) vec))) + (is (= [foo bar fie] (->> "T3" (get @messages-store) vec))) + (is (= [foo] (->> "T4" (get @messages-store) vec))))))) \ No newline at end of file diff --git a/scheduler/test/cook/test/zz_simulator.clj b/scheduler/test/cook/test/zz_simulator.clj index a410031da5..3172589200 100644 --- a/scheduler/test/cook/test/zz_simulator.clj +++ b/scheduler/test/cook/test/zz_simulator.clj @@ -150,7 +150,8 @@ (testutil/fake-test-compute-cluster-with-driver ~conn testutil/fake-test-compute-cluster-name nil ; no dummy driver - simulator is going to call initialize - create-compute-cluster#) + create-compute-cluster# + framework-id#) (c/start-leader-selector {:curator-framework curator-framework# :fenzo-config fenzo-config# From bc6d0abcfec97061d181a2514b3d90c85c257da4 Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Tue, 11 Jun 2019 15:14:44 -0500 Subject: [PATCH 09/11] Add a comment in mesos_compute_cluster --- scheduler/src/cook/mesos/mesos_compute_cluster.clj | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scheduler/src/cook/mesos/mesos_compute_cluster.clj b/scheduler/src/cook/mesos/mesos_compute_cluster.clj index 96573c470b..6d1e0ccfef 100644 --- a/scheduler/src/cook/mesos/mesos_compute_cluster.clj +++ b/scheduler/src/cook/mesos/mesos_compute_cluster.clj @@ -227,6 +227,8 @@ (reset! driver-atom driver) (async/thread (try + ; scheduler/join! is a blocking call which returns or throws when the driver loses it's connection to mesos. + ; Run this in an async thread which will deliver either the Status on a normal exit, or an exception if thrown. (mesomatic.scheduler/join! driver) (catch Exception e e) From 280cb54b37425b5672a6b2f58b176b154f658cf2 Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Wed, 12 Jun 2019 10:43:05 -0500 Subject: [PATCH 10/11] Feedback for code review --- scheduler/src/cook/components.clj | 1 + scheduler/src/cook/compute_cluster.clj | 8 +++--- .../src/cook/mesos/mesos_compute_cluster.clj | 26 +++++++++---------- scheduler/src/cook/scheduler/scheduler.clj | 6 ++--- scheduler/src/cook/test/testutil.clj | 21 ++++++++------- 5 files changed, 34 insertions(+), 28 deletions(-) diff --git a/scheduler/src/cook/components.clj b/scheduler/src/cook/components.clj index 40cba5c75c..eb9f0ae7d0 100644 --- a/scheduler/src/cook/components.clj +++ b/scheduler/src/cook/components.clj @@ -299,6 +299,7 @@ create-mesos-compute-cluster))) :mesos-datomic-mult (fnk [] (first ((util/lazy-load-var 'cook.datomic/create-tx-report-mult) datomic/conn))) + ; TODO(pschorf): Remove hearbeat support :mesos-heartbeat-chan (fnk [] (async/chan (async/buffer 4096))) :local-zookeeper (fnk [[:settings zookeeper-server]] diff --git a/scheduler/src/cook/compute_cluster.clj b/scheduler/src/cook/compute_cluster.clj index 2aa8b0baa9..eadf289321 100644 --- a/scheduler/src/cook/compute_cluster.clj +++ b/scheduler/src/cook/compute_cluster.clj @@ -16,7 +16,6 @@ (ns cook.compute-cluster (:require [clojure.tools.logging :as log] [cook.config :as config] - [cook.datomic] [datomic.api :as d])) (defprotocol ComputeCluster @@ -29,13 +28,16 @@ "Get a database entity-id for this compute cluster (used for putting it into a task structure).") (initialize-cluster [this pool->fenzo pool->offers-chan] - "Initializes the cluster. Returns a channel that will be delivered on when the cluster loses leadership.") + "Initializes the cluster. Returns a channel that will be delivered on when the cluster loses leadership. + We expect Cook to give up leadership when a compute cluster loses leadership, so leadership is not expected to be regained. + The channel result will be an exception if an error occurred, or a status message if leadership was lost normally.") (current-leader? [this] "Returns true if this cook instance is currently the leader for the compute cluster") + ; TODO - remove (get-mesos-driver-hack [this] - "Get the mesos driver. Hack; any funciton invoking this should be put within the compute-cluster implementation")) + "Get the mesos driver. Hack; any function invoking this should be put within the compute-cluster implementation")) ; Internal method (defn write-compute-cluster diff --git a/scheduler/src/cook/mesos/mesos_compute_cluster.clj b/scheduler/src/cook/mesos/mesos_compute_cluster.clj index 6d1e0ccfef..e6e7478168 100644 --- a/scheduler/src/cook/mesos/mesos_compute_cluster.clj +++ b/scheduler/src/cook/mesos/mesos_compute_cluster.clj @@ -160,19 +160,19 @@ ([{:keys [mesos-framework-name mesos-role mesos-principal gpu-enabled? mesos-failover-timeout mesos-master] :as config} scheduler framework-id] - (apply mesomatic.scheduler/scheduler-driver - scheduler - (cond-> {:checkpoint true - :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) - :user ""} - framework-id (assoc :id {:value framework-id}) - gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) - mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) - mesos-principal (assoc :principal mesos-principal) - mesos-role (assoc :role mesos-role)) - mesos-master - (when mesos-principal - [{:principal mesos-principal}])))) + (mesomatic.scheduler/scheduler-driver + scheduler + (cond-> {:checkpoint true + :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) + :user ""} + framework-id (assoc :id {:value framework-id}) + gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) + mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) + mesos-principal (assoc :principal mesos-principal) + mesos-role (assoc :role mesos-role)) + mesos-master + (when mesos-principal + [{:principal mesos-principal}])))) (defrecord MesosComputeCluster [compute-cluster-name framework-id db-id driver-atom sandbox-syncer-state exit-code-syncer-state mesos-heartbeat-chan diff --git a/scheduler/src/cook/scheduler/scheduler.clj b/scheduler/src/cook/scheduler/scheduler.clj index 4ddf14728b..72cb910b68 100644 --- a/scheduler/src/cook/scheduler/scheduler.clj +++ b/scheduler/src/cook/scheduler/scheduler.clj @@ -1444,7 +1444,7 @@ {} pools')] (start-jobs-prioritizer! conn pool-name->pending-jobs-atom task-constraints rank-trigger-chan) - {:view-incubating-offers (fn get-resources-atom [p] - (deref (get pool->resources-atom p))) + {:pool-name->fenzo pool-name->fenzo :pool->offers-chan pool->offers-chan - :pool-name->fenzo pool-name->fenzo})) + :view-incubating-offers (fn get-resources-atom [p] + (deref (get pool->resources-atom p)))})) diff --git a/scheduler/src/cook/test/testutil.clj b/scheduler/src/cook/test/testutil.clj index 5539aff157..304db2c414 100644 --- a/scheduler/src/cook/test/testutil.clj +++ b/scheduler/src/cook/test/testutil.clj @@ -40,15 +40,18 @@ (defn create-dummy-mesos-compute-cluster [compute-cluster-name framework-id db-id driver-atom] - (mcc/->MesosComputeCluster compute-cluster-name - framework-id - db-id - driver-atom - nil ; sandbox-syncer-state - nil ; exit-code-syncer-state - nil ; mesos-heartbeat-chan - nil ; trigger-chans - )) + (let [sandbox-syncer-state nil + exit-code-syncer-state nil + mesos-heartbeat-chan nil + trigger-chans nil] + (mcc/->MesosComputeCluster compute-cluster-name + framework-id + db-id + driver-atom + sandbox-syncer-state + exit-code-syncer-state + mesos-heartbeat-chan + trigger-chans))) (defn fake-test-compute-cluster-with-driver "Create a test compute cluster with associated driver attached to it. Returns the compute cluster." From ef7b76649254fa2ea813a170a3ceca6930b952a0 Mon Sep 17 00:00:00 2001 From: Paul Schorfheide Date: Wed, 12 Jun 2019 12:07:26 -0500 Subject: [PATCH 11/11] Fix apply call --- .../src/cook/mesos/mesos_compute_cluster.clj | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/scheduler/src/cook/mesos/mesos_compute_cluster.clj b/scheduler/src/cook/mesos/mesos_compute_cluster.clj index e6e7478168..922c3e3309 100644 --- a/scheduler/src/cook/mesos/mesos_compute_cluster.clj +++ b/scheduler/src/cook/mesos/mesos_compute_cluster.clj @@ -160,19 +160,17 @@ ([{:keys [mesos-framework-name mesos-role mesos-principal gpu-enabled? mesos-failover-timeout mesos-master] :as config} scheduler framework-id] - (mesomatic.scheduler/scheduler-driver - scheduler - (cond-> {:checkpoint true - :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) - :user ""} - framework-id (assoc :id {:value framework-id}) - gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) - mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) - mesos-principal (assoc :principal mesos-principal) - mesos-role (assoc :role mesos-role)) - mesos-master - (when mesos-principal - [{:principal mesos-principal}])))) + (let [mesos-config (cond-> {:checkpoint true + :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) + :user ""} + framework-id (assoc :id {:value framework-id}) + gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) + mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) + mesos-principal (assoc :principal mesos-principal) + mesos-role (assoc :role mesos-role))] + (if mesos-principal + (mesomatic.scheduler/scheduler-driver scheduler mesos-config mesos-master {:principal mesos-principal}) + (mesomatic.scheduler/scheduler-driver scheduler mesos-config mesos-master))))) (defrecord MesosComputeCluster [compute-cluster-name framework-id db-id driver-atom sandbox-syncer-state exit-code-syncer-state mesos-heartbeat-chan