-
-
Notifications
You must be signed in to change notification settings - Fork 148
/
supervisor.ex
112 lines (99 loc) · 3.54 KB
/
supervisor.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
defmodule Quantum.Supervisor do
@moduledoc false
use Supervisor
# Starts the quantum supervisor.
@spec start_link(GenServer.server(), Keyword.t()) :: GenServer.on_start()
def start_link(quantum, opts) do
name = Keyword.take(opts, [:name])
Supervisor.start_link(__MODULE__, {quantum, opts}, name)
end
@impl Supervisor
def init({scheduler, opts}) do
%{
storage: storage,
scheduler: ^scheduler,
task_supervisor_name: task_supervisor_name,
storage_name: storage_name,
task_registry_name: task_registry_name,
clock_broadcaster_name: clock_broadcaster_name,
job_broadcaster_name: job_broadcaster_name,
execution_broadcaster_name: execution_broadcaster_name,
node_selector_broadcaster_name: node_selector_broadcaster_name,
executor_supervisor_name: executor_supervisor_name
} =
opts =
opts
|> scheduler.config
|> quantum_init(scheduler)
|> Map.new()
task_supervisor_opts = [name: task_supervisor_name]
storage_opts =
opts
|> Map.get(:storage_opts, [])
|> Keyword.put(:scheduler, scheduler)
|> Keyword.put(:name, storage_name)
task_registry_opts = %Quantum.TaskRegistry.StartOpts{name: task_registry_name}
clock_broadcaster_opts =
struct!(
Quantum.ClockBroadcaster.StartOpts,
opts
|> Map.take([:debug_logging, :storage, :scheduler])
|> Map.put(:name, clock_broadcaster_name)
|> Map.put(:start_time, NaiveDateTime.utc_now())
)
job_broadcaster_opts =
struct!(
Quantum.JobBroadcaster.StartOpts,
opts
|> Map.take([:jobs, :storage, :scheduler, :debug_logging])
|> Map.put(:name, job_broadcaster_name)
)
execution_broadcaster_opts =
struct!(
Quantum.ExecutionBroadcaster.StartOpts,
opts
|> Map.take([
:storage,
:scheduler,
:debug_logging
])
|> Map.put(:job_broadcaster_reference, job_broadcaster_name)
|> Map.put(:clock_broadcaster_reference, clock_broadcaster_name)
|> Map.put(:name, execution_broadcaster_name)
)
node_selector_broadcaster_opts = %Quantum.NodeSelectorBroadcaster.StartOpts{
execution_broadcaster_reference: execution_broadcaster_name,
task_supervisor_reference: task_supervisor_name,
name: node_selector_broadcaster_name
}
executor_supervisor_opts =
struct!(
Quantum.ExecutorSupervisor.StartOpts,
opts
|> Map.take([:debug_logging])
|> Map.put(:node_selector_broadcaster_reference, node_selector_broadcaster_name)
|> Map.put(:task_supervisor_reference, task_supervisor_name)
|> Map.put(:task_registry_reference, task_registry_name)
|> Map.put(:name, executor_supervisor_name)
|> Map.put(:scheduler, scheduler)
)
Supervisor.init(
[
{Task.Supervisor, task_supervisor_opts},
{storage, storage_opts},
{Quantum.ClockBroadcaster, clock_broadcaster_opts},
{Quantum.TaskRegistry, task_registry_opts},
{Quantum.JobBroadcaster, job_broadcaster_opts},
{Quantum.ExecutionBroadcaster, execution_broadcaster_opts},
{Quantum.NodeSelectorBroadcaster, node_selector_broadcaster_opts},
{Quantum.ExecutorSupervisor, executor_supervisor_opts}
],
strategy: :rest_for_one
)
end
# Run Optional Callback in Quantum Scheduler Implementation
@spec quantum_init(Keyword.t(), atom) :: Keyword.t()
defp quantum_init(config, scheduler) do
scheduler.init(config)
end
end