From 98e5354c65f0a691e301d70a78491c358c26fb11 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Wed, 31 Aug 2022 11:42:22 +0200 Subject: [PATCH] redis-rb 5.0 compatiblity There was two distinct issues. The first one was that `Resque::Worker` instances were passed as argument to `sadd / srem` relying on the implicit call to `to_s`. This has been removed in redis-rb 5.0, you now must explictly cast to a String or another type understood by the redis client. The other issue was with the reconnection after fork. When forking you must make sure to close the Redis connection, otherwise if you write in it, the responses of the parent and child will be mixed in. Resque was using a private API to do this reconnection. I updated it to use the main public API. There was also a bug in redis-rb 5.0.1 preventing from reconnecting after fork, I fixed it in 5.0.2. --- lib/resque/data_store.rb | 8 +++++--- lib/resque/web_runner.rb | 2 +- test/child_killing_test.rb | 6 +++--- test/worker_test.rb | 31 +++++++++++++++++++++---------- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/lib/resque/data_store.rb b/lib/resque/data_store.rb index 7f7428f29..05a37cf28 100644 --- a/lib/resque/data_store.rb +++ b/lib/resque/data_store.rb @@ -79,7 +79,9 @@ def identifier # Force a reconnect to Redis without closing the connection in the parent # process after a fork. def reconnect - @redis._client.connect + @redis.disconnect! + @redis.ping + nil end # Returns an array of all known Resque keys in Redis. Redis' KEYS operation @@ -238,7 +240,7 @@ def worker_exists?(worker_id) def register_worker(worker) @redis.pipelined do |piped| - piped.sadd(:workers, [worker]) + piped.sadd(:workers, [worker.id]) worker_started(worker, redis: piped) end end @@ -249,7 +251,7 @@ def worker_started(worker, redis: @redis) def unregister_worker(worker, &block) @redis.pipelined do |piped| - piped.srem(:workers, [worker]) + piped.srem(:workers, [worker.id]) piped.del(redis_key_for_worker(worker)) piped.del(redis_key_for_worker_start_time(worker)) piped.hdel(HEARTBEAT_KEY, worker.to_s) diff --git a/lib/resque/web_runner.rb b/lib/resque/web_runner.rb index eef0d92f0..e6751ef0d 100644 --- a/lib/resque/web_runner.rb +++ b/lib/resque/web_runner.rb @@ -286,7 +286,7 @@ def setup_rack_handler begin handler = Rack::Handler.get(server) break - rescue LoadError, NameError => e + rescue LoadError, NameError next end end diff --git a/test/child_killing_test.rb b/test/child_killing_test.rb index afdb9f949..38a7a4d86 100644 --- a/test/child_killing_test.rb +++ b/test/child_killing_test.rb @@ -7,7 +7,7 @@ class LongRunningJob @queue = :long_running_job def self.perform( sleep_time, rescue_time=nil ) - Resque.redis.reconnect # get its own connection + Resque.redis.disconnect! # get its own connection Resque.redis.rpush( 'sigterm-test:start', Process.pid ) sleep sleep_time Resque.redis.rpush( 'sigterm-test:result', 'Finished Normally' ) @@ -23,8 +23,8 @@ def start_worker(rescue_time, term_child, term_timeout = 1) Resque.enqueue( LongRunningJob, 3, rescue_time ) worker_pid = Kernel.fork do - # reconnect since we just forked - Resque.redis.reconnect + # disconnect since we just forked + Resque.redis.disconnect! worker = Resque::Worker.new(:long_running_job) worker.term_timeout = term_timeout diff --git a/test/worker_test.rb b/test/worker_test.rb index 8623e6c79..ca72568a0 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -93,7 +93,7 @@ def messages assert_equal "at_exit", File.open(tmpfile).read.strip else # ensure we actually fork - Resque.redis.reconnect + Resque.redis.disconnect! Resque::Job.create(:at_exit_jobs, AtExitJob, tmpfile) worker = Resque::Worker.new(:at_exit_jobs) worker.run_at_exit_hooks = true @@ -121,7 +121,7 @@ def self.perform Process.waitpid(worker_pid) else # ensure we actually fork - Resque.redis.reconnect + Resque.redis.disconnect! Resque::Job.create(:not_failing_job, RaiseExceptionOnFailure) worker = Resque::Worker.new(:not_failing_job) worker.run_at_exit_hooks = true @@ -141,7 +141,7 @@ def self.perform assert !File.exist?(tmpfile), "The file '#{tmpfile}' exists, at_exit hooks were run" else # ensure we actually fork - Resque.redis.reconnect + Resque.redis.disconnect! Resque::Job.create(:at_exit_jobs, AtExitJob, tmpfile) worker = Resque::Worker.new(:at_exit_jobs) suppress_warnings do @@ -1197,6 +1197,7 @@ def self.perform Resque::Job.create(:jobs, SomeJob, 20, '/tmp') workerA.work(0) + pipe_rd.wait_readable(1) assert_equal('hey', pipe_rd.read_nonblock(3)) ensure pipe_rd.close @@ -1264,12 +1265,12 @@ def self.perform assert_equal 1, Resque::Failure.count end - it "no reconnects to redis when not forking" do - original_connection = Resque.redis._client.connection.instance_variable_get("@sock") + it "no disconnect from redis when not forking" do + original_connection = redis_socket(Resque.redis) without_forking do @worker.work(0) end - assert_equal original_connection, Resque.redis._client.connection.instance_variable_get("@sock") + assert_equal original_connection, redis_socket(Resque.redis) end it "logs errors with the correct logging level" do @@ -1342,9 +1343,9 @@ def run_in_job(&block) end it "reconnects to redis after fork" do - original_connection = Resque.redis._client.connection.instance_variable_get("@sock").object_id + original_connection = redis_socket(Resque.redis).object_id new_connection = run_in_job do - Resque.redis._client.connection.instance_variable_get("@sock").object_id + redis_socket(Resque.redis).object_id end assert Resque.redis._client.connected? refute_equal original_connection, new_connection @@ -1381,7 +1382,7 @@ class PreShutdownLongRunningJob @queue = :long_running_job def self.perform(run_time) - Resque.redis.reconnect # get its own connection + Resque.redis.disconnect! # get its own connection Resque.redis.rpush('pre-term-timeout-test:start', Process.pid) sleep run_time Resque.redis.rpush('pre-term-timeout-test:result', 'Finished Normally') @@ -1403,7 +1404,7 @@ def self.perform(run_time) worker_pid = Kernel.fork do # reconnect to redis - Resque.redis.reconnect + Resque.redis.disconnect! worker = Resque::Worker.new(:long_running_job) worker.pre_shutdown_timeout = pre_shutdown_timeout @@ -1473,4 +1474,14 @@ def self.on_failure_store_exception(exc, *args) assert_kind_of Process::Status, exception.process_status end end + + private + + def redis_socket(redis) + if ::Redis::VERSION < "5" + redis._client.connection.instance_variable_get(:@sock) + else + redis._client.instance_variable_get(:@raw_connection) + end + end end