Skip to content
Browse files

Refactor fileserver into sidecar with progress reporting (#1404)

  • Loading branch information
shamsimam committed Mar 4, 2020
1 parent e50d770 commit c9b7791a5489ca830480fcc4f6ff287dd205cba3
@@ -34,3 +34,4 @@ target
@@ -561,6 +561,15 @@ def submit_jobs(cook_url, job_specs, clones=1, pool=None, headers=None, log_requ
job_specs = [job_specs] * clones
caller = get_caller()

def io_redirect(spec):
if using_kubernetes() and using_kubernetes_default_shell():
# Capture stdout and stderr as files in the sandbox directory
# (i.e., mimick the default mesos behavior on k8s).
# Assuming custom shell commands handle this logic internally.
spec['command'] = f'( {spec["command"]} ) >stdout 2>stderr'
logging.debug(f'Rewrote job {spec["uuid"]} command to capture stdout and stderr.')
return spec

def full_spec(spec):
if 'name' not in spec:
spec['name'] = DEFAULT_JOB_NAME_PREFIX + caller
@@ -569,9 +578,9 @@ def full_spec(spec):
if "name" in spec and spec["name"] is None:
del spec["name"]
return spec
return dict(**spec)

jobs = [full_spec(j) for j in job_specs]
jobs = [io_redirect(full_spec(j)) for j in job_specs]
request_body = {'jobs': jobs}
default_pool = default_submit_pool()
@@ -1262,11 +1271,8 @@ def retrieve_progress_file_env(cook_url):
"""Retrieves the environment variable used by the cook executor to lookup the progress file."""
cook_settings = settings(cook_url)
if using_kubernetes():
return default_env_value
env_value = get_in(cook_settings, 'executor', 'environment', 'EXECUTOR_PROGRESS_OUTPUT_FILE_ENV')
return env_value or default_env_value
env_value = get_in(cook_settings, 'executor', 'environment', 'EXECUTOR_PROGRESS_OUTPUT_FILE_ENV')
return env_value or default_env_value

def get_instance_stats(cook_url, **kwargs):
@@ -1353,10 +1359,22 @@ def is_cook_executor_in_use():
def is_job_progress_supported():
"""Returns true if the current job execution environment supports progress reporting"""
# Mesos supports progress reporting only with Cook Executor,
# but our progress reporter sidecar is always enabled on Kubernetes.
# TODO: Update predicate below once progress sidecar is enabled in k8s (#1404)
return is_cook_executor_in_use() # or using_kubernetes()
# Mesos supports progress reporting only with Cook Executor.
# Our progress reporter sidecar is always enabled on Kubernetes,
# but we only enable the tests if the executor config is also present
# (otherwise some tests fail due to missing environment variables, etc).
return is_cook_executor_in_use() or (using_kubernetes() and _cook_executor_config())

def using_kubernetes_default_shell():
"""Returns true if Kuberentes scheduler is configured with our default command.
Not that this predicate *does not* check whether the Kubernetes scheduler is in use."""
cook_url = retrieve_cook_url()
cook_settings = settings(cook_url)
k8s_custom_shell = get_in(cook_settings, 'kubernetes', 'custom-shell')
default_shell = ['/bin/sh', '-c']
return k8s_custom_shell == default_shell

def slave_cpus(mesos_url, hostname):
@@ -49,7 +49,9 @@
:disallowed-var-names #{"BADVAR"}
:init-container {:command ["/bin/sh" "-c" "echo sample init container running"]
:image "byrnedo/alpine-curl:latest"}
:sidecar {:command ["cook-sidecar"]
:sidecar {;; Note that Cook automatically appends the :port given below
;; as the last argument in this :command vector.
:command ["cook-sidecar" "--file-server-port"]
:image "twosigma/cook-sidecar:latest"
:port 23906
:resource-requirements {:cpu-request 0.1
@@ -265,6 +265,17 @@
:hostname (fnk [[:config {hostname (.getCanonicalHostName (InetAddress/getLocalHost))}]]
:cluster-dns-name (fnk [[:config {cluster-dns-name nil}]]
:scheduler-rest-url (fnk [cluster-dns-name hostname server-https-port server-port]
(let [[scheme port] (if server-https-port
["https" server-https-port]
["http" server-port])
host (or cluster-dns-name
(do (log/warn "Missing cluster-dns-name in config."
"REST callbacks will use the master's hostname.")
(str scheme "://" host ":" port)))
:leader-reports-unhealthy (fnk [[:config {mesos nil}]]
(when mesos
(or (:leader-reports-unhealthy mesos) false)))
@@ -552,6 +563,12 @@
(get-in config [:settings :kubernetes]))

(defn scheduler-rest-url
"Get the URL for REST calls back to the Cook Scheduler API.
Used by Kubernetes pod sidecar to send messages back to Cook."
(get-in config [:settings :scheduler-rest-url]))

(defn task-constraints
(get-in config [:settings :task-constraints]))
@@ -5,6 +5,7 @@
[cook.kubernetes.metrics :as metrics]
[datomic.api :as d]
[cook.config :as config]
[cook.task :as task]
[ :as util]
[metrics.meters :as meters]
[metrics.timers :as timers]
@@ -448,17 +449,13 @@
(.setOperator "Exists")
(.setEffect "PreferNoSchedule")))

(defn param-env-vars
(defn build-params-env
"Make environment vars map from docker parameters"
(->> params
(filter (fn [{:keys [key]}]
(= key "env")))
(map (fn [{:keys [value]}]
(let [[env-var-name env-var-value] (str/split value #"=")
env (V1EnvVar.)]
(.setName env (str env-var-name))
(.setValue env (str env-var-value))
(pc/for-map [{:keys [key value]} params
:when (= "env" key)
:let [[var-key var-val] (str/split value #"=")]]
var-key var-val))

(defn make-security-context
[params user]
@@ -485,13 +482,22 @@
(:value workdir-param)
(:default-workdir (config/kubernetes)))))

(defn filter-env-vars
(let [{:keys [disallowed-var-names]} (config/kubernetes)]
(->> env-vars
(filter (fn [^V1EnvVar var]
(not (contains? disallowed-var-names (.getName var)))))
(into []))))
(defn- make-env
"Make a kubernetes environment variable"
[var-name var-value]
(doto (V1EnvVar.)
(.setName (str var-name))
(.setValue (str var-value))))

(defn make-filtered-env-vars
"Create a Kubernetes API compatible var list from an environment vars map,
with variables filtered based on disallowed-var-names in the Kubernetes config."
(let [{:keys [disallowed-var-names]} (config/kubernetes)
disallowed-var? #(contains? disallowed-var-names (key %))]
(->> env
(remove disallowed-var?)
(mapv #(apply make-env %)))))

(defn- add-as-decimals
"Takes two doubles and adds them as decimals to avoid floating point error. Kubernetes will not be able to launch a
@@ -508,20 +514,11 @@
:mem (add-as-decimals mem memory-request))

(defn- make-env
"Make a kubernetes environment variable"
[key value]
(.. (V1EnvVarBuilder.)
(withName key)
(withValue value)

(defn- add-node-selector
"Adds a node selector with the given key and val to the given pod spec"
[^V1PodSpec pod-spec key val]
(.setNodeSelector pod-spec (-> pod-spec
(or {})
(assoc key val))))

(defn ^V1Pod task-metadata->pod
@@ -538,16 +535,12 @@
{:strs [mem cpus]} scalar-requests
{:keys [docker volumes]} container
{:keys [image parameters]} docker
{:keys [job/progress-output-file job/progress-regex-string]} job
{:keys [environment]} command
pod (V1Pod.)
pod-spec (V1PodSpec.)
metadata (V1ObjectMeta.)
container (V1Container.)
env (map (fn [[k v]]
(let [env (V1EnvVar.)]
(.setName env (str k))
(.setValue env (str v))
(:environment command))
resources (V1ResourceRequirements.)
labels (assoc pod-labels cook-pod-label compute-cluster-name)
pool-name (some-> job :job/pool :pool/name)
@@ -566,9 +559,20 @@
sidecar-workdir "/mnt/sidecar"
sidecar-workdir-volume (when use-cook-sidecar? (make-empty-volume "cook-sidecar-workdir-volume"))
sidecar-workdir-volume-mount-fn (partial make-volume-mount sidecar-workdir-volume sidecar-workdir)
workdir-env-vars [(make-env "HOME" workdir)
(make-env "MESOS_SANDBOX" workdir)
(make-env "SIDECAR_WORKDIR" sidecar-workdir)]]
workdir-env {"HOME" workdir
"SIDECAR_WORKDIR" sidecar-workdir}
params-env (build-params-env parameters)
progress-env (task/build-executor-environment job)
main-env-base (merge environment params-env progress-env workdir-env)
progress-file-var (get main-env-base task/progress-meta-env-name task/default-progress-env-name)
progress-file-path (get main-env-base progress-file-var)
main-env (cond-> main-env-base
;; Add a default progress file path to the environment when missing,
;; preserving compatibility with Meosos + Cook Executor.
(not progress-file-path)
(assoc progress-file-var (str task-id ".progress")))
main-env-vars (make-filtered-env-vars main-env)]

; metadata
(.setName metadata (str task-id))
@@ -578,11 +582,7 @@
; container
(.setName container cook-container-name-for-job)
(.setCommand container (conj (or (when use-cook-init? custom-shell) ["/bin/sh" "-c"]) (:value command)))
(.setEnv container (-> []
(into env)
(into (param-env-vars parameters))
(into workdir-env-vars)
(.setEnv container main-env-vars)
(.setImage container image)

(.putRequestsItem resources "memory" (double->quantity (* memory-multiplier mem)))
@@ -628,9 +628,11 @@
(.setWorkingDir container sidecar-workdir)
(.setPorts container [(.containerPort (V1ContainerPort.) (int port))])

(.setEnv container [(make-env "COOK_WORKDIR" workdir)])
(.setEnv container (conj main-env-vars
(make-env "COOK_SCHEDULER_REST_URL" (config/scheduler-rest-url))
(make-env "COOK_WORKDIR" workdir)))

(.setPort http-get-action (IntOrString. port))
(.setPort http-get-action (-> port int IntOrString.))
(.setPath http-get-action "readiness-probe")
(.setHttpGet readiness-probe http-get-action)
(.setReadinessProbe container readiness-probe)
@@ -19,6 +19,7 @@
[ :as log]
[cook.compute-cluster :as cc]
[cook.config :as config]
[cook.task :as task]
[ :as util]
[mesomatic.types :as mtypes]
[plumbing.core :refer (map-vals)])
@@ -62,21 +63,6 @@
(or (= :executor/cook (:job/executor job-ent))
(cook-executor-candidate? job-ent))))

(defn build-executor-environment
"Build the environment for the cook executor."
(let [{:keys [default-progress-regex-string environment log-level max-message-length progress-sample-interval-ms]}
progress-output-file (:job/progress-output-file job-ent)]
(cond-> (assoc environment
"EXECUTOR_MAX_MESSAGE_LENGTH" max-message-length
"PROGRESS_REGEX_STRING" (:job/progress-regex-string job-ent default-progress-regex-string)
"PROGRESS_SAMPLE_INTERVAL_MS" progress-sample-interval-ms)
"EXECUTOR_PROGRESS_OUTPUT_FILE_NAME" progress-output-file))))

(defn merge-container-defaults
"Takes a job container specification and applies any defaults from the config.
Currently only supports volumes."
@@ -146,7 +132,7 @@
(:cpus resources) (assoc "COOK_JOB_CPUS" (-> resources :cpus str))
(:gpus resources) (assoc "COOK_JOB_GPUS" (-> resources :gpus str))
(:mem resources) (assoc "COOK_JOB_MEM_MB" (-> resources :mem str))
cook-executor? (merge (build-executor-environment job-ent)))
cook-executor? (merge (task/build-executor-environment job-ent)))
labels (util/job-ent->label job-ent)
command {:environment environment
:uris (cond-> (:uris resources [])
@@ -16,6 +16,7 @@
(ns cook.task
[cook.compute-cluster :as cc]
[cook.config :as config]
[datomic.api :as d]))

(defn task-entity-id->task-id
@@ -46,3 +47,35 @@
(some-> task-ent

(def progress-meta-env-name
"Meta environment variable name for declaring the environment variable
that stores the path of the progress output file."

(def progress-meta-env-value
"Default name of the the environment variable stores the path of the progress output file,
used as the default value for the meta-env-var described above."

(def default-progress-env-name
"Default name of the the environment variable stores the path of the progress output file,
only used when the meta-env-var is not set. Must correspond to the default file name in the executor."

(defn build-executor-environment
"Build the environment for the job's executor and/or progress monitor."
(let [{:keys [default-progress-regex-string environment log-level max-message-length
progress-sample-interval-ms] :as executor-config} (config/executor-config)
progress-output-file (:job/progress-output-file job-ent)]
(cond-> environment
(seq executor-config)
"EXECUTOR_MAX_MESSAGE_LENGTH" max-message-length
"PROGRESS_REGEX_STRING" (:job/progress-regex-string job-ent default-progress-regex-string)
"PROGRESS_SAMPLE_INTERVAL_MS" progress-sample-interval-ms)
(assoc progress-meta-env-name progress-meta-env-value
progress-meta-env-value progress-output-file))))
@@ -90,12 +90,14 @@
(is (not (nil? cook-workdir-volume)))
(is (not (nil? (.getEmptyDir cook-workdir-volume)))))

(let [^V1Container container (-> pod .getSpec .getContainers first)]
(let [^V1Container container (-> pod .getSpec .getContainers first)
container-env (.getEnv container)]
(is (= "required-cook-job-container" (.getName container)))
(is (= ["/bin/sh" "-c" "foo && bar"] (.getCommand container)))
(is (= "alpine:latest" (.getImage container)))
(is (not (nil? container)))
(is (= 4 (count (.getEnv container))))
(->> container-env (map #(.getName %)) sort)))
(is (= "/mnt/sandbox" (.getWorkingDir container)))
(let [cook-workdir-mount (->> container

0 comments on commit c9b7791

Please sign in to comment.
You can’t perform that action at this time.