Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

worker manager can periodically restart workers (to avoid memory leaks)

  • Loading branch information...
commit cbe69d7ba71451542549eb94ea3369c4d29f924d 1 parent c9968d9
@thoughtless authored
View
26 lib/angael/manager.rb
@@ -25,9 +25,14 @@ class Manager
# Logger::WARN
# Logger::INFO # Default
# Logger::DEBUG
+ # :restart_after => If set, 1 worker will be restarted after this number
+ # of seconds. If it is nil (the default), then workers will not get
+ # restarted for no reason. If your workers leak memory, this can help
+ # reduce the problem. A graceful restart is always attempted.
def initialize(worker_class, worker_count=1, worker_args=[], opts={})
@workers = []
worker_count.times { workers << worker_class.new(*worker_args) }
+ @restart_after = opts[:restart_after]
@logger = opts[:logger]
if @logger
@log_level = opts[:log_level] || begin
@@ -64,7 +69,16 @@ def start!
end
loop do
- sleep 1
+ if @restart_after
+ # Periodically restart workers, 1 at a time.
+ sleep @restart_after
+ w = next_worker_to_restart
+ w.stop!
+ w.start!
+ else
+ # Don't restart workers if nothing is wrong.
+ sleep 1
+ end
end
end
@@ -109,5 +123,15 @@ def wait(pid)
[pid, nil] # It did exit, but we don't know the exit status.
end
end
+
+ def next_worker_to_restart
+ @worker_count ||= workers.size
+ @next_worker_to_restart_index ||= 0
+ @next_worker_to_restart_index += 1
+ @next_worker_to_restart_index %= @worker_count
+
+ workers[@next_worker_to_restart_index]
+ end
+
end
end
View
2  lib/angael/version.rb
@@ -1,3 +1,3 @@
module Angael
- VERSION = "0.0.4"
+ VERSION = "0.0.5"
end
View
51 spec/lib/angael/manager_spec.rb
@@ -39,13 +39,47 @@
Process.wait(pid)
end
+ context "when :restart_after is set to 0.5" do
+ subject { Angael::Manager.new(Angael::TestSupport::SampleWorker, 3, [], :restart_after => 0.5) }
+ it "should restarts workers 1 at a time, at 1 second intervals" do
+ subject.workers.each do |w|
+ w.stub(:start!) # We don't actually need the workers to fork.
+ end
+
+ subject.workers[0].should_receive(:stop!).exactly(1).times
+ subject.workers[1].should_receive(:stop!).exactly(2).times # This is the worker that got restarted.
+ subject.workers[2].should_receive(:stop!).exactly(1).times
+
+ subject.workers[0].should_receive(:start!).exactly(1).times
+ subject.workers[1].should_receive(:start!).exactly(2).times # This is the worker that got restarted.
+ subject.workers[2].should_receive(:start!).exactly(1).times
+
+
+ # As an alternative to should_receive_in_child_process, we
+ # fork a process which will send SIGINT to this current process.
+ # Then we start the Manager in this process and wait for it to
+ # get the SIGINT. Finally we rescue SystemExit so that this
+ # process doesn't exit with the Manager stops.
+ # TODO: Be consistent in my use of this technique vs. should_receive_in_child_process.
+ current_pid = $$
+ pid = Process.fork do
+ sleep 0.6 # Add a 0.1 second buffer to the value of :restart_after to give the process a chance to start.
+ Process.kill('INT', current_pid)
+ exit 0
+ end
+ begin
+ subject.start!
+ rescue SystemExit
+ nil
+ end
+
+ clean_up_pid(pid)
+ end
+ end
+
context "when it receives a SIGCHLD" do
after(:each) do
- # Clean up
- unless Process.wait2(@pid, Process::WNOHANG)
- Process.kill('KILL', @pid) unless
- Process.wait(@pid) rescue nil
- end
+ clean_up_pid(@pid)
end
context "when worker was asked to stop" do
@@ -143,4 +177,11 @@
end
end
end
+
+ def clean_up_pid(pid)
+ unless Process.wait2(pid, Process::WNOHANG)
+ Process.kill('KILL', pid) unless
+ Process.wait(pid) rescue nil
+ end
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.