This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
common_task_scheduler.clj
47 lines (38 loc) · 1.68 KB
/
common_task_scheduler.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
(ns onyx.scheduling.common-task-scheduler
(:require [clojure.core.async :refer [chan go >! <! close! >!!]]
[clojure.set :refer [union difference map-invert]]
[clojure.data :refer [diff]]
[com.stuartsierra.component :as component]
[onyx.log.commands.common :as common]
[onyx.extensions :as extensions]
[taoensso.timbre]))
(defn preallocated-grouped-task? [replica job task]
(and (= :kill (get-in replica [:flux-policies job task]))
(> (count (get-in replica [:allocations job task])) 0)))
(defmulti task-distribute-peer-count
(fn [replica job-id n]
(get-in replica [:task-schedulers job-id])))
(defmulti task-constraints
(fn [replica jobs task-capacities peer->vm task->node no-op-node job-id]
(get-in replica [:task-schedulers job-id])))
(defmulti assign-capacity-constraint?
(fn [replica job-id]
(get-in replica [:task-schedulers job-id])))
(defmulti choose-downstream-peers
(fn [replica job-id peer-config this-peer downstream-peers]
(get-in replica [:task-schedulers job-id])))
(defmulti choose-acker
(fn [replica job-id peer-config this-peer candidates]
(get-in replica [:task-schedulers job-id])))
(defmethod task-distribute-peer-count :default
[replica job n]
(throw (ex-info (format "Task scheduler %s not recognized" (get-in replica [:task-schedulers job]))
{:task-scheduler (get-in replica [:task-schedulers job])
:replica replica
:job job})))
(defmethod task-constraints :default
[replica jobs task-capacities peer->vm task->node no-op-node job-id]
[])
(defmethod assign-capacity-constraint? :default
[replica job-id]
true)