Skip to content
Browse files

Support multiple kubernetes compute clusters (#1290)

  • Loading branch information...
scrosby authored and dposada committed Nov 8, 2019
1 parent 34f0adb commit ca405b85ee8f21bc666f352da3f96ffd44b092f8
@@ -326,28 +326,29 @@ def test_mea_culpa_retries(self):

def test_compute_cluster(self):
settings_dict = util.settings(self.cook_url)
if util.using_kubernetes():
expected_compute_cluster_type = 'kubernetes'
elif util.using_mesos():
expected_compute_cluster_type = 'mesos'
else:'Unable to determine compute cluster type')
expected_compute_cluster = settings_dict['compute-clusters'][0]['config']['compute-cluster-name']
expected_mesos_framework = settings_dict['compute-clusters'][0]['config'].get('framework-id', None)

job_uuid, resp = util.submit_job(self.cook_url)

self.assertEqual(resp.status_code, 201, msg=resp.content)

instance = util.wait_for_instance(self.cook_url, job_uuid)
message = repr(instance)

self.assertIsNotNone(instance['compute-cluster'], message)
self.assertEqual(expected_compute_cluster_type, instance['compute-cluster']['type'], message)
self.assertEqual(expected_compute_cluster, instance['compute-cluster']['name'], message)
if expected_mesos_framework is not None:

instance_compute_cluster_name = instance['compute-cluster']['name']
instance_compute_cluster_type = instance['compute-cluster']['type']
self.assertIn(instance_compute_cluster_type, ['mesos', 'kubernetes'], message)
filtered_compute_clusters = [compute_cluster for compute_cluster in settings_dict['compute-clusters']
if compute_cluster['config']['compute-cluster-name'] == instance_compute_cluster_name]
self.assertEqual(1, len(filtered_compute_clusters), "Unable to find " + instance_compute_cluster_name + " in compute clusters")
found_compute_cluster = filtered_compute_clusters[0]

self.assertIsNotNone(found_compute_cluster, message + str(settings_dict['compute-clusters']))

self.assertEqual(util.get_compute_cluster_type(found_compute_cluster), instance_compute_cluster_type, message)
if found_compute_cluster['factory-fn'] == 'cook.mesos.mesos-compute-cluster/factory-fn':
expected_mesos_framework = found_compute_cluster['config'].get('framework-id', None)
self.assertEqual(expected_mesos_framework, instance['compute-cluster']['mesos']['framework-id'],
@@ -1332,6 +1332,7 @@ def get_kubernetes_nodes():'Retrieved kubernetes nodes: {node_json}')
return node_json['items']

def kubernetes_node_pool(nodename):
node = [node for node in get_kubernetes_nodes() if node['metadata']['name'] == nodename]
@@ -1364,6 +1365,17 @@ def max_kubernetes_node_cpus():
for n in nodes])

def get_compute_cluster_type(compute_cluster_dictionary):
if compute_cluster_dictionary is None:
raise Exception("compute-cluster-dictionary is None. Cannot determine the type")
elif compute_cluster_dictionary['factory-fn'] == 'cook.mesos.mesos-compute-cluster/factory-fn':
return 'mesos'
elif compute_cluster_dictionary['factory-fn'] == 'cook.kubernetes.compute-cluster/factory-fn':
return 'kubernetes'
raise Exception(
"compute-cluster-dictionary is " + repr(compute_cluster_dictionary) + " Cannot determine the type")

def task_constraint_cpus(cook_url):
"""Returns the max cpus that can be submitted to the cluster"""
task_constraint_cpus = settings(cook_url)['task-constraints']['cpus']
export DATA_LOCAL_ENDPOINT="http://localhost:35847/retrieve-costs"
export COOK_K8S_CONFIG_FILE=~/.kube/config

