Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial introduction of a deferrable pool for resource pooling where …

…the work can be encapsulated using deferrables
  • Loading branch information...
commit bc999d2464a341f7648bd13727b293d2a326e945 1 parent 61239dd
@raggi raggi authored
View
1  lib/em/deferrable.rb
@@ -25,6 +25,7 @@
module EventMachine
module Deferrable
+ autoload :Pool, 'em/deferrable/pool'
# Specify a block to be executed if and when the Deferrable object receives
# a status of :succeeded. See #set_deferred_status for more information.
View
97 lib/em/deferrable/pool.rb
@@ -0,0 +1,97 @@
+# = EM::Deferrable::Pool
+#
+# A simple async resource pool based on a resource and work queue. Resources
+# are enqueued and work waits for resources to become available.
+#
+# Example:
+#
+# EM.run do
+# pool = EM::Deferrable::Pool.new
+# spawn = lambda { pool.add EM::HttpRequest.new('http://example.org') }
+# 10.times { spawn[] }
+# done, scheduled = 0, 0
+#
+# check = lambda do
+# done += 1
+# if done >= scheduled
+# EM.stop
+# end
+# end
+#
+# pool.on_error { |conn| spawn[] }
+#
+# 100.times do
+# pool.perform do |conn|
+# req = conn.get :path => '/', :keepalive => true
+#
+# req.callback do
+# p [:success, conn.object_id, i, req.response.size]
+# check[]
+# end
+#
+# req.errback { check[] }
+#
+# req
+# end
+# end
+# end
+#
+class EM::Deferrable::Pool
+ def initialize
+ @resources = EM::Queue.new
+ @removed = []
+ end
+
+ def add resource
+ @resources.push resource
+ end
+ alias requeue add
+
+ def remove resource
+ @removed << resource
+ end
+
+ def on_error *a, &b
+ @on_error = EM::Callback(*a, &b)
+ end
+
+ def perform(*a, &b)
+ work = EM::Callback(*a, &b)
+
+ @resources.pop do |resource|
+ if removed? resource
+ @removed.delete resource
+ reschedule work
+ else
+ process work, resource
+ end
+ end
+ end
+ alias reschedule perform
+
+ def process work, resource
+ deferrable = work.call resource
+ if deferrable.kind_of?(EM::Deferrable)
+ completion deferrable, resource
+ else
+ raise ArgumentError, "deferrable expected from work"
+ end
+ end
+
+ def completion deferrable, resource
+ deferrable.callback { requeue resource }
+ deferrable.errback { failure resource }
+ end
+
+ def failure resource
+ if @on_error
+ @on_error.call resource
+ else
+ requeue resource
+ end
+ end
+
+ def removed? resource
+ @removed.include? resource
+ end
+end
View
107 tests/test_deferrable_pool.rb
@@ -0,0 +1,107 @@
+class TestDeferrablePool < Test::Unit::TestCase
+ def pool
+ @pool ||= EM::Deferrable::Pool.new
+ end
+
+ def go
+ EM.run { yield }
+ end
+
+ def stop
+ EM.stop
+ end
+
+ def deferrable
+ @deferrable ||= EM::DefaultDeferrable.new
+ end
+
+ def test_supports_more_work_than_resources
+ ran = false
+ go do
+ pool.perform do
+ ran = true
+ deferrable
+ end
+ stop
+ end
+ assert_equal false, ran
+ go do
+ pool.add :resource
+ stop
+ end
+ assert_equal true, ran
+ end
+
+ def test_reques_resources_on_error
+ pooled_res, pooled_res2 = nil
+ pool.add :res
+ go do
+ pool.perform do |res|
+ pooled_res = res
+ deferrable
+ end
+ stop
+ end
+ deferrable.fail
+ go do
+ pool.perform do |res|
+ pooled_res2 = res
+ deferrable
+ end
+ stop
+ end
+ assert_equal :res, pooled_res
+ assert_equal pooled_res, pooled_res2
+ end
+
+ def test_supports_custom_error_handler
+ eres = nil
+ pool.on_error do |res|
+ eres = res
+ end
+ performs = []
+ pool.add :res
+ go do
+ pool.perform do |res|
+ performs << res
+ deferrable
+ end
+ pool.perform do |res|
+ performs << res
+ deferrable
+ end
+ deferrable.fail
+ stop
+ end
+ assert_equal :res, eres
+ # manual requeues required when error handler is installed:
+ assert_equal 1, performs.size
+ assert_equal :res, performs.first
+ end
+
+ def test_catches_successful_deferrables
+ performs = []
+ pool.add :res
+ go do
+ pool.perform { |res| performs << res; deferrable }
+ pool.perform { |res| performs << res; deferrable }
+ stop
+ end
+ assert_equal [:res], performs
+ deferrable.succeed
+ go { stop }
+ assert_equal [:res, :res], performs
+ end
+
+ def test_prunes_locked_and_removed_resources
+ performs = []
+ pool.add :res
+ deferrable.succeed
+ go do
+ pool.perform { |res| performs << res; pool.remove res; deferrable }
+ pool.perform { |res| performs << res; pool.remove res; deferrable }
+ stop
+ end
+ assert_equal [:res], performs
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.