Skip to content

Commit

Permalink
Merge eddaaef into c2e77c9
Browse files Browse the repository at this point in the history
  • Loading branch information
ogins57 committed Mar 27, 2019
2 parents c2e77c9 + eddaaef commit 48159e7
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 13 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,35 @@ class MyWorker
end
```

### Observer

You can specify an observer will be processed after throttled with `:observe` option:

``` ruby
class MyWorker
include Sidekiq::Worker
include Sidekiq::Throttled::Worker

sidekiq_options :queue => :my_queue

MY_OBSERVER = lambda do |strategy, url|
# do something
end

sidekiq_throttle({
:concurrency => { ... },
:threshold => { ... },
:observe => MY_OBSERVER
})

def perform(url)
# ...
end
end
```

`strategy` returns `:concurrency` or `:threshold` when throttled in each condition.


### Dynamic throttling

Expand Down
37 changes: 24 additions & 13 deletions lib/sidekiq/throttled/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,25 @@ class Strategy
# @return [Strategy::Threshold, nil]
attr_reader :threshold

# @!attribute [r] observe
# @return [Proc, nil]
attr_reader :observe

# @param [#to_s] name
# @param [Hash] concurrency Concurrency options.
# See keyword args of {Strategy::Concurrency#initialize} for details.
# @param [Hash] threshold Threshold options.
# See keyword args of {Strategy::Threshold#initialize} for details.
# @param [#call] key_suffix Dynamic key suffix generator.
def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil)
key = "throttled:#{name}"
# @param [#call] observe Process called after throttled.
def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil,
observe: nil)
@observe = observe

@concurrency =
if concurrency
concurrency[:key_suffix] ||= key_suffix
Concurrency.new(key, **concurrency)
end
key = "throttled:#{name}"

@threshold =
if threshold
threshold[:key_suffix] ||= key_suffix
Threshold.new(key, **threshold)
end
@concurrency = initialize_of(:concurrency, key, key_suffix, concurrency)
@threshold = initialize_of(:threshold, key, key_suffix, threshold)

return if @concurrency || @threshold

Expand All @@ -55,9 +54,13 @@ def dynamic?

# @return [Boolean] whenever job is throttled or not.
def throttled?(jid, *job_args)
return true if @concurrency&.throttled?(jid, *job_args)
if @concurrency&.throttled?(jid, *job_args)
@observe&.call(:concurrency, *job_args)
return true
end

if @threshold&.throttled?(*job_args)
@observe&.call(:threshold, *job_args)
finalize!(jid, *job_args)
return true
end
Expand All @@ -77,6 +80,14 @@ def reset!
@concurrency&.reset!
@threshold&.reset!
end

private

def initialize_of(name, key, key_suffix, hash)
return nil unless hash
hash[:key_suffix] ||= key_suffix
Strategy.const_get(name.to_s.capitalize).new(key, **hash)
end
end
end
end
31 changes: 31 additions & 0 deletions spec/sidekiq/throttled/strategy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@
:key_suffix => key_suffix
})
end

it "assigns given observe" do
observe = lambda { |_| }

strategy = described_class.new(:foo, :threshold => {
:limit => 123,
:period => 657
}, :observe => observe)

expect(strategy.observe).to eq observe
end
end

describe "#throttled?" do
Expand All @@ -56,6 +67,16 @@
before { 10.times { strategy.throttled? jid } }

it { is_expected.to be true }

context "with observe" do
let(:observe) { lambda { |_| } }
let(:options) { threshold.merge(:observe => observe) }

it "calls observe" do
expect(observe).to receive(:call).with(:threshold)
strategy.throttled? jid
end
end
end
end

Expand All @@ -72,6 +93,16 @@
before { 7.times { strategy.throttled? jid } }

it { is_expected.to be true }

context "with observe" do
let(:observe) { lambda { |_| } }
let(:options) { concurrency.merge(:observe => observe) }

it "calls observe" do
expect(observe).to receive(:call).with(:concurrency)
strategy.throttled? jid
end
end
end
end

Expand Down

0 comments on commit 48159e7

Please sign in to comment.