Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

+ A forked server implementation

  • Loading branch information...
commit a8ecb0b0b42a7ea9a342e0cc36ac7b8eb7f8f56b 1 parent 55c1c25
@kschiess authored
View
3  lib/procrastinate.rb
@@ -13,4 +13,5 @@ class ChildDeath < StandardError; end
require 'procrastinate/spawn_strategy'
require 'procrastinate/proxy'
require 'procrastinate/process_manager'
-require 'procrastinate/scheduler'
+require 'procrastinate/scheduler'
+require 'procrastinate/server'
View
25 lib/procrastinate/process_manager.rb
@@ -166,13 +166,18 @@ def reap_childs
# Ignore: This means that no childs remain.
end
- # Spawns a process to work on +task+. If a block is given, it is called
- # when the task completes. This method should only be called from a strategy
+ # Spawns a process to work on +task+. If a block is given, it is called when
+ # the task completes. This method should only be called from a strategy
# inside the dispatchers thread. Otherwise it will expose threading issues.
#
- # Example:
- #
- # spawn(wi) { |pid| puts "Task is complete" }
+ # @example
+ # create_process(wi) { puts "Task is complete" }
+ #
+ # @param task [Procrastinate::Task::Callable] task to be run inside the
+ # forked process
+ # @param completion_handler [Proc] completion handler that is called when
+ # the process exits
+ # @return [void]
#
def create_process(task, &completion_handler)
# Tasks that are interested in getting messages from their childs must
@@ -224,4 +229,14 @@ def wait_for_all_childs
finalize_children
end
end
+
+ # Kills all running processes by sending them a QUIT signal.
+ #
+ # @param signal [String] signal to send to the forked processes.
+ #
+ def kill_processes(signal='QUIT')
+ children.each do |pid, process|
+ Process.kill(signal, pid)
+ end
+ end
end
View
77 lib/procrastinate/server.rb
@@ -0,0 +1,77 @@
+module Procrastinate
+ class Server
+ def initialize
+ @manager = Procrastinate::ProcessManager.new
+ @state = :new
+ end
+
+ def start(n, &block)
+ fail "Already running server." unless @state == :new
+
+ @block = block
+ @strategy = Procrastinate::SpawnStrategy::Throttled.new(n)
+ @state = :running
+
+ start_thread
+ end
+
+ def shutdown
+ fail "For shutdown, server must be running." unless @state == :running
+
+ @state = :shutdown
+ @manager.wakeup
+
+ @thread.join if @thread
+
+ @thread = nil
+ @state = :new
+ end
+
+ private
+ def start_thread
+ @thread = Thread.start(&method(:control_thread_main))
+ end
+
+ # @note This method runs in the control thread only.
+ #
+ def spawn_new_workers
+ while @strategy.should_spawn?
+ task = Procrastinate::Task::Callable.new(@block)
+
+ @strategy.notify_spawn
+ @manager.create_process(task) do
+ @strategy.notify_dead
+ end
+ end
+ end
+
+ # @note This method runs in the control thread only.
+ #
+ def control_thread_main
+ # Start managers work
+ @manager.setup
+
+ # Loop until someone requests a shutdown.
+ loop do
+ spawn_new_workers
+
+ @manager.step
+
+ break if @state == :shutdown
+ end
+
+ @manager.kill_processes
+ @manager.teardown
+ rescue => ex
+ # Sometimes exceptions vanish silently. This will avoid that, even though
+ # they should abort the whole process.
+
+ warn "Exception #{ex.inspect} caught."
+ ex.backtrace.first(5).each do |line|
+ warn line
+ end
+
+ raise
+ end
+ end
+end
View
20 spec/acceptance/server_spec.rb
@@ -0,0 +1,20 @@
+require 'spec_helper'
+
+describe "Server mode (#spawn_workers)" do
+ let(:server) { Procrastinate::Server.new }
+ after(:each) { server.shutdown }
+
+ let(:pipe) { Cod.pipe.split }
+ after(:each) { pipe.read.close; pipe.write.close; }
+
+ it "spawns n workers" do
+ server.start(5) {
+ pipe.write.put Process.pid
+ sleep 10
+ }
+
+ collected_worker_pids = 3.times.map { pipe.read.get }.compact
+ p collected_worker_pids
+ collected_worker_pids.should have(3).pids_stored_in_it
+ end
+end
View
2  spec/procrastinate/process_manager_spec.rb
@@ -14,7 +14,7 @@
let(:child) { Procrastinate::ProcessManager::ChildProcess.new(nil, nil) }
before(:each) { manager.children[1234] = child }
- it "correctly cleans up children" do
+ it "regression: correctly cleans up children" do
child.start
child.sigchld_received
Please sign in to comment.
Something went wrong with that request. Please try again.