Skip to content

Commit

Permalink
Merge branch 'lantins-resque_v1.9.10_and_above'
Browse files Browse the repository at this point in the history
Conflicts:
	test/helper.rb
  • Loading branch information
staugaard committed Dec 17, 2010
2 parents 64180d9 + 2db4985 commit bf26562
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 87 deletions.
14 changes: 7 additions & 7 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ PATH
resque-multi-job-forks (0.2.0)
json
redis (< 2.0)
resque (< 1.8.0)
resque (~> 1.10.0)

GEM
remote: http://rubygems.org/
Expand All @@ -13,13 +13,13 @@ GEM
rack (1.2.1)
rake (0.8.7)
redis (1.0.7)
redis-namespace (0.10.0)
redis-namespace (0.8.0)
redis (< 3.0.0)
resque (1.7.1)
redis
redis-namespace
resque (1.10.0)
json (~> 1.4.6)
redis-namespace (~> 0.8.0)
sinatra (>= 0.9.2)
vegas (>= 0.1.2)
vegas (~> 0.1.2)
sinatra (1.1.0)
rack (~> 1.1)
tilt (~> 1.1)
Expand All @@ -35,5 +35,5 @@ DEPENDENCIES
json
rake
redis (< 2.0)
resque (< 1.8.0)
resque (~> 1.10.0)
resque-multi-job-forks!
111 changes: 69 additions & 42 deletions lib/resque-multi-job-forks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,74 @@
require 'resque/worker'

module Resque
class Worker
attr_accessor :seconds_per_fork
attr_accessor :jobs_per_fork
attr_reader :jobs_processed

unless method_defined?(:shutdown_without_multi_job_forks)
def perform_with_multi_job_forks(job = nil)
perform_without_multi_job_forks(job)
hijack_fork unless fork_hijacked?
@jobs_processed += 1
end
alias_method :perform_without_multi_job_forks, :perform
alias_method :perform, :perform_with_multi_job_forks

def shutdown_with_multi_job_forks
release_fork if fork_hijacked? && fork_job_limit_reached?
shutdown_without_multi_job_forks
end
alias_method :shutdown_without_multi_job_forks, :shutdown?
alias_method :shutdown?, :shutdown_with_multi_job_forks
end

def fork_hijacked?
@release_fork_limit
end

def hijack_fork
log 'hijack fork.'
@suppressed_fork_hooks = [Resque.after_fork, Resque.before_fork]
Resque.after_fork = Resque.before_fork = nil
@release_fork_limit = fork_job_limit
@jobs_processed = 0
@cant_fork = true
end

def release_fork
log "jobs processed by child: #{jobs_processed}"
run_hook :before_child_exit, self
Resque.after_fork, Resque.before_fork = *@suppressed_fork_hooks
@release_fork_limit = @jobs_processed = @cant_fork = nil
log 'hijack over, counter terrorists win.'
end

def fork_job_limit
jobs_per_fork.nil? ? Time.now.to_i + seconds_per_fork : jobs_per_fork
end

def fork_job_limit_reached?
fork_job_limit_remaining <= 0 ? true : false
end

def fork_job_limit_remaining
jobs_per_fork.nil? ? @release_fork_limit - Time.now.to_i : jobs_per_fork - @jobs_processed
end

def seconds_per_fork
@seconds_per_fork ||= minutes_per_fork * 60
end

def minutes_per_fork
ENV['MINUTES_PER_FORK'].nil? ? 1 : ENV['MINUTES_PER_FORK'].to_i
end

def jobs_per_fork
@jobs_per_fork ||= ENV['JOBS_PER_FORK'].nil? ? nil : ENV['JOBS_PER_FORK'].to_i
end
end

# the `before_child_exit` hook will run in the child process
# right before the child process terminates
#
Expand All @@ -16,45 +84,4 @@ def self.before_child_exit=(before_child_exit)
@before_child_exit = before_child_exit
end

class Worker
attr_accessor :jobs_per_fork
attr_reader :jobs_processed

unless method_defined?(:done_working_without_multi_job_forks)
def process_with_multi_job_forks(job = nil)
@jobs_processed ||= 0
@kill_fork_at ||= Time.now.to_i + (ENV['MINUTES_PER_FORK'].to_i * 60)
process_without_multi_job_forks(job)
end
alias_method :process_without_multi_job_forks, :process
alias_method :process, :process_with_multi_job_forks

def done_working_with_multi_job_forks
done_working_without_multi_job_forks

@jobs_processed += 1

if @jobs_processed == 1
old_after_fork = Resque.after_fork
Resque.after_fork = nil

while Time.now.to_i < @kill_fork_at
if job = reserve
process(job)
else
sleep(1)
end
end

Resque.after_fork = old_after_fork

run_hook :before_child_exit, self
@jobs_processed = nil
@kill_fork_at = nil
end
end
alias_method :done_working_without_multi_job_forks, :done_working
alias_method :done_working, :done_working_with_multi_job_forks
end
end
end
end
2 changes: 1 addition & 1 deletion resque-multi-job-forks.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Gem::Specification.new do |s|
s.summary = "Have your resque workers process more that one job"
s.description = "When your resque jobs are frequent and fast, the overhead of forking and running your after_fork might get too big."

