Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Forking worker #13

Merged
merged 4 commits into from

2 participants

@danielfarrell

Ok, this seems to be working fine here. I added a test for the forking. I think all the rest of the behavior is tested with the simple worker. If you think it should test more within the context of the worker let me know and I'll build that out some more.

@nesquena
Owner

Looks pretty good, can you add at least one end-to-end test that verifies that jobs are being processed successfully similar to the way we test jobs processed in simple worker

@danielfarrell

Ok, I'll add one. I had tried to run the whole test suite on there but the counting and output capturing doesn't work when you fork. I'll come up with a solution for that though.

@nesquena
Owner

Great, look forward to having the forking worker available, thanks for working on it.

@danielfarrell

Ok, there are some tests in there for the forking worker adapted from the ThreadsOnFork worker. These changes seems to make one of the "for prepare method" tests fail for the ThreadsOnFork worker, and I'm drawing a blank on why that would be. It seems to be a different one each time, which makes it more confusing. Any thoughts on that would be welcome.

@nesquena
Owner

Thanks Daniel, I will take a look and see if I can get it to pass on my end. Look forward to merging this in soon.

@nesquena
Owner

@danielfarrell OK fixed the failing tests, just a simple issue with the logger being defined elsewhere. I cleared the logger at the beginning of those tests.

@nesquena
Owner

Looks good, unless you see any reason not to, I will merge soon.

@danielfarrell

Cool, thanks for finding that. I knew it had to be something little, I just couldn't find it. I'm good with merging whenever you are.

@nesquena nesquena merged commit 1c57ee2 into nesquena:master
@nesquena
Owner

Ok merged, released new gem version with forking included, updated wiki https://github.com/nesquena/backburner/wiki/Forking-worker and the readme to include information on the new worker. Thanks Daniel, much appreciated.

@danielfarrell danielfarrell deleted the danielfarrell:forking-worker branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 22, 2012
  1. Add a simple forking worker

    Daniel Farrell authored
Commits on Dec 23, 2012
  1. Add comments to forking work and exit cleanly. Add a test.

    Daniel Farrell authored
  2. Forgot to include test_jobs in test

    Daniel Farrell authored
Commits on Dec 26, 2012
  1. Adapt tests from ThreadsOnFork worker for Forking worker

    Daniel Farrell authored
