Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditional results in pipeline? #31

Open
jasonbarbee opened this issue Sep 29, 2022 · 4 comments
Open

Conditional results in pipeline? #31

jasonbarbee opened this issue Sep 29, 2022 · 4 comments

Comments

@jasonbarbee
Copy link

Is there a way for an if/unless step to modify the pipeline data?

I've found myself using pipelines similar to a with statement sometimes, and I would like some results to flow all the way to the end - even the conditional checks.
But I can't pass results of the conditional checks, as they only return a true/false, they don't mutate the pipeline.

An example might be to obtain the results for a pipeline that must complete successfully, it would never fully error out. I need to know that some steps were skipped, and the task/log result of why. I've seen instrumentation, but they are asynchronous. I need to feed the direct response and instantly reply with all conditional events skipped in the pipeline.

Any thoughts on this?

@zorbash
Copy link
Owner

zorbash commented Oct 3, 2022

I've seen instrumentation, but they are asynchronous

Instrumentation is not asynchronous, see for example https://github.com/zorbash/opus/blob/master/lib/opus/pipeline.ex#L118

Is there a way for an if/unless step to modify the pipeline data?

Do you mean something like:

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step  :add_one,         with: &(&1 + 1)
  step  :enhance,         if:  &(&1 > 42)
  
  def enhance(n), do: %{number: n, metadata: "the number was greater than 42"}  
end

The library supports conditional stages, so you can modify the pipeline data.

@jasonbarbee
Copy link
Author

jasonbarbee commented Oct 3, 2022

Ah. Actually you are right about instrumentation. I think that explains why when I tried to access data in my pipeline instrumentation that didn't happen to exist, and instrumentation raised an error stopped the pipeline. It might be better if it was treated more like a tee: or wrapped in a way that it cannot stop the pipeline if the steps inside raise an error.

Back to metadata Let's take the example repo guide, and modify it a little. I tend to use a Map in the pipeline for complex flows.


defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step :add_one      
  step  :randomize,          if: :lucky_number?

   # Entry point takes initial value and creates a map for the duration of the pipeline
  def add_one(value) do
    %{number: value + 1}
  end

   # steps mutate the pipeline
  def randomize(pipeline) do
    new_value = pipeline.n*:rand.uniform(10)
    put_in(pipeline, [:number], %{number: new_value})
  end

# Your conditionals normally return a simple true/false, but I would like to be able to return data into the pipeline data stream, so I'm meta-programming an example of how this might work. 
If a conditional did not want to mutate the pipeline, it could just return a boolean for compatibility.

  def lucky_number?(pipeline) do
      if pipeline.number == 41 do
            meta = %{
              lucky: true,
              metadata: "Lucky number."
            }
          {true, Map.merge(pipeline, meta)}
      else
           meta = %{
              lucky: false,
              metadata: "Sorry - No luck here."
            }
          {false, Map.merge(pipeline, meta}
      end

ArithmeticPipeline.call(41)
# {:ok, %{number: 42, lucky: true, metadata: "Lucky number."}}
ArithmeticPipeline.call(4)
# {:ok, %{number: 32, lucky: false, metadata: "Sorry - No luck here."}}

Hope this example make sense - it would be awesome to be able to capture data from the conditional checks.

@zorbash
Copy link
Owner

zorbash commented Oct 3, 2022

Why do you want the condition function to transform the pipeline context?

In your example:

def lucky_number?(pipeline) do
      if pipeline.number == 41 do
            meta = %{
              lucky: true,
              metadata: "Lucky number."
            }
          {true, Map.merge(pipeline, meta)}
      else
           meta = %{
              lucky: false,
              metadata: "Sorry - No luck here."
            }
          {false, Map.merge(pipeline, meta}
      end

ArithmeticPipeline.call(41)
# {:ok, %{number: 42, lucky: true, metadata: "Lucky number."}}
ArithmeticPipeline.call(4)
# {:ok, %{number: 32, lucky: false, metadata: "Sorry - No luck here."}}

You can achieve the same result with:

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step :add_one
  step :classify
  step :add_metadata
  step  :randomize,          if: :lucky_number?

   # Entry point takes initial value and creates a map for the duration of the pipeline
  def add_one(value) do
    %{number: value + 1}
  end

   # steps mutate the pipeline
  def randomize(pipeline) do
    new_value = pipeline.n*:rand.uniform(10)
    put_in(pipeline, [:number], %{number: new_value})
  end

def classify(%{number: 41} = context), do: put_in(context, [:lucky?], true)
def classify(%{number: _} = context), do: put_in(context, [:lucky?], false)

def lucky_number?(%{lucky?: true}), do: true
def lucky_number(_), do: false

def add_metadata(%{lucky?: true}) do
  put_in(context, [:meta], {
              lucky: true,
              metadata: "Lucky number."
            })
end

def add_metadata(%{lucky?: false}) do
  put_in(context, [:meta], {
              lucky: false,
              metadata: "Sorry - No luck here"
            })
end

In many cases you don't even need to make a stage conditional and you can pattern-match instead.

@jasonbarbee
Copy link
Author

jasonbarbee commented Oct 3, 2022

I would like conditional logic features - which today are if/unless/skip - to capture or mutate data within that context/pipeline. I might inject context specific data about why something was skipped, or why it didn't pass a check. These are metadata or checks or warnings found from checks during the pipeline, in synchronous long pipelines that must complete, and having that would explain in the end what was skipped, and why in their payload.

You could do the above. I've done it, and it works. I love pattern matching, but when we are orchestrating from the pipeline as the the controller, I see the power of the Opus library in keeping the control logic patterns at the top to keep it readable and manageable for long flows. I feel that the method using deeper pattern matching reduces overall readability of the orchestration of the pipeline to have to explore multiple pattern matches.

Instead of talking straight conditionals like if/unless, can I track the value of a "skip:" step using an alternative code like the above (inline to the data, not instrumentation)? That's also of value to me, but I don't see how to record that event in the pipeline data.

Maybe I just need to adopt the above and have a few more functions and steps like maybe_this, and use more pattern matching. It's just a question and an idea, and I'm grateful for the library and your helpful replies and clear examples of alternatives.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants