diff --git a/scheduler/src/cook/components.clj b/scheduler/src/cook/components.clj index c426d0ff38..eb9f0ae7d0 100644 --- a/scheduler/src/cook/components.clj +++ b/scheduler/src/cook/components.clj @@ -13,7 +13,8 @@ ;; limitations under the License. ;; (ns cook.components - (:require [clojure.core.cache :as cache] + (:require [clojure.core.async :as async] + [clojure.core.cache :as cache] [clojure.pprint :refer (pprint)] [clojure.tools.logging :as log] [compojure.core :refer (GET POST routes context)] @@ -95,13 +96,11 @@ (def mesos-scheduler {:mesos-scheduler (fnk [[:settings fenzo-fitness-calculator fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback - good-enough-fitness hostname mea-culpa-failure-limit mesos-failover-timeout mesos-framework-name - mesos-gpu-enabled mesos-leader-path mesos-master mesos-principal - mesos-role mesos-run-as-user offer-incubate-time-ms optimizer progress rebalancer server-port - task-constraints] - curator-framework exit-code-syncer-state framework-id mesos-datomic-mult mesos-leadership-atom - mesos-agent-attributes-cache pool-name->pending-jobs-atom sandbox-syncer-state - compute-clusters] + good-enough-fitness hostname mea-culpa-failure-limit mesos-leader-path mesos-run-as-user + offer-incubate-time-ms optimizer rebalancer server-port task-constraints] + compute-clusters curator-framework mesos-datomic-mult mesos-leadership-atom + mesos-agent-attributes-cache pool-name->pending-jobs-atom mesos-heartbeat-chan + trigger-chans] (if (cook.config/api-only-mode?) (if curator-framework (throw (ex-info "This node is configured for API-only mode, but also has a curator configured" @@ -110,48 +109,35 @@ (if curator-framework (do (log/info "Initializing mesos scheduler") - (let [make-mesos-driver-fn (partial (util/lazy-load-var 'cook.mesos/make-mesos-driver) - {:mesos-master mesos-master - :mesos-failover-timeout mesos-failover-timeout - :mesos-principal mesos-principal - :mesos-role mesos-role - :mesos-framework-name mesos-framework-name - :gpu-enabled? mesos-gpu-enabled}) - trigger-chans ((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints)] - (try - (Class/forName "org.apache.mesos.Scheduler") - ((util/lazy-load-var 'cook.mesos/start-mesos-scheduler) - {:curator-framework curator-framework - :exit-code-syncer-state exit-code-syncer-state - :fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered - :fenzo-scaleback fenzo-scaleback - :fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn - :fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset - :fenzo-fitness-calculator fenzo-fitness-calculator - :good-enough-fitness good-enough-fitness} - :framework-id framework-id - :gpu-enabled? mesos-gpu-enabled - :make-mesos-driver-fn make-mesos-driver-fn - :mea-culpa-failure-limit mea-culpa-failure-limit - :mesos-datomic-conn datomic/conn - :mesos-datomic-mult mesos-datomic-mult - :mesos-leadership-atom mesos-leadership-atom - :pool-name->pending-jobs-atom pool-name->pending-jobs-atom - :mesos-run-as-user mesos-run-as-user - :agent-attributes-cache mesos-agent-attributes-cache - :offer-incubate-time-ms offer-incubate-time-ms - :optimizer-config optimizer - :progress-config progress - :rebalancer-config rebalancer - :sandbox-syncer-state sandbox-syncer-state - :server-config {:hostname hostname - :server-port server-port} - :task-constraints task-constraints - :trigger-chans trigger-chans - :zk-prefix mesos-leader-path}) - (catch ClassNotFoundException e - (log/warn e "Not loading mesos support...") - nil)))) + (try + (Class/forName "org.apache.mesos.Scheduler") + ((util/lazy-load-var 'cook.mesos/start-leader-selector) + {:curator-framework curator-framework + :fenzo-config {:fenzo-max-jobs-considered fenzo-max-jobs-considered + :fenzo-scaleback fenzo-scaleback + :fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn + :fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset + :fenzo-fitness-calculator fenzo-fitness-calculator + :good-enough-fitness good-enough-fitness} + :mea-culpa-failure-limit mea-culpa-failure-limit + :mesos-datomic-conn datomic/conn + :mesos-datomic-mult mesos-datomic-mult + :mesos-heartbeat-chan mesos-heartbeat-chan + :mesos-leadership-atom mesos-leadership-atom + :pool-name->pending-jobs-atom pool-name->pending-jobs-atom + :mesos-run-as-user mesos-run-as-user + :agent-attributes-cache mesos-agent-attributes-cache + :offer-incubate-time-ms offer-incubate-time-ms + :optimizer-config optimizer + :rebalancer-config rebalancer + :server-config {:hostname hostname + :server-port server-port} + :task-constraints task-constraints + :trigger-chans trigger-chans + :zk-prefix mesos-leader-path}) + (catch ClassNotFoundException e + (log/warn e "Not loading mesos support...") + nil))) (throw (ex-info "This node does not have a curator configured" {})))))}) (defn health-check-middleware @@ -294,10 +280,28 @@ (throw (ex-info "Framework id not configured and not in ZooKeeper" {}))) (log/info "Using framework id:" framework-id) framework-id))) - :compute-clusters (fnk [settings] - ((util/lazy-load-var 'cook.mesos.mesos-compute-cluster/setup-compute-cluster-map-from-config) datomic/conn settings)) + :compute-clusters (fnk [exit-code-syncer-state + mesos-heartbeat-chan + sandbox-syncer-state + settings + trigger-chans] + (let [constructor (util/lazy-load-var 'cook.mesos.mesos-compute-cluster/->MesosComputeCluster) + create-mesos-compute-cluster (fn [compute-cluster-name framework-id db-id driver-atom] + (constructor compute-cluster-name + framework-id + db-id + driver-atom + sandbox-syncer-state + exit-code-syncer-state + mesos-heartbeat-chan + trigger-chans))] + ((util/lazy-load-var 'cook.mesos.mesos-compute-cluster/setup-compute-cluster-map-from-config) datomic/conn settings + create-mesos-compute-cluster))) :mesos-datomic-mult (fnk [] (first ((util/lazy-load-var 'cook.datomic/create-tx-report-mult) datomic/conn))) + ; TODO(pschorf): Remove hearbeat support + :mesos-heartbeat-chan (fnk [] + (async/chan (async/buffer 4096))) :local-zookeeper (fnk [[:settings zookeeper-server]] (when zookeeper-server (log/info "Starting local ZK server") @@ -318,6 +322,8 @@ ((util/lazy-load-var 'cook.mesos.sandbox/prepare-sandbox-publisher) framework-id datomic/conn publish-batch-size publish-interval-ms sync-interval-ms max-consecutive-sync-failure mesos-agent-query-cache))) + :trigger-chans (fnk [[:settings rebalancer progress optimizer task-constraints]] + ((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints)) :clear-uncommitted-canceler (fnk [mesos-leadership-atom] ((util/lazy-load-var 'cook.tools/clear-uncommitted-jobs-on-schedule) datomic/conn mesos-leadership-atom)) diff --git a/scheduler/src/cook/compute_cluster.clj b/scheduler/src/cook/compute_cluster.clj index 6b646cfc9f..eadf289321 100644 --- a/scheduler/src/cook/compute_cluster.clj +++ b/scheduler/src/cook/compute_cluster.clj @@ -16,8 +16,7 @@ (ns cook.compute-cluster (:require [clojure.tools.logging :as log] [cook.config :as config] - [datomic.api :as d] - [mesomatic.scheduler :as mesos])) + [datomic.api :as d])) (defprotocol ComputeCluster ; These methods should accept bulk data and process in batches. @@ -28,10 +27,17 @@ (db-id [this] "Get a database entity-id for this compute cluster (used for putting it into a task structure).") + (initialize-cluster [this pool->fenzo pool->offers-chan] + "Initializes the cluster. Returns a channel that will be delivered on when the cluster loses leadership. + We expect Cook to give up leadership when a compute cluster loses leadership, so leadership is not expected to be regained. + The channel result will be an exception if an error occurred, or a status message if leadership was lost normally.") + + (current-leader? [this] + "Returns true if this cook instance is currently the leader for the compute cluster") + + ; TODO - remove (get-mesos-driver-hack [this] - "Get the mesos driver. Hack; any funciton invoking this should be put within the compute-cluster implementation") - (set-mesos-driver-atom-hack! [this driver] - "Hack to overwrite the driver. Used until we fix the initialization order of compute-cluster")) + "Get the mesos driver. Hack; any function invoking this should be put within the compute-cluster implementation")) ; Internal method (defn write-compute-cluster diff --git a/scheduler/src/cook/mesos.clj b/scheduler/src/cook/mesos.clj index df6684c4df..53c24a7b76 100644 --- a/scheduler/src/cook/mesos.clj +++ b/scheduler/src/cook/mesos.clj @@ -83,44 +83,8 @@ (counters/defcounter [cook-mesos mesos mesos-leader]) -(defn make-mesos-driver - "Creates a mesos driver - - Parameters: - mesos-framework-name -- string, name to use when connecting to mesos. - Will be appended with version of cook - mesos-role -- string, (optional) The role to connect to mesos with - mesos-principal -- string, (optional) principal to connect to mesos with - gpu-enabled? -- boolean, (optional) whether cook will schedule gpu jobs - mesos-failover-timeout -- long, (optional) time in milliseconds mesos will wait - for framework to reconnect. - See http://mesos.apache.org/documentation/latest/high-availability-framework-guide/ - and search for failover_timeout - mesos-master -- str, (optional) connection string for mesos masters - scheduler -- mesos scheduler implementation - framework-id -- str, (optional) Id of framework if it has connected to mesos before - " - ([config scheduler] - (make-mesos-driver config scheduler nil)) - ([{:keys [mesos-framework-name mesos-role mesos-principal gpu-enabled? - mesos-failover-timeout mesos-master] :as config} - scheduler framework-id] - (apply mesomatic.scheduler/scheduler-driver - scheduler - (cond-> {:checkpoint true - :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) - :user ""} - framework-id (assoc :id {:value framework-id}) - gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) - mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) - mesos-principal (assoc :principal mesos-principal) - mesos-role (assoc :role mesos-role)) - mesos-master - (when mesos-principal - [{:principal mesos-principal}])))) - (defn make-trigger-chans - "Creates a map of of the trigger channels expected by `start-mesos-scheduler` + "Creates a map of of the trigger channels expected by `start-leader-selector` Each channel receives chime triggers at particular intervals and it is possible to send additional events as desired" [rebalancer-config progress-config optimizer-config @@ -145,7 +109,7 @@ update-interval-ms (assoc :update-data-local-costs-trigger-chan (prepare-trigger-chan (time/millis update-interval-ms)))))) -(defn start-mesos-scheduler +(defn start-leader-selector "Starts a leader elector. When the process is leader, it starts the mesos scheduler and associated threads to interact with mesos. @@ -168,19 +132,18 @@ framework-id -- str, the Mesos framework id from the cook settings fenzo-config -- map, config for fenzo, See scheduler/docs/configuration.adoc for more details sandbox-syncer-state -- map, representing the sandbox syncer object" - [{:keys [curator-framework exit-code-syncer-state fenzo-config framework-id gpu-enabled? make-mesos-driver-fn - mea-culpa-failure-limit mesos-datomic-conn mesos-datomic-mult mesos-leadership-atom pool-name->pending-jobs-atom - mesos-run-as-user agent-attributes-cache offer-incubate-time-ms optimizer-config progress-config rebalancer-config - sandbox-syncer-state server-config task-constraints trigger-chans zk-prefix]}] + [{:keys [curator-framework fenzo-config mea-culpa-failure-limit mesos-datomic-conn mesos-datomic-mult + mesos-heartbeat-chan mesos-leadership-atom pool-name->pending-jobs-atom mesos-run-as-user agent-attributes-cache + offer-incubate-time-ms optimizer-config rebalancer-config server-config task-constraints trigger-chans + zk-prefix]}] (let [{:keys [fenzo-fitness-calculator fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback good-enough-fitness]} fenzo-config {:keys [cancelled-task-trigger-chan lingering-task-trigger-chan optimizer-trigger-chan rebalancer-trigger-chan straggler-trigger-chan]} trigger-chans {:keys [hostname server-port server-https-port]} server-config datomic-report-chan (async/chan (async/sliding-buffer 4096)) - mesos-heartbeat-chan (async/chan (async/buffer 4096)) + compute-cluster (mcc/mesos-cluster-hack) - current-driver (atom nil) rebalancer-reservation-atom (atom {}) leader-selector (LeaderSelector. curator-framework @@ -194,43 +157,39 @@ ;; TODO: get the framework ID and try to reregister (let [normal-exit (atom true)] (try - (let [{:keys [scheduler view-incubating-offers]} + (let [{:keys [pool-name->fenzo pool->offers-chan view-incubating-offers]} (sched/create-datomic-scheduler {:conn mesos-datomic-conn :compute-cluster compute-cluster - :exit-code-syncer-state exit-code-syncer-state :fenzo-fitness-calculator fenzo-fitness-calculator :fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-reset :fenzo-floor-iterations-before-warn fenzo-floor-iterations-before-warn :fenzo-max-jobs-considered fenzo-max-jobs-considered :fenzo-scaleback fenzo-scaleback :good-enough-fitness good-enough-fitness - :gpu-enabled? gpu-enabled? - :heartbeat-ch mesos-heartbeat-chan :mea-culpa-failure-limit mea-culpa-failure-limit :mesos-run-as-user mesos-run-as-user :agent-attributes-cache agent-attributes-cache :offer-incubate-time-ms offer-incubate-time-ms :pool-name->pending-jobs-atom pool-name->pending-jobs-atom - :progress-config progress-config :rebalancer-reservation-atom rebalancer-reservation-atom - :sandbox-syncer-state sandbox-syncer-state :task-constraints task-constraints :trigger-chans trigger-chans}) - driver (make-mesos-driver-fn scheduler framework-id)] - (mesomatic.scheduler/start! driver) - (reset! current-driver driver) - (cc/set-mesos-driver-atom-hack! compute-cluster driver) - + cluster-leadership-chan (cc/initialize-cluster compute-cluster + pool-name->fenzo + pool->offers-chan)] (cook.monitor/start-collecting-stats) ; Many of these should look at the compute-cluster of the underlying jobs, and not use driver at all. - (cook.scheduler.scheduler/lingering-task-killer mesos-datomic-conn driver task-constraints lingering-task-trigger-chan) - (cook.scheduler.scheduler/straggler-handler mesos-datomic-conn driver straggler-trigger-chan) - (cook.scheduler.scheduler/cancelled-task-killer mesos-datomic-conn driver cancelled-task-trigger-chan) + (cook.scheduler.scheduler/lingering-task-killer mesos-datomic-conn (cc/get-mesos-driver-hack compute-cluster) + task-constraints lingering-task-trigger-chan) + (cook.scheduler.scheduler/straggler-handler mesos-datomic-conn (cc/get-mesos-driver-hack compute-cluster) + straggler-trigger-chan) + (cook.scheduler.scheduler/cancelled-task-killer mesos-datomic-conn (cc/get-mesos-driver-hack compute-cluster) + cancelled-task-trigger-chan) (cook.mesos.heartbeat/start-heartbeat-watcher! mesos-datomic-conn mesos-heartbeat-chan) (cook.rebalancer/start-rebalancer! {:config rebalancer-config :conn mesos-datomic-conn - :driver driver + :driver (cc/get-mesos-driver-hack compute-cluster) :agent-attributes-cache agent-attributes-cache :pool-name->pending-jobs-atom pool-name->pending-jobs-atom :rebalancer-reservation-atom rebalancer-reservation-atom @@ -252,12 +211,16 @@ (counters/inc! mesos-leader) (async/tap mesos-datomic-mult datomic-report-chan) (cook.scheduler.scheduler/monitor-tx-report-queue datomic-report-chan mesos-datomic-conn) - (mesomatic.scheduler/join! driver) - (reset! current-driver nil)) + ; Curator expects takeLeadership to block until voluntarily surrendering leadership. + ; Block on cluster-leadership-chan to hold ZK leadership unless we lose mesos leadership. + (let [res (async/fenzo pool->offers-chan match-trigger-chan + handle-exit-code handle-progress-message sandbox-syncer-state framework-id compute-cluster] + (let [sync-agent-sandboxes-fn #(sandbox/sync-agent-sandboxes sandbox-syncer-state framework-id %1 %2) + message-handlers {:handle-exit-code handle-exit-code + :handle-progress-message handle-progress-message}] + (mesos/scheduler + (registered + [this driver mesos-framework-id master-info] + (log/info "Registered with mesos with framework-id " mesos-framework-id) + (let [value (-> mesos-framework-id mesomatic.types/pb->data :value)] + (when (not= framework-id value) + (let [message (str "The framework-id provided by Mesos (" value ") " + "does not match the one Cook is configured with (" framework-id ")")] + (log/error message) + (throw (ex-info message {:framework-id-mesos value :framework-id-cook framework-id}))))) + (when (and gpu-enabled? (not (re-matches #"1\.\d+\.\d+" (:version master-info)))) + (binding [*out* *err*] + (println "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info))) + (log/error "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info)) + (Thread/sleep 1000) + (System/exit 1)) + ;; Use future because the thread that runs mesos/scheduler doesn't load classes correctly. for reasons. + ;; As Sophie says, you want to future proof your code. + (future + (try + (sched/reconcile-jobs conn) + (sched/reconcile-tasks (d/db conn) driver pool->fenzo) + (catch Exception e + (log/error e "Reconciliation error"))))) + (reregistered + [this driver master-info] + (log/info "Reregistered with new master") + (future + (try + (sched/reconcile-jobs conn) + (sched/reconcile-tasks (d/db conn) driver pool->fenzo) + (catch Exception e + (log/error e "Reconciliation error"))))) + ;; Ignore this--we can just wait for new offers + (offer-rescinded + [this driver offer-id] + (comment "TODO: Rescind the offer in fenzo")) + (framework-message + [this driver executor-id slave-id message] + (meters/mark! handle-framework-message-rate) + (try + (let [{:strs [task-id type] :as parsed-message} (json/read-str (String. ^bytes message "UTF-8"))] + (case type + "directory" (sandbox/update-sandbox sandbox-syncer-state parsed-message) + "heartbeat" (heartbeat/notify-heartbeat heartbeat-ch executor-id slave-id parsed-message) + (sched/async-in-order-processing + task-id #(sched/handle-framework-message conn message-handlers parsed-message)))) + (catch Exception e + (log/error e "Unable to process framework message" + {:executor-id executor-id, :message message, :slave-id slave-id})))) + (disconnected + [this driver] + (log/error "Disconnected from the previous master")) + ;; We don't care about losing slaves or executors--only tasks + (slave-lost [this driver slave-id]) + (executor-lost [this driver executor-id slave-id status]) + (error + [this driver message] + (meters/mark! mesos-error) + (log/error "Got a mesos error!!!!" message)) + (resource-offers + [this driver raw-offers] + (log/debug "Got offers:" raw-offers) + (let [offers (map #(assoc % :compute-cluster compute-cluster) raw-offers) + pool->offers (group-by (fn [o] (plugins/select-pool pool-plugin/plugin o)) offers) + using-pools? (config/default-pool)] + (log/info "Offers by pool:" (pc/map-vals count pool->offers)) + (run! + (fn [[pool-name offers]] + (let [offer-count (count offers)] + (if using-pools? + (if-let [offers-chan (get pool->offers-chan pool-name)] + (do + (log/info "Processing" offer-count "offer(s) for known pool" pool-name) + (sched/receive-offers offers-chan match-trigger-chan driver pool-name offers)) + (do + (log/warn "Declining" offer-count "offer(s) for non-existent pool" pool-name) + (sched/decline-offers-safe driver offers))) + (if-let [offers-chan (get pool->offers-chan "no-pool")] + (do + (log/info "Processing" offer-count "offer(s) for pool" pool-name "(not using pools)") + (sched/receive-offers offers-chan match-trigger-chan driver pool-name offers)) + (do + (log/error "Declining" offer-count "offer(s) for pool" pool-name "(missing no-pool offer chan)") + (sched/decline-offers-safe driver offers)))))) + pool->offers) + (log/debug "Finished receiving offers for all pools"))) + (status-update + [this driver status] + (meters/mark! handle-status-update-rate) + (let [task-id (-> status :task-id :value)] + (sched/async-in-order-processing + task-id #(sched/handle-status-update conn compute-cluster pool->fenzo sync-agent-sandboxes-fn status))))))) + -(defrecord MesosComputeCluster [compute-cluster-name framework-id db-id driver-atom] +(defn make-mesos-driver + "Creates a mesos driver + + Parameters: + mesos-framework-name -- string, name to use when connecting to mesos. + Will be appended with version of cook + mesos-role -- string, (optional) The role to connect to mesos with + mesos-principal -- string, (optional) principal to connect to mesos with + gpu-enabled? -- boolean, (optional) whether cook will schedule gpu jobs + mesos-failover-timeout -- long, (optional) time in milliseconds mesos will wait + for framework to reconnect. + See http://mesos.apache.org/documentation/latest/high-availability-framework-guide/ + and search for failover_timeout + mesos-master -- str, (optional) connection string for mesos masters + scheduler -- mesos scheduler implementation + framework-id -- str, (optional) Id of framework if it has connected to mesos before + " + ([config scheduler] + (make-mesos-driver config scheduler nil)) + ([{:keys [mesos-framework-name mesos-role mesos-principal gpu-enabled? + mesos-failover-timeout mesos-master] :as config} + scheduler framework-id] + (let [mesos-config (cond-> {:checkpoint true + :name (str mesos-framework-name "-" @cook.util/version "-" @cook.util/commit) + :user ""} + framework-id (assoc :id {:value framework-id}) + gpu-enabled? (assoc :capabilities [{:type :framework-capability-gpu-resources}]) + mesos-failover-timeout (assoc :failover-timeout mesos-failover-timeout) + mesos-principal (assoc :principal mesos-principal) + mesos-role (assoc :role mesos-role))] + (if mesos-principal + (mesomatic.scheduler/scheduler-driver scheduler mesos-config mesos-master {:principal mesos-principal}) + (mesomatic.scheduler/scheduler-driver scheduler mesos-config mesos-master))))) + +(defrecord MesosComputeCluster [compute-cluster-name framework-id db-id driver-atom + sandbox-syncer-state exit-code-syncer-state mesos-heartbeat-chan + trigger-chans] cc/ComputeCluster (compute-cluster-name [this] compute-cluster-name) @@ -33,8 +186,52 @@ (task/compile-mesos-messages framework-id offers task-metadata-seq))) (db-id [this] db-id) - (set-mesos-driver-atom-hack! [this driver] - (reset! driver-atom driver))) + + (current-leader? [this] + (not (nil? @driver-atom))) + + (initialize-cluster [this pool->fenzo pool->offers-chan] + (let [settings (:settings config/config) + mesos-config (select-keys settings [:mesos-master + :mesos-failover-timeout + :mesos-principal + :mesos-role + :mesos-framework-name + :gpu-enabled?]) + progress-config (:progress settings) + conn cook.datomic/conn + {:keys [match-trigger-chan progress-updater-trigger-chan]} trigger-chans + {:keys [batch-size]} progress-config + {:keys [progress-state-chan]} (progress/progress-update-transactor progress-updater-trigger-chan batch-size conn) + progress-aggregator-chan (progress/progress-update-aggregator progress-config progress-state-chan) + handle-progress-message (fn handle-progress-message-curried [progress-message-map] + (progress/handle-progress-message! progress-aggregator-chan progress-message-map)) + handle-exit-code (fn handle-exit-code [task-id exit-code] + (sandbox/aggregate-exit-code exit-code-syncer-state task-id exit-code)) + scheduler (create-mesos-scheduler (:gpu-enabled? mesos-config) + conn + mesos-heartbeat-chan + pool->fenzo + pool->offers-chan + match-trigger-chan + handle-exit-code + handle-progress-message + sandbox-syncer-state + framework-id + this) + _ (log/info "Initializing mesos driver with config: " mesos-config) + driver (make-mesos-driver mesos-config scheduler framework-id)] + (mesomatic.scheduler/start! driver) + (reset! driver-atom driver) + (async/thread + (try + ; scheduler/join! is a blocking call which returns or throws when the driver loses it's connection to mesos. + ; Run this in an async thread which will deliver either the Status on a normal exit, or an exception if thrown. + (mesomatic.scheduler/join! driver) + (catch Exception e + e) + (finally + (reset! driver-atom nil))))))) ; Internal method (defn- mesos-cluster->compute-cluster-map-for-datomic @@ -65,17 +262,18 @@ (defn get-mesos-compute-cluster "Process one mesos cluster specification, returning the entity id of the corresponding compute-cluster, creating the cluster if it does not exist. Warning: Not idempotent. Only call once " - [conn {:keys [compute-cluster-name framework-id] :as mesos-cluster}] - {:pre [compute-cluster-name - framework-id]} - (let [cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)] - (when-not cluster-entity-id - (cc/write-compute-cluster conn (mesos-cluster->compute-cluster-map-for-datomic mesos-cluster))) - (->MesosComputeCluster - compute-cluster-name - framework-id - (or cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)) - (atom nil)))) + ([conn mesos-compute-cluster-factory mesos-cluster] + (get-mesos-compute-cluster conn mesos-compute-cluster-factory mesos-cluster nil)) + ([conn mesos-compute-cluster-factory {:keys [compute-cluster-name framework-id] :as mesos-cluster} driver] ; driver argument for unit tests + {:pre [compute-cluster-name + framework-id]} + (let [cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)] + (when-not cluster-entity-id + (cc/write-compute-cluster conn (mesos-cluster->compute-cluster-map-for-datomic mesos-cluster))) + (mesos-compute-cluster-factory compute-cluster-name + framework-id + (or cluster-entity-id (get-mesos-cluster-entity-id (d/db conn) mesos-cluster)) + (atom driver))))) (defn- get-mesos-clusters-from-config "Get all of the mesos clusters defined in the configuration. @@ -105,9 +303,9 @@ (defn setup-compute-cluster-map-from-config "Setup the cluster-map configs, linking a cluster name to the associated metadata needed to represent/process it." - [conn settings] + [conn settings create-mesos-compute-cluster] (let [compute-clusters (->> (get-mesos-clusters-from-config settings) - (map (partial get-mesos-compute-cluster conn)) + (map (partial get-mesos-compute-cluster conn create-mesos-compute-cluster)) (map cc/register-compute-cluster!))] (doall compute-clusters))) diff --git a/scheduler/src/cook/scheduler/scheduler.clj b/scheduler/src/cook/scheduler/scheduler.clj index 38f902d390..72cb910b68 100644 --- a/scheduler/src/cook/scheduler/scheduler.clj +++ b/scheduler/src/cook/scheduler/scheduler.clj @@ -17,10 +17,8 @@ (:require [chime :refer [chime-at chime-ch]] [clj-time.coerce :as tc] [clj-time.core :as time] - [clj-time.periodic :as periodic] [clojure.core.async :as async] [clojure.core.cache :as cache] - [clojure.data.json :as json] [clojure.edn :as edn] [clojure.string :as str] [clojure.tools.logging :as log] @@ -30,18 +28,14 @@ [cook.plugins.completion :as completion] [cook.plugins.definitions :as plugins] [cook.plugins.launch :as launch-plugin] - [cook.plugins.pool :as pool-plugin] [cook.scheduler.constraints :as constraints] [cook.scheduler.data-locality :as dl] [cook.scheduler.dru :as dru] [cook.scheduler.fenzo-utils :as fenzo] [cook.group :as group] - [cook.mesos.heartbeat :as heartbeat] [cook.pool :as pool] - [cook.progress :as progress] [cook.quota :as quota] [cook.mesos.reason :as reason] - [cook.mesos.sandbox :as sandbox] [cook.scheduler.share :as share] [cook.mesos.task :as task] [cook.tools :as util] @@ -56,8 +50,7 @@ [metrics.meters :as meters] [metrics.timers :as timers] [plumbing.core :as pc]) - (import [com.netflix.fenzo ConstraintEvaluator ConstraintEvaluator$Result - TaskAssignmentResult TaskRequest TaskScheduler TaskScheduler$Builder + (import [com.netflix.fenzo TaskAssignmentResult TaskRequest TaskScheduler TaskScheduler$Builder VirtualMachineLease VirtualMachineLease$Range VirtualMachineCurrentState] [com.netflix.fenzo.functions Action1 Func1])) @@ -100,7 +93,6 @@ (merge mesos-attributes cook-attributes))) (timers/deftimer [cook-mesos scheduler handle-status-update-duration]) -(meters/defmeter [cook-mesos scheduler handle-status-update-rate]) (timers/deftimer [cook-mesos scheduler handle-framework-message-duration]) (meters/defmeter [cook-mesos scheduler handle-framework-message-rate]) @@ -1421,118 +1413,16 @@ (counters/dec! in-order-queue-counter) (body-fn)))))) -(defn create-mesos-scheduler - "Creates the mesos scheduler which processes status updates asynchronously but in order of receipt." - [gpu-enabled? conn heartbeat-ch pool->fenzo pool->offers-chan match-trigger-chan - handle-exit-code handle-progress-message sandbox-syncer-state compute-cluster] - (let [configured-framework-id (cook.config/framework-id-config) - sync-agent-sandboxes-fn #(sandbox/sync-agent-sandboxes sandbox-syncer-state configured-framework-id %1 %2) - message-handlers {:handle-exit-code handle-exit-code - :handle-progress-message handle-progress-message}] - (mesos/scheduler - (registered - [this driver framework-id master-info] - (log/info "Registered with mesos with framework-id " framework-id) - (let [value (-> framework-id mesomatic.types/pb->data :value)] - (when (not= configured-framework-id value) - (let [message (str "The framework-id provided by Mesos (" value ") " - "does not match the one Cook is configured with (" configured-framework-id ")")] - (log/error message) - (throw (ex-info message {:framework-id-mesos value :framework-id-cook configured-framework-id}))))) - (when (and gpu-enabled? (not (re-matches #"1\.\d+\.\d+" (:version master-info)))) - (binding [*out* *err*] - (println "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info))) - (log/error "Cannot enable GPU support on pre-mesos 1.0. The version we found was " (:version master-info)) - (Thread/sleep 1000) - (System/exit 1)) - ;; Use future because the thread that runs mesos/scheduler doesn't load classes correctly. for reasons. - ;; As Sophie says, you want to future proof your code. - (future - (try - (reconcile-jobs conn) - (reconcile-tasks (db conn) driver pool->fenzo) - (catch Exception e - (log/error e "Reconciliation error"))))) - (reregistered - [this driver master-info] - (log/info "Reregistered with new master") - (future - (try - (reconcile-jobs conn) - (reconcile-tasks (db conn) driver pool->fenzo) - (catch Exception e - (log/error e "Reconciliation error"))))) - ;; Ignore this--we can just wait for new offers - (offer-rescinded - [this driver offer-id] - (comment "TODO: Rescind the offer in fenzo")) - (framework-message - [this driver executor-id slave-id message] - (meters/mark! handle-framework-message-rate) - (try - (let [{:strs [task-id type] :as parsed-message} (json/read-str (String. ^bytes message "UTF-8"))] - (case type - "directory" (sandbox/update-sandbox sandbox-syncer-state parsed-message) - "heartbeat" (heartbeat/notify-heartbeat heartbeat-ch executor-id slave-id parsed-message) - (async-in-order-processing - task-id #(handle-framework-message conn message-handlers parsed-message)))) - (catch Exception e - (log/error e "Unable to process framework message" - {:executor-id executor-id, :message message, :slave-id slave-id})))) - (disconnected - [this driver] - (log/error "Disconnected from the previous master")) - ;; We don't care about losing slaves or executors--only tasks - (slave-lost [this driver slave-id]) - (executor-lost [this driver executor-id slave-id status]) - (error - [this driver message] - (meters/mark! mesos-error) - (log/error "Got a mesos error!!!!" message)) - (resource-offers - [this driver raw-offers] - (log/debug "Got offers:" raw-offers) - (let [offers (map #(assoc % :compute-cluster compute-cluster) raw-offers) - pool->offers (group-by (fn [o] (plugins/select-pool pool-plugin/plugin o)) offers) - using-pools? (config/default-pool)] - (log/info "Offers by pool:" (pc/map-vals count pool->offers)) - (run! - (fn [[pool-name offers]] - (let [offer-count (count offers)] - (if using-pools? - (if-let [offers-chan (get pool->offers-chan pool-name)] - (do - (log/info "Processing" offer-count "offer(s) for known pool" pool-name) - (receive-offers offers-chan match-trigger-chan driver pool-name offers)) - (do - (log/warn "Declining" offer-count "offer(s) for non-existent pool" pool-name) - (decline-offers-safe driver offers))) - (if-let [offers-chan (get pool->offers-chan "no-pool")] - (do - (log/info "Processing" offer-count "offer(s) for pool" pool-name "(not using pools)") - (receive-offers offers-chan match-trigger-chan driver pool-name offers)) - (do - (log/error "Declining" offer-count "offer(s) for pool" pool-name "(missing no-pool offer chan)") - (decline-offers-safe driver offers)))))) - pool->offers) - (log/debug "Finished receiving offers for all pools"))) - (status-update - [this driver status] - (meters/mark! handle-status-update-rate) - (let [task-id (-> status :task-id :value)] - (async-in-order-processing - task-id #(handle-status-update conn compute-cluster pool->fenzo sync-agent-sandboxes-fn status))))))) - (defn create-datomic-scheduler - [{:keys [conn compute-cluster exit-code-syncer-state fenzo-fitness-calculator fenzo-floor-iterations-before-reset + [{:keys [conn compute-cluster fenzo-fitness-calculator fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback good-enough-fitness - gpu-enabled? heartbeat-ch mea-culpa-failure-limit mesos-run-as-user agent-attributes-cache offer-incubate-time-ms - pool-name->pending-jobs-atom progress-config rebalancer-reservation-atom sandbox-syncer-state task-constraints + mea-culpa-failure-limit mesos-run-as-user agent-attributes-cache offer-incubate-time-ms + pool-name->pending-jobs-atom rebalancer-reservation-atom task-constraints trigger-chans]}] (persist-mea-culpa-failure-limit! conn mea-culpa-failure-limit) - (let [{:keys [match-trigger-chan progress-updater-trigger-chan rank-trigger-chan]} trigger-chans + (let [{:keys [match-trigger-chan rank-trigger-chan]} trigger-chans pools (pool/all-pools (d/db conn)) pools' (if (-> pools count pos?) pools @@ -1552,17 +1442,9 @@ (assoc-in [:pool->offers-chan pool-name] offers-chan) (assoc-in [:pool->resources-atom pool-name] resources-atom)))) {} - pools') - {:keys [batch-size]} progress-config - {:keys [progress-state-chan]} (progress/progress-update-transactor progress-updater-trigger-chan batch-size conn) - progress-aggregator-chan (progress/progress-update-aggregator progress-config progress-state-chan) - handle-progress-message (fn handle-progress-message-curried [progress-message-map] - (progress/handle-progress-message! progress-aggregator-chan progress-message-map)) - handle-exit-code (fn handle-exit-code [task-id exit-code] - (sandbox/aggregate-exit-code exit-code-syncer-state task-id exit-code))] + pools')] (start-jobs-prioritizer! conn pool-name->pending-jobs-atom task-constraints rank-trigger-chan) - {:scheduler (create-mesos-scheduler gpu-enabled? conn heartbeat-ch pool-name->fenzo pool->offers-chan - match-trigger-chan handle-exit-code handle-progress-message sandbox-syncer-state - compute-cluster) + {:pool-name->fenzo pool-name->fenzo + :pool->offers-chan pool->offers-chan :view-incubating-offers (fn get-resources-atom [p] (deref (get pool->resources-atom p)))})) diff --git a/scheduler/src/cook/test/testutil.clj b/scheduler/src/cook/test/testutil.clj index df752e260f..304db2c414 100644 --- a/scheduler/src/cook/test/testutil.clj +++ b/scheduler/src/cook/test/testutil.clj @@ -38,17 +38,34 @@ (:import (java.util UUID) (org.apache.log4j ConsoleAppender Logger PatternLayout))) +(defn create-dummy-mesos-compute-cluster + [compute-cluster-name framework-id db-id driver-atom] + (let [sandbox-syncer-state nil + exit-code-syncer-state nil + mesos-heartbeat-chan nil + trigger-chans nil] + (mcc/->MesosComputeCluster compute-cluster-name + framework-id + db-id + driver-atom + sandbox-syncer-state + exit-code-syncer-state + mesos-heartbeat-chan + trigger-chans))) + (defn fake-test-compute-cluster-with-driver "Create a test compute cluster with associated driver attached to it. Returns the compute cluster." - [conn compute-cluster-name driver] - {:pre [compute-cluster-name]} - (let [existing-compute-cluster (get @cc/cluster-name->compute-cluster-atom compute-cluster-name) - compute-cluster-mesos-map {:framework-id (str compute-cluster-name "-framework") - :compute-cluster-name compute-cluster-name} - compute-cluster (mcc/get-mesos-compute-cluster conn compute-cluster-mesos-map)] - (cc/register-compute-cluster! compute-cluster) - (cc/set-mesos-driver-atom-hack! compute-cluster driver) - compute-cluster)) + ([conn compute-cluster-name driver] + (fake-test-compute-cluster-with-driver conn compute-cluster-name driver create-dummy-mesos-compute-cluster + (str compute-cluster-name "-framework"))) + ([conn compute-cluster-name driver mesos-compute-cluster-factory framework-id] + {:pre [compute-cluster-name]} + (let [compute-cluster-mesos-map {:framework-id framework-id + :compute-cluster-name compute-cluster-name} + compute-cluster (mcc/get-mesos-compute-cluster conn mesos-compute-cluster-factory compute-cluster-mesos-map + driver)] + (cc/register-compute-cluster! compute-cluster) + compute-cluster))) ; The name of the fake compute cluster to use. (def fake-test-compute-cluster-name "unittest-default-compute-cluster-name") diff --git a/scheduler/test/cook/test/compute_cluster.clj b/scheduler/test/cook/test/compute_cluster.clj index aae697ca5c..0707c116e5 100644 --- a/scheduler/test/cook/test/compute_cluster.clj +++ b/scheduler/test/cook/test/compute_cluster.clj @@ -35,13 +35,21 @@ (is (= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-2)))) (testing "Create a cluster. Should be a new cluster" - (let [{id1a :db-id :as fetch-mesos-1a} (mcc/get-mesos-compute-cluster conn mesos-1)] + (let [{id1a :db-id :as fetch-mesos-1a} (mcc/get-mesos-compute-cluster conn + testutil/create-dummy-mesos-compute-cluster + mesos-1)] ; This should create one cluster in the DB, but not the other. (is (not= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-1))) (is (= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-2))) - (let [{id2a :db-id :as fetch-mesos-2a} (mcc/get-mesos-compute-cluster conn mesos-2) - {id1b :db-id :as fetch-mesos-1b} (mcc/get-mesos-compute-cluster conn mesos-1) - {id2b :db-id :as fetch-mesos-2b} (mcc/get-mesos-compute-cluster conn mesos-2)] + (let [{id2a :db-id :as fetch-mesos-2a} (mcc/get-mesos-compute-cluster conn + testutil/create-dummy-mesos-compute-cluster + mesos-2) + {id1b :db-id :as fetch-mesos-1b} (mcc/get-mesos-compute-cluster conn + testutil/create-dummy-mesos-compute-cluster + mesos-1) + {id2b :db-id :as fetch-mesos-2b} (mcc/get-mesos-compute-cluster conn + testutil/create-dummy-mesos-compute-cluster + mesos-2)] ; Should see both clusters created. (is (not= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-1))) (is (not= nil (mcc/get-mesos-cluster-entity-id (d/db conn) mesos-2))) @@ -56,7 +64,7 @@ (is (and id1b (< 0 id1b))) (is (and id2b (< 0 id2b))) - (is (= (dissoc fetch-mesos-1a :db-id :driver-atom) + (is (= (select-keys fetch-mesos-1a [:compute-cluster-name :framework-id]) {:compute-cluster-name "mesos-1" :framework-id "mesos-1a"}))))))) @@ -70,14 +78,16 @@ :framework-id "foo-1a"} {:compute-cluster-name "foo-1" :framework-id "foo-1a"}])] - (is (thrown-with-msg? IllegalArgumentException #"Multiple" (mcc/setup-compute-cluster-map-from-config conn nil))))) + (is (thrown-with-msg? IllegalArgumentException #"Multiple" + (mcc/setup-compute-cluster-map-from-config conn nil + testutil/create-dummy-mesos-compute-cluster))))) (testing "Install the clusters per the configuration read and make sure that they can be found" (with-redefs [mcc/get-mesos-clusters-from-config (constantly [{:compute-cluster-name "foo-3" :framework-id "foo-1a"} {:compute-cluster-name "foo-4" :framework-id "foo-1a"}])] - (mcc/setup-compute-cluster-map-from-config conn nil) + (mcc/setup-compute-cluster-map-from-config conn nil testutil/create-dummy-mesos-compute-cluster) (is (< 0 (-> "foo-3" cc/compute-cluster-name->ComputeCluster cc/db-id))) (is (< 0 (-> "foo-4" cc/compute-cluster-name->ComputeCluster cc/db-id))) (is (= nil (cc/compute-cluster-name->ComputeCluster "foo-5"))))))) diff --git a/scheduler/test/cook/test/mesos/mesos_compute_cluster.clj b/scheduler/test/cook/test/mesos/mesos_compute_cluster.clj new file mode 100644 index 0000000000..e9134ab404 --- /dev/null +++ b/scheduler/test/cook/test/mesos/mesos_compute_cluster.clj @@ -0,0 +1,118 @@ +(ns cook.test.mesos.mesos-compute-cluster + (:require [clojure.test :refer :all] + [cook.scheduler.scheduler :as sched] + [cook.mesos.mesos-compute-cluster :as mcc] + [mesomatic.types :as mtypes] + [clojure.data.json :as json] + [cook.mesos.sandbox :as sandbox] + [cook.mesos.heartbeat :as heartbeat]) + (:import (java.util.concurrent CountDownLatch TimeUnit))) + +(deftest test-in-order-status-update-processing + (let [status-store (atom {}) + latch (CountDownLatch. 11)] + (with-redefs [sched/handle-status-update + (fn [_ _ _ _ status] + (let [task-id (-> status :task-id :value str)] + (swap! status-store update task-id + (fn [statuses] (conj (or statuses []) + (-> status mtypes/pb->data :state))))) + (Thread/sleep (rand-int 100)) + (.countDown latch))] + (let [s (mcc/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil nil)] + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T2"} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-running})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T2"} :state :task-running})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-running})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-finished})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-failed})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T4"} :state :task-starting})) + (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {} :state :task-failed})) + + (.await latch 4 TimeUnit/SECONDS) + + (is (= [:task-starting :task-failed] (->> "" (get @status-store) vec))) + (is (= [:task-starting :task-running :task-finished] (->> "T1" (get @status-store) vec))) + (is (= [:task-starting :task-running] (->> "T2" (get @status-store) vec))) + (is (= [:task-starting :task-running :task-failed] (->> "T3" (get @status-store) vec))) + (is (= [:task-starting] (->> "T4" (get @status-store) vec))))))) + +(deftest test-framework-message-processing-delegation + (let [framework-message-store (atom []) + heartbeat-store (atom []) + sandbox-store (atom [])] + (with-redefs [heartbeat/notify-heartbeat (fn [_ _ _ framework-message] + (swap! heartbeat-store conj framework-message)) + sandbox/update-sandbox (fn [_ framework-message] + (swap! sandbox-store conj framework-message)) + sched/handle-framework-message (fn [_ _ framework-message] + (swap! framework-message-store conj framework-message))] + (let [s (mcc/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil nil) + make-message (fn [message] (-> message json/write-str str (.getBytes "UTF-8")))] + + (testing "message delegation" + (let [task-id "T1" + executor-id (-> task-id mtypes/->ExecutorID mtypes/data->pb) + m1 {"task-id" task-id} + m2 {"task-id" task-id, "timestamp" 123456, "type" "heartbeat"} + m3 {"exit-code" 0, "task-id" task-id} + m4 {"task-id" task-id, "timestamp" 123456, "type" "heartbeat"} + m5 {"sandbox" "/path/to/a/directory", "task-id" task-id, "type" "directory"}] + + (.frameworkMessage s nil executor-id nil (make-message m1)) + (.frameworkMessage s nil executor-id nil (make-message m2)) + (.frameworkMessage s nil executor-id nil (make-message m3)) + (.frameworkMessage s nil executor-id nil (make-message m4)) + (.frameworkMessage s nil executor-id nil (make-message m5)) + + (let [latch (CountDownLatch. 1)] + (sched/async-in-order-processing task-id #(.countDown latch)) + (.await latch)) + + (is (= [m1 m3] @framework-message-store)) + (is (= [m2 m4] @heartbeat-store)) + (is (= [m5] @sandbox-store)))))))) + +(deftest test-in-order-framework-message-processing + (let [messages-store (atom {}) + latch (CountDownLatch. 11)] + (with-redefs [heartbeat/notify-heartbeat (constantly true) + sched/handle-framework-message + (fn [_ _ framework-message] + (let [{:strs [message task-id]} framework-message] + (swap! messages-store update (str task-id) (fn [messages] (conj (or messages []) message)))) + (Thread/sleep (rand-int 100)) + (.countDown latch))] + (let [s (mcc/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil nil) + foo 11 + bar 21 + fee 31 + fie 41 + make-message (fn [index message] + (-> {"message" message, "task-id" (str "T" index)} + json/write-str + str + (.getBytes "UTF-8")))] + + (.frameworkMessage s nil (-> "" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 0 foo)) + (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 foo)) + (.frameworkMessage s nil (-> "T2" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 2 foo)) + (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 bar)) + (.frameworkMessage s nil (-> "T2" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 2 bar)) + (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 foo)) + (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 bar)) + (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 fee)) + (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 fie)) + (.frameworkMessage s nil (-> "T4" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 4 foo)) + (.frameworkMessage s nil (-> "" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 0 fie)) + + (.await latch 4 TimeUnit/SECONDS) + + (is (= [foo fie] (->> "T0" (get @messages-store) vec))) + (is (= [foo bar fee] (->> "T1" (get @messages-store) vec))) + (is (= [foo bar] (->> "T2" (get @messages-store) vec))) + (is (= [foo bar fie] (->> "T3" (get @messages-store) vec))) + (is (= [foo] (->> "T4" (get @messages-store) vec))))))) \ No newline at end of file diff --git a/scheduler/test/cook/test/mesos/mesos_mock.clj b/scheduler/test/cook/test/mesos/mesos_mock.clj index 35d3b52829..595fdce607 100644 --- a/scheduler/test/cook/test/mesos/mesos_mock.clj +++ b/scheduler/test/cook/test/mesos/mesos_mock.clj @@ -672,9 +672,8 @@ hosts (for [_ (range num-hosts)] (dummy-host :mem {"*" mem} :cpus {"*" cpus} :ports {"*" ports})) offer-trigger-chan (chime-ch (util/time-seq (t/now) (t/millis 50))) - make-mesos-driver-fn (fn [scheduler _] ;; _ is framework-id + make-mesos-driver-fn (fn [config scheduler framework-id] ;; _ is framework-id (mm/mesos-mock hosts offer-trigger-chan scheduler))] - (testutil/setup-fake-test-compute-cluster mesos-datomic-conn) (with-cook-scheduler mesos-datomic-conn make-mesos-driver-fn {} (share/set-share! mesos-datomic-conn "default" nil "new cluster settings" diff --git a/scheduler/test/cook/test/scheduler/scheduler.clj b/scheduler/test/cook/test/scheduler/scheduler.clj index 32e4b7b7f7..709ec78ee6 100644 --- a/scheduler/test/cook/test/scheduler/scheduler.clj +++ b/scheduler/test/cook/test/scheduler/scheduler.clj @@ -1882,116 +1882,6 @@ (is (= (:slave-id offer-1) (:slave-id task-1))) (is (= (:slave-id offer-2) (:slave-id task-2))))))))) -(deftest test-in-order-status-update-processing - (let [status-store (atom {}) - latch (CountDownLatch. 11)] - (with-redefs [sched/handle-status-update - (fn [_ _ _ _ status] - (let [task-id (-> status :task-id :value str)] - (swap! status-store update task-id - (fn [statuses] (conj (or statuses []) - (-> status mtypes/pb->data :state))))) - (Thread/sleep (rand-int 100)) - (.countDown latch))] - (let [s (sched/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil)] - - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T2"} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-running})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T2"} :state :task-running})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-running})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T1"} :state :task-finished})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T3"} :state :task-failed})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {:value "T4"} :state :task-starting})) - (.statusUpdate s nil (mtypes/->pb :TaskStatus {:task-id {} :state :task-failed})) - - (.await latch 4 TimeUnit/SECONDS) - - (is (= [:task-starting :task-failed] (->> "" (get @status-store) vec))) - (is (= [:task-starting :task-running :task-finished] (->> "T1" (get @status-store) vec))) - (is (= [:task-starting :task-running] (->> "T2" (get @status-store) vec))) - (is (= [:task-starting :task-running :task-failed] (->> "T3" (get @status-store) vec))) - (is (= [:task-starting] (->> "T4" (get @status-store) vec))))))) - -(deftest test-framework-message-processing-delegation - (let [framework-message-store (atom []) - heartbeat-store (atom []) - sandbox-store (atom [])] - (with-redefs [heartbeat/notify-heartbeat (fn [_ _ _ framework-message] - (swap! heartbeat-store conj framework-message)) - sandbox/update-sandbox (fn [_ framework-message] - (swap! sandbox-store conj framework-message)) - sched/handle-framework-message (fn [_ _ framework-message] - (swap! framework-message-store conj framework-message))] - (let [s (sched/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil) - make-message (fn [message] (-> message json/write-str str (.getBytes "UTF-8")))] - - (testing "message delegation" - (let [task-id "T1" - executor-id (-> task-id mtypes/->ExecutorID mtypes/data->pb) - m1 {"task-id" task-id} - m2 {"task-id" task-id, "timestamp" 123456, "type" "heartbeat"} - m3 {"exit-code" 0, "task-id" task-id} - m4 {"task-id" task-id, "timestamp" 123456, "type" "heartbeat"} - m5 {"sandbox" "/path/to/a/directory", "task-id" task-id, "type" "directory"}] - - (.frameworkMessage s nil executor-id nil (make-message m1)) - (.frameworkMessage s nil executor-id nil (make-message m2)) - (.frameworkMessage s nil executor-id nil (make-message m3)) - (.frameworkMessage s nil executor-id nil (make-message m4)) - (.frameworkMessage s nil executor-id nil (make-message m5)) - - (let [latch (CountDownLatch. 1)] - (sched/async-in-order-processing task-id #(.countDown latch)) - (.await latch)) - - (is (= [m1 m3] @framework-message-store)) - (is (= [m2 m4] @heartbeat-store)) - (is (= [m5] @sandbox-store)))))))) - -(deftest test-in-order-framework-message-processing - (let [messages-store (atom {}) - latch (CountDownLatch. 11)] - (with-redefs [heartbeat/notify-heartbeat (constantly true) - sched/handle-framework-message - (fn [_ _ framework-message] - (let [{:strs [message task-id]} framework-message] - (swap! messages-store update (str task-id) (fn [messages] (conj (or messages []) message)))) - (Thread/sleep (rand-int 100)) - (.countDown latch))] - (let [s (sched/create-mesos-scheduler nil true nil nil nil nil nil nil nil nil) - foo 11 - bar 21 - fee 31 - fie 41 - make-message (fn [index message] - (-> {"message" message, "task-id" (str "T" index)} - json/write-str - str - (.getBytes "UTF-8")))] - - (.frameworkMessage s nil (-> "" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 0 foo)) - (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 foo)) - (.frameworkMessage s nil (-> "T2" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 2 foo)) - (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 bar)) - (.frameworkMessage s nil (-> "T2" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 2 bar)) - (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 foo)) - (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 bar)) - (.frameworkMessage s nil (-> "T1" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 1 fee)) - (.frameworkMessage s nil (-> "T3" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 3 fie)) - (.frameworkMessage s nil (-> "T4" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 4 foo)) - (.frameworkMessage s nil (-> "" mtypes/->ExecutorID mtypes/data->pb) nil (make-message 0 fie)) - - (.await latch 4 TimeUnit/SECONDS) - - (is (= [foo fie] (->> "T0" (get @messages-store) vec))) - (is (= [foo bar fee] (->> "T1" (get @messages-store) vec))) - (is (= [foo bar] (->> "T2" (get @messages-store) vec))) - (is (= [foo bar fie] (->> "T3" (get @messages-store) vec))) - (is (= [foo] (->> "T4" (get @messages-store) vec))))))) - (defn- task-id->instance-entity [db-conn task-id] (let [datomic-db (d/db db-conn)] diff --git a/scheduler/test/cook/test/zz_simulator.clj b/scheduler/test/cook/test/zz_simulator.clj index 1df6c46c09..3172589200 100644 --- a/scheduler/test/cook/test/zz_simulator.clj +++ b/scheduler/test/cook/test/zz_simulator.clj @@ -18,6 +18,7 @@ [clojure.walk :refer (keywordize-keys)] [com.rpl.specter :refer (transform ALL MAP-VALS MAP-KEYS select FIRST)] [cook.config :refer (executor-config, init-logger)] + [cook.datomic :as datomic] [cook.mesos :as c] [cook.mesos.mesos-mock :as mm] [cook.scheduler.share :as share] @@ -25,7 +26,8 @@ [cook.plugins.completion :as completion] [cook.test.testutil :as testutil :refer (restore-fresh-database! poll-until)] [datomic.api :as d] - [plumbing.core :refer (map-vals map-keys map-from-vals)]) + [plumbing.core :refer (map-vals map-keys map-from-vals)] + [cook.mesos.mesos-compute-cluster :as mcc]) (:import java.util.Date org.apache.curator.framework.CuratorFrameworkFactory org.apache.curator.framework.state.ConnectionStateListener @@ -111,7 +113,6 @@ :executable true :extract false :value "file:///path/to/cook/executor"}} - gpu-enabled?# (or (:gpus-enabled? ~scheduler-config) false) progress-config# {:batch-size 100 :pending-threshold 1000 :publish-interval-ms 2000 @@ -126,33 +127,45 @@ optimizer-config# (or (:optimizer-config ~scheduler-config) {}) trigger-chans# (or (:trigger-chans ~scheduler-config) - (c/make-trigger-chans rebalancer-config# progress-config# optimizer-config# task-constraints#))] + (c/make-trigger-chans rebalancer-config# progress-config# optimizer-config# task-constraints#)) + mesos-heartbeat-chan# (async/chan 1024) + create-compute-cluster# (fn [compute-cluster-name# framework-id# db-id# driver-atom#] + (mcc/->MesosComputeCluster compute-cluster-name# + framework-id# + db-id# + driver-atom# + sandbox-syncer-state# + exit-code-syncer-state# + mesos-heartbeat-chan# + trigger-chans#))] (try (with-redefs [executor-config (constantly executor-config#) completion/plugin completion/no-op ; This initializatioon is needed so the code to validate that the ; registration responses matches the configured cook scheduler passes simulator ; and mesos-mock unit tests. (cook.scheduler, lines 1428 create-mesos-scheduler) - cook.config/framework-id-config (constantly framework-id#)] - (c/start-mesos-scheduler + cook.config/framework-id-config (constantly framework-id#) + mcc/make-mesos-driver ~make-mesos-driver-fn + datomic/conn ~conn] + (testutil/fake-test-compute-cluster-with-driver ~conn + testutil/fake-test-compute-cluster-name + nil ; no dummy driver - simulator is going to call initialize + create-compute-cluster# + framework-id#) + (c/start-leader-selector {:curator-framework curator-framework# - :exit-code-syncer-state exit-code-syncer-state# :fenzo-config fenzo-config# - :framework-id framework-id# - :gpu-enabled? gpu-enabled?# - :make-mesos-driver-fn ~make-mesos-driver-fn :mea-culpa-failure-limit mea-culpa-failure-limit# :mesos-datomic-conn ~conn :mesos-datomic-mult mesos-mult# + :mesos-heartbeat-chan mesos-heartbeat-chan# :mesos-leadership-atom mesos-leadership-atom# :pool-name->pending-jobs-atom pool-name->pending-jobs-atom# :mesos-run-as-user nil :agent-attributes-cache agent-attributes-cache# :offer-incubate-time-ms offer-incubate-time-ms# :optimizer-config optimizer-config# - :progress-config progress-config# :rebalancer-config rebalancer-config# - :sandbox-syncer-state sandbox-syncer-state# :server-config host-settings# :task-constraints task-constraints# :trigger-chans trigger-chans# @@ -336,7 +349,7 @@ rebalancer-trigger-chan (async/chan) optimizer-trigger-chan (async/chan) state-atom (atom {}) - make-mesos-driver-fn (fn [scheduler _] + make-mesos-driver-fn (fn [config scheduler framework-id] (mm/mesos-mock mesos-hosts offer-trigger-chan scheduler :task->runtime-ms task->runtime-ms :task->complete-status task->complete-status @@ -390,7 +403,6 @@ (DateTimeUtils/setCurrentMillisFixed simulation-time) (log/info "Starting simulation at" simulation-time) ;; launch the simulator - (testutil/setup-fake-test-compute-cluster mesos-datomic-conn) (with-cook-scheduler mesos-datomic-conn make-mesos-driver-fn