Skip to content

Commit

Permalink
redis-rb 5.0 compatiblity
Browse files Browse the repository at this point in the history
There was two distinct issues.

The first one was that `Resque::Worker` instances were passed as argument
to `sadd / srem` relying on the implicit call to `to_s`. This has been removed
in redis-rb 5.0, you now must explictly cast to a String or another type understood
by the redis client.

The other issue was with the reconnection after fork.
When forking you must make sure to close the Redis connection, otherwise
if you write in it, the responses of the parent and child will be mixed in.

Resque was using a private API to do this reconnection. I updated it
to use the main public API.

There was also a bug in redis-rb 5.0.1 preventing from reconnecting after fork,
I fixed it in 5.0.2.
  • Loading branch information
byroot authored and PatrickTulskie committed Sep 5, 2022
1 parent dac6f61 commit 98e5354
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 17 deletions.
8 changes: 5 additions & 3 deletions lib/resque/data_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ def identifier
# Force a reconnect to Redis without closing the connection in the parent
# process after a fork.
def reconnect
@redis._client.connect
@redis.disconnect!
@redis.ping
nil
end

# Returns an array of all known Resque keys in Redis. Redis' KEYS operation
Expand Down Expand Up @@ -238,7 +240,7 @@ def worker_exists?(worker_id)

def register_worker(worker)
@redis.pipelined do |piped|
piped.sadd(:workers, [worker])
piped.sadd(:workers, [worker.id])
worker_started(worker, redis: piped)
end
end
Expand All @@ -249,7 +251,7 @@ def worker_started(worker, redis: @redis)

def unregister_worker(worker, &block)
@redis.pipelined do |piped|
piped.srem(:workers, [worker])
piped.srem(:workers, [worker.id])
piped.del(redis_key_for_worker(worker))
piped.del(redis_key_for_worker_start_time(worker))
piped.hdel(HEARTBEAT_KEY, worker.to_s)
Expand Down
2 changes: 1 addition & 1 deletion lib/resque/web_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def setup_rack_handler
begin
handler = Rack::Handler.get(server)
break
rescue LoadError, NameError => e
rescue LoadError, NameError
next
end
end
Expand Down
6 changes: 3 additions & 3 deletions test/child_killing_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class LongRunningJob
@queue = :long_running_job

def self.perform( sleep_time, rescue_time=nil )
Resque.redis.reconnect # get its own connection
Resque.redis.disconnect! # get its own connection
Resque.redis.rpush( 'sigterm-test:start', Process.pid )
sleep sleep_time
Resque.redis.rpush( 'sigterm-test:result', 'Finished Normally' )
Expand All @@ -23,8 +23,8 @@ def start_worker(rescue_time, term_child, term_timeout = 1)
Resque.enqueue( LongRunningJob, 3, rescue_time )

worker_pid = Kernel.fork do
# reconnect since we just forked
Resque.redis.reconnect
# disconnect since we just forked
Resque.redis.disconnect!

worker = Resque::Worker.new(:long_running_job)
worker.term_timeout = term_timeout
Expand Down
31 changes: 21 additions & 10 deletions test/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def messages
assert_equal "at_exit", File.open(tmpfile).read.strip
else
# ensure we actually fork
Resque.redis.reconnect
Resque.redis.disconnect!
Resque::Job.create(:at_exit_jobs, AtExitJob, tmpfile)
worker = Resque::Worker.new(:at_exit_jobs)
worker.run_at_exit_hooks = true
Expand Down Expand Up @@ -121,7 +121,7 @@ def self.perform
Process.waitpid(worker_pid)
else
# ensure we actually fork
Resque.redis.reconnect
Resque.redis.disconnect!
Resque::Job.create(:not_failing_job, RaiseExceptionOnFailure)
worker = Resque::Worker.new(:not_failing_job)
worker.run_at_exit_hooks = true
Expand All @@ -141,7 +141,7 @@ def self.perform
assert !File.exist?(tmpfile), "The file '#{tmpfile}' exists, at_exit hooks were run"
else
# ensure we actually fork
Resque.redis.reconnect
Resque.redis.disconnect!
Resque::Job.create(:at_exit_jobs, AtExitJob, tmpfile)
worker = Resque::Worker.new(:at_exit_jobs)
suppress_warnings do
Expand Down Expand Up @@ -1197,6 +1197,7 @@ def self.perform
Resque::Job.create(:jobs, SomeJob, 20, '/tmp')
workerA.work(0)

pipe_rd.wait_readable(1)
assert_equal('hey', pipe_rd.read_nonblock(3))
ensure
pipe_rd.close
Expand Down Expand Up @@ -1264,12 +1265,12 @@ def self.perform
assert_equal 1, Resque::Failure.count
end

it "no reconnects to redis when not forking" do
original_connection = Resque.redis._client.connection.instance_variable_get("@sock")
it "no disconnect from redis when not forking" do
original_connection = redis_socket(Resque.redis)
without_forking do
@worker.work(0)
end
assert_equal original_connection, Resque.redis._client.connection.instance_variable_get("@sock")
assert_equal original_connection, redis_socket(Resque.redis)
end

it "logs errors with the correct logging level" do
Expand Down Expand Up @@ -1342,9 +1343,9 @@ def run_in_job(&block)
end

it "reconnects to redis after fork" do
original_connection = Resque.redis._client.connection.instance_variable_get("@sock").object_id
original_connection = redis_socket(Resque.redis).object_id
new_connection = run_in_job do
Resque.redis._client.connection.instance_variable_get("@sock").object_id
redis_socket(Resque.redis).object_id
end
assert Resque.redis._client.connected?
refute_equal original_connection, new_connection
Expand Down Expand Up @@ -1381,7 +1382,7 @@ class PreShutdownLongRunningJob
@queue = :long_running_job

def self.perform(run_time)
Resque.redis.reconnect # get its own connection
Resque.redis.disconnect! # get its own connection
Resque.redis.rpush('pre-term-timeout-test:start', Process.pid)
sleep run_time
Resque.redis.rpush('pre-term-timeout-test:result', 'Finished Normally')
Expand All @@ -1403,7 +1404,7 @@ def self.perform(run_time)

worker_pid = Kernel.fork do
# reconnect to redis
Resque.redis.reconnect
Resque.redis.disconnect!

worker = Resque::Worker.new(:long_running_job)
worker.pre_shutdown_timeout = pre_shutdown_timeout
Expand Down Expand Up @@ -1473,4 +1474,14 @@ def self.on_failure_store_exception(exc, *args)
assert_kind_of Process::Status, exception.process_status
end
end

private

def redis_socket(redis)
if ::Redis::VERSION < "5"
redis._client.connection.instance_variable_get(:@sock)
else
redis._client.instance_variable_get(:@raw_connection)
end
end
end

0 comments on commit 98e5354

Please sign in to comment.