Skip to content

Commit eb5ca4f

Browse files
committed
Refactor conditional stages and add typespecs
1 parent 949c525 commit eb5ca4f

File tree

12 files changed

+222
-95
lines changed

12 files changed

+222
-95
lines changed

.formatter.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
check: 2,
1111
link: 1,
1212
link: 2,
13-
skip: 1,
13+
skip: 2,
1414
instrument: 2,
1515
instrument: 3,
1616
send: 2
@@ -26,7 +26,7 @@
2626
check: 2,
2727
link: 1,
2828
link: 2,
29-
skip: 1,
29+
skip: 2,
3030
instrument: 2,
3131
instrument: 3,
3232
send: 2,

README.md

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ example above you need to add `use Opus.Pipeline` to turn a module into
6969
a pipeline. A pipeline module is a composition of stages executed in
7070
sequence.
7171

72-
7372
## Stages
7473

7574
There are a few different types of stages for different use-cases.
@@ -116,6 +115,50 @@ This stage is to link with another Opus.Pipeline module. It calls
116115
`call/1` for the provided module. If the module is not an
117116
`Opus.Pipeline` it is ignored.
118117

118+
#### Skip
119+
120+
The `skip` macro can be used for linked pipelines.
121+
A linked pipeline may act as a true bypass, based on a condition,
122+
expressed as either `:if` or `:unless`. When skipped, none of the stages
123+
are executed and it returns the input, to be used by any next stages of
124+
the caller pipeline. A very common use-case is illustrated in the following example:
125+
126+
127+
```elixir
128+
defmodule RetrieveCustomerInformation do
129+
use Opus.Pipeline
130+
131+
check :valid_query?
132+
link FetchFromCache, if: :cacheable?
133+
link FetchFromDatabase, if: :db_backed?
134+
step :serialize
135+
end
136+
```
137+
138+
With `skip` it can be written as:
139+
140+
```elixir
141+
defmodule RetrieveCustomerInformation do
142+
use Opus.Pipeline
143+
144+
check :valid_query?
145+
link FetchFromCache
146+
link FetchFromDatabase
147+
step :serialize
148+
end
149+
```
150+
151+
A linked pipeline becomes:
152+
153+
```elixir
154+
defmodule FetchFromCache do
155+
use Opus.Pipeline
156+
157+
skip :assert_suitable, if: :cacheable?
158+
step :retrive_from_cache
159+
end
160+
```
161+
119162
### Available options
120163

121164
The behaviour of each stage can be configured with any of the available
@@ -235,7 +278,7 @@ defmodule CustomInstrumentation do
235278
def instrument(:pipeline_started, %{pipeline: ArithmeticPipeline}, %{input: input}) do
236279
# publish the metrics to specific backend
237280
end
238-
281+
239282
def instrument(:before_stage, %{stage: %{pipeline: pipeline}}, %{input: input}) do
240283
# publish the metrics to specific backend
241284
end
@@ -254,6 +297,11 @@ end
254297

255298
You may choose to provide some common options to all the stages of a pipeline.
256299

300+
* `:raise`: A list of exceptions to not rescue. When set to `true`, Opus
301+
does not handle any exceptions. Defaults to `false` which converts all exceptions
302+
to `{:error, %Opus.PipelineError{}}` values halting the pipeline.
303+
* `:instrument?`: A boolean which defaults to `true`. Set to `false` to
304+
skip instrumentation for a module.
257305

