Permalink
Browse files

Experimental fork pool

  • Loading branch information...
kennethkalmer committed Oct 9, 2009
1 parent e5f116f commit 2c172838362cf68acf90b8e82868df745a1f2981
Showing with 212 additions and 1 deletion.
  1. +5 −1 lib/daemon_kit.rb
  2. +134 −0 lib/daemon_kit/fork_pool.rb
  3. +28 −0 pool_test.rb
  4. +45 −0 spec/fork_pool_spec.rb
View
@@ -3,7 +3,7 @@
# Seems in 1.9 we need to load openssl before em or there is failures all around.
# But we need to consider that people might not have ssl in the first place.
-if RUBY_VERSION >= "1.9"
+if RUBY_VERSION >= "1.8.7"
begin
require 'openssl'
rescue LoadError
@@ -37,6 +37,7 @@ module DaemonKit
autoload :RuoteParticipants, 'daemon_kit/ruote_participants'
autoload :RuoteWorkitem, 'daemon_kit/ruote_workitem'
autoload :RuotePseudoParticipant, 'daemon_kit/ruote_pseudo_participant'
+ autoload :ForkPool, 'daemon_kit/fork_pool'
class << self
def logger
@@ -60,3 +61,6 @@ def framework_root
end
end
end
+
+# Trigger autoloading of the safety net
+DaemonKit::Safety
View
@@ -0,0 +1,134 @@
+module DaemonKit
+ # = HIGHLY EXPERIMENTAL FORK POOL
+ #
+ # This is a highly experimental piece of code, really, it still needs a
+ # lot of testing and a lot of eyes before it goes to master.
+ #
+ # == Supported Ruby versions
+ #
+ # I've tested this on OS X 10.5.8 (Leopard) using rvm, here is the results:
+ #
+ # 1.8.6-p389 -> Works well
+ # 1.8.7-p174 -> Works well
+ # 1.9.1-p243 -> Unreliable
+ # 1.9.2-preview1 -> Unreliable
+ #
+ # There are some hints that the "forking from inside a thread issue" is OSX
+ # specific, I still need to test on gentoo.
+ #
+ # == Issues with Ruby 1.9
+ #
+ # Some of the child processes never actually spawns, leaving the parent
+ # waiting for processes that will never run. Terminating the daemon will
+ # leave detached processes hanging around on the sytem.
+ #
+ # == Usage
+ #
+ # If you made it this far, thanks for being brave enough to test it
+ #
+ # DaemonKit::ForkPool.process do
+ # # this will be your sub-process
+ # sleep 60
+ # end
+ #
+ # Set the maximum number of concurrent forks with +DaemonKit::ForkPool.size+
+ #
+ # Try to keep the number of running forks slightly higher than
+ # (cpu's * cpu cores) if the sub-processes are CPU-intensive.
+ class ForkPool
+
+ class << self
+
+ # Set the size of the fork pool, defaults to 4
+ def size=( num )
+ @size = num
+ end
+
+ # Return the size of the fork pool, defaults to 4
+ def size
+ @size ||= 4
+ end
+
+ # The queue used by the dispatcher
+ def queue
+ @queue ||= Queue.new
+ end
+
+ # Array of process id's currently running in the pool
+ def processes
+ @processes ||= []
+ end
+
+ # Add the block to the queue for processing
+ def process( &block )
+ @enqueueing = true
+
+ queue << block
+ p [ :queue, queue ]
+
+ run_forks!
+
+ @enqueueing = false
+ end
+
+ # Wait for all processes to finish, including the processes that
+ # are currently queued for execution.
+ def wait
+ loop do
+ p [ :wait, queue.size, processes, @enqueueing ]
+ break if queue.empty? && processes.empty? && !@enqueueing
+ sleep 0.1
+ end
+ end
+
+ private
+
+ def run_forks!
+ return if @dispatcher_thread
+ Thread.abort_on_exception = true
+ p [ :running_forks ]
+
+ @process_mutex = Mutex.new
+
+ @dispatcher_thread = Thread.new do
+ loop do
+ if block = queue.pop
+ p [ :got_block, block, processes.size, size ]
+
+ pid = Kernel.fork do
+ p [ :fork, Process.pid, block ]
+ safely do
+ block.call
+ end
+ at_exit { p [:exit, Process.pid] }
+ end
+
+ p [ :pid, pid ]
+
+ @process_mutex.synchronize { processes << pid }
+
+ p [ :processes, processes ]
+
+ Thread.new {
+ p [ :waiting, pid ]
+ Process.wait( pid )
+ @process_mutex.synchronize {
+ processes.delete( pid )
+ }
+ p [ :finished, pid ]
+ }
+
+ sleep 0.1 until processes.size < size
+
+ end
+
+ p [ :loop_done ]
+ sleep 0.1
+ end
+ end
+
+ p [ :dispatcher_running ]
+ end
+ end
+ end
+end
View
@@ -0,0 +1,28 @@
+#!/usr/bin/env ruby
+
+# Simple example script of using and debugging the fork pool
+
+require "./lib/daemon_kit"
+require "timeout"
+
+Timeout.timeout( 30 ) do
+
+ start = Time.now
+
+ DaemonKit::ForkPool.process { p [:p1, Process.pid]; sleep 3; p [ :p1, Time.now ] }
+# sleep 0.5
+ DaemonKit::ForkPool.process { p [:p2, Process.pid]; sleep 3; p [ :p2, Time.now ] }
+# sleep 0.5
+ DaemonKit::ForkPool.process { p [:p3, Process.pid]; sleep 3; p [ :p3, Time.now ] }
+# sleep 0.5
+ DaemonKit::ForkPool.process { p [:p4, Process.pid]; sleep 3; p [ :p4, Time.now ] }
+# sleep 0.5
+ DaemonKit::ForkPool.process { p [:p5, Process.pid]; sleep 3; p [ :p5, Time.now ] }
+# sleep 0.5
+ DaemonKit::ForkPool.process { p [:p6, Process.pid]; sleep 3; p [ :p6, Time.now ] }
+
+ DaemonKit::ForkPool.wait
+
+ puts "Completed in #{Time.now - start} seconds"
+
+end
View
@@ -0,0 +1,45 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe DaemonKit::ForkPool do
+ describe "configuration" do
+ it "should default to 4 forks" do
+ DaemonKit::ForkPool.size.should be(4)
+ end
+
+ it "should have an empty work queue" do
+ DaemonKit::ForkPool.queue.should be_empty
+ end
+ end
+
+ describe "running" do
+ it "should run blocks without issue" do
+ pending "Figure out how to run forks with rspec"
+
+ DaemonKit::ForkPool.fork do
+ sleep 1
+ end
+
+ DaemonKit::ForkPool.processes.should_not be_empty
+ DaemonKit::ForkPool.wait
+
+ DaemonKit::ForkPool.processes.should be_empty
+ DaemonKit::ForkPool.queue.should be_empty
+ end
+
+ it "should queue overflow blocks" do
+ pending "Figure out how to run forks with rspec"
+
+ 5.times do
+ DaemonKit::ForkPool.process { sleep 1 }
+ end
+
+ sleep 0.5
+ DaemonKit::ForkPool.processes.size.should be(4)
+ DaemonKit::ForkPool.queue.size.should be(1)
+
+ DaemonKit::ForkPool.wait
+ DaemonKit::ForkPool.processes.should be_empty
+ DaemonKit::ForkPool.queue.should be_empty
+ end
+ end
+end

0 comments on commit 2c17283

Please sign in to comment.