/
testing.ex
592 lines (428 loc) 路 16.1 KB
/
testing.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
defmodule Oban.Testing do
@moduledoc """
This module simplifies testing workers and making assertions about enqueued jobs when testing in
`:manual` mode.
Assertions may be made on any property of a job, but you'll typically want to check by `args`,
`queue` or `worker`. If you're using namespacing through PostgreSQL schemas, also called
"prefixes" in Ecto, you should use the `prefix` option when doing assertions about enqueued
jobs during testing. By default the `prefix` option is `public`.
## Using in Tests
The most convenient way to use `Oban.Testing` is to `use` the module:
use Oban.Testing, repo: MyApp.Repo
That will define the helper functions you'll use to make assertions on the jobs that should (or
should not) be inserted in the database while testing.
Along with the `repo` you can also specify an alternate prefix to use in all assertions:
use Oban.Testing, repo: MyApp.Repo, prefix: "business"
Some example assertions:
```elixir
# Assert that a job was already enqueued
assert_enqueued worker: MyWorker, args: %{id: 1}
# Assert that a job was enqueued or will be enqueued in the next 100ms
assert_enqueued [worker: MyWorker, args: %{id: 1}], 100
# Refute that a job was already enqueued
refute_enqueued queue: "special", args: %{id: 2}
# Refute that a job was already enqueued or would be enqueued in the next 100ms
refute_enqueued queue: "special", args: %{id: 2}, 100
# Make assertions on a list of all jobs matching some options
assert [%{args: %{"id" => 1}}] = all_enqueued(worker: MyWorker)
# Assert that no jobs are enqueued in any queues
assert [] = all_enqueued()
```
Note that the final example, using `all_enqueued/1`, returns a raw list of matching jobs and
does not make an assertion by itself. This makes it possible to test using pattern matching at
the expense of being more verbose.
## Example
Given a simple module that enqueues a job:
```elixir
defmodule MyApp.Business do
def work(args) do
args
|> Oban.Job.new(worker: MyApp.Worker, queue: :special)
|> Oban.insert!()
end
end
```
The behaviour can be exercised in your test code:
defmodule MyApp.BusinessTest do
use ExUnit.Case, async: true
use Oban.Testing, repo: MyApp.Repo
alias MyApp.Business
test "jobs are enqueued with provided arguments" do
Business.work(%{id: 1, message: "Hello!"})
assert_enqueued worker: MyApp.Worker, args: %{id: 1, message: "Hello!"}
end
end
## Matching Scheduled Jobs and Timestamps
In order to assert a job has been scheduled at a certain time, you will need to match against
the `scheduled_at` attribute of the enqueued job.
in_an_hour = DateTime.add(DateTime.utc_now(), 3600, :second)
assert_enqueued worker: MyApp.Worker, scheduled_at: in_an_hour
By default, Oban will apply a 1 second delta to all timestamp fields of jobs, so that small
deviations between the actual value and the expected one are ignored. You may configure this
delta by passing a tuple of value and a `delta` option (in seconds) to corresponding keyword:
assert_enqueued worker: MyApp.Worker, scheduled_at: {in_an_hour, delta: 10}
## Adding to Case Templates
To include helpers in all of your tests you can add it to your case template:
```elixir
defmodule MyApp.DataCase do
use ExUnit.CaseTemplate
using do
quote do
use Oban.Testing, repo: MyApp.Repo
import Ecto
import Ecto.Changeset
import Ecto.Query
import MyApp.DataCase
alias MyApp.Repo
end
end
end
```
"""
@moduledoc since: "0.3.0"
import ExUnit.Assertions, only: [assert: 2, refute: 2]
import Ecto.Query, only: [limit: 2, order_by: 2, select: 2, where: 2, where: 3]
alias Ecto.Changeset
alias Oban.{Config, Job, Queue.Executor, Repo, Worker}
@wait_interval 10
@doc false
defmacro __using__(opts) do
repo = Keyword.fetch!(opts, :repo)
prefix = Keyword.get(opts, :prefix, "public")
quote do
alias Oban.Testing
def perform_job(worker, args, opts \\ []) do
opts =
opts
|> Keyword.put_new(:repo, unquote(repo))
|> Keyword.put_new(:prefix, unquote(prefix))
Testing.perform_job(worker, args, opts)
end
def all_enqueued(opts \\ []) do
opts = Keyword.put_new(opts, :prefix, unquote(prefix))
Testing.all_enqueued(unquote(repo), opts)
end
def assert_enqueued(opts, timeout \\ :none) do
opts = Keyword.put_new(opts, :prefix, unquote(prefix))
if timeout == :none do
Testing.assert_enqueued(unquote(repo), opts)
else
Testing.assert_enqueued(unquote(repo), opts, timeout)
end
end
def refute_enqueued(opts, timeout \\ :none) do
opts = Keyword.put_new(opts, :prefix, unquote(prefix))
if timeout == :none do
Testing.refute_enqueued(unquote(repo), opts)
else
Testing.refute_enqueued(unquote(repo), opts, timeout)
end
end
end
end
@doc """
Construct a job and execute it with a worker module.
This reduces boilerplate when constructing jobs for unit tests and checks for common pitfalls.
For example, it automatically converts `args` to string keys before calling `perform/1`,
ensuring that perform clauses aren't erroneously trying to match atom keys.
The helper makes the following assertions:
* That the worker implements the `Oban.Worker` behaviour
* That the options provided build a valid job
* That the return is valid, e.g. `:ok`, `{:ok, value}`, `{:error, value}` etc.
If all of the assertions pass then the function returns the result of `perform/1` for you to
make additional assertions on.
## Examples
Successfully execute a job with some string arguments:
assert :ok = perform_job(MyWorker, %{"id" => 1})
Successfully execute a job and assert that it returns an error tuple:
assert {:error, _} = perform_job(MyWorker, %{"bad" => "arg"})
Execute a job with the args keys automatically stringified:
assert :ok = perform_job(MyWorker, %{id: 1})
Exercise custom attempt handling within a worker by passing options:
assert :ok = perform_job(MyWorker, %{}, attempt: 42)
Cause a test failure because the provided worker isn't real:
assert :ok = perform_job(Vorker, %{"id" => 1})
"""
@doc since: "2.0.0"
@spec perform_job(
worker :: Worker.t(),
args :: term(),
opts :: [Job.option() | {:repo, module()}]
) ::
Worker.result()
def perform_job(worker, args, opts) when is_atom(worker) do
{conf_opts, opts} = Keyword.split(opts, [:log, :prefix, :repo])
opts = Keyword.put_new(opts, :attempt, 1)
assert_valid_worker(worker)
changeset =
args
|> worker.new(opts)
|> Changeset.update_change(:args, &json_encode_decode/1)
assert_valid_changeset(changeset)
result =
conf_opts
|> Keyword.put(:testing, :inline)
|> Config.new()
|> Executor.new(create_job(changeset), safe: false)
|> Executor.call()
|> Map.fetch!(:result)
assert_valid_result(result)
result
end
@doc """
Retrieve all currently enqueued jobs matching a set of options.
Only jobs matching all of the provided arguments will be returned. Additionally, jobs are
returned in descending order where the most recently enqueued job will be listed first.
## Examples
Assert based on only _some_ of a job's args:
assert [%{args: %{"id" => 1}}] = all_enqueued(worker: MyWorker)
Assert that exactly one job was inserted for a queue:
assert [%Oban.Job{}] = all_enqueued(queue: :alpha)
Assert that there aren't any jobs enqueued for any queues or workers:
assert [] = all_enqueued()
"""
@doc since: "0.6.0"
@spec all_enqueued(repo :: module(), opts :: Keyword.t()) :: [Job.t()]
def all_enqueued(repo, opts) when is_list(opts) do
{conf, opts} = extract_conf(repo, opts)
Repo.all(conf, base_query(opts))
end
@doc """
Assert that a job with particular options has been enqueued.
Only values for the provided arguments will be checked. For example, an assertion made on
`worker: "MyWorker"` will match _any_ jobs for that worker, regardless of the queue or args.
"""
@doc since: "0.3.0"
@spec assert_enqueued(repo :: module(), opts :: Keyword.t()) :: true
def assert_enqueued(repo, [_ | _] = opts) do
error_message = """
Expected a job matching:
#{inspect_opts(opts)}
to be enqueued in the #{inspect(opts[:prefix])} schema. Instead found:
#{inspect(available_jobs(repo, opts), pretty: true)}
"""
assert get_job(repo, opts), error_message
end
@doc """
Assert that a job with particular options is or will be enqueued within a timeout period.
See `assert_enqueued/2` for additional details.
## Examples
Assert that a job will be enqueued in the next 100ms:
assert_enqueued [worker: MyWorker], 100
"""
@doc since: "1.2.0"
@spec assert_enqueued(repo :: module(), opts :: Keyword.t(), timeout :: pos_integer()) :: true
def assert_enqueued(repo, [_ | _] = opts, timeout) when timeout > 0 do
error_message = """
Expected a job matching:
#{inspect_opts(opts)}
to be enqueued in the #{inspect(opts[:prefix])} schema within #{timeout}ms
"""
assert wait_for_job(repo, opts, timeout), error_message
end
@doc """
Refute that a job with particular options has been enqueued.
See `assert_enqueued/2` for additional details.
"""
@doc since: "0.3.0"
@spec refute_enqueued(repo :: module(), opts :: Keyword.t()) :: false
def refute_enqueued(repo, [_ | _] = opts) do
error_message = """
Expected no jobs matching:
#{inspect_opts(opts)}
to be enqueued in the #{inspect(opts[:prefix])} schema
"""
refute get_job(repo, opts), error_message
end
@doc """
Refute that a job with particular options is or will be enqueued within a timeout period.
The minimum refute timeout is 10ms.
See `assert_enqueued/2` for additional details.
## Examples
Refute that a job will be enqueued in the next 100ms:
refute_enqueued [worker: MyWorker], 100
"""
@doc since: "1.2.0"
@spec refute_enqueued(repo :: module(), opts :: Keyword.t(), timeout :: pos_integer()) :: false
def refute_enqueued(repo, [_ | _] = opts, timeout) when timeout >= 10 do
error_message = """
Expected no jobs matching:
#{inspect_opts(opts)}
to be enqueued in the #{inspect(opts[:prefix])} schema within #{timeout}ms
"""
refute wait_for_job(repo, opts, timeout), error_message
end
@doc """
Change the testing mode within the context of a function.
Only `:manual` and `:inline` mode are supported, as `:disabled` implies that supervised queues
and plugins are running and this function won't start any processes.
## Examples
Switch to `:manual` mode when an Oban instance is configured for `:inline` testing:
Oban.Testing.with_testing_mode(:manual, fn ->
Oban.insert(MyWorker.new(%{id: 123}))
assert_enqueued worker: MyWorker, args: %{id: 123}
end)
Visa-versa, switch to `:inline` mode:
Oban.Testing.with_testing_mode(:inline, fn ->
{:ok, %Job{state: "completed"}} = Oban.insert(MyWorker.new(%{id: 123}))
end)
"""
@doc since: "2.12.0"
@spec with_testing_mode(:inline | :manual, (() -> any())) :: any()
def with_testing_mode(mode, fun) when mode in [:manual, :inline] and is_function(fun, 0) do
engine =
case mode do
:manual -> Oban.Queue.BasicEngine
:inline -> Oban.Queue.InlineEngine
end
Process.put(:oban_engine, engine)
fun.()
after
Process.delete(:oban_engine)
end
# Assert Helpers
defp inspect_opts(opts) do
opts
|> Map.new()
|> Map.drop([:prefix])
|> inspect(pretty: true)
end
# Perform Helpers
defp assert_valid_worker(worker) do
assert Code.ensure_loaded?(worker) and implements_worker?(worker), """
Expected worker to be a module that implements the Oban.Worker behaviour, got:
#{inspect(worker)}
"""
end
defp implements_worker?(worker) do
:attributes
|> worker.__info__()
|> Keyword.get_values(:behaviour)
|> List.flatten()
|> Enum.member?(Oban.Worker)
end
defp assert_valid_changeset(changeset) do
assert changeset.valid?, """
Expected args and opts to build a valid job, got validation errors:
#{traverse_errors(changeset)}
"""
end
defp traverse_errors(changeset) do
traverser = fn {message, opts} ->
Enum.reduce(opts, message, fn {key, value}, acc ->
String.replace(acc, "%{#{key}}", to_string(value))
end)
end
changeset
|> Changeset.traverse_errors(traverser)
|> Enum.map_join("\n", fn {key, val} -> "#{key}: #{val}" end)
end
defp json_encode_decode(map) do
map
|> Jason.encode!()
|> Jason.decode!()
end
defp assert_valid_result(result) do
valid? =
case result do
:ok -> true
{:ok, _value} -> true
{:error, _value} -> true
{:discard, _value} -> true
{:snooze, snooze} when is_integer(snooze) -> true
:discard -> true
_ -> false
end
assert valid?, """
Expected result to be one of
- `:ok`
- `:discard`
- `{:ok, value}`
- `{:error, reason}`
- `{:discard, reason}`
- `{:snooze, duration}
Instead received:
#{inspect(result, pretty: true)}
"""
end
# Enqueued Helpers
defp get_job(repo, opts) do
{conf, opts} = extract_conf(repo, opts)
Repo.one(conf, opts |> base_query() |> limit(1) |> select([:id]))
end
defp wait_for_job(repo, opts, timeout) when timeout > 0 do
case get_job(repo, opts) do
nil ->
Process.sleep(@wait_interval)
wait_for_job(repo, opts, timeout - @wait_interval)
job ->
job
end
end
defp wait_for_job(_repo, _opts, _timeout), do: nil
defp available_jobs(repo, opts) do
{conf, opts} = extract_conf(repo, opts)
fields = Keyword.keys(opts)
conf
|> Repo.all([] |> base_query() |> select(^fields))
|> Enum.map(&Map.take(&1, fields))
end
defp base_query(opts) do
fields_with_opts = normalize_fields(opts)
Job
|> where([j], j.state in ["available", "scheduled"])
|> apply_where_clauses(fields_with_opts)
|> order_by(desc: :id)
end
defp extract_conf(repo, opts) do
{conf_opts, opts} = Keyword.split(opts, [:prefix])
conf =
conf_opts
|> Keyword.put(:repo, repo)
|> Config.new()
{conf, opts}
end
defp extract_field_opts({key, {value, field_opts}}, field_opts_acc) do
{{key, value}, [{key, field_opts} | field_opts_acc]}
end
defp extract_field_opts({key, value}, field_opts_acc) do
{{key, value}, field_opts_acc}
end
defp normalize_fields(opts) do
{fields, field_opts} = Enum.map_reduce(opts, [], &extract_field_opts/2)
args = Keyword.get(fields, :args, %{})
keys = Keyword.keys(fields)
args
|> Job.new(fields)
|> Changeset.apply_changes()
|> Map.from_struct()
|> Map.take(keys)
|> Enum.map(fn {key, value} -> {key, value, Keyword.get(field_opts, key, [])} end)
end
@timestamp_fields ~W(attempted_at completed_at inserted_at scheduled_at)a
@timestamp_default_delta_seconds 1
defp apply_where_clauses(query, []), do: query
defp apply_where_clauses(query, [{key, value, opts} | rest]) when key in @timestamp_fields do
delta = Keyword.get(opts, :delta, @timestamp_default_delta_seconds)
window_start = DateTime.add(value, -delta, :second)
window_end = DateTime.add(value, delta, :second)
query
|> where([j], fragment("? BETWEEN ? AND ?", field(j, ^key), ^window_start, ^window_end))
|> apply_where_clauses(rest)
end
defp apply_where_clauses(query, [{key, value, _opts} | rest]) do
query
|> where(^[{key, value}])
|> apply_where_clauses(rest)
end
defp create_job(changeset) do
changeset
|> default_to_now(:attempted_at)
|> default_to_now(:scheduled_at)
|> Changeset.apply_action!(:insert)
end
defp default_to_now(changeset, field) do
value = Changeset.get_change(changeset, field) || DateTime.utc_now()
Changeset.put_change(changeset, field, value)
end
end