Permalink
Browse files

added merger

  • Loading branch information...
1 parent 433f795 commit 57447505fe597353684ec776b2cc5167850c709c @ohler55 committed Feb 9, 2014
View
@@ -29,7 +29,9 @@ Follow [@peterohler on Twitter](http://twitter.com/#!/peterohler) for announceme
- Added support for dynamic Timer options updates.
- - Added balancer load balancing of processing across multiple tasks.
+ - Added Balancer Actor for load balancing of processing across multiple tasks.
+
+ - Added Merger Actor that merges two or more processing paths.
### Release 0.3
View
@@ -8,5 +8,7 @@ module Actors
require 'oflow/actors/relay'
require 'oflow/actors/log'
require 'oflow/actors/errorhandler'
-require 'oflow/actors/timer'
+
require 'oflow/actors/balancer'
+require 'oflow/actors/merger'
+require 'oflow/actors/timer'
View
@@ -0,0 +1,85 @@
+
+module OFlow
+ module Actors
+
+ class Merger < Actor
+
+ def initialize(task, options)
+ super
+ @match_key = options.fetch(:match, :tracker)
+ @cnt = options.fetch(:count, 2)
+ # Hash of Arrays
+ @waiting = {}
+ end
+
+ def perform(op, box)
+ matches = match(box)
+ if matches.nil?
+ waiting_add(box)
+ else
+ matches.each { |b| waiting_remove(b) }
+ matches << box
+ box = merge(matches)
+ task.ship(nil, box)
+ end
+ end
+
+ def box_key(box)
+ key = nil
+ if :tracker == @match_key
+ key = box.tracker.id unless box.tracker.nil?
+ elsif !@match_key.nil?
+ key = box.get(@match_key)
+ end
+ key
+ end
+
+ def waiting_add(box)
+ key = box_key(box)
+ boxes = @waiting[key]
+ if boxes.nil?
+ @waiting[key] = [box]
+ else
+ boxes << box
+ end
+ end
+
+ def waiting_remove(box)
+ key = box_key(box)
+ boxes = @waiting[key]
+ # only remove the box, not a similar one or one that is ==
+ boxes.delete_if { |b| box.equal?(b) }
+ end
+
+ # Should look at all the waiting boxes and determine which of those if any
+ # are a match for the target. If all necessary matches are found then an
+ # array of the boxes are returned, otherwise nil is returned.
+ def match(target)
+ key = box_key(target)
+ boxes = @waiting[key]
+ return nil if boxes.nil? || (boxes.size + 1) < @cnt
+ Array.new(boxes)
+ end
+
+ # Should merge the boxes and return a single box. The default is to take
+ # all the box contents and place them in an Array and then merge the
+ # Trackers if there are any.
+ def merge(boxes)
+ content = []
+ tracker = nil
+ boxes.each do |b|
+ content << b.contents
+ unless b.tracker.nil?
+ if tracker.nil?
+ tracker = b.tracker
+ else
+ tracker = tracker.merge(b.tracker)
+ end
+ end
+ end
+ Box.new(content, tracker)
+ end
+
+ end # Merger
+ end # Actors
+end # OFlow
@@ -30,6 +30,9 @@ class Timer < Actor
def initialize(task, options={})
@count = 0
@pending = nil
+ @stop = nil
+ @period = nil
+ @repeat = nil
set_options(options)
@pending = @start
super
View
@@ -1,5 +1,5 @@
module OFlow
# Current version of the module.
- VERSION = '0.4.0a1'
+ VERSION = '0.4.0'
end
View
5 notes
@@ -6,7 +6,6 @@
- todo
- features
- - collector waits for n number of boxes to be recieved before continuing
- 0.4.0
- Persister Actor (write to disk and ready on start)
- 0.5.0
@@ -23,10 +22,6 @@
- 1.0.0
- TBDs
- - timer
- - dynamic updates
- - more tests
-
- example
- rubygems graphs
@@ -36,7 +36,6 @@ def perform(op, box)
class BalancerTest < ::Test::Unit::TestCase
def test_balancer_fair
- period = 0.1
balancer = nil
collector = nil
::OFlow::Env.flow('fair') { |f|
@@ -71,10 +70,9 @@ def test_balancer_fair
end
def test_balancer_less_busy
- period = 0.1
balancer = nil
collector = nil
- ::OFlow::Env.flow('fair') { |f|
+ ::OFlow::Env.flow('less-busy') { |f|
f.task('balance', ::OFlow::Actors::Balancer) { |t|
balancer = t
t.link(:one, :one, nil)
View
@@ -0,0 +1,135 @@
+#!/usr/bin/env ruby
+# encoding: UTF-8
+
+[ File.dirname(__FILE__),
+ File.join(File.dirname(__FILE__), "../../lib"),
+ File.join(File.dirname(__FILE__), "..")
+].each { |path| $: << path unless $:.include?(path) }
+
+require 'test/unit'
+require 'oflow'
+require 'oflow/test'
+
+require 'collector'
+
+class Splitter < ::OFlow::Actor
+
+ def initialize(task, options)
+ super
+ end
+
+ def perform(op, box)
+ task.ship(:left, box)
+ task.ship(:right, box)
+ end
+
+end # Splitter
+
+class Multiplier < ::OFlow::Actor
+
+ def initialize(task, options)
+ super
+ @factor = options.fetch(:factor, 1)
+ end
+
+ def perform(op, box)
+ box = box.set(nil, box.contents * @factor)
+ task.ship(nil, box)
+ end
+
+end # Multiplier
+
+class MergerTest < ::Test::Unit::TestCase
+
+ def test_merger_any
+ start = nil
+ collector = nil
+ ::OFlow::Env.flow('merge') { |f|
+ f.task(:split, Splitter) { |t|
+ start = t
+ t.link(:left, :one, nil)
+ t.link(:right, :two, nil)
+ }
+ f.task(:one, Multiplier, factor: 2) { |t|
+ t.link(nil, :merge, nil)
+ }
+ f.task(:two, Multiplier, factor: 3) { |t|
+ t.link(nil, :merge, nil)
+ }
+ f.task(:merge, ::OFlow::Actors::Merger) { |t|
+ t.link(nil, :collector, nil)
+ }
+ f.task(:collector, Collector) { |t|
+ collector = t.actor
+ }
+ }
+ start.receive(nil, ::OFlow::Box.new(1))
+ ::OFlow::Env.flush()
+
+ result = collector.collection[0]
+ assert_equal(2, result.size, 'should be 2 values in the box')
+ assert(result.include?(2), 'box should include 2')
+ assert(result.include?(3), 'box should include 3')
+
+ ::OFlow::Env.clear()
+ end
+
+ def test_merger_tracker
+ start = nil
+ collector = nil
+ ::OFlow::Env.flow('merge') { |f|
+ f.task(:split, Splitter) { |t|
+ start = t
+ t.link(:left, :one, nil)
+ t.link(:right, :two, nil)
+ }
+ f.task(:one, Multiplier, factor: 2) { |t|
+ t.link(nil, :merge, nil)
+ }
+ f.task(:two, Multiplier, factor: 3) { |t|
+ t.link(nil, :merge, nil)
+ }
+ f.task(:merge, ::OFlow::Actors::Merger) { |t|
+ t.link(nil, :collector, nil)
+ }
+ f.task(:collector, Collector, contents_only: false) { |t|
+ collector = t.actor
+ }
+ }
+ tracker = ::OFlow::Tracker.create('start')
+ start.receive(nil, ::OFlow::Box.new(1, tracker))
+ tracker2 = ::OFlow::Tracker.create('start2')
+ start.receive(nil, ::OFlow::Box.new(10, tracker2))
+ ::OFlow::Env.flush()
+
+ box = collector.collection[0]
+ result = box.contents
+ assert_equal(2, result.size, 'should be 2 values in the box')
+ assert(result.include?(2), 'box should include 2')
+ assert(result.include?(3), 'box should include 3')
+
+ t = box.tracker()
+ assert_not_nil(t, 'should have a tracker')
+ assert_equal(t.id, tracker.id, 'tracker id should be carried through')
+ track = t.track
+
+ assert_equal('start', track[0].location)
+ assert_equal(':merge:split', track[1].location)
+ split = track[2].map { |a| a.map { |stamp| stamp.location } }
+
+ assert_equal(2, split.size, 'should be 2 values in the split')
+ assert(split.include?([':merge:one']), 'split should include [merge:one]')
+ assert(split.include?([':merge:two']), 'split should include [merge:two]')
+ assert_equal(':merge:merge', track[3].location)
+ assert_equal(':merge:collector', track[4].location)
+
+ box = collector.collection[1]
+ result = box.contents
+ assert_equal(2, result.size, 'should be 2 values in the box')
+ assert(result.include?(20), 'box should include 20')
+ assert(result.include?(30), 'box should include 30')
+
+ ::OFlow::Env.clear()
+ end
+
+end # MergerTest
@@ -224,7 +224,6 @@ def test_timer_perform_start
end
def test_timer_perform_stop
- now = Time.now()
timer = nil
collector = nil
::OFlow::Env.flow('one-time') { |f|
View
@@ -24,4 +24,6 @@
# Actor tests
require 'actors/log_test'
require 'actors/timer_test'
+require 'actors/balancer_test'
+require 'actors/merger_test'

0 comments on commit 5744750

Please sign in to comment.