258306
```elixir
259307
defmodule ArithmeticPipeline do

lib/opus/pipeline.ex

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ defmodule Opus.Pipeline do
1212
step :double, with: & &1 * 2
1313
end
1414
15-
The pipeline can be run calling a `call/1` function which is defined by using Opus.Pipeline.
15+
The pipeline can be run calling a `call/1` function which is defined by `Opus.Pipeline`.
1616
Pipelines are intended to have a single parameter and always return a tagged tuple `{:ok, value} | {:error, error}`.
1717
A stage returning `{:error, error}` halts the pipeline. The error value is an `Opus.PipelineError` struct which
18-
contains useful information to detect where was the error caused and why.
18+
contains useful information to detect where the error was caused and why.
1919
2020
## Exception Handling
2121
@@ -40,6 +40,8 @@ defmodule Opus.Pipeline do
4040
CreateUserPipeline.call(params, except: :send_notification)
4141
```
4242
"""
43+
@type opts :: [only: [atom], except: [atom]]
44+
@type result :: {:ok, any} | {:error, Opus.PipelineError.t()}
4345

4446
defmacro __using__(opts) do
4547
quote location: :keep do
@@ -63,6 +65,8 @@ defmodule Opus.Pipeline do
6365
@doc false
6466
def pipeline?, do: true
6567

68+
@doc "The entrypoint function of the pipeline"
69+
@spec call(any, Opus.Pipeline.opts()) :: Opus.Pipeline.result()
6670
def call(input, opts \\ %{}) do
6771
instrument? = Pipeline._opus_opts()[:instrument?]
6872

@@ -97,6 +101,7 @@ defmodule Opus.Pipeline do
97101
end
98102
end
99103

104+
@doc false
100105
def _opus_opts, do: @opus_opts
101106

102107
defp run_instrumented({type, name, opts} = stage, %{time: acc_time, input: input}) do
@@ -112,9 +117,15 @@ defmodule Opus.Pipeline do
112117
)
113118

114119
case instrumented_return do
120+
{status, %{time: time, input: :pipeline_skipped}} ->
121+
{status, %{time: acc_time + time, input: input}}
122+
115123
{status, %{time: time, input: new_input}} ->
116124
{status, %{time: acc_time + time, input: new_input}}
117125

126+
{status, :pipeline_skipped} ->
127+
{status, %{time: acc_time + 0, input: input}}
128+
118129
{status, new_input} ->
119130
{status, %{time: acc_time + 0, input: new_input}}
120131
end

lib/opus/pipeline/registration.ex

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,38 @@ defmodule Opus.Pipeline.Registration do
4545

4646
def maybe_define_callbacks(stage_id, name, opts) do
4747
[
48-
define_callback(:if, stage_id, name, Access.get(opts, :if)),
49-
define_callback(:unless, stage_id, name, Access.get(opts, :unless)),
48+
define_callback(:conditional, stage_id, name, ensure_valid_conditional!(opts)),
5049
define_callback(:with, stage_id, name, Access.get(opts, :with)),
5150
define_callback(:retry_backoff, stage_id, name, Access.get(opts, :retry_backoff))
5251
]
5352
end
5453

54+
def ensure_valid_conditional!(opts) do
55+
if_cond = Access.get(opts, :if)
56+
unless_cond = Access.get(opts, :unless)
57+
58+
if if_cond && unless_cond do
59+
raise CompileError,
60+
file: __ENV__.file,
61+
line: __ENV__.line,
62+
description:
63+
"Invalid stage conditional. For each stage you may define either an :if or an :unless option. Not both"
64+
end
65+
66+
if_cond || unless_cond
67+
end
68+
5569
def normalize_opts(opts, id, callbacks) do
5670
callback_types = for %{stage_id: ^id, type: type} <- callbacks, do: type
57-
Keyword.merge(opts, for(t <- callback_types, do: {t, :anonymous}))
71+
72+
for {k, v} <- opts, into: [] do
73+
anonymous? = k in callback_types || (k in [:if, :unless] && :conditional in callback_types)
74+
callback = if anonymous?, do: :anonymous, else: v
75+
76+
case k do
77+
term when term in [:if, :unless] -> {:conditional, {term, callback}}
78+
_ -> {k, callback}
79+
end
80+
end
5881
end
5982
end

lib/opus/pipeline/stage.ex

Lines changed: 60 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,69 +15,68 @@ defmodule Opus.Pipeline.Stage do
1515

1616
alias Opus.{Safe, PipelineError}
1717

18-
def maybe_run({module, type, name, %{if: :anonymous, stage_id: id} = opts}, input) do
19-
callback = (module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :if end)).name
20-
maybe_run({module, type, name, %{opts | if: {module, callback, [input]}}}, input)
21-
end
22-
23-
def maybe_run({module, type, name, %{if: fun} = opts} = _stage, input)
24-
when is_atom(fun),
25-
do: maybe_run({module, type, name, %{opts | if: {module, fun, [input]}}}, input)
26-
27-
def maybe_run({module, :skip, name, %{if: {_m, _f, _a} = condition}}, input) do
28-
case Safe.apply(condition) do
29-
true ->
30-
module.instrument(:pipeline_skipped, %{stage: %{pipeline: module, name: name}}, %{
31-
stage: name,
32-
input: input
33-
})
34-
35-
# Stop the pipeline execution
36-
:pipeline_skipped
18+
@doc false
19+
def maybe_run(
20+
{module, type, name, %{conditional: {cond_type, :anonymous}, stage_id: id} = opts},
21+
input
22+
) do
23+
callback =
24+
(module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :conditional end)).name
3725

38-
_ ->
39-
nil
40-
end
26+
maybe_run(
27+
{module, type, name, %{opts | conditional: {cond_type, {module, callback, [input]}}}},
28+
input
29+
)
4130
end
4231

43-
def maybe_run({module, _type, name, %{if: {_m, _f, _a} = condition} = opts} = stage, input) do
44-
case Safe.apply(condition) do
45-
true ->
46-
with_retries({module, opts}, fn -> do_run(stage, input) end)
47-
48-
_ ->
49-
module.instrument(:stage_skipped, %{stage: %{pipeline: module, name: name}}, %{
50-
stage: name,
51-
input: input
52-
})
32+
def maybe_run(
33+
{module, :skip, name, %{conditional: {cond_type, {_m, _f, _a} = condition}}},
34+
input
35+
) do
36+
if eval_condition(cond_type, condition) do
37+
module.instrument(:pipeline_skipped, %{stage: %{pipeline: module, name: name}}, %{
38+
stage: name,
39+
input: input
40+
})
5341

54-
# Ignore this stage
55-
:stage_skipped
42+
# Stop the pipeline execution
43+
:pipeline_skipped
44+
else
45+
nil
5646
end
5747
end
5848

59-
def maybe_run({module, type, name, %{unless: :anonymous, stage_id: id} = opts}, input) do
60-
callback = (module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :unless end)).name
61-
maybe_run({module, type, name, %{opts | unless: {module, callback, [input]}}}, input)
49+
def maybe_run(
50+
{
51+
module,
52+
type,
53+
name,
54+
%{conditional: {cond_type, fun}} = opts
55+
} = _stage,
56+
input
57+
)
58+
when is_atom(fun) do
59+
maybe_run(
60+
{module, type, name, %{opts | conditional: {cond_type, {module, fun, [input]}}}},
61+
input
62+
)
6263
end
6364

64-
def maybe_run({module, type, name, %{unless: fun} = opts} = _stage, input)
65-
when is_atom(fun),
66-
do: maybe_run({module, type, name, %{opts | unless: {module, fun, [input]}}}, input)
67-
68-
def maybe_run({module, _type, name, %{unless: {_m, _f, _a} = condition} = opts} = stage, input) do
69-
case Safe.apply(condition) do
70-
false ->
71-
with_retries({module, opts}, fn -> do_run(stage, input) end)
72-
73-
_ ->
74-
module.instrument(:stage_skipped, %{stage: %{pipeline: module, name: name}}, %{
75-
stage: name,
76-
input: input
77-
})
65+
def maybe_run(
66+
{module, _type, name, %{conditional: {cond_type, {_m, _f, _a} = condition}} = opts} =
67+
stage,
68+
input
69+
) do
70+
if eval_condition(cond_type, condition) do
71+
with_retries({module, opts}, fn -> do_run(stage, input) end)
72+
else
73+
module.instrument(:stage_skipped, %{stage: %{pipeline: module, name: name}}, %{
74+
stage: name,
75+
input: input
76+
})
7877

79-
# Ignore this stage
80-
:stage_skipped
78+
# Ignore this stage
79+
:stage_skipped
8180
end
8281
end
8382

@@ -201,4 +200,12 @@ defmodule Opus.Pipeline.Stage do
201200

202201
defp do_run({module, type, name, %{} = opts}, input),
203202
do: do_run({module, type, name, Map.merge(opts, %{with: {module, name, [input]}})}, input)
203+
204+
defp eval_condition(cond_type, condition) do
205+
case {cond_type, Safe.apply(condition)} do
206+
{:if, true} -> true
207+
{:unless, false} -> true
208+
_ -> false
209+
end
210+
end
204211
end

lib/opus/pipeline/stage/skip.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ defmodule Opus.Pipeline.Stage.Skip do
1111
defmodule CreateUserPipeline do
1212
use Opus.Pipeline
1313
14-
skip if: :user_exists?
14+
skip :prevent_duplicates, if: :user_exists?
1515
step :persist_user
1616
end
1717
```
@@ -24,9 +24,9 @@ defmodule Opus.Pipeline.Stage.Skip do
2424

2525
@behaviour Stage
2626

27-
def run({module, type, [if: func], opts}, input) do
28-
case Stage.maybe_run({module, type, nil, opts |> put_in([:if], func)}, input) do
29-
:pipeline_skipped -> {:halt, :skipped}
27+
def run(stage, input) do
28+
case stage |> Stage.maybe_run(input) do
29+
:pipeline_skipped -> {:halt, :pipeline_skipped}
3030
_ -> {:cont, input}
3131
end
3232
end

lib/opus/pipeline_error.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@ defmodule Opus.PipelineError do
33
Error struct capturing useful information to detect where an error was caused and why.
44
"""
55