echo "Starting cook..."
lein run config-k8s.edn
@@ -20,9 +20,13 @@
; (like test_default_container_volumes) will fail without this set.
:run-as-user "root"}
:compute-clusters [{:factory-fn cook.kubernetes.compute-cluster/factory-fn
:config {:compute-cluster-name "minikube"
;; Location of the kubernetes config file, e.g. $HOME/.kube/config
:config-file #config/env "COOK_K8S_CONFIG_FILE"}}]
:config {:compute-cluster-name "gke-1"
;; Location of the kubernetes config file. Hardcoded to the location specified by bin/make-gke-test-cluster
:config-file "../scheduler/.cook_kubeconfig_1"}}
{:factory-fn cook.kubernetes.compute-cluster/factory-fn
:config {:compute-cluster-name "gke-2"
;; Location of the kubernetes config file. Hardcoded to the location specified by bin/make-gke-test-cluster
:config-file "../scheduler/.cook_kubeconfig_2"}}]
:cors-origins ["https?://"]
:data-local {:fitness-calculator {:cache-ttl-ms 60000
:cost-endpoint #config/env "DATA_LOCAL_ENDPOINT"
@@ -79,13 +79,13 @@

(def raw-scheduler-routes
{:scheduler (fnk [mesos mesos-leadership-atom pool-name->pending-jobs-atom settings]
{:scheduler (fnk [mesos leadership-atom pool-name->pending-jobs-atom settings]
((util/lazy-load-var '
(fn [] @pool-name->pending-jobs-atom)
(get-in mesos [:mesos-scheduler :leader-selector])
:view (fnk [scheduler]

@@ -100,7 +100,7 @@
fenzo-floor-iterations-before-warn fenzo-max-jobs-considered fenzo-scaleback
good-enough-fitness hostname mea-culpa-failure-limit mesos-leader-path mesos-run-as-user
offer-incubate-time-ms optimizer rebalancer server-port task-constraints]
compute-clusters curator-framework mesos-datomic-mult mesos-leadership-atom
compute-clusters curator-framework mesos-datomic-mult leadership-atom
mesos-agent-attributes-cache pool-name->pending-jobs-atom mesos-heartbeat-chan
(if (cook.config/api-only-mode?)
@@ -125,7 +125,7 @@
:mesos-datomic-conn datomic/conn
:mesos-datomic-mult mesos-datomic-mult
:mesos-heartbeat-chan mesos-heartbeat-chan
:mesos-leadership-atom mesos-leadership-atom
:leadership-atom leadership-atom
:pool-name->pending-jobs-atom pool-name->pending-jobs-atom
:mesos-run-as-user mesos-run-as-user
:agent-attributes-cache mesos-agent-attributes-cache
@@ -144,11 +144,11 @@

(defn health-check-middleware
"This adds /debug to return 200 OK"
[h mesos-leadership-atom leader-reports-unhealthy]
[h leadership-atom leader-reports-unhealthy]
(fn healthcheck [req]
(if (and (= (:uri req) "/debug")
(= (:request-method req) :get))
{:status (if (and leader-reports-unhealthy @mesos-leadership-atom)
{:status (if (and leader-reports-unhealthy @leadership-atom)
:headers {}
@@ -246,7 +246,7 @@
:http-server (fnk [[:settings cors-origins server-port authorization-middleware impersonation-middleware
leader-reports-unhealthy server-https-port server-keystore-path server-keystore-type
server-keystore-pass [:rate-limit user-limit]]
[:route view] mesos-leadership-atom]
[:route view] leadership-atom]
(log/info "Launching http server")
(let [rate-limit-storage (storage/local-storage)
jetty ((util/lazy-load-var 'qbits.jet.server/run-jetty)
@@ -264,7 +264,7 @@
(cors/cors-middleware cors-origins)
(health-check-middleware mesos-leadership-atom leader-reports-unhealthy)
(health-check-middleware leadership-atom leader-reports-unhealthy)
:join? false
@@ -311,10 +311,10 @@
datomic/conn publish-batch-size publish-interval-ms))
:trigger-chans (fnk [[:settings rebalancer progress optimizer task-constraints]]
((util/lazy-load-var 'cook.mesos/make-trigger-chans) rebalancer progress optimizer task-constraints))
:clear-uncommitted-canceler (fnk [mesos-leadership-atom]
:clear-uncommitted-canceler (fnk [leadership-atom]
((util/lazy-load-var '
datomic/conn mesos-leadership-atom))
:mesos-leadership-atom (fnk [] (atom false))
datomic/conn leadership-atom))
:leadership-atom (fnk [] (atom false))
:pool-name->pending-jobs-atom (fnk [] (atom {}))
:mesos-agent-attributes-cache (fnk [[:settings {agent-attributes-cache nil}]]
(when agent-attributes-cache
@@ -32,9 +32,6 @@
We expect Cook to give up leadership when a compute cluster loses leadership, so leadership is not expected to be regained.
The channel result will be an exception if an error occurred, or a status message if leadership was lost normally.")

(current-leader? [this]
"Returns true if this cook instance is currently the leader for the compute cluster")

(kill-task [this task-id]
"Kill the task with the given task id")

@@ -210,9 +210,6 @@
; We keep leadership indefinitely in kubernetes.
(async/chan 1))

(current-leader? [this]

(pending-offers [this pool-name]
(let [nodes @current-nodes-atom
pods @all-pods-atom
@@ -133,7 +133,7 @@
fenzo-config -- map, config for fenzo, See scheduler/docs/configuration.adoc for more details
sandbox-syncer-state -- map, representing the sandbox syncer object"
[{:keys [curator-framework fenzo-config mea-culpa-failure-limit mesos-datomic-conn mesos-datomic-mult
mesos-heartbeat-chan mesos-leadership-atom pool-name->pending-jobs-atom mesos-run-as-user agent-attributes-cache
mesos-heartbeat-chan leadership-atom pool-name->pending-jobs-atom mesos-run-as-user agent-attributes-cache
offer-incubate-time-ms optimizer-config rebalancer-config server-config task-constraints trigger-chans
(let [{:keys [fenzo-fitness-calculator fenzo-floor-iterations-before-reset fenzo-floor-iterations-before-warn
@@ -142,8 +142,7 @@
rebalancer-trigger-chan straggler-trigger-chan]} trigger-chans
{:keys [hostname server-port server-https-port]} server-config
datomic-report-chan (async/chan (async/sliding-buffer 4096))

compute-cluster (cc/get-default-cluster-for-legacy)
cluster-name->compute-cluster @cc/cluster-name->compute-cluster-atom
rebalancer-reservation-atom (atom {})
_ (log/info "Using path" zk-prefix "for leader selection")
leader-selector (LeaderSelector.
@@ -154,7 +153,7 @@
(reify LeaderSelectorListener
(takeLeadership [_ client]
(log/warn "Taking leadership")
(reset! mesos-leadership-atom true)
(reset! leadership-atom true)
;; TODO: get the framework ID and try to reregister
(let [normal-exit (atom true)]
@@ -177,9 +176,14 @@
:task-constraints task-constraints
:trigger-chans trigger-chans})
running-tasks-ents ( (d/db mesos-datomic-conn))
cluster-leadership-chan (cc/initialize-cluster compute-cluster
cluster-connected-chans (->> cluster-name->compute-cluster
(map #(cc/initialize-cluster %
; Note: This doall has a critical side effect of actually initializing
; all of the clusters.
; Many of these should look at the compute-cluster of the underlying jobs, and not use driver at all.
(cook.scheduler.scheduler/lingering-task-killer mesos-datomic-conn
@@ -214,15 +218,23 @@
(async/tap mesos-datomic-mult datomic-report-chan)
(cook.scheduler.scheduler/monitor-tx-report-queue datomic-report-chan mesos-datomic-conn)
; Curator expects takeLeadership to block until voluntarily surrendering leadership.
; Block on cluster-leadership-chan to hold ZK leadership unless we lose mesos leadership.
(let [res (async/<!! cluster-leadership-chan)]
; We need to block here until we're willing to give up leadership.
; We block until any of the cluster-connected-chans unblock. This happens when the compute cluster
; loses connectivity to the backend. For now, we want to treat mesos as special. When we lose our mesos
; driver connection to the backend, we want cook to suicide. If we lose any of our kubernetes connections
; we ignore it and work with the remaining cluster.
; WARNING: This code is very misleading. It looks like we'll suicide if ANY of the clusters lose leadership.
; However, the kubernetes compute clusters never put anything on their chan, so this is the equivalent of only looking at mesos.
; We didn't want to implement the special case for mesos.
(let [res (async/<!! (async/merge cluster-connected-chans))]
(when (instance? Throwable res)
(throw res))))
(catch Throwable e
(log/error e "Lost leadership due to exception")
(reset! normal-exit false))
(reset! mesos-leadership-atom false)
(counters/dec! mesos-leader)
(when @normal-exit
(log/warn "Lost leadership naturally"))
@@ -234,8 +246,7 @@
;; We will give up our leadership whenever it seems that we lost
;; ZK connection
(when (#{ConnectionState/LOST ConnectionState/SUSPENDED} newState)
(reset! mesos-leadership-atom false)
(when (cc/current-leader? compute-cluster)
(when @leadership-atom
(counters/dec! mesos-leader)
;; Better to fail over and rely on start up code we trust then rely on rarely run code
;; to make sure we yield leadership correctly (and fully)
@@ -235,9 +235,6 @@
(db-id [this]

(current-leader? [this]
(not (nil? @driver-atom)))

(initialize-cluster [this pool->fenzo _]
(log/info "Initializing Mesos compute cluster" compute-cluster-name)
(let [settings (:settings config/config)
@@ -1942,7 +1942,7 @@

(timers/deftimer [cook-scheduler handler queue-endpoint])
(defn waiting-jobs
[conn mesos-pending-jobs-fn is-authorized-fn mesos-leadership-atom leader-selector]
[conn mesos-pending-jobs-fn is-authorized-fn leadership-atom leader-selector]
:available-media-types ["application/json"]
:allowed-methods [:get]
@@ -1963,10 +1963,10 @@
(log/info user " has failed auth")
[false {::error "Unauthorized"}]))))
:exists? (fn [_] [@mesos-leadership-atom {}])
:exists? (fn [_] [@leadership-atom {}])
:existed? (fn [_] [true {}])
:moved-temporarily? (fn [_]
(if @mesos-leadership-atom
(if @leadership-atom
[false {}]
[true {:location (str (leader-url leader-selector) "/queue")}]))
:handle-forbidden (fn [ctx]
@@ -2903,7 +2903,7 @@
gpu-enabled? :mesos-gpu-enabled
:as settings}
@@ -3165,7 +3165,7 @@
:handler (data-local-update-time-handler conn)}}))

(ANY "/queue" []
(waiting-jobs conn mesos-pending-jobs-fn is-authorized-fn mesos-leadership-atom leader-selector))
(waiting-jobs conn mesos-pending-jobs-fn is-authorized-fn leadership-atom leader-selector))
(ANY "/running" []
(running-jobs conn is-authorized-fn))
(ANY "/list" []

0 comments on commit ca405b8

Please sign in to comment.
You can’t perform that action at this time.