Permalink
Browse files

Added

[git-p4: depot-paths = "//src/worker_bee/dev/": change = 6217]
  • Loading branch information...
zenspider committed Mar 11, 2011
0 parents commit ca83074f1280cfd8989146119d5ec30470ed431e
Showing with 262 additions and 0 deletions.
  1. +26 −0 .autotest
  2. +6 −0 History.txt
  3. +7 −0 Manifest.txt
  4. +55 −0 README.txt
  5. +27 −0 Rakefile
  6. +110 −0 lib/worker_bee.rb
  7. +31 −0 test/test_worker_bee.rb
@@ -0,0 +1,26 @@
+# -*- ruby -*-
+
+require "autotest/restart"
+
+Autotest.add_hook :initialize do |at|
+ at.testlib = "minitest/autorun"
+ at.add_exception "tmp"
+
+# at.extra_files << "../some/external/dependency.rb"
+#
+# at.libs << ":../some/external"
+#
+# at.add_exception "vendor"
+#
+# at.add_mapping(/dependency.rb/) do |f, _|
+# at.files_matching(/test_.*rb$/)
+# end
+#
+# %w(TestA TestB).each do |klass|
+# at.extra_class_map[klass] = "test/test_misc.rb"
+# end
+end
+
+# Autotest.add_hook :run_command do |at|
+# system "rake build"
+# end
@@ -0,0 +1,6 @@
+=== 1.0.0 / 2011-03-10
+
+* 1 major enhancement
+
+ * Birthday!
+
@@ -0,0 +1,7 @@
+.autotest
+History.txt
+Manifest.txt
+README.txt
+Rakefile
+lib/worker_bee.rb
+test/test_worker_bee.rb
@@ -0,0 +1,55 @@
+= worker_bee
+
+* http://seattlerb.org/
+
+== DESCRIPTION:
+
+WorkerBee encapsulates a simple pipeline of workers.
+
+== FEATURES/PROBLEMS:
+
+* Simple API to wrap up the usual Thread/Queue patterns.
+
+== SYNOPSIS:
+
+ bee = WorkerBee.new
+
+ bee.input enum_of_work_to_do
+
+ bee.work(20) { |work| ... stuff with input ... }
+ bee.work(5) { |work| ... stuff with results of previous ... }
+
+ bee.results # the final set of results
+
+== REQUIREMENTS:
+
+* ruby... awesome, no?
+
+== INSTALL:
+
+* sudo gem install worker_bee
+
+== LICENSE:
+
+(The MIT License)
+
+Copyright (c) Ryan Davis, seattle.rb
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+'Software'), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@@ -0,0 +1,27 @@
+# -*- ruby -*-
+
+require "rubygems"
+require "hoe"
+
+Hoe.plugin :isolate
+Hoe.plugin :seattlerb
+
+# Hoe.plugin :compiler
+# Hoe.plugin :email
+# Hoe.plugin :gem_prelude_sucks
+# Hoe.plugin :git
+# Hoe.plugin :inline
+# Hoe.plugin :isolate
+# Hoe.plugin :minitest
+# Hoe.plugin :perforce
+# Hoe.plugin :racc
+# Hoe.plugin :rubyforge
+# Hoe.plugin :seattlerb
+
+Hoe.spec "worker_bee" do
+ developer "Ryan Davis", "ryand-ruby@zenspider.com"
+
+ self.rubyforge_name = "seattlerb"
+end
+
+# vim: syntax=ruby
@@ -0,0 +1,110 @@
+require 'thread'
+
+##
+# Defines a WorkerBee instance which provides a simple means of
+# defining parallel tasks working from one queue of work to a queue of
+# results (which can then be a queue of work for another pipeline of
+# workers).
+#
+# bee = WorkerBee.new
+# bee.add_work :input, *(1..1000).to_a
+#
+# workers = bee.workers 20, :input, :square do |task|
+# task ** 2
+# end
+#
+# workers.finish
+#
+# workers = bee.workers 5, :square do |task|
+# Math.sqrt task
+# end
+#
+# workers.finish
+#
+# p bee.results
+
+Thread.abort_on_exception = true
+
+class WorkerBee
+ VERSION = "1.0.0"
+ SENTINAL = Object.new
+
+ attr_reader :tasks, :workers
+ attr_accessor :count
+
+ ##
+ # Creates a new WorkerBee.
+
+ def initialize
+ @tasks = [Queue.new]
+ @workers = []
+ @count = 0
+ end
+
+ def next_count
+ self.count += 1
+ self.count
+ end
+
+ def input *work
+ q = tasks.first
+ work.each do |task|
+ q << task
+ end
+ self
+ end
+
+ class Worker < Thread
+ attr_accessor :input, :output
+
+ def initialize input, output, &b
+ @input, @output = input, output
+ super() do
+ loop do
+ task = input.shift
+
+ break if task == SENTINAL
+
+ output << b[task]
+ end
+ end
+ end
+ end
+
+ def work n, &b
+ input = tasks[self.count]
+ output = tasks[self.next_count] = Queue.new
+
+ workers << (1..n).map { Worker.new input, output, &b }
+
+ self
+ end
+
+ def finish
+ workers.each do |pool|
+ input = pool.first.input
+
+ pool.size.times do
+ input << SENTINAL
+ end
+
+ pool.each do |thread|
+ thread.join
+ end
+ end
+ end
+
+ ##
+ # Returns the contents of the queue +name+ (defaults to +:result+).
+
+ def results
+ finish
+
+ q = tasks[count]
+
+ result = []
+ result.push q.shift until q.empty?
+ result.delete SENTINAL
+ result
+ end
+end
@@ -0,0 +1,31 @@
+require "minitest/autorun"
+require "worker_bee"
+
+class TestWorkerBee < MiniTest::Unit::TestCase
+ def test_sanity_manual
+ bee = WorkerBee.new
+
+ bee.input(*(1..25).to_a)
+
+ bee.work(20) { |n| sleep 0.25; n ** 2 }
+ bee.finish
+ bee.work(5) { |n| Math.sqrt n }
+ bee.finish
+
+ expected = (1..25).to_a
+ assert_equal expected, bee.results.map(&:to_i).sort
+ end
+
+ def test_sanity_automatic
+ bee = WorkerBee.new
+
+ bee.input(*(1..25).to_a)
+
+ bee.work(20) { |n| n ** 2 }
+ # bee.finish # commented out on purpose
+ bee.work(5) { |n| Math.sqrt n }
+
+ expected = (1..25).to_a
+ assert_equal expected, bee.results.map(&:to_i).sort
+ end
+end

0 comments on commit ca83074

Please sign in to comment.