/
cron.ex
237 lines (180 loc) · 6.39 KB
/
cron.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
defmodule Oban.Plugins.Cron do
@moduledoc """
Periodically enqueue jobs through CRON based scheduling.
## Using the Plugin
Schedule various jobs using `{expr, worker}` and `{expr, worker, opts}` syntaxes:
config :my_app, Oban,
plugins: [
{Oban.Plugins.Cron,
crontab: [
{"* * * * *", MyApp.MinuteWorker},
{"0 * * * *", MyApp.HourlyWorker, args: %{custom: "arg"}},
{"0 0 * * *", MyApp.DailyWorker, max_attempts: 1},
{"0 12 * * MON", MyApp.MondayWorker, queue: :scheduled, tags: ["mondays"]},
{"@daily", MyApp.AnotherDailyWorker}
]}
]
## Options
* `:crontab` — a list of cron expressions that enqueue jobs on a periodic basis. See [Periodic
Jobs][perjob] in the Oban module docs for syntax and details.
* `:timezone` — which timezone to use when scheduling cron jobs. To use a timezone other than
the default of "Etc/UTC" you *must* have a timezone database like [tzdata][tzdata] installed
and configured.
[tzdata]: https://hexdocs.pm/tzdata
[perjob]: Oban.html#module-periodic-jobs
## Instrumenting with Telemetry
The `Oban.Plugins.Cron` plugin adds the following metadata to the `[:oban, :plugin, :stop]` event:
* :jobs - a list of jobs that were inserted into the database
"""
use GenServer
alias Oban.Cron.Expression
alias Oban.{Config, Job, Query, Worker}
@type cron_opt ::
{:args, Job.args()}
| {:max_attempts, pos_integer()}
| {:paused, boolean()}
| {:priority, 0..3}
| {:queue, atom() | binary()}
| {:tags, Job.tags()}
@type cron_input :: {binary(), module(), [cron_opt()]}
@type option ::
{:conf, Config.t()}
| {:name, GenServer.name()}
| {:crontab, [cron_input()]}
| {:timezone, Calendar.time_zone()}
defmodule State do
@moduledoc false
defstruct [
:conf,
:name,
:timer,
crontab: [],
lock_key: 1_149_979_440_242_868_001,
timezone: "Etc/UTC"
]
end
@spec start_link([option()]) :: GenServer.on_start()
def start_link(opts) do
validate!(opts)
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end
@doc false
@spec validate!(Keyword.t()) :: :ok
def validate!(opts) when is_list(opts) do
Enum.each(opts, &validate_opt!/1)
end
@doc false
@spec interval_to_next_minute(Time.t()) :: pos_integer()
def interval_to_next_minute(time \\ Time.utc_now()) do
time
|> Time.add(60)
|> Map.put(:second, 0)
|> Time.diff(time)
|> Integer.mod(86_400)
|> :timer.seconds()
end
@impl GenServer
def init(opts) do
Process.flag(:trap_exit, true)
state =
State
|> struct!(opts)
|> parse_crontab()
{:ok, state, {:continue, :start}}
end
@impl GenServer
def handle_continue(:start, %State{} = state) do
handle_info(:evaluate, state)
end
@impl GenServer
def terminate(_reason, %State{timer: timer}) do
if is_reference(timer), do: Process.cancel_timer(timer)
:ok
end
@impl GenServer
def handle_info(:evaluate, %State{} = state) do
state = schedule_evaluate(state)
meta = %{conf: state.conf, plugin: __MODULE__}
:telemetry.span([:oban, :plugin], meta, fn ->
case lock_and_insert_jobs(state) do
{:ok, inserted_jobs} when is_list(inserted_jobs) ->
{:ok, Map.put(meta, :jobs, inserted_jobs)}
{:ok, false} ->
{:ok, Map.put(meta, :jobs, [])}
error ->
{:error, Map.put(meta, :error, error)}
end
end)
{:noreply, state}
end
# Scheduling Helpers
defp schedule_evaluate(state) do
timer = Process.send_after(self(), :evaluate, interval_to_next_minute())
%{state | timer: timer}
end
# Parsing & Validation Helpers
defp parse_crontab(%State{crontab: crontab} = state) do
parsed =
Enum.map(crontab, fn
{expression, worker} -> {Expression.parse!(expression), worker, []}
{expression, worker, opts} -> {Expression.parse!(expression), worker, opts}
end)
%{state | crontab: parsed}
end
defp validate_opt!({:crontab, crontab}) do
unless is_list(crontab) do
raise ArgumentError, "expected :crontab to be a list, got: #{inspect(crontab)}"
end
Enum.each(crontab, &validate_crontab!/1)
end
defp validate_opt!({:timezone, timezone}) do
unless is_binary(timezone) and match?({:ok, _}, DateTime.now(timezone)) do
raise ArgumentError, "expected :timezone to be a known timezone"
end
end
defp validate_opt!(_opt), do: :ok
defp validate_crontab!({expression, worker, opts}) do
%Expression{} = Expression.parse!(expression)
unless Code.ensure_loaded?(worker) do
raise ArgumentError, "#{inspect(worker)} not found or can't be loaded"
end
unless function_exported?(worker, :perform, 1) do
raise ArgumentError, "#{inspect(worker)} does not implement `perform/1` callback"
end
unless Keyword.keyword?(opts) do
raise ArgumentError,
"#{inspect(worker)} options must be a keyword list, got: #{inspect(opts)}"
end
end
defp validate_crontab!({expression, worker}) do
validate_crontab!({expression, worker, []})
end
defp validate_crontab!(invalid) do
raise ArgumentError,
"expected crontab entry to be an {expression, worker} or " <>
"{expression, worker, options} tuple, got: #{inspect(invalid)}"
end
# Inserting Helpers
defp lock_and_insert_jobs(state) do
Query.with_xact_lock(state.conf, state.lock_key, fn ->
insert_jobs(state.conf, state.crontab, state.timezone)
end)
end
defp insert_jobs(conf, crontab, timezone) do
{:ok, datetime} = DateTime.now(timezone)
for {expr, worker, opts} <- crontab, Expression.now?(expr, datetime) do
{args, opts} = Keyword.pop(opts, :args, %{})
opts = unique_opts(worker.__opts__(), opts)
{:ok, job} = Query.fetch_or_insert_job(conf, worker.new(args, opts))
job
end
end
# Make each job unique for 59 seconds to prevent double-enqueue if the node or scheduler
# crashes. The minimum resolution for our cron jobs is 1 minute, so there is potentially
# a one second window where a double enqueue can happen.
defp unique_opts(worker_opts, crontab_opts) do
[unique: [period: 59]]
|> Keyword.merge(worker_opts, &Worker.resolve_opts/3)
|> Keyword.merge(crontab_opts, &Worker.resolve_opts/3)
end
end