6+
@type t :: %__MODULE__{
7+
error: struct,
8+
input: any,
9+
pipeline: module,
10+
stage: atom,
11+
stacktrace: [{module, atom, non_neg_integer, keyword}]
12+
}
13+
614
defexception [:error, :pipeline, :stage, :input, :stacktrace]
715

816
def message(%{error: error, pipeline: pipeline, stage: stage, input: input, stacktrace: trace}) do

mix.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ defmodule Opus.Mixfile do
4141
{:retry, "~> 0.8"},
4242
{:credo, "~> 0.8.10", only: [:dev, :test], runtime: false},
4343
{:ex_doc, "~> 0.18.0", only: :dev, runtime: false},
44+
{:dialyxir, "~> 1.0.0-rc.3", only: [:dev, :test], runtime: false},
4445
{:excoveralls, "~> 0.8", only: :test}
4546
]
4647
end

mix.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"},
33
"certifi": {:hex, :certifi, "2.5.1", "867ce347f7c7d78563450a18a6a28a8090331e77fa02380b4a21962a65d36ee5", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"},
44
"credo": {:hex, :credo, "0.8.10", "261862bb7363247762e1063713bb85df2bbd84af8d8610d1272cd9c1943bba63", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}], "hexpm"},
5+
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.6", "78e97d9c0ff1b5521dd68041193891aebebce52fc3b93463c0a6806874557d7d", [:mix], [{:erlex, "~> 0.2.1", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm"},
56
"earmark": {:hex, :earmark, "1.3.1", "73812f447f7a42358d3ba79283cfa3075a7580a3a2ed457616d6517ac3738cb9", [:mix], [], "hexpm"},
7+
"erlex": {:hex, :erlex, "0.2.1", "cee02918660807cbba9a7229cae9b42d1c6143b768c781fa6cee1eaf03ad860b", [:mix], [], "hexpm"},
68
"ex_doc": {:hex, :ex_doc, "0.18.4", "4406b8891cecf1352f49975c6d554e62e4341ceb41b9338949077b0d4a97b949", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
79
"excoveralls": {:hex, :excoveralls, "0.10.5", "7c912c4ec0715a6013647d835c87cde8154855b9b84e256bc7a63858d5f284e3", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
810
"hackney": {:hex, :hackney, "1.15.1", "9f8f471c844b8ce395f7b6d8398139e26ddca9ebc171a8b91342ee15a19963f4", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},

test/opus/pipeline/stage/option/if_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Opus.Pipeline.Option.IfTest do
22
use ExUnit.Case
33

4-
describe "as atom - when the condition is truth" do
4+
describe "as atom - when the condition is true" do
55
defmodule PipelineWithTruthyAtomIf do
66
use Opus.Pipeline
77

0 commit comments

Comments
 (0)