Skip to content
Browse files

Respace. Remove project files.

  • Loading branch information...
1 parent 8029ebc commit 54678efeaa8d01d4b03fd303248ab469090df7a0 @chrisroberts chrisroberts committed Mar 21, 2012
View
30 CHANGELOG
@@ -1,19 +1,19 @@
0.2.3
- - Better thread destroying when removed normally and forcibly
- - Flushing threads to ensure proper joining
- - Fixed splatting for ruby < 1.9
- - Removed the useless logger wrapper
- - Use a simple monitor
+ - Better thread destroying when removed normally and forcibly
+ - Flushing threads to ensure proper joining
+ - Fixed splatting for ruby < 1.9
+ - Removed the useless logger wrapper
+ - Use a simple monitor
0.2.2
- - Better thread management within the pool
- - Restart thread timeouts when modified
- - Fix argument passing
+ - Better thread management within the pool
+ - Restart thread timeouts when modified
+ - Fix argument passing
0.2.1
- - Set minimum pool size properly when initialized to avoid
- creating extra threads (thanks to Roger Pack)
+ - Set minimum pool size properly when initialized to avoid
+ creating extra threads (thanks to Roger Pack)
0.2.0
- - Added argument support for passed tasks (thanks simonmenke)
- - Faster pool resizing
- - Smarter thread creation to limit unneeded creation
- - Allow floats for timeouts to provide better control
- - Test verified support for 1.8.6, 1.8.7, 1.9.1 and JRuby 1.4.0RC1
+ - Added argument support for passed tasks (thanks simonmenke)
+ - Faster pool resizing
+ - Smarter thread creation to limit unneeded creation
+ - Allow floats for timeouts to provide better control
+ - Test verified support for 1.8.6, 1.8.7, 1.9.1 and JRuby 1.4.0RC1
View
32 LICENSE
@@ -1,5 +1,5 @@
- GNU LESSER GENERAL PUBLIC LICENSE
- Version 3, 29 June 2007
+ GNU LESSER GENERAL PUBLIC LICENSE
+ Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
@@ -98,20 +98,20 @@ the following:
d) Do one of the following:
- 0) Convey the Minimal Corresponding Source under the terms of this
- License, and the Corresponding Application Code in a form
- suitable for, and under terms that permit, the user to
- recombine or relink the Application with a modified version of
- the Linked Version to produce a modified Combined Work, in the
- manner specified by section 6 of the GNU GPL for conveying
- Corresponding Source.
-
- 1) Use a suitable shared library mechanism for linking with the
- Library. A suitable mechanism is one that (a) uses at run time
- a copy of the Library already present on the user's computer
- system, and (b) will operate properly with a modified version
- of the Library that is interface-compatible with the Linked
- Version.
+ 0) Convey the Minimal Corresponding Source under the terms of this
+ License, and the Corresponding Application Code in a form
+ suitable for, and under terms that permit, the user to
+ recombine or relink the Application with a modified version of
+ the Linked Version to produce a modified Combined Work, in the
+ manner specified by section 6 of the GNU GPL for conveying
+ Corresponding Source.
+
+ 1) Use a suitable shared library mechanism for linking with the
+ Library. A suitable mechanism is one that (a) uses at run time
+ a copy of the Library already present on the user's computer
+ system, and (b) will operate properly with a modified version
+ of the Library that is interface-compatible with the Linked
+ Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
View
238 README.rdoc
@@ -4,14 +4,14 @@ ActionPool is just a simple thread pool. It allows for various constraints and r
=== install (easy):
- gem install actionpool
+ gem install actionpool
=== install (less easy):
- git clone http://github.com/spox/actionpool.git
- cd actionpool
- gem build *.gemspec
- gem install ./
+ git clone http://github.com/spox/actionpool.git
+ cd actionpool
+ gem build *.gemspec
+ gem install ./
=== install (less easy that's a little easier)
@@ -34,49 +34,49 @@ ActionPool is currently tested on:
=== Code:
- require 'actionpool'
-
- pool = ActionPool::Pool.new
+ require 'actionpool'
+
+ pool = ActionPool::Pool.new
+ pool.process do
+ sleep(2)
+ raise 'Wakeup main thread'
+ end
+ 20.times do
pool.process do
- sleep(2)
- raise 'Wakeup main thread'
- end
- 20.times do
- pool.process do
- puts "Thread: #{Thread.current}"
- sleep(0.1)
- end
- end
- begin
- sleep
- rescue Exception => e
- puts "Thread pool woke me up: #{e}"
+ puts "Thread: #{Thread.current}"
+ sleep(0.1)
end
+ end
+ begin
+ sleep
+ rescue Exception => e
+ puts "Thread pool woke me up: #{e}"
+ end
=== Result:
- Thread: #<Thread:0x93ebeb8>
- Thread: #<Thread:0x93eb92c>
- Thread: #<Thread:0x93eb8a0>
- Thread: #<Thread:0x93eb814>
- Thread: #<Thread:0x93eb788>
- Thread: #<Thread:0x93eb670>
- Thread: #<Thread:0x93eb5e4>
- Thread: #<Thread:0x93eb558>
- Thread: #<Thread:0x93eb4cc>
- Thread: #<Thread:0x93ebeb8>
- Thread: #<Thread:0x93eb92c>
- Thread: #<Thread:0x93eb8a0>
- Thread: #<Thread:0x93eb814>
- Thread: #<Thread:0x93eb788>
- Thread: #<Thread:0x93eb670>
- Thread: #<Thread:0x93eb5e4>
- Thread: #<Thread:0x93eb558>
- Thread: #<Thread:0x93eb4cc>
- Thread: #<Thread:0x93eb92c>
- Thread: #<Thread:0x93eb8a0>
- Thread pool woke me up: Wakeup main thread
+ Thread: #<Thread:0x93ebeb8>
+ Thread: #<Thread:0x93eb92c>
+ Thread: #<Thread:0x93eb8a0>
+ Thread: #<Thread:0x93eb814>
+ Thread: #<Thread:0x93eb788>
+ Thread: #<Thread:0x93eb670>
+ Thread: #<Thread:0x93eb5e4>
+ Thread: #<Thread:0x93eb558>
+ Thread: #<Thread:0x93eb4cc>
+ Thread: #<Thread:0x93ebeb8>
+ Thread: #<Thread:0x93eb92c>
+ Thread: #<Thread:0x93eb8a0>
+ Thread: #<Thread:0x93eb814>
+ Thread: #<Thread:0x93eb788>
+ Thread: #<Thread:0x93eb670>
+ Thread: #<Thread:0x93eb5e4>
+ Thread: #<Thread:0x93eb558>
+ Thread: #<Thread:0x93eb4cc>
+ Thread: #<Thread:0x93eb92c>
+ Thread: #<Thread:0x93eb8a0>
+ Thread pool woke me up: Wakeup main thread
=== Important note
@@ -87,115 +87,115 @@ The worker threads in the ActionPool will catch all Exception objects that your
ActionPool has some simple settings that make things work. First, the pool has a minimum and maximum number of allowed threads. On initialization, the minimum number of threads are created and put into the pool. By default, this is 10 threads. As the number of tasks added to the pool increases, the pool will grow as needed. When more tasks are in the pool than threads to process them, new threads will be added into the pool, until the maximum thread threshold is reached. Taking the example above, we can demonstrate this easily by adjusting our limits:
- require 'actionpool'
+ require 'actionpool'
- pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 3)
+ pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 3)
+ pool.process do
+ sleep(10)
+ raise 'Wakeup main thread'
+ end
+ 20.times do
pool.process do
- sleep(10)
- raise 'Wakeup main thread'
- end
- 20.times do
- pool.process do
- puts "Thread: #{Thread.current}"
- sleep(rand(0.0))
- end
- end
- begin
- sleep
- rescue Exception => e
- puts "Thread pool woke me up: #{e}"
+ puts "Thread: #{Thread.current}"
+ sleep(rand(0.0))
end
+ end
+ begin
+ sleep
+ rescue Exception => e
+ puts "Thread pool woke me up: #{e}"
+ end
Which results in:
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1080>
- Thread: #<Thread:0x86c1080>
- Thread: #<Thread:0x86c1080>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1080>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1080>
- Thread: #<Thread:0x86c1080>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1080>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1080>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1760>
- Thread: #<Thread:0x86c1080>
- Thread pool woke me up: Wakeup main thread
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1080>
+ Thread: #<Thread:0x86c1080>
+ Thread: #<Thread:0x86c1080>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1080>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1080>
+ Thread: #<Thread:0x86c1080>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1080>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1080>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1760>
+ Thread: #<Thread:0x86c1080>
+ Thread pool woke me up: Wakeup main thread
Our pool starts with a single thread that is occupied by the sleeping task waiting to raise an exception. As we begin to add new tasks, the pool grows to accommodate the growing number of tasks, until it reaches the maximum threshold of 3. At that point, the pool simply processes the tasks until the task list is empty.
The pool also has the ability to limit the amount of time a thread spends working on a given task. By default, a thread will work on a given task until the task is completed, or the pool is shutdown. However, as the following example shows, it is very easy to limit this time to avoid the pool being bogged down on long running tasks:
- require 'actionpool'
-
- pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 1, :a_to => 1)
- pool.process do
- puts "#{Time.now}: I'm a long running task"
- sleep(100)
- raise 'Wakeup main thread'
- end
- pool.process do
- puts "#{Time.now}: Waiting for my turn"
- raise "I'm waking up the main thread"
- end
- begin
- sleep
- rescue Exception => e
- puts "Thread pool woke me up: #{e}"
- end
+ require 'actionpool'
+
+ pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 1, :a_to => 1)
+ pool.process do
+ puts "#{Time.now}: I'm a long running task"
+ sleep(100)
+ raise 'Wakeup main thread'
+ end
+ pool.process do
+ puts "#{Time.now}: Waiting for my turn"
+ raise "I'm waking up the main thread"
+ end
+ begin
+ sleep
+ rescue Exception => e
+ puts "Thread pool woke me up: #{e}"
+ end
Results:
- 2009-10-10 08:47:08 -0700: I'm a long running task
- 2009-10-10 08:47:09 -0700: Waiting for my turn
- Thread pool woke me up: I'm waking up the main thread
+ 2009-10-10 08:47:08 -0700: I'm a long running task
+ 2009-10-10 08:47:09 -0700: Waiting for my turn
+ Thread pool woke me up: I'm waking up the main thread
If you have a number of tasks you would like to schedule at once, it is easy with the add_jobs method:
- require 'actionpool'
+ require 'actionpool'
- pool = ActionPool::Pool.new
- a = 0
- lock = Mutex.new
- tasks = [].fill(lambda{ lock.synchronize{ a += 1 } }, 0..19)
- pool.add_jobs(tasks)
- pool.shutdown
- puts "Result: #{a}"
+ pool = ActionPool::Pool.new
+ a = 0
+ lock = Mutex.new
+ tasks = [].fill(lambda{ lock.synchronize{ a += 1 } }, 0..19)
+ pool.add_jobs(tasks)
+ pool.shutdown
+ puts "Result: #{a}"
Results:
- Result: 20
+ Result: 20
Passing arguments to tasks is now available as well:
- require 'actionpool'
+ require 'actionpool'
- pool = ActionPool::Pool.new
- string = 'Hello world'
- puts "Original: #{string}. ID: #{string.object_id}"
- pool << [lambda{|var| puts "Passed: #{var}. ID: #{var.object_id}"}, [string.dup]]
- pool << [lambda{|a,b| puts "Passed: #{a} | #{b}. ID: #{a.object_id} | #{b.object_id}"}, [string, string.dup]]
- pool.shutdown
+ pool = ActionPool::Pool.new
+ string = 'Hello world'
+ puts "Original: #{string}. ID: #{string.object_id}"
+ pool << [lambda{|var| puts "Passed: #{var}. ID: #{var.object_id}"}, [string.dup]]
+ pool << [lambda{|a,b| puts "Passed: #{a} | #{b}. ID: #{a.object_id} | #{b.object_id}"}, [string, string.dup]]
+ pool.shutdown
Results:
- Original: Hello world. ID: 70651630
- Passed: Hello world. ID: 70651250
- Passed: Hello world | Hello world. ID: 70651630 | 70651100
+ Original: Hello world. ID: 70651630
+ Passed: Hello world. ID: 70651250
+ Passed: Hello world | Hello world. ID: 70651630 | 70651100
== Last remarks
If you find any bugs, please report them through {github}[http://github.com/spox/actionpool/issues]. If you are in need of any help, you can generally find me on DALnet and Freenode.
== License
- ActionPool is licensed under the LGPLv3
- Copyright (c) 2009 spox <spox@modspox.com>
+ ActionPool is licensed under the LGPLv3
+ Copyright (c) 2009 spox <spox@modspox.com>
View
28 Rakefile
@@ -12,20 +12,20 @@ require 'rake/testtask'
require 'spec/rake/spectask'
spec = Gem::Specification.new do |s|
- s.name = 'ActionPool'
- s.author = %q(spox)
- s.email = %q(spox@modspox.com)
- s.version = '0.2.3'
- s.summary = %q(Thread Pool)
- s.platform = Gem::Platform::RUBY
- s.files = Dir['**/*']
- s.rdoc_options = %w(--title ActionPool --main README.rdoc --line-numbers)
- s.extra_rdoc_files = %w(README.rdoc CHANGELOG)
- s.require_paths = %w(lib)
- s.required_ruby_version = '>= 1.8.6'
- s.add_dependency 'splib', '~> 1.4'
- s.homepage = %q(http://github.com/spox/actionpool)
- s.description = "The ActionPool is an easy to use thread pool for ruby."
+ s.name = 'ActionPool'
+ s.author = %q(spox)
+ s.email = %q(spox@modspox.com)
+ s.version = '0.2.3'
+ s.summary = %q(Thread Pool)
+ s.platform = Gem::Platform::RUBY
+ s.files = Dir['**/*']
+ s.rdoc_options = %w(--title ActionPool --main README.rdoc --line-numbers)
+ s.extra_rdoc_files = %w(README.rdoc CHANGELOG)
+ s.require_paths = %w(lib)
+ s.required_ruby_version = '>= 1.8.6'
+ s.add_dependency 'splib', '~> 1.4'
+ s.homepage = %q(http://github.com/spox/actionpool)
+ s.description = "The ActionPool is an easy to use thread pool for ruby."
end
Rake::GemPackageTask.new(spec) do |p|
View
28 actionpool.gemspec
@@ -1,16 +1,16 @@
spec = Gem::Specification.new do |s|
- s.name = 'ActionPool'
- s.author = %q(spox)
- s.email = %q(spox@modspox.com)
- s.version = '0.2.3'
- s.summary = %q(Thread Pool)
- s.platform = Gem::Platform::RUBY
- s.files = Dir['**/*']
- s.rdoc_options = %w(--title ActionPool --main README.rdoc --line-numbers)
- s.extra_rdoc_files = %w(README.rdoc CHANGELOG)
- s.require_paths = %w(lib)
- s.required_ruby_version = '>= 1.8.6'
- s.add_dependency 'splib', '~> 1.4'
- s.homepage = %q(http://github.com/spox/actionpool)
- s.description = "The ActionPool is an easy to use thread pool for ruby."
+ s.name = 'ActionPool'
+ s.author = %q(spox)
+ s.email = %q(spox@modspox.com)
+ s.version = '0.2.3'
+ s.summary = %q(Thread Pool)
+ s.platform = Gem::Platform::RUBY
+ s.files = Dir['**/*']
+ s.rdoc_options = %w(--title ActionPool --main README.rdoc --line-numbers)
+ s.extra_rdoc_files = %w(README.rdoc CHANGELOG)
+ s.require_paths = %w(lib)
+ s.required_ruby_version = '>= 1.8.6'
+ s.add_dependency 'splib', '~> 1.4'
+ s.homepage = %q(http://github.com/spox/actionpool)
+ s.description = "The ActionPool is an easy to use thread pool for ruby."
end
View
6 lib/actionpool.rb
@@ -1,9 +1,9 @@
require 'rubygems'
begin
- require 'fastthread'
+ require 'fastthread'
rescue LoadError
- # we don't care if it's available
- # just load it if it's around
+ # we don't care if it's available
+ # just load it if it's around
end
require 'splib'
Splib.load :Array, :Monitor
View
542 lib/actionpool/Pool.rb
@@ -4,311 +4,311 @@
require 'thread'
module ActionPool
- class PoolClosed < StandardError
+ class PoolClosed < StandardError
+ end
+ class Pool
+
+ # :min_threads:: minimum number of threads in pool
+ # :max_threads:: maximum number of threads in pool
+ # :t_to:: thread timeout waiting for action to process
+ # :a_to:: maximum time action may be worked on before aborting
+ # :logger:: logger to print logging messages to
+ # :pqueue:: use a priority queue (defaults to false)
+ # Creates a new pool
+ def initialize(args={})
+ raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash)
+ @logger = args[:logger] && args[:logger].is_a?(Logger) ? args[:logger] : Logger.new(nil)
+ if(args[:pqueue])
+ ActionPool.enable_priority_q
+ @queue = ActionPool::PQueue.new
+ else
+ @queue = ActionPool::Queue.new
+ end
+ @threads = []
+ @lock = Splib::Monitor.new
+ @thread_timeout = args[:t_to] ? args[:t_to] : 0
+ @action_timeout = args[:a_to] ? args[:a_to] : 0
+ @max_threads = args[:max_threads] ? args[:max_threads] : 100
+ @min_threads = args[:min_threads] ? args[:min_threads] : 10
+ @min_threads = @max_threads if @max_threads < @min_threads
+ @respond_to = args[:respond_thread] || ::Thread.current
+ @open = true
+ fill_pool
end
- class Pool
- # :min_threads:: minimum number of threads in pool
- # :max_threads:: maximum number of threads in pool
- # :t_to:: thread timeout waiting for action to process
- # :a_to:: maximum time action may be worked on before aborting
- # :logger:: logger to print logging messages to
- # :pqueue:: use a priority queue (defaults to false)
- # Creates a new pool
- def initialize(args={})
- raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash)
- @logger = args[:logger] && args[:logger].is_a?(Logger) ? args[:logger] : Logger.new(nil)
- if(args[:pqueue])
- ActionPool.enable_priority_q
- @queue = ActionPool::PQueue.new
- else
- @queue = ActionPool::Queue.new
- end
- @threads = []
- @lock = Splib::Monitor.new
- @thread_timeout = args[:t_to] ? args[:t_to] : 0
- @action_timeout = args[:a_to] ? args[:a_to] : 0
- @max_threads = args[:max_threads] ? args[:max_threads] : 100
- @min_threads = args[:min_threads] ? args[:min_threads] : 10
- @min_threads = @max_threads if @max_threads < @min_threads
- @respond_to = args[:respond_thread] || ::Thread.current
- @open = true
- fill_pool
- end
+ # Pool is closed
+ def pool_closed?
+ !@open
+ end
- # Pool is closed
- def pool_closed?
- !@open
- end
+ # Pool is open
+ def pool_open?
+ @open
+ end
- # Pool is open
- def pool_open?
- @open
- end
+ # arg:: :open or :closed
+ # Set pool status
+ def status(arg)
+ @open = arg == :open
+ fill_pool if @open
+ end
- # arg:: :open or :closed
- # Set pool status
- def status(arg)
- @open = arg == :open
- fill_pool if @open
+ # args:: :force forces a new thread.
+ # :nowait will create a thread if threads are waiting
+ # Create a new thread for pool.
+ # Returns newly created thread or nil if pool is at maximum size
+ def create_thread(*args)
+ return if pool_closed?
+ thread = nil
+ @lock.synchronize do
+ if(args.include?(:nowait) || action_size > size || action_size > waiting || args.include?(:force))
+ if(size < max || args.include?(:force))
+ thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to,
+ :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger,
+ :autostart => false)
+ @threads << thread
+ end
end
+ if(((size == working || args.include?(:nowait)) && @threads.size < @max_threads) || args.include?(:force))
- # args:: :force forces a new thread.
- # :nowait will create a thread if threads are waiting
- # Create a new thread for pool.
- # Returns newly created thread or nil if pool is at maximum size
- def create_thread(*args)
- return if pool_closed?
- thread = nil
- @lock.synchronize do
- if(args.include?(:nowait) || action_size > size || action_size > waiting || args.include?(:force))
- if(size < max || args.include?(:force))
- thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to,
- :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger,
- :autostart => false)
- @threads << thread
- end
- end
- if(((size == working || args.include?(:nowait)) && @threads.size < @max_threads) || args.include?(:force))
-
- end
- end
- thread.start if thread
- thread
end
+ end
+ thread.start if thread
+ thread
+ end
- # Fills the pool with the minimum number of threads
- # Returns array of created threads
- def fill_pool
- threads = []
- if(@open)
- @lock.synchronize do
- required = min - size
- if(required > 0)
- required.times do
- thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to,
- :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger,
- :autostart => false)
- @threads << thread
- threads << thread
- end
- end
- end
+ # Fills the pool with the minimum number of threads
+ # Returns array of created threads
+ def fill_pool
+ threads = []
+ if(@open)
+ @lock.synchronize do
+ required = min - size
+ if(required > 0)
+ required.times do
+ thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to,
+ :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger,
+ :autostart => false)
+ @threads << thread
+ threads << thread
end
- threads.each{|t|t.start}
- threads
+ end
end
+ end
+ threads.each{|t|t.start}
+ threads
+ end
- # force:: force immediate stop
- # Stop the pool
- def shutdown(force=false)
- status(:closed)
- args = []
- args.push(:force) if force
- @logger.info("Pool is now shutting down #{force ? 'using force' : ''}")
- @queue.clear if force
- @queue.wait_empty
- while(t = @threads.pop) do
- t.stop(*args)
- end
- unless(force)
- flush
- @threads.each{|x|x.join}
- end
- nil
- end
+ # force:: force immediate stop
+ # Stop the pool
+ def shutdown(force=false)
+ status(:closed)
+ args = []
+ args.push(:force) if force
+ @logger.info("Pool is now shutting down #{force ? 'using force' : ''}")
+ @queue.clear if force
+ @queue.wait_empty
+ while(t = @threads.pop) do
+ t.stop(*args)
+ end
+ unless(force)
+ flush
+ @threads.each{|x|x.join}
+ end
+ nil
+ end
- # action:: proc to be executed or array of [proc, [*args]]
- # Add a new proc/lambda to be executed (alias for queue)
- def <<(action)
- case action
- when Proc
- queue(action)
- when Array
- raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless action.size == 2 and action[0].is_a?(Proc) and action[1].is_a?(Array)
- queue(action[0], action[1])
- else
- raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
- end
- nil
- end
+ # action:: proc to be executed or array of [proc, [*args]]
+ # Add a new proc/lambda to be executed (alias for queue)
+ def <<(action)
+ case action
+ when Proc
+ queue(action)
+ when Array
+ raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless action.size == 2 and action[0].is_a?(Proc) and action[1].is_a?(Array)
+ queue(action[0], action[1])
+ else
+ raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
+ end
+ nil
+ end
- # action:: proc to be executed
- # Add a new proc/lambda to be executed
- # TODO: When using a prio queue for action holding, queue items
- # based on thread ID
- def queue(action, *args)
- raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
- raise ArgumentError.new('Expecting block') unless action.is_a?(Proc)
- @lock.synchronize do
- @queue << [action, args]
- create_thread
- end
- end
+ # action:: proc to be executed
+ # Add a new proc/lambda to be executed
+ # TODO: When using a prio queue for action holding, queue items
+ # based on thread ID
+ def queue(action, *args)
+ raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
+ raise ArgumentError.new('Expecting block') unless action.is_a?(Proc)
+ @lock.synchronize do
+ @queue << [action, args]
+ create_thread
+ end
+ end
- # jobs:: Array of proc/lambdas
- # Will queue a list of jobs into the pool
- def add_jobs(jobs)
- raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
- raise ArgumentError.new("Expecting an array but received: #{jobs.class}") unless jobs.is_a?(Array)
- @queue.pause
- begin
- jobs.each do |job|
- case job
- when Proc
- @queue << [job, []]
- when Array
- raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless job.size == 2 and job[0].is_a?(Proc) and job[1].is_a?(Array)
- @queue << [job.shift, job]
- else
- raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
- end
- end
- ensure
- num = jobs.size - @threads.select{|t|t.waiting?}.size
- @lock.synchronize{ num.times{ create_thread(:nowait) } if num > 0 }
- @queue.unpause
- end
- true
+ # jobs:: Array of proc/lambdas
+ # Will queue a list of jobs into the pool
+ def add_jobs(jobs)
+ raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
+ raise ArgumentError.new("Expecting an array but received: #{jobs.class}") unless jobs.is_a?(Array)
+ @queue.pause
+ begin
+ jobs.each do |job|
+ case job
+ when Proc
+ @queue << [job, []]
+ when Array
+ raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless job.size == 2 and job[0].is_a?(Proc) and job[1].is_a?(Array)
+ @queue << [job.shift, job]
+ else
+ raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
+ end
end
+ ensure
+ num = jobs.size - @threads.select{|t|t.waiting?}.size
+ @lock.synchronize{ num.times{ create_thread(:nowait) } if num > 0 }
+ @queue.unpause
+ end
+ true
+ end
- # block:: block to process
- # Adds a block to be processed
- def process(*args, &block)
- queue(block, *args)
- nil
- end
+ # block:: block to process
+ # Adds a block to be processed
+ def process(*args, &block)
+ queue(block, *args)
+ nil
+ end
- # Current size of pool
- def size
- @threads.size
- end
+ # Current size of pool
+ def size
+ @threads.size
+ end
- # Maximum allowed number of threads
- def max
- @max_threads
- end
+ # Maximum allowed number of threads
+ def max
+ @max_threads
+ end
- # Minimum allowed number of threads
- def min
- @min_threads
- end
+ # Minimum allowed number of threads
+ def min
+ @min_threads
+ end
- # m:: new max
- # Set maximum number of threads
- def max=(m)
- m = m.to_i
- raise ArgumentError.new('Maximum value must be greater than 0') unless m > 0
- @max_threads = m
- @min_threads = m if m < @min_threads
- resize if m < size
- m
- end
+ # m:: new max
+ # Set maximum number of threads
+ def max=(m)
+ m = m.to_i
+ raise ArgumentError.new('Maximum value must be greater than 0') unless m > 0
+ @max_threads = m
+ @min_threads = m if m < @min_threads
+ resize if m < size
+ m
+ end
- # m:: new min
- # Set minimum number of threads
- def min=(m)
- m = m.to_i
- raise ArgumentError.new("Minimum value must be greater than 0 and less than or equal to maximum (#{max})") unless m > 0 && m <= max
- @min_threads = m
- m
- end
+ # m:: new min
+ # Set minimum number of threads
+ def min=(m)
+ m = m.to_i
+ raise ArgumentError.new("Minimum value must be greater than 0 and less than or equal to maximum (#{max})") unless m > 0 && m <= max
+ @min_threads = m
+ m
+ end
- # t:: ActionPool::Thread to remove
- # Removes a thread from the pool
- def remove(t)
- raise ArgumentError.new('Expecting an ActionPool::Thread object') unless t.is_a?(ActionPool::Thread)
- t.stop
- del = @threads.include?(t)
- @threads.delete(t) if del
- fill_pool
- del
- end
+ # t:: ActionPool::Thread to remove
+ # Removes a thread from the pool
+ def remove(t)
+ raise ArgumentError.new('Expecting an ActionPool::Thread object') unless t.is_a?(ActionPool::Thread)
+ t.stop
+ del = @threads.include?(t)
+ @threads.delete(t) if del
+ fill_pool
+ del
+ end
- # Maximum number of seconds a thread
- # is allowed to idle in the pool.
- # (nil means thread life is infinite)
- def thread_timeout
- @thread_timeout
- end
+ # Maximum number of seconds a thread
+ # is allowed to idle in the pool.
+ # (nil means thread life is infinite)
+ def thread_timeout
+ @thread_timeout
+ end
- # Maximum number of seconds a thread
- # is allowed to work on a given action
- # (nil means thread is given unlimited
- # time to work on action)
- def action_timeout
- @action_timeout
- end
+ # Maximum number of seconds a thread
+ # is allowed to work on a given action
+ # (nil means thread is given unlimited
+ # time to work on action)
+ def action_timeout
+ @action_timeout
+ end
- # t:: timeout in seconds (nil for infinite)
- # Set maximum allowed time thead may idle in pool
- def thread_timeout=(t)
- t = t.to_f
- raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
- @thread_timeout = t
- @threads.each{|thread|thread.thread_timeout = t}
- t
- end
+ # t:: timeout in seconds (nil for infinite)
+ # Set maximum allowed time thead may idle in pool
+ def thread_timeout=(t)
+ t = t.to_f
+ raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
+ @thread_timeout = t
+ @threads.each{|thread|thread.thread_timeout = t}
+ t
+ end
- # t:: timeout in seconds (nil for infinte)
- # Set maximum allowed time thread may work
- # on a given action
- def action_timeout=(t)
- t = t.to_f
- raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
- @action_timeout = t
- @threads.each{|thread|thread.action_timeout = t}
- t
- end
+ # t:: timeout in seconds (nil for infinte)
+ # Set maximum allowed time thread may work
+ # on a given action
+ def action_timeout=(t)
+ t = t.to_f
+ raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
+ @action_timeout = t
+ @threads.each{|thread|thread.action_timeout = t}
+ t
+ end
- # Returns the next action to be processed
- def action
- @queue.pop
- end
+ # Returns the next action to be processed
+ def action
+ @queue.pop
+ end
- # Number of actions in the queue
- def action_size
- @queue.size
- end
+ # Number of actions in the queue
+ def action_size
+ @queue.size
+ end
- # Flush the thread pool. Mainly used for forcibly resizing
- # the pool if existing threads have a long thread life waiting
- # for input.
- def flush
- mon = Splib::Monitor.new
- @threads.size.times{ queue{ mon.wait } }
- @queue.wait_empty
- sleep(0.01)
- mon.broadcast
- end
+ # Flush the thread pool. Mainly used for forcibly resizing
+ # the pool if existing threads have a long thread life waiting
+ # for input.
+ def flush
+ mon = Splib::Monitor.new
+ @threads.size.times{ queue{ mon.wait } }
+ @queue.wait_empty
+ sleep(0.01)
+ mon.broadcast
+ end
- # Returns current number of threads in the pool working
- def working
- @threads.select{|t|t.running?}.size
- end
+ # Returns current number of threads in the pool working
+ def working
+ @threads.select{|t|t.running?}.size
+ end
- # Returns current number of threads in the pool waiting
- def waiting
- @threads.select{|t|t.waiting?}.size
- end
+ # Returns current number of threads in the pool waiting
+ def waiting
+ @threads.select{|t|t.waiting?}.size
+ end
- def thread_stats
- @threads.map{|t|[t.object_id,t.status]}
- end
+ def thread_stats
+ @threads.map{|t|[t.object_id,t.status]}
+ end
- private
+ private
- # Resize the pool
- def resize
- @logger.info("Pool is being resized to stated maximum: #{max}")
- until(size <= max) do
- t = nil
- t = @threads.find{|x|x.waiting?}
- t = @threads.shift unless t
- t.stop
- end
- flush
- nil
- end
+ # Resize the pool
+ def resize
+ @logger.info("Pool is being resized to stated maximum: #{max}")
+ until(size <= max) do
+ t = nil
+ t = @threads.find{|x|x.waiting?}
+ t = @threads.shift unless t
+ t.stop
+ end
+ flush
+ nil
end
+ end
end
View
102 lib/actionpool/Queue.rb
@@ -3,62 +3,62 @@
# This is a stub so we don't have to load the
# actual PriorityQueue library unless we need it
module Splib
- class PriorityQueue
- end
+ class PriorityQueue
+ end
end
module ActionPool
- class << self
- def enable_priority_q
- Splib.load :PriorityQueue
- end
+ class << self
+ def enable_priority_q
+ Splib.load :PriorityQueue
+ end
+ end
+ class PQueue < Splib::PriorityQueue
+ def initialize
+ super
+ @wait = false
+ @pause_guard = Splib::Monitor.new
+ @empty_guard = Splib::Monitor.new
+ extend QueueMethods
+ end
+ end
+ # Adds a little bit extra functionality to the Queue class
+ class Queue < ::Queue
+ def initialize
+ super
+ @wait = false
+ @pause_guard = Splib::Monitor.new
+ @empty_guard = Splib::Monitor.new
+ extend QueueMethods
+ end
+ end
+ module QueueMethods
+ # Stop the queue from returning results to requesting
+ # threads. Threads will wait for results until signalled
+ def pause
+ @wait = true
+ end
+ # Allow the queue to return results. Any threads waiting
+ # will have results given to them.
+ def unpause
+ @wait = false
+ @pause_guard.broadcast
end
- class PQueue < Splib::PriorityQueue
- def initialize
- super
- @wait = false
- @pause_guard = Splib::Monitor.new
- @empty_guard = Splib::Monitor.new
- extend QueueMethods
- end
+ # Check if queue needs to wait before returning
+ def pop
+ @pause_guard.wait_while{ @wait }
+ o = super
+ @empty_guard.broadcast if empty?
+ return o
end
- # Adds a little bit extra functionality to the Queue class
- class Queue < ::Queue
- def initialize
- super
- @wait = false
- @pause_guard = Splib::Monitor.new
- @empty_guard = Splib::Monitor.new
- extend QueueMethods
- end
+ # Clear queue
+ def clear
+ super
+ @empty_guard.broadcast
end
- module QueueMethods
- # Stop the queue from returning results to requesting
- # threads. Threads will wait for results until signalled
- def pause
- @wait = true
- end
- # Allow the queue to return results. Any threads waiting
- # will have results given to them.
- def unpause
- @wait = false
- @pause_guard.broadcast
- end
- # Check if queue needs to wait before returning
- def pop
- @pause_guard.wait_while{ @wait }
- o = super
- @empty_guard.broadcast if empty?
- return o
- end
- # Clear queue
- def clear
- super
- @empty_guard.broadcast
- end
- # Park a thread here until queue is empty
- def wait_empty
- @empty_guard.wait_while{ size > 0 }
- end
+ # Park a thread here until queue is empty
+ def wait_empty
+ @empty_guard.wait_while{ size > 0 }
end
+ end
end
View
310 lib/actionpool/Thread.rb
@@ -1,179 +1,179 @@
require 'timeout'
module ActionPool
- # Exception class used for waking up a thread
- class Wakeup < StandardError
+ # Exception class used for waking up a thread
+ class Wakeup < StandardError
+ end
+ # Raised within a thread when the timeout is changed
+ class Retimeout < StandardError
+ end
+ class Thread
+ # :pool:: pool thread is associated with
+ # :t_timeout:: max time a thread is allowed to wait for action
+ # :a_timeout:: max time thread is allowed to work
+ # :respond_thread:: thread to send execptions to
+ # :logger:: LogHelper for logging messages
+ # :autostart:: Automatically start the thread
+ # Create a new thread
+ def initialize(args={})
+ raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash)
+ raise ArgumentError.new('ActionPool::Thread requires a pool') unless args[:pool]
+ raise ArgumentError.new('ActionPool::Thread requries thread to respond') unless args[:respond_thread]
+ @pool = args[:pool]
+ @respond_to = args[:respond_thread]
+ @thread_timeout = args[:t_timeout] ? args[:t_timeout].to_f : 0
+ @action_timeout = args[:a_timeout] ? args[:a_timeout].to_f : 0
+ args[:autostart] = true unless args.has_key?(:autostart)
+ @kill = false
+ @logger = args[:logger].is_a?(Logger) ? args[:logger] : Logger.new(nil)
+ @lock = Splib::Monitor.new
+ @action = nil
+ @thread = args[:autostart] ? ::Thread.new{ start_thread } : nil
end
- # Raised within a thread when the timeout is changed
- class Retimeout < StandardError
- end
- class Thread
- # :pool:: pool thread is associated with
- # :t_timeout:: max time a thread is allowed to wait for action
- # :a_timeout:: max time thread is allowed to work
- # :respond_thread:: thread to send execptions to
- # :logger:: LogHelper for logging messages
- # :autostart:: Automatically start the thread
- # Create a new thread
- def initialize(args={})
- raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash)
- raise ArgumentError.new('ActionPool::Thread requires a pool') unless args[:pool]
- raise ArgumentError.new('ActionPool::Thread requries thread to respond') unless args[:respond_thread]
- @pool = args[:pool]
- @respond_to = args[:respond_thread]
- @thread_timeout = args[:t_timeout] ? args[:t_timeout].to_f : 0
- @action_timeout = args[:a_timeout] ? args[:a_timeout].to_f : 0
- args[:autostart] = true unless args.has_key?(:autostart)
- @kill = false
- @logger = args[:logger].is_a?(Logger) ? args[:logger] : Logger.new(nil)
- @lock = Splib::Monitor.new
- @action = nil
- @thread = args[:autostart] ? ::Thread.new{ start_thread } : nil
- end
- def start
- @thread = ::Thread.new{ start_thread } if @thread.nil?
- end
-
- # :force:: force the thread to stop
- # :wait:: wait for the thread to stop
- # Stop the thread
- def stop(*args)
- @kill = true
- if(args.include?(:force) || waiting?)
- begin
- @thread.raise Wakeup.new
- rescue Wakeup
- #ignore since we are the caller
- end
- sleep(0.01)
- @thread.kill if @thread.alive?
- end
- nil
+ def start
+ @thread = ::Thread.new{ start_thread } if @thread.nil?
+ end
+
+ # :force:: force the thread to stop
+ # :wait:: wait for the thread to stop
+ # Stop the thread
+ def stop(*args)
+ @kill = true
+ if(args.include?(:force) || waiting?)
+ begin
+ @thread.raise Wakeup.new
+ rescue Wakeup
+ #ignore since we are the caller
end
+ sleep(0.01)
+ @thread.kill if @thread.alive?
+ end
+ nil
+ end
- # Currently waiting
- def waiting?
- @action.nil?
- end
+ # Currently waiting
+ def waiting?
+ @action.nil?
+ end
- # Currently running
- def running?
- !@action.nil?
- end
+ # Currently running
+ def running?
+ !@action.nil?
+ end
- # Is the thread still alive
- def alive?
- @thread.alive?
- end
+ # Is the thread still alive
+ def alive?
+ @thread.alive?
+ end
- # Current thread status
- def status
- @action
- end
+ # Current thread status
+ def status
+ @action
+ end
- # Join internal thread
- def join
- @thread.join(@action_timeout)
- if(@thread.alive?)
- @thread.kill
- @thread.join
- end
- end
+ # Join internal thread
+ def join
+ @thread.join(@action_timeout)
+ if(@thread.alive?)
+ @thread.kill
+ @thread.join
+ end
+ end
- # Kill internal thread
- def kill
- @thread.kill
- end
+ # Kill internal thread
+ def kill
+ @thread.kill
+ end
- # Seconds thread will wait for input
- def thread_timeout
- @thread_timeout
- end
+ # Seconds thread will wait for input
+ def thread_timeout
+ @thread_timeout
+ end
- # Seconds thread will spend working on a given task
- def action_timeout
- @action_timeout
- end
+ # Seconds thread will spend working on a given task
+ def action_timeout
+ @action_timeout
+ end
- # t:: seconds to wait for input (floats allow for values 0 < t < 1)
- # Set the maximum amount of time to wait for a task
- def thread_timeout=(t)
- t = t.to_f
- raise ArgumentError.new('Value must be great than zero or nil') unless t > 0
- @thread_timeout = t
- @thread.raise Retimeout.new if waiting?
- t
- end
+ # t:: seconds to wait for input (floats allow for values 0 < t < 1)
+ # Set the maximum amount of time to wait for a task
+ def thread_timeout=(t)
+ t = t.to_f
+ raise ArgumentError.new('Value must be great than zero or nil') unless t > 0
+ @thread_timeout = t
+ @thread.raise Retimeout.new if waiting?
+ t
+ end
- # t:: seconds to work on a task (floats allow for values 0 < t < 1)
- # Set the maximum amount of time to work on a given task
- # Note: Modification of this will not affect actions already in process
- def action_timeout=(t)
- t = t.to_f
- raise ArgumentError.new('Value must be great than zero or nil') unless t > 0
- @action_timeout = t
- t
- end
+ # t:: seconds to work on a task (floats allow for values 0 < t < 1)
+ # Set the maximum amount of time to work on a given task
+ # Note: Modification of this will not affect actions already in process
+ def action_timeout=(t)
+ t = t.to_f
+ raise ArgumentError.new('Value must be great than zero or nil') unless t > 0
+ @action_timeout = t
+ t
+ end
- private
+ private
- # Start our thread
- def start_thread
- begin
- @logger.info("New pool thread is starting (#{self})")
- until(@kill) do
- begin
- @action = nil
- if(@pool.size > @pool.min && !@thread_timeout.zero?)
- Timeout::timeout(@thread_timeout) do
- @action = @pool.action
- end
- else
- @action = @pool.action
- end
- run(@action[0], @action[1]) unless @action.nil?
- rescue Timeout::Error
- @kill = true
- rescue Wakeup
- @logger.info("Thread #{::Thread.current} was woken up.")
- rescue Retimeout
- @logger.warn('Thread was woken up to reset thread timeout')
- rescue Exception => boom
- @logger.error("Pool thread caught an exception: #{boom}\n#{boom.backtrace.join("\n")}")
- @respond_to.raise boom
- end
- end
- rescue Retimeout
- @logger.warn('Thread was woken up to reset thread timeout')
- retry
- rescue Wakeup
- @logger.info("Thread #{::Thread.current} was woken up.")
- rescue Exception => boom
- @logger.error("Pool thread caught an exception: #{boom}\n#{boom.backtrace.join("\n")}")
- @respond_to.raise boom
- ensure
- @logger.info("Pool thread is shutting down (#{self})")
- @pool.remove(self)
+ # Start our thread
+ def start_thread
+ begin
+ @logger.info("New pool thread is starting (#{self})")
+ until(@kill) do
+ begin
+ @action = nil
+ if(@pool.size > @pool.min && !@thread_timeout.zero?)
+ Timeout::timeout(@thread_timeout) do
+ @action = @pool.action
+ end
+ else
+ @action = @pool.action
end
+ run(@action[0], @action[1]) unless @action.nil?
+ rescue Timeout::Error
+ @kill = true
+ rescue Wakeup
+ @logger.info("Thread #{::Thread.current} was woken up.")
+ rescue Retimeout
+ @logger.warn('Thread was woken up to reset thread timeout')
+ rescue Exception => boom
+ @logger.error("Pool thread caught an exception: #{boom}\n#{boom.backtrace.join("\n")}")
+ @respond_to.raise boom
+ end
end
+ rescue Retimeout
+ @logger.warn('Thread was woken up to reset thread timeout')
+ retry
+ rescue Wakeup
+ @logger.info("Thread #{::Thread.current} was woken up.")
+ rescue Exception => boom
+ @logger.error("Pool thread caught an exception: #{boom}\n#{boom.backtrace.join("\n")}")
+ @respond_to.raise boom
+ ensure
+ @logger.info("Pool thread is shutting down (#{self})")
+ @pool.remove(self)
+ end
+ end
- # action:: task to be run
- # args:: arguments to be passed to task
- # Run the task
- def run(action, args)
- args = args.respond_to?(:fixed_flatten) ? args.fixed_flatten(1) : args.flatten(1)
- begin
- unless(@action_timeout.zero?)
- Timeout::timeout(@action_timeout) do
- action.call(*args)
- end
- else
- action.call(*args)
- end
- rescue Timeout::Error => boom
- @logger.warn("Pool thread reached max execution time for action: #{boom}")
- end
+ # action:: task to be run
+ # args:: arguments to be passed to task
+ # Run the task
+ def run(action, args)
+ args = args.respond_to?(:fixed_flatten) ? args.fixed_flatten(1) : args.flatten(1)
+ begin
+ unless(@action_timeout.zero?)
+ Timeout::timeout(@action_timeout) do
+ action.call(*args)
+ end
+ else
+ action.call(*args)
end
+ rescue Timeout::Error => boom
+ @logger.warn("Pool thread reached max execution time for action: #{boom}")
+ end
end
+ end
end
View
4 nbproject/private/private.xml
@@ -1,4 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project-private xmlns="http://www.netbeans.org/ns/project-private/1">
- <editor-bookmarks xmlns="http://www.netbeans.org/ns/editor-bookmarks/1"/>
-</project-private>
View
19 nbproject/private/rake-d.txt
@@ -1,19 +0,0 @@
-clean=Remove any temporary products.
-clobber=Remove any generated file.
-clobber_package=Remove package products
-clobber_rdoc=Remove rdoc products
-doc=
-doc/rdoc=
-doc/rdoc/index.html=
-gem=Build the gem file ActionPool-0.2.3.gem
-package=Build all the packages
-pkg=
-pkg/ActionPool-0.2.3=
-pkg/ActionPool-0.2.3.gem=
-pkg/ActionPool-0.2.3.tgz=
-pkg/ActionPool-0.2.3.zip=
-rdoc=Build the rdoc HTML Files
-repackage=Force a rebuild of the package files
-rerdoc=Force a rebuild of the RDOC files
-spec=Run specs
-test=Run tests
View
6 nbproject/project.properties
@@ -1,6 +0,0 @@
-main.file=
-platform.active=default
-source.encoding=UTF-8
-spec.src.dir=spec
-src.dir=lib
-test.src.dir=test
View
16 nbproject/project.xml
@@ -1,16 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://www.netbeans.org/ns/project/1">
- <type>org.netbeans.modules.ruby.rubyproject</type>
- <configuration>
- <data xmlns="http://www.netbeans.org/ns/ruby-project/1">
- <name>actionpool</name>
- <source-roots>
- <root id="src.dir"/>
- </source-roots>
- <test-roots>
- <root id="test.src.dir"/>
- <root id="spec.src.dir"/>
- </test-roots>
- </data>
- </configuration>
-</project>
View
110 test/cases/general.rb
@@ -2,59 +2,59 @@
require 'test/unit'
class GeneralPoolTest < Test::Unit::TestCase
- def setup
- @pool = ActionPool::Pool.new
- end
- def teardown
- @pool.shutdown(true)
- end
- def test_numbers
- assert_equal(10, @pool.size)
- assert_equal(10, @pool.min)
- assert_equal(100, @pool.max)
- assert_equal(0, @pool.action_timeout)
- assert_equal(0, @pool.thread_timeout)
- assert_equal(0, @pool.action_size)
- end
- def test_output
- a = 0
- lock = Mutex.new
- run = lambda{ lock.synchronize{ a += 1 } }
- 100.times{ @pool << run }
- @pool.shutdown
- assert_equal(100, a)
- @pool.status :open
- a = 0
- jobs = [].fill(run,0,100)
- @pool.add_jobs(jobs)
- @pool.shutdown
- sleep(0.01)
- assert_equal(100, a)
- @pool.shutdown(true)
- end
- def test_args
- @pool.status :open
- output = nil
- @pool << [lambda{|x| output = x}, [2]]
- assert(2, output)
- output = nil
- @pool.add_jobs([[lambda{|x| output = x}, [3]]])
- assert(3, output)
- output = nil
- @pool << [lambda{|x,y| output = x+y}, [1,2]]
- assert(3, output)
- output = nil
- arr = []
- @pool.add_jobs([[lambda{|x,y| arr << x + y}, [1,1]], [lambda{|x| arr << x}, [3]]])
- ::Thread.pass
- sleep(0.01)
- assert(arr.include?(2))
- assert(arr.include?(3))
- arr.clear
- @pool << [lambda{|x,y| arr = [x,y]}, ['test', [1,2]]]
- sleep(0.01)
- assert_equal('test', arr[0])
- assert(arr[1].is_a?(Array))
- @pool.shutdown(true)
- end
+ def setup
+ @pool = ActionPool::Pool.new
+ end
+ def teardown
+ @pool.shutdown(true)
+ end
+ def test_numbers
+ assert_equal(10, @pool.size)
+ assert_equal(10, @pool.min)
+ assert_equal(100, @pool.max)
+ assert_equal(0, @pool.action_timeout)
+ assert_equal(0, @pool.thread_timeout)
+ assert_equal(0, @pool.action_size)
+ end
+ def test_output
+ a = 0
+ lock = Mutex.new
+ run = lambda{ lock.synchronize{ a += 1 } }
+ 100.times{ @pool << run }
+ @pool.shutdown
+ assert_equal(100, a)
+ @pool.status :open
+ a = 0
+ jobs = [].fill(run,0,100)
+ @pool.add_jobs(jobs)
+ @pool.shutdown
+ sleep(0.01)
+ assert_equal(100, a)
+ @pool.shutdown(true)
+ end
+ def test_args
+ @pool.status :open
+ output = nil
+ @pool << [lambda{|x| output = x}, [2]]
+ assert(2, output)
+ output = nil
+ @pool.add_jobs([[lambda{|x| output = x}, [3]]])
+ assert(3, output)
+ output = nil
+ @pool << [lambda{|x,y| output = x+y}, [1,2]]
+ assert(3, output)
+ output = nil
+ arr = []
+ @pool.add_jobs([[lambda{|x,y| arr << x + y}, [1,1]], [lambda{|x| arr << x}, [3]]])
+ ::Thread.pass
+ sleep(0.01)
+ assert(arr.include?(2))
+ assert(arr.include?(3))
+ arr.clear
+ @pool << [lambda{|x,y| arr = [x,y]}, ['test', [1,2]]]
+ sleep(0.01)
+ assert_equal('test', arr[0])
+ assert(arr[1].is_a?(Array))
+ @pool.shutdown(true)
+ end
end
View
38 test/cases/grow.rb
@@ -2,23 +2,23 @@
require 'test/unit'
class GrowPoolTest < Test::Unit::TestCase
- def setup
- @pool = ActionPool::Pool.new
- end
- def teardown
- @pool.shutdown(true)
- end
- def test_grow
- jobs = [].fill(lambda{sleep}, 0..20)
- @pool.add_jobs(jobs)
- Thread.pass
- assert(@pool.size > 10)
- @pool.shutdown(true)
- end
- def test_max
- @pool.create_thread(:force) until @pool.size > @pool.max
- assert(@pool.create_thread.nil?)
- assert(!@pool.create_thread(:force).nil?)
- @pool.shutdown(true)
- end
+ def setup
+ @pool = ActionPool::Pool.new
+ end
+ def teardown
+ @pool.shutdown(true)
+ end
+ def test_grow
+ jobs = [].fill(lambda{sleep}, 0..20)
+ @pool.add_jobs(jobs)
+ Thread.pass
+ assert(@pool.size > 10)
+ @pool.shutdown(true)
+ end
+ def test_max
+ @pool.create_thread(:force) until @pool.size > @pool.max
+ assert(@pool.create_thread.nil?)
+ assert(!@pool.create_thread(:force).nil?)
+ @pool.shutdown(true)
+ end
end
View
20 test/cases/nogrow.rb
@@ -2,14 +2,14 @@
require 'test/unit'
class NoGrowPoolTest < Test::Unit::TestCase
- def setup
- @pool = ActionPool::Pool.new
- end
- def teardown
- @pool.shutdown(true)
- end
- def test_nogrow
- 5.times{ @pool << lambda{} }
- assert_equal(10, @pool.size)
- end
+ def setup
+ @pool = ActionPool::Pool.new
+ end
+ def teardown
+ @pool.shutdown(true)
+ end
+ def test_nogrow
+ 5.times{ @pool << lambda{} }
+ assert_equal(10, @pool.size)
+ end
end
View
58 test/cases/queue.rb
@@ -2,33 +2,33 @@
require 'test/unit'
class QueueTest < Test::Unit::TestCase
- def setup
- @queue = ActionPool::Queue.new
- end
- def test_pop
- 3.times{|i|@queue << i}
- 3.times{|i|assert(i, @queue.pop)}
- assert(@queue.empty?)
- end
- def test_pause
- 3.times{|i|@queue << i}
- @queue.pause
- output = []
- 3.times{Thread.new{output << @queue.pop}}
- assert(output.empty?)
- assert_equal(3, @queue.size)
- @queue.unpause
- sleep(1)
- assert(@queue.empty?)
- assert_equal(3, output.size)
- 3.times{|i|assert(output.include?(i))}
- @queue << 1
- output = nil
- Thread.new{@queue.wait_empty; output = true}
- assert_nil(output)
- @queue.pop
- Thread.pass
- sleep(0.01)
- assert(output)
- end
+ def setup
+ @queue = ActionPool::Queue.new
+ end
+ def test_pop
+ 3.times{|i|@queue << i}
+ 3.times{|i|assert(i, @queue.pop)}
+ assert(@queue.empty?)
+ end
+ def test_pause
+ 3.times{|i|@queue << i}
+ @queue.pause
+ output = []
+ 3.times{Thread.new{output << @queue.pop}}
+ assert(output.empty?)
+ assert_equal(3, @queue.size)
+ @queue.unpause
+ sleep(1)
+ assert(@queue.empty?)
+ assert_equal(3, output.size)
+ 3.times{|i|assert(output.include?(i))}
+ @queue << 1
+ output = nil
+ Thread.new{@queue.wait_empty; output = true}
+ assert_nil(output)
+ @queue.pop
+ Thread.pass
+ sleep(0.01)
+ assert(output)
+ end
end
View
60 test/cases/resize.rb
@@ -2,34 +2,34 @@
require 'test/unit'
class ResizePoolTest < Test::Unit::TestCase
- def setup
- @pool = ActionPool::Pool.new
- end
- def teardown
- @pool.shutdown(true)
- end
- def test_resize
- stop = false
- 20.times{ @pool << lambda{ sleep } }
- Thread.pass
- sleep(0.1)
- assert(@pool.size > 10)
- stop = true
- @pool.shutdown(true)
- @pool.status :open
- @pool.max = 10
- assert_equal(10, @pool.max)
- stop = false
- 20.times{ @pool << lambda{ a = 0; a += 1 until stop || a > 9999999999 } }
- assert_equal(10, @pool.size)
- stop = true
- @pool.shutdown(true)
- @pool.status :open
- @pool.max = 20
- stop = false
- 30.times{ @pool << lambda{ a = 0; a += 1 until stop || a > 9999999999 } }
- stop = true
- assert(@pool.size > 10)
- @pool.shutdown(true)
- end
+ def setup
+ @pool = ActionPool::Pool.new
+ end
+ def teardown
+ @pool.shutdown(true)
+ end
+ def test_resize
+ stop = false
+ 20.times{ @pool << lambda{ sleep } }
+ Thread.pass
+ sleep(0.1)
+ assert(@pool.size > 10)
+ stop = true
+ @pool.shutdown(true)
+ @pool.status :open
+ @pool.max = 10
+ assert_equal(10, @pool.max)
+ stop = false
+ 20.times{ @pool << lambda{ a = 0; a += 1 until stop || a > 9999999999 } }
+ assert_equal(10, @pool.size)
+ stop = true
+ @pool.shutdown(true)
+ @pool.status :open
+ @pool.max = 20
+ stop = false
+ 30.times{ @pool << lambda{ a = 0; a += 1 until stop || a > 9999999999 } }
+ stop = true
+ assert(@pool.size > 10)
+ @pool.shutdown(true)
+ end
end
View
58 test/cases/shutdown.rb
@@ -2,35 +2,35 @@
require 'test/unit'
class ShutdownPoolTest < Test::Unit::TestCase
- def setup
- @pool = ActionPool::Pool.new
+ def setup
+ @pool = ActionPool::Pool.new
+ end
+ def teardown
+ @pool.shutdown(true)
+ end
+ def test_close
+ result = 0
+ @pool << lambda{ result = 5 }
+ sleep(0.01)
+ assert(5, result)
+ @pool.status :closed
+ assert_raise(ActionPool::PoolClosed) do
+ @pool << lambda{}
end
- def teardown
- @pool.shutdown(true)
- end
- def test_close
- result = 0
- @pool << lambda{ result = 5 }
- sleep(0.01)
- assert(5, result)
- @pool.status :closed
- assert_raise(ActionPool::PoolClosed) do
- @pool << lambda{}
- end
- assert_raise(ActionPool::PoolClosed) do
- @pool.add_jobs [lambda{}, lambda{}]
- end
- @pool.shutdown(true)
- end
- def test_shutdown
- assert_equal(10, @pool.size)
- @pool.shutdown
- sleep(0.5)
- assert_equal(0, @pool.size)
- @pool.status :open
- @pool << lambda{ sleep }
- sleep(0.01)
- assert_equal(10, @pool.size)
- @pool.shutdown(true)
+ assert_raise(ActionPool::PoolClosed) do
+ @pool.add_jobs [lambda{}, lambda{}]
end
+ @pool.shutdown(true)
+ end
+ def test_shutdown
+ assert_equal(10, @pool.size)
+ @pool.shutdown
+ sleep(0.5)
+ assert_equal(0, @pool.size)
+ @pool.status :open
+ @pool << lambda{ sleep }
+ sleep(0.01)
+ assert_equal(10, @pool.size)
+ @pool.shutdown(true)
+ end
end
View
44 test/cases/thread.rb
@@ -2,26 +2,26 @@
require 'test/unit'
class ThreadTest < Test::Unit::TestCase
- def setup
- @pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 1)
- @thread = ActionPool::Thread.new(:pool => @pool, :respond_thread => self, :t_timeout => 60, :a_timeout => 0)
- end
- def teardown
- @pool.shutdown(true)
- end
- def test_thread
- sleep(0.01)
- assert(@thread.waiting?)
- assert_equal(60, @thread.thread_timeout)
- assert_equal(0, @thread.action_timeout)
- assert(@thread.alive?)
- stop = false
- 10.times{ @pool << lambda{ a = 0; a += 1 until stop || a > 9999999999 } }
- assert(!@thread.waiting?)
- @thread.stop(:force)
- sleep(0.01)
- assert(!@thread.alive?)
- stop = true
- @pool.shutdown(true)
- end
+ def setup
+ @pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 1)
+ @thread = ActionPool::Thread.new(:pool => @pool, :respond_thread => self, :t_timeout => 60, :a_timeout => 0)
+ end
+ def teardown
+ @pool.shutdown(true)
+ end
+ def test_thread
+ sleep(0.01)
+ assert(@thread.waiting?)
+ assert_equal(60, @thread.thread_timeout)
+ assert_equal(0, @thread.action_timeout)
+ assert(@thread.alive?)
+ stop = false
+ 10.times{ @pool << lambda{ a = 0; a += 1 until stop || a > 9999999999 } }
+ assert(!@thread.waiting?)
+ @thread.stop(:force)
+ sleep(0.01)
+ assert(!@thread.alive?)
+ stop = true
+ @pool.shutdown(true)
+ end
end
View
84 test/cases/timeouts.rb
@@ -2,48 +2,48 @@
require 'test/unit'
class TimeoutPoolTest < Test::Unit::TestCase
- def setup
- @pool = ActionPool::Pool.new
- end
- def teardown
- @pool.shutdown(true)
- end
- def test_actiontimeout
- @pool.action_timeout = 0.25
- assert_equal(10, @pool.size)
- stop = false
- output = []
- 20.times do
- @pool.process do
- until(stop) do
- output << 1
- sleep(0.3)
- end
- end
+ def setup
+ @pool = ActionPool::Pool.new
+ end
+ def teardown
+ @pool.shutdown(true)
+ end
+ def test_actiontimeout
+ @pool.action_timeout = 0.25
+ assert_equal(10, @pool.size)
+ stop = false
+ output = []
+ 20.times do
+ @pool.process do
+ until(stop) do
+ output << 1
+ sleep(0.3)
end
- sleep(0.1)
- assert_equal(20, output.size)
- sleep(0.3)
- assert_equal(0, @pool.working)
- assert_equal(20, output.size)
- assert_equal(20, @pool.size)
- @pool.shutdown(true)
- end
- def test_threadtimeout
- @pool.thread_timeout = 0.05
- assert_equal(10, @pool.size)
- t = [].fill(lambda{
- begin
- sleep(0.1)
- rescue
- end }, 0, 20)
- @pool.add_jobs(t)
- ::Thread.pass
- sleep(0.01)
- assert(@pool.size >= 20)
- ::Thread.pass
- sleep(0.1)
- assert(10, @pool.size)
- @pool.shutdown(true)
+ end
end
+ sleep(0.1)
+ assert_equal(20, output.size)
+ sleep(0.3)
+ assert_equal(0, @pool.working)
+ assert_equal(20, output.size)
+ assert_equal(20, @pool.size)
+ @pool.shutdown(true)
+ end
+ def test_threadtimeout
+ @pool.thread_timeout = 0.05
+ assert_equal(10, @pool.size)
+ t = [].fill(lambda{
+ begin
+ sleep(0.1)
+ rescue
+ end }, 0, 20)
+ @pool.add_jobs(t)
+ ::Thread.pass
+ sleep(0.01)
+ assert(@pool.size >= 20)
+ ::Thread.pass
+ sleep(0.1)
+ assert(10, @pool.size)
+ @pool.shutdown(true)
+ end
end
View
2 test/run_tests.rb
@@ -4,5 +4,5 @@
require 'actionpool'
Dir.new("#{File.dirname(__FILE__)}/cases").each{|f|
- require "#{File.dirname(__FILE__)}/cases/#{f}" if f[-2..f.size] == 'rb'
+ require "#{File.dirname(__FILE__)}/cases/#{f}" if f[-2..f.size] == 'rb'
}

0 comments on commit 54678ef

Please sign in to comment.
Something went wrong with that request. Please try again.