s.add_runtime_dependency("resque", "< 1.8.0")
s.add_runtime_dependency("resque", "~> 1.10.0")
s.add_runtime_dependency("redis", "< 2.0")
s.add_runtime_dependency("json")

Expand Down
60 changes: 54 additions & 6 deletions test/helper.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,60 @@
require 'rubygems'
require 'test/unit'
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))

$TESTING = true

require 'rubygems'
require 'bundler'
Bundler.setup
Bundler.require
require 'test/unit'
require 'resque-multi-job-forks'

$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))
$TESTING = true
# setup redis & resque.
redis = Redis.new(:db => 1)
Resque.redis = redis

require 'resque-multi-job-forks'
# adds simple STDOUT logging to test workers.
# set `VERBOSE=true` when running the tests to view resques log output.
module Resque
class Worker
def log(msg)
puts "*** #{msg}" unless ENV['VERBOSE'].nil?
end
alias_method :log!, :log
end
end

# stores a record of the job processing sequence.
# you may wish to reset this in the test `setup` method.
$SEQUENCE = []

# test job, tracks sequence.
class SequenceJob
@queue = :jobs
def self.perform(i)
$SEQUENCE << "work_#{i}".to_sym
sleep(2)
end
end

class QuickSequenceJob
@queue = :jobs
def self.perform(i)
$SEQUENCE << "work_#{i}".to_sym
end
end


# test hooks, tracks sequence.
Resque.after_fork do
$SEQUENCE << :after_fork
end

Resque.before_fork do
$SEQUENCE << :before_fork
end

Resque.before_child_exit do |worker|
$SEQUENCE << "before_child_exit_#{worker.jobs_processed}".to_sym
end
80 changes: 49 additions & 31 deletions test/test_resque-multi-job-forks.rb
Original file line number Diff line number Diff line change
@@ -1,40 +1,58 @@
require 'helper'

class SomeJob
def self.perform(i)
$SEQUENCE << "work_#{i}".to_sym
puts 'working...'
sleep(25)
end
end

Resque.after_fork do
$SEQUENCE << :after_fork
end

Resque.before_child_exit do |worker|
$SEQUENCE << "before_child_exit_#{worker.jobs_processed}".to_sym
end
require File.dirname(__FILE__) + '/helper'

class TestResqueMultiJobForks < Test::Unit::TestCase
def setup
$SEQUENCE = []
Resque.redis.flushdb
@worker = Resque::Worker.new(:jobs)
end

def test_sequence_of_events
Resque.redis.flush_all

ENV['MINUTES_PER_FORK'] = '1'

worker = Resque::Worker.new(:jobs)

Resque::Job.create(:jobs, SomeJob, 1)
Resque::Job.create(:jobs, SomeJob, 2)
Resque::Job.create(:jobs, SomeJob, 3)
Resque::Job.create(:jobs, SomeJob, 4)

worker.work(0)
def test_timeout_limit_sequence_of_events
# only allow enough time for 3 jobs to process.
@worker.seconds_per_fork = 3

Resque.enqueue(SequenceJob, 1)
Resque.enqueue(SequenceJob, 2)
Resque.enqueue(SequenceJob, 3)
Resque.enqueue(SequenceJob, 4)

# make sure we don't take longer then 15 seconds.
begin
Timeout::timeout(15) { @worker.work(1) }
rescue Timeout::Error
end

# test the sequence is correct.
assert_equal([:before_fork, :after_fork, :work_1, :work_2, :work_3,
:before_child_exit_3, :before_fork, :after_fork, :work_4,
:before_child_exit_1], $SEQUENCE, 'correct sequence')
end

assert_equal([:after_fork, :work_1, :work_2, :work_3, :before_child_exit_3, :after_fork, :work_4, :before_child_exit_1], $SEQUENCE)
# test we can also limit fork job process by a job limit.
def test_job_limit_sequence_of_events
# only allow enough time for 3 jobs to process.
ENV['JOBS_PER_FORK'] = '20'

# queue 40 jobs.
(1..40).each { |i| Resque.enqueue(QuickSequenceJob, i) }

begin
Timeout::timeout(3) { @worker.work(1) }
rescue Timeout::Error
end

assert_equal :before_fork, $SEQUENCE[0], 'first before_fork call.'
assert_equal :after_fork, $SEQUENCE[1], 'first after_fork call.'
assert_equal :work_20, $SEQUENCE[21], '20th chunk of work.'
assert_equal :before_child_exit_20, $SEQUENCE[22], 'first before_child_exit call.'
assert_equal :before_fork, $SEQUENCE[23], 'final before_fork call.'
assert_equal :after_fork, $SEQUENCE[24], 'final after_fork call.'
assert_equal :work_40, $SEQUENCE[44], '40th chunk of work.'
assert_equal :before_child_exit_20, $SEQUENCE[45], 'final before_child_exit call.'
end

def teardown
# make sure we don't clobber any other tests.
ENV['JOBS_PER_FORK'] = nil
end
end

0 comments on commit bf26562

Please sign in to comment.