From ae7e94856ef9f8f79c9f2081dda758bbe425f0ba Mon Sep 17 00:00:00 2001 From: Alexey Zapparov Date: Mon, 12 Feb 2018 04:15:36 +0000 Subject: [PATCH] Switch to redis-prescription We're working on new, better sidekiq fetch strategye that will allow to build reliable / throttled and the mix of two strategies easily. Throttling logic most likley will be completely rewritten and extracted out to a seprate, sidekiq-agnostic gem as well. --- lib/sidekiq/throttled/strategy/concurrency.rb | 13 ++- lib/sidekiq/throttled/strategy/script.rb | 95 ------------------- lib/sidekiq/throttled/strategy/threshold.rb | 13 ++- sidekiq-throttled.gemspec | 1 + .../sidekiq/throttled/strategy/script_spec.rb | 37 -------- 5 files changed, 17 insertions(+), 142 deletions(-) delete mode 100644 lib/sidekiq/throttled/strategy/script.rb delete mode 100644 spec/sidekiq/throttled/strategy/script_spec.rb diff --git a/lib/sidekiq/throttled/strategy/concurrency.rb b/lib/sidekiq/throttled/strategy/concurrency.rb index 0f5d3c2..ca7e600 100644 --- a/lib/sidekiq/throttled/strategy/concurrency.rb +++ b/lib/sidekiq/throttled/strategy/concurrency.rb @@ -1,7 +1,8 @@ # frozen_string_literal: true +require "redis/prescription" + require "sidekiq/throttled/strategy/base" -require "sidekiq/throttled/strategy/script" module Sidekiq module Throttled @@ -19,7 +20,7 @@ class Concurrency # PUSH(@key, @jid) # return 0 # end - SCRIPT = Script.read "#{__dir__}/concurrency.lua" + SCRIPT = Redis::Prescription.read "#{__dir__}/concurrency.lua" private_constant :SCRIPT # @param [#to_s] strategy_key @@ -43,10 +44,12 @@ def dynamic? def throttled?(jid, *job_args) return false unless (job_limit = limit(job_args)) - keys = [key(job_args)] - args = [jid.to_s, job_limit, @ttl, Time.now.to_f] + kwargs = { + :keys => [key(job_args)], + :argv => [jid.to_s, job_limit, @ttl, Time.now.to_f] + } - 1 == SCRIPT.eval(keys, args) + Sidekiq.redis { |redis| 1 == SCRIPT.eval(redis, kwargs) } end # @return [Integer] Current count of jobs diff --git a/lib/sidekiq/throttled/strategy/script.rb b/lib/sidekiq/throttled/strategy/script.rb deleted file mode 100644 index 811a162..0000000 --- a/lib/sidekiq/throttled/strategy/script.rb +++ /dev/null @@ -1,95 +0,0 @@ -# frozen_string_literal: true - -require "digest/sha1" - -require "sidekiq" - -module Sidekiq - module Throttled - class Strategy - # Lua script executor for redis. - # - # Instead of executing script with `EVAL` everytime - loads script once - # and then runs it with `EVALSHA`. - # - # @private - class Script - # Script load command - LOAD = "load" - private_constant :LOAD - - # Redis error fired when script ID is unkown - NOSCRIPT = "NOSCRIPT" - private_constant :NOSCRIPT - - # LUA script source. - # @return [String] - attr_reader :source - - # LUA script SHA1 digest. - # @return [String] - attr_reader :digest - - # @param [#to_s] source Lua script - # @param [Logger] logger - def initialize(source, logger: Sidekiq.logger) - @source = source.to_s.strip.freeze - @digest = Digest::SHA1.hexdigest(@source).freeze - @logger = logger - end - - # Loads script to redis - # @return [void] - def bootstrap! - namespaceless_redis do |conn| - digest = conn.script(LOAD, @source) - - # XXX: this may happen **ONLY** if script digesting will be - # changed in redis, which is not likely gonna happen. - unless @digest == digest - if @logger - @logger.warn "Unexpected script SHA1 digest: " \ - "#{digest.inspect} (expected: #{@digest.inspect})" - end - - @digest = digest.freeze - end - end - end - - # Executes script and returns result of execution - # @return Result of script execution - def eval(*args) - Sidekiq.redis do |conn| - begin - conn.evalsha(@digest, *args) - rescue => e - raise unless e.message.include? NOSCRIPT - bootstrap! - conn.evalsha(@digest, *args) - end - end - end - - # Reads given file and returns new {Script} with its contents. - # @return [Script] - def self.read(file) - new File.read file - end - - private - - # Yields real namespace-less redis client. - def namespaceless_redis - Sidekiq.redis do |conn| - if defined?(Redis::Namespace) && conn.is_a?(Redis::Namespace) - conn = conn.redis - end - - yield conn - end - end - end - end - end -end diff --git a/lib/sidekiq/throttled/strategy/threshold.rb b/lib/sidekiq/throttled/strategy/threshold.rb index c2971be..5517cea 100644 --- a/lib/sidekiq/throttled/strategy/threshold.rb +++ b/lib/sidekiq/throttled/strategy/threshold.rb @@ -1,7 +1,8 @@ # frozen_string_literal: true +require "redis/prescription" + require "sidekiq/throttled/strategy/base" -require "sidekiq/throttled/strategy/script" module Sidekiq module Throttled @@ -29,7 +30,7 @@ class Threshold # # increase! # return 0 - SCRIPT = Script.read "#{__dir__}/threshold.lua" + SCRIPT = Redis::Prescription.read "#{__dir__}/threshold.lua" private_constant :SCRIPT # @param [#to_s] strategy_key @@ -59,10 +60,12 @@ def dynamic? def throttled?(*job_args) return false unless (job_limit = limit(job_args)) - keys = [key(job_args)] - args = [job_limit, period(job_args), Time.now.to_f] + kwargs = { + :keys => [key(job_args)], + :argv => [job_limit, period(job_args), Time.now.to_f] + } - 1 == SCRIPT.eval(keys, args) + Sidekiq.redis { |redis| 1 == SCRIPT.eval(redis, kwargs) } end # @return [Integer] Current count of jobs diff --git a/sidekiq-throttled.gemspec b/sidekiq-throttled.gemspec index 277e8de..90b200e 100644 --- a/sidekiq-throttled.gemspec +++ b/sidekiq-throttled.gemspec @@ -25,6 +25,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_runtime_dependency "sidekiq" + spec.add_runtime_dependency "redis-prescription" spec.add_development_dependency "bundler", "~> 1.10" end diff --git a/spec/sidekiq/throttled/strategy/script_spec.rb b/spec/sidekiq/throttled/strategy/script_spec.rb deleted file mode 100644 index 9a29a53..0000000 --- a/spec/sidekiq/throttled/strategy/script_spec.rb +++ /dev/null @@ -1,37 +0,0 @@ -# frozen_string_literal: true - -RSpec.describe Sidekiq::Throttled::Strategy::Script do - let(:logger) { double :warn => nil } - let(:lua_script) { "redis.call('ping')" } - let(:redis_script) { described_class.new(lua_script, :logger => logger) } - - it "loads only when needed" do - Sidekiq.redis do |conn| - if defined?(Redis::Namespace) && conn.is_a?(Redis::Namespace) - conn = conn.redis - end - - expect(conn).to receive(:script) - .with("load", lua_script).and_call_original - redis_script.eval - - expect(conn).not_to receive(:script) - .with("load", lua_script) - redis_script.eval - end - end - - describe "#eval" do - before { redis_script.instance_variable_set(:@digest, "xxx") } - - it "warns if server returns unexpected script digest" do - expect(logger).to receive(:warn).with(/Unexpected script SHA1 digest/) - redis_script.eval - end - - it "updates script digest" do - redis_script.eval - expect(redis_script.digest).not_to eq("xxx") - end - end -end