Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

223 lines (193 sloc) 5.983 kb

ActionPool

ActionPool is just a simple thread pool. It allows for various constraints and resizing in a pretty easy and unobtrusive manner. You can set limits on how long tasks are worked on, as well as on the life of a thread. For things that like to use lots threads, it can be helpful to reuse threads instead of constantly recreating them.

Install:

  gem install actionpool

== Documentation

{rdocs}[http://allgems.ruby-forum.com/gems/ActionPool/]

Example

Code:

  require 'actionpool'

  pool = ActionPool::Pool.new
  pool.process do
    sleep(2)
    raise 'Wakeup main thread'
  end
  20.times do
    pool.process do
      puts "Thread: #{Thread.current}"
      sleep(0.1)
    end
  end
  begin
    sleep
  rescue Exception => e
    puts "Thread pool woke me up: #{e}"
  end

Result:

  Thread: #<Thread:0x93ebeb8>
  Thread: #<Thread:0x93eb92c>
  Thread: #<Thread:0x93eb8a0>
  Thread: #<Thread:0x93eb814>
  Thread: #<Thread:0x93eb788>
  Thread: #<Thread:0x93eb670>
  Thread: #<Thread:0x93eb5e4>
  Thread: #<Thread:0x93eb558>
  Thread: #<Thread:0x93eb4cc>
  Thread: #<Thread:0x93ebeb8>
  Thread: #<Thread:0x93eb92c>
  Thread: #<Thread:0x93eb8a0>
  Thread: #<Thread:0x93eb814>
  Thread: #<Thread:0x93eb788>
  Thread: #<Thread:0x93eb670>
  Thread: #<Thread:0x93eb5e4>
  Thread: #<Thread:0x93eb558>
  Thread: #<Thread:0x93eb4cc>
  Thread: #<Thread:0x93eb92c>
  Thread: #<Thread:0x93eb8a0>
  Thread pool woke me up: Wakeup main thread

Important note

The worker threads in the ActionPool will catch all Exception objects that your block fails to catch. Instead of just eating these exceptions, they are passed back to the creating thread of the pool (generally the main thread). This is important to note if you care about processing unexpected exceptions and what allows the example code above to be woken up from its sleep.

The internals

ActionPool has some simple settings that make things work. First, the pool has a minimum and maximum number of allowed threads. On initialization, the minimum number of threads are created and put into the pool. By default, this is 10 threads. As the number of tasks added to the pool increases, the pool will grow as needed. When more tasks are in the pool than threads to process them, new threads will be added into the pool, until the maximum thread threshold is reached. Taking the example above, we can demonstrate this easily by adjusting our limits:

  require 'actionpool'

  pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 3)
  pool.process do
    sleep(10)
    raise 'Wakeup main thread'
  end
  20.times do
    pool.process do
      puts "Thread: #{Thread.current}"
      sleep(rand(0.0))
    end
  end
  begin
    sleep
  rescue Exception => e
    puts "Thread pool woke me up: #{e}"
  end

Which results in:

  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1080>
  Thread: #<Thread:0x86c1080>
  Thread: #<Thread:0x86c1080>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1080>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1080>
  Thread: #<Thread:0x86c1080>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1080>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1080>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1760>
  Thread: #<Thread:0x86c1080>
  Thread pool woke me up: Wakeup main thread

Our pool starts with a single thread that is occupied by the sleeping task waiting to raise an exception. As we begin to add new tasks, the pool grows to accommodate the growing number of tasks, until it reaches the maximum threshold of 3. At that point, the pool simply processes the tasks until the task list is empty.

The pool also has the ability to limit the amount of time a thread spends working on a given task. By default, a thread will work on a given task until the task is completed, or the pool is shutdown. However, as the following example shows, it is very easy to limit this time to avoid the pool being bogged down on long running tasks:

  require 'actionpool'

  pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 1, :a_to => 1)
  pool.process do
    puts "#{Time.now}: I'm a long running task"
    sleep(100)
    raise 'Wakeup main thread'
  end
  pool.process do
    puts "#{Time.now}: Waiting for my turn"
    raise "I'm waking up the main thread"
  end
  begin
    sleep
  rescue Exception => e
    puts "Thread pool woke me up: #{e}"
  end

Results:

  2009-10-10 08:47:08 -0700: I'm a long running task
  2009-10-10 08:47:09 -0700: Waiting for my turn
  Thread pool woke me up: I'm waking up the main thread

If you have a number of tasks you would like to schedule at once, it is easy with the add_jobs method:

  require 'actionpool'

  pool = ActionPool::Pool.new
  a = 0
  lock = Mutex.new
  tasks = [].fill(lambda{ lock.synchronize{ a += 1 } }, 0..19)
  pool.add_jobs(tasks)
  pool.shutdown
  puts "Result: #{a}"

Results:

  Result: 20

Passing arguments to tasks is now available as well:

  require 'actionpool'

  pool = ActionPool::Pool.new
  string = 'Hello world'
  puts "Original: #{string}. ID: #{string.object_id}"
  pool << [lambda{|var| puts "Passed: #{var}. ID: #{var.object_id}"}, [string.dup]]
  pool << [lambda{|a,b| puts "Passed: #{a} | #{b}. ID: #{a.object_id} | #{b.object_id}"}, [string, string.dup]]
  pool.shutdown

Results:

  Original: Hello world. ID: 70651630
  Passed: Hello world. ID: 70651250
  Passed: Hello world | Hello world. ID: 70651630 | 70651100

== Last remarks

If you find any bugs, please report them through {github}[http://github.com/spox/actionpool/issues].

== License

ActionPool is licensed under the LGPLv3 Copyright (c) 2012 spox spox@modspox.com

Jump to Line
Something went wrong with that request. Please try again.