This page is out of date. Refresh to see the latest.
View
1  lib/backburner.rb
@@ -11,6 +11,7 @@
require 'backburner/performable'
require 'backburner/worker'
require 'backburner/workers/simple'
+require 'backburner/workers/forking'
require 'backburner/workers/threads_on_fork'
require 'backburner/queue'
View
48 lib/backburner/workers/forking.rb
@@ -0,0 +1,48 @@
+module Backburner
+ module Workers
+ class Forking < Worker
+ # Used to prepare job queues before processing jobs.
+ # Setup beanstalk tube_names and watch all specified tubes for jobs.
+ #
+ # @raise [Beaneater::NotConnected] If beanstalk fails to connect.
+ # @example
+ # @worker.prepare
+ #
+ def prepare
+ self.tube_names.map! { |name| expand_tube_name(name) }
+ log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]"
+ self.connection.tubes.watch!(*self.tube_names)
+ end
+
+ # Starts processing new jobs indefinitely.
+ # Primary way to consume and process jobs in specified tubes.
+ #
+ # @example
+ # @worker.start
+ #
+ def start
+ prepare
+ loop { fork_one_job }
+ end
+
+ # Need to re-establish the connection to the server(s) after forking
+ # Waits for a job, works the job, and exits
+ def fork_one_job
+ pid = Process.fork do
+ @connection = Connection.new(Backburner.configuration.beanstalk_url)
+ work_one_job
+ coolest_exit
+ end
+ Process.wait(pid)
+ end
+
+ # Exit with Kernel.exit! to avoid at_exit callbacks that should belongs to
+ # parent process
+ # We will use exitcode 99 that means the fork reached the garbage number
+ def coolest_exit
+ Kernel.exit! 99
+ end
+
+ end
+ end
+end
View
56 test/fixtures/test_forking_jobs.rb
@@ -0,0 +1,56 @@
+class ResponseForkingJob
+ include Backburner::Queue
+ queue_priority 1000
+ def self.perform(data)
+ $worker_test_count += data['worker_test_count'].to_i if data['worker_test_count']
+ $worker_success = data['worker_success'] if data['worker_success']
+ $worker_test_count = data['worker_test_count_set'].to_i if data['worker_test_count_set']
+ $worker_raise = data['worker_raise'] if data['worker_raise']
+ end
+end
+
+class TestJobForking
+ include Backburner::Queue
+ queue_priority 1000
+ def self.perform(x, y)
+ Backburner::Workers::Forking.enqueue ResponseForkingJob, [{
+ :worker_test_count_set => x + y
+ }], :queue => 'response'
+ end
+end
+
+class TestFailJobForking
+ include Backburner::Queue
+ def self.perform(x, y)
+ Backburner::Workers::Forking.enqueue ResponseForkingJob, [{
+ :worker_raise => true
+ }], :queue => 'response'
+ end
+end
+
+class TestRetryJobForking
+ include Backburner::Queue
+ def self.perform(x, y)
+ if $worker_test_count <= 2
+ Backburner::Workers::Forking.enqueue ResponseForkingJob, [{
+ :worker_test_count => 1
+ }], :queue => 'response'
+
+ raise RuntimeError
+ else # succeeds
+ Backburner::Workers::Forking.enqueue ResponseForkingJob, [{
+ :worker_test_count => 1,
+ :worker_success => true
+ }], :queue => 'response'
+ end
+ end
+end
+
+class TestAsyncJobForking
+ include Backburner::Performable
+ def self.foo(x, y)
+ Backburner::Workers::Forking.enqueue ResponseForkingJob, [{
+ :worker_test_count_set => x * y
+ }], :queue => 'response'
+ end
+end
View
135 test/workers/forking_worker_test.rb
@@ -0,0 +1,135 @@
+require File.expand_path('../../test_helper', __FILE__)
+require File.expand_path('../../fixtures/test_forking_jobs', __FILE__)
+
+describe "Backburner::Workers::Forking module" do
+
+ before do
+ Backburner.default_queues.clear
+ @worker_class = Backburner::Workers::Forking
+ end
+
+ describe "for fork_one_job method" do
+
+ it "should fork, reconnect, work job, and exit" do
+ clear_jobs!("bar.foo")
+ @worker_class.enqueue TestJobForking, [1, 2], :queue => "bar.foo"
+
+ fake_pid = 45
+ Process.expects(:fork).returns(fake_pid) do |&block|
+ Connection.expects(:new).with(Backburner.configuration.beanstalk_url)
+ @worker_class.any_instance.expects(:work_one_job)
+ @worker_class.any_instance.expects(:coolest_exit)
+ block.call
+ end
+ Process.expects(:wait).with(fake_pid)
+
+ silenced(2) do
+ worker = @worker_class.new('bar.foo')
+ worker.prepare
+ worker.fork_one_job
+ end
+ end
+
+ end # fork_one_job
+
+ describe "practical tests" do
+
+ before do
+ @templogger = Templogger.new('/tmp')
+ Backburner.configure { |config| config.logger = @templogger.logger }
+ $worker_test_count = 0
+ $worker_success = false
+ $worker_raise = false
+ clear_jobs!('response')
+ clear_jobs!('bar.foo.1', 'bar.foo.2', 'bar.foo.3', 'bar.foo.4', 'bar.foo.5')
+ silenced do
+ @response_worker = @worker_class.new('response')
+ end
+ end
+
+ after do
+ @templogger.close
+ clear_jobs!('response')
+ clear_jobs!('bar.foo.1', 'bar.foo.2', 'bar.foo.3', 'bar.foo.4', 'bar.foo.5')
+ end
+
+
+ it "should work an enqueued job" do
+ @worker = @worker_class.new('bar.foo.1')
+ @worker_class.enqueue TestJobForking, [1, 2], :queue => "bar.foo.1"
+ @worker.prepare
+ @worker.fork_one_job
+ silenced(2) do
+ @templogger.wait_for_match(/Completed TestJobFork/m)
+ @response_worker.prepare
+ @response_worker.work_one_job
+ end
+ assert_equal 3, $worker_test_count
+ end # enqueue
+
+ it "should work for an async job" do
+ @worker = @worker_class.new('bar.foo.2')
+ TestAsyncJobForking.async(:queue => 'bar.foo.2').foo(3, 5)
+ @worker.prepare
+ @worker.fork_one_job
+ silenced(2) do
+ @templogger.wait_for_match(/Completed TestAsyncJobFork/m)
+ @response_worker.prepare
+ @response_worker.work_one_job
+ end
+ assert_equal 15, $worker_test_count
+ end # async
+
+ it "should fail quietly if there's an argument error" do
+ Backburner.configure { |config| config.max_job_retries = 0 }
+ @worker = @worker_class.new('bar.foo.3')
+ @worker_class.enqueue TestJobForking, ["bam", "foo", "bar"], :queue => "bar.foo.3"
+ @worker.prepare
+ @worker.fork_one_job
+ silenced(5) do
+ @templogger.wait_for_match(/Finished TestJobFork.*attempt 1 of 1/m)
+ end
+ assert_match(/Exception ArgumentError/, @templogger.body)
+ assert_equal 0, $worker_test_count
+ end # fail, argument
+
+ it "should support retrying jobs and burying" do
+ Backburner.configure { |config| config.max_job_retries = 1; config.retry_delay = 0 }
+ @worker = @worker_class.new('bar.foo.4')
+ @worker_class.enqueue TestRetryJobForking, ["bam", "foo"], :queue => 'bar.foo.4'
+ @worker.prepare
+ 2.times do
+ $worker_test_count += 1
+ @worker.fork_one_job
+ end
+ silenced(2) do
+ @templogger.wait_for_match(/Finished TestRetryJobFork.*attempt 2 of 2/m)
+ @response_worker.prepare
+ 2.times { @response_worker.work_one_job }
+ end
+ assert_equal 4, $worker_test_count
+ assert_equal false, $worker_success
+ end # retry, bury
+
+ it "should support retrying jobs and succeeds" do
+ Backburner.configure { |config| config.max_job_retries = 2; config.retry_delay = 0 }
+ @worker = @worker_class.new('bar.foo.5')
+ @worker_class.enqueue TestRetryJobForking, ["bam", "foo"], :queue => 'bar.foo.5'
+ @worker.prepare
+ 3.times do
+ $worker_test_count += 1
+ @worker.fork_one_job
+ end
+ silenced(2) do
+ @templogger.wait_for_match(/Completed TestRetryJobFork/m)
+ @response_worker.prepare
+ 3.times { @response_worker.work_one_job }
+ end
+ assert_equal 6, $worker_test_count
+ assert_equal true, $worker_success
+ end # retrying, succeeds
+
+ end # practical tests
+
+
+end
Something went wrong with that request. Please try again.