From 09dacfed8f3f0b42833122b138639d22c71e3d56 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Mon, 22 Aug 2022 13:27:37 -0700 Subject: [PATCH] Redis-rb 4.8.0 compatibility, fixes #5484 --- Gemfile | 1 + lib/sidekiq/api.rb | 2 +- lib/sidekiq/client.rb | 2 +- lib/sidekiq/fetch.rb | 4 ++-- lib/sidekiq/launcher.rb | 4 ++-- test/test_api.rb | 26 ++++++++++++-------------- test/test_fetch.rb | 2 +- test/test_scheduled.rb | 2 +- test/test_web.rb | 13 ++++++------- test/test_web_helpers.rb | 2 +- 10 files changed, 28 insertions(+), 30 deletions(-) diff --git a/Gemfile b/Gemfile index a4443c471..2d43f1304 100644 --- a/Gemfile +++ b/Gemfile @@ -3,6 +3,7 @@ source "https://rubygems.org" gemspec gem "rake" +gem "redis" gem "redis-namespace" gem "redis-client" gem "rails", "~> 6.0" diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index cedcf6e5c..68e27f172 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -316,7 +316,7 @@ def clear Sidekiq.redis do |conn| conn.multi do |transaction| transaction.unlink(@rname) - transaction.srem("queues", name) + transaction.srem("queues", [name]) end end true diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 7e75f85ac..ae333cd94 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -231,7 +231,7 @@ def atomic_push(conn, payloads) entry["enqueued_at"] = now Sidekiq.dump_json(entry) } - conn.sadd("queues", queue) + conn.sadd("queues", [queue]) conn.lpush("queue:#{queue}", to_push) end end diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index 5edb306be..c48b59571 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -33,7 +33,7 @@ def initialize(config) @queues = @config[:queues].map { |q| "queue:#{q}" } if @strictly_ordered_queues @queues.uniq! - @queues << TIMEOUT + @queues << {timeout: TIMEOUT} end end @@ -83,7 +83,7 @@ def queues_cmd else permute = @queues.shuffle permute.uniq! - permute << TIMEOUT + permute << {timeout: TIMEOUT} permute end end diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index 580fd181b..dca953be0 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -86,7 +86,7 @@ def clear_heartbeat # doesn't actually exit, it'll reappear in the Web UI. redis do |conn| conn.pipelined do |pipeline| - pipeline.srem("processes", identity) + pipeline.srem("processes", [identity]) pipeline.unlink("#{identity}:work") end end @@ -165,7 +165,7 @@ def ❤ _, exists, _, _, msg = redis { |conn| conn.multi { |transaction| - transaction.sadd("processes", key) + transaction.sadd("processes", [key]) transaction.exists?(key) transaction.hmset(key, "info", to_json, "busy", curstate.size, diff --git a/test/test_api.rb b/test/test_api.rb index 25343514b..7b1966303 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -76,8 +76,7 @@ describe "workers_size" do it "retrieves the number of busy workers" do Sidekiq.redis do |c| - c.sadd("processes", "process_1") - c.sadd("processes", "process_2") + c.sadd("processes", ["process_1", "process_2"]) c.hset("process_1", "busy", 1) c.hset("process_2", "busy", 2) end @@ -95,10 +94,10 @@ it "returns a hash of queue and size in order" do Sidekiq.redis do |conn| conn.rpush "queue:foo", "{}" - conn.sadd "queues", "foo" + conn.sadd "queues", ["foo"] 3.times { conn.rpush "queue:bar", "{}" } - conn.sadd "queues", "bar" + conn.sadd "queues", ["bar"] end s = Sidekiq::Stats::Queues.new @@ -113,7 +112,7 @@ it "handles latency for good jobs" do Sidekiq.redis do |conn| conn.rpush "queue:default", "{\"enqueued_at\": #{Time.now.to_f}}" - conn.sadd "queues", "default" + conn.sadd "queues", ["default"] end s = Sidekiq::Stats.new assert s.default_queue_latency > 0 @@ -124,7 +123,7 @@ it "handles latency for incomplete jobs" do Sidekiq.redis do |conn| conn.rpush "queue:default", "{}" - conn.sadd "queues", "default" + conn.sadd "queues", ["default"] end s = Sidekiq::Stats.new assert_equal 0, s.default_queue_latency @@ -135,10 +134,10 @@ it "returns total enqueued jobs" do Sidekiq.redis do |conn| conn.rpush "queue:foo", "{}" - conn.sadd "queues", "foo" + conn.sadd "queues", ["foo"] 3.times { conn.rpush "queue:bar", "{}" } - conn.sadd "queues", "bar" + conn.sadd "queues", ["bar"] end s = Sidekiq::Stats.new @@ -570,9 +569,9 @@ class WorkerWithTags time = Time.now.to_f Sidekiq.redis do |conn| conn.multi do |transaction| - transaction.sadd("processes", odata["key"]) + transaction.sadd("processes", [odata["key"]]) transaction.hmset(odata["key"], "info", Sidekiq.dump_json(odata), "busy", 10, "beat", time) - transaction.sadd("processes", "fake:pid") + transaction.sadd("processes", ["fake:pid"]) end end @@ -601,7 +600,7 @@ class WorkerWithTags key = "#{hn}:#{$$}" pdata = {"pid" => $$, "hostname" => hn, "started_at" => Time.now.to_i} Sidekiq.redis do |conn| - conn.sadd("processes", key) + conn.sadd("processes", [key]) conn.hmset(key, "info", Sidekiq.dump_json(pdata), "busy", 0, "beat", Time.now.to_f) end @@ -653,7 +652,7 @@ class WorkerWithTags data = {"pid" => rand(10_000), "hostname" => "app#{rand(1_000)}", "started_at" => Time.now.to_f} key = "#{data["hostname"]}:#{data["pid"]}" Sidekiq.redis do |conn| - conn.sadd("processes", key) + conn.sadd("processes", [key]) conn.hmset(key, "info", Sidekiq.dump_json(data), "busy", 0, "beat", Time.now.to_f) end @@ -662,8 +661,7 @@ class WorkerWithTags assert_equal 1, ps.to_a.size Sidekiq.redis do |conn| - conn.sadd("processes", "bar:987") - conn.sadd("processes", "bar:986") + conn.sadd("processes", ["bar:987", "bar:986"]) conn.del("process_cleanup") end diff --git a/test/test_fetch.rb b/test/test_fetch.rb index 994fa43e5..b3b3ec1da 100644 --- a/test/test_fetch.rb +++ b/test/test_fetch.rb @@ -35,7 +35,7 @@ def fetcher(options) it "retrieves with strict setting" do fetch = fetcher(queues: ["basic", "bar", "bar"], strict: true) cmd = fetch.queues_cmd - assert_equal cmd, ["queue:basic", "queue:bar", Sidekiq::BasicFetch::TIMEOUT] + assert_equal cmd, ["queue:basic", "queue:bar", {timeout: Sidekiq::BasicFetch::TIMEOUT}] end it "bulk requeues" do diff --git a/test/test_scheduled.rb b/test/test_scheduled.rb index e649255d9..01462f0b6 100644 --- a/test/test_scheduled.rb +++ b/test/test_scheduled.rb @@ -129,7 +129,7 @@ def with_sidekiq_option(name, value) with_sidekiq_option(:average_scheduled_poll_interval, 10) do 3.times do |i| Sidekiq.redis do |conn| - conn.sadd("processes", "process-#{i}") + conn.sadd("processes", ["process-#{i}"]) conn.hset("process-#{i}", "info", "") end end diff --git a/test/test_web.rb b/test/test_web.rb index 92d12513b..b2dedcc14 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -62,7 +62,7 @@ def perform(a, b) it "can display workers" do Sidekiq.redis do |conn| conn.incr("busy") - conn.sadd("processes", "foo:1234") + conn.sadd("processes", ["foo:1234"]) conn.hmset("foo:1234", "info", Sidekiq.dump_json("hostname" => "foo", "started_at" => Time.now.to_f, "queues" => [], "concurrency" => 10), "at", Time.now.to_f, "busy", 4) identity = "foo:1234:work" hash = {queue: "critical", payload: {"class" => WebWorker.name, "args" => [1, "abc"]}, run_at: Time.now.to_i} @@ -152,7 +152,7 @@ def perform(a, b) it "can delete a queue" do Sidekiq.redis do |conn| conn.rpush("queue:foo", "{\"args\":[],\"enqueued_at\":1567894960}") - conn.sadd("queues", "foo") + conn.sadd("queues", ["foo"]) end get "/queues/foo" @@ -442,7 +442,7 @@ def perform(a, b) # on /workers page Sidekiq.redis do |conn| pro = "foo:1234" - conn.sadd("processes", pro) + conn.sadd("processes", [pro]) conn.hmset(pro, "info", Sidekiq.dump_json("started_at" => Time.now.to_f, "labels" => ["frumduz"], "queues" => [], "concurrency" => 10), "busy", 1, "beat", Time.now.to_f) identity = "#{pro}:work" hash = {queue: "critical", payload: {"class" => "FailWorker", "args" => ["hello"]}, run_at: Time.now.to_i} @@ -517,7 +517,7 @@ def perform(a, b) Sidekiq.redis do |conn| conn.set("stat:processed", 5) conn.set("stat:failed", 2) - conn.sadd("queues", "default") + conn.sadd("queues", ["default"]) end 2.times { add_retry } 3.times { add_scheduled } @@ -570,8 +570,7 @@ def perform(a, b) Sidekiq.redis do |conn| conn.set("stat:processed", 5) conn.set("stat:failed", 2) - conn.sadd("queues", "default") - conn.sadd("queues", "queue2") + conn.sadd("queues", ["default", "queue2"]) end 2.times { add_retry } 3.times { add_scheduled } @@ -714,7 +713,7 @@ def add_worker msg = "{\"queue\":\"default\",\"payload\":{\"retry\":true,\"queue\":\"default\",\"timeout\":20,\"backtrace\":5,\"class\":\"HardWorker\",\"args\":[\"bob\",10,5],\"jid\":\"2b5ad2b016f5e063a1c62872\"},\"run_at\":1361208995}" Sidekiq.redis do |conn| conn.multi do |transaction| - transaction.sadd("processes", key) + transaction.sadd("processes", [key]) transaction.hmset(key, "info", Sidekiq.dump_json("hostname" => "foo", "started_at" => Time.now.to_f, "queues" => []), "at", Time.now.to_f, "busy", 4) transaction.hmset("#{key}:work", Time.now.to_f, msg) end diff --git a/test/test_web_helpers.rb b/test/test_web_helpers.rb index dfdf73534..eea895686 100644 --- a/test/test_web_helpers.rb +++ b/test/test_web_helpers.rb @@ -136,7 +136,7 @@ def params key = "#{hostname}:123" Sidekiq.redis do |conn| - conn.sadd("processes", key) + conn.sadd("processes", [key]) conn.hmset(key, "info", Sidekiq.dump_json(pdata), "busy", 0, "beat", Time.now.to_f) end end