Permalink
Browse files

added balancer

  • Loading branch information...
ohler55 committed Feb 9, 2014
1 parent 902135f commit 433f795a1595261e76a41b1966fcbeb407db1731
Showing with 153 additions and 3 deletions.
  1. +2 −0 README.md
  2. +1 −0 lib/oflow/actors.rb
  3. +38 −0 lib/oflow/actors/balancer.rb
  4. +3 −2 lib/oflow/task.rb
  5. +0 −1 notes
  6. +109 −0 test/actors/balancer_test.rb
View
@@ -29,6 +29,8 @@ Follow [@peterohler on Twitter](http://twitter.com/#!/peterohler) for announceme
- Added support for dynamic Timer options updates.
+ - Added balancer load balancing of processing across multiple tasks.
+
### Release 0.3
- Initial release with minimal features.
View
@@ -9,3 +9,4 @@ module Actors
require 'oflow/actors/log'
require 'oflow/actors/errorhandler'
require 'oflow/actors/timer'
+require 'oflow/actors/balancer'
@@ -0,0 +1,38 @@
+
+module OFlow
+ module Actors
+
+ # Redirects operations to one Task out of all the linked tasks. It uses the
+ # Task.backed_up() method to determine which task is the least busy. It also
+ # attempts to distribute requests somewhat evenly if Tasks are equally as
+ # busy.
+ class Balancer < Actor
+
+ def initialize(task, options)
+ super
+ @cnt = 0
+ @called = {}
+ end
+
+ def perform(op, box)
+ best = nil
+ order = nil
+ bbu = nil
+ @task.links().each do |dest,lnk|
+ t = lnk.target
+ next if t.nil?
+ bu = t.backed_up()
+ if bbu.nil? || bu < bbu || (bbu == bu && @called.fetch(dest, 0) < order)
+ best = dest
+ bbu = bu
+ order = @called.fetch(dest, 0)
+ end
+ end
+ @cnt += 1
+ @called[best] = @cnt
+ task.ship(best, box)
+ end
+
+ end # Balancer
+ end # Actors
+end # OFlow
View
@@ -90,7 +90,7 @@ def initialize(flow, name, actor_class, options={})
sleep(1.0)
end
rescue Exception => e
- puts "*** #{full_name} #{e.class}: #{e.message}"
+ puts "*** #{full_name} #{e.class}: #{e.message}"
@current_req = nil
# TBD Env.rescue(e)
end
@@ -159,7 +159,7 @@ def queue_count()
# selecting an Actor when stepping from the Inspector.
# @return [Fixnum] a measure of how backed up a Task is
def backed_up()
- cnt = @queue.size()
+ cnt = @queue.size() + (@current_req.nil? ? 0 : 1)
return 0 if 0 == cnt
if @max_queue_count.nil? || 0 == @max_queue_count
cnt = 80 if 80 < cnt
@@ -217,6 +217,7 @@ def step(max_wait=5)
# Wakes up the Task if it has been stopped or if Env.shutdown() has been called.
def wakeup()
+ # don't wake if the task is currently processing
@loop.wakeup() unless @loop.nil?
end
View
1 notes
@@ -6,7 +6,6 @@
- todo
- features
- - workqueue task/actor that distributes to 1 or n others based on how busy
- collector waits for n number of boxes to be recieved before continuing
- 0.4.0
- Persister Actor (write to disk and ready on start)
@@ -0,0 +1,109 @@
+#!/usr/bin/env ruby
+# encoding: UTF-8
+
+[ File.dirname(__FILE__),
+ File.join(File.dirname(__FILE__), "../../lib"),
+ File.join(File.dirname(__FILE__), "..")
+].each { |path| $: << path unless $:.include?(path) }
+
+require 'test/unit'
+require 'oflow'
+require 'oflow/test'
+
+require 'collector'
+
+class Busy < ::OFlow::Actor
+
+ def initialize(task, options)
+ super
+ @delay = options.fetch(:delay, 0)
+ end
+
+ def perform(op, box)
+ if 0.0 < @delay
+ done = Time.now() + @delay
+ while true
+ now = Time.now()
+ break if done <= now
+ sleep(done - now)
+ end
+ end
+ task.ship(nil, ::OFlow::Box.new([task.name, box.contents]))
+ end
+
+end # Busy
+
+class BalancerTest < ::Test::Unit::TestCase
+
+ def test_balancer_fair
+ period = 0.1
+ balancer = nil
+ collector = nil
+ ::OFlow::Env.flow('fair') { |f|
+ f.task('balance', ::OFlow::Actors::Balancer) { |t|
+ balancer = t
+ t.link(:one, :one, nil)
+ t.link(:two, :two, nil)
+ t.link(:three, :three, nil)
+ }
+ f.task(:one, Busy) { |t|
+ t.link(nil, :collector, :one)
+ }
+ f.task(:two, Busy) { |t|
+ t.link(nil, :collector, :two)
+ }
+ f.task(:three, Busy) { |t|
+ t.link(nil, :collector, :three)
+ }
+ f.task(:collector, Collector) { |t|
+ collector = t.actor
+ }
+ }
+ 9.times { |i| balancer.receive(nil, ::OFlow::Box.new(i)) }
+ ::OFlow::Env.flush()
+ counts = {}
+ collector.collection.each { |a| counts[a[0]] = counts.fetch(a[0], 0) + 1 }
+
+ assert_equal(counts[:one], counts[:two], 'all counts should be the same')
+ assert_equal(counts[:two], counts[:three], 'all counts should be the same')
+
+ ::OFlow::Env.clear()
+ end
+
+ def test_balancer_less_busy
+ period = 0.1
+ balancer = nil
+ collector = nil
+ ::OFlow::Env.flow('fair') { |f|
+ f.task('balance', ::OFlow::Actors::Balancer) { |t|
+ balancer = t
+ t.link(:one, :one, nil)
+ t.link(:two, :two, nil)
+ t.link(:three, :three, nil)
+ }
+ f.task(:one, Busy, delay: 0.01) { |t|
+ t.link(nil, :collector, :one)
+ }
+ f.task(:two, Busy, delay: 0.02) { |t|
+ t.link(nil, :collector, :two)
+ }
+ f.task(:three, Busy, delay: 0.04) { |t|
+ t.link(nil, :collector, :three)
+ }
+ f.task(:collector, Collector) { |t|
+ collector = t.actor
+ }
+ }
+ 40.times { |i| balancer.receive(nil, ::OFlow::Box.new(i)); sleep(0.005) }
+ ::OFlow::Env.flush()
+ counts = {}
+ collector.collection.each { |a| counts[a[0]] = counts.fetch(a[0], 0) + 1 }
+ #puts "*** #{counts}"
+
+ assert(counts[:one] > counts[:two], 'one is faster and should have processed more than two')
+ assert(counts[:two] > counts[:three], 'two is faster and should have processed more than three')
+
+ ::OFlow::Env.clear()
+ end
+
+end # BalancerTest

0 comments on commit 433f795

Please sign in to comment.