Skip to content

Commit

Permalink
Replace conditional fiber yield in step with pluggable pipe strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
maetl committed Jun 30, 2016
1 parent d6b1f2b commit 2c5226c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
15 changes: 9 additions & 6 deletions lib/mementus/pipeline/step.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

module Mementus
module Pipeline
class Pipe
# Basic passthrough.
def call(element)
Fiber.yield(element)
end
end

# Represents a step in a pipeline chain.
#
# New steps are constructed from a `source` enumerable (usually the previous
Expand All @@ -21,14 +28,10 @@ module Pipeline
# @param source [Enumerable]
# @param pipe [#call]
class Step
def initialize(source, pipe=nil)
def initialize(source, pipe=Pipe.new)
@context = Fiber.new do
source.each do |element|
if pipe
pipe.call(element)
else
Fiber.yield(element)
end
pipe.call(element)
end
end
end
Expand Down
20 changes: 17 additions & 3 deletions spec/pipeline/step_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,24 @@
expect(step.all).to eq([:a, :b, :c])
end

it 'processes output values based on the given pipe' do
pipe = -> (value) { Fiber.yield(value.to_s.upcase) }
step = Mementus::Pipeline::Step.new([:a, :b, :c], pipe)
it 'transforms output values based on the given pipe' do
transform = -> (value) { Fiber.yield(value.to_s.upcase) }
step = Mementus::Pipeline::Step.new([:a, :b, :c], transform)
expect(step.all).to eq(['A', 'B', 'C'])
end

it 'filters output values based on the given pipe' do
filter = -> (value) { Fiber.yield(value) if value == :a }
step = Mementus::Pipeline::Step.new([:a, :b, :c], filter)
expect(step.all).to eq([:a])
end

it 'transforms and filters output values based on the given pipes' do
filter = -> (value) { Fiber.yield(value) if value == :a }
transform = -> (value) { Fiber.yield(value.to_s.upcase) }
prev = Mementus::Pipeline::Step.new([:a, :b, :c], filter)
step = Mementus::Pipeline::Step.new(prev, transform)
expect(step.all).to eq(['A'])
end
end
end

0 comments on commit 2c5226c

Please sign in to comment.