Skip to content

Commit

Permalink
Merge a7c65d4 into 712d5a1
Browse files Browse the repository at this point in the history
  • Loading branch information
krisleech committed Jul 31, 2018
2 parents 712d5a1 + a7c65d4 commit 153cdf3
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -21,3 +21,6 @@ tmp
*.o
*.a
mkmf.log
.ruby-version
.ruby-gemset
.idea
13 changes: 9 additions & 4 deletions .travis.yml
@@ -1,11 +1,16 @@
language: ruby
sudo: false
cache: bundler
script: bundle exec rspec
bundler_args: --without=extras
before_script:
- bundle exec sidekiq -r ./spec/dummy_app/app.rb -L /tmp/sidekiq.log &
- sleep 1
script: rspec spec
bundler_args: --without=extras
services:
- redis-server
rvm:
- '2.4.0'
- jruby-9.1.6.0
- 2.2.10
- 2.3.7
- 2.4.4
- 2.5.1
- jruby-9.1.17.0
10 changes: 10 additions & 0 deletions CHANGELOG.md
@@ -0,0 +1,10 @@
## [Unreleased]

### Added
- sidekiq 5 compatibility (closes [#11](https://github.com/krisleech/wisper-sidekiq/issues/11))
- support for optional `sidekiq_options` per subscriber class (closes [#15](https://github.com/krisleech/wisper-sidekiq/issues/15))

## [0.0.1] - 06-10-2014

### Added
- Initial release
2 changes: 1 addition & 1 deletion Gemfile
Expand Up @@ -11,5 +11,5 @@ gem 'psych', platforms: :rbx

group :extras do
gem 'rerun'
gem 'pry'
gem 'pry-byebug'
end
4 changes: 4 additions & 0 deletions README.md
Expand Up @@ -39,6 +39,10 @@ use its id instead.
See the [Sidekiq best practices](https://github.com/mperham/sidekiq/wiki/Best-Practices)
for more information.

### Passing down sidekiq options

In order to define custom [sidekiq_options](https://github.com/mperham/sidekiq/wiki/Advanced-Options#workers) you can add `sidekiq_options` class method in your subscriber definition - those options will be passed to Sidekiq's `set` method just before scheduling the asynchronous worker.

## Compatibility

The same Ruby versions as Sidekiq are offically supported, but it should work
Expand Down
27 changes: 24 additions & 3 deletions lib/wisper/sidekiq.rb
@@ -1,12 +1,22 @@
require 'yaml'
require 'wisper'
require 'sidekiq'

require 'wisper/sidekiq/version'

module Wisper

# based on Sidekiq 4.x #delay method, which is not enabled by default in Sidekiq 5.x
# https://github.com/mperham/sidekiq/blob/4.x/lib/sidekiq/extensions/generic_proxy.rb
# https://github.com/mperham/sidekiq/blob/4.x/lib/sidekiq/extensions/class_methods.rb

class SidekiqBroadcaster
def broadcast(subscriber, publisher, event, args)
subscriber.delay.public_send(event, *args)
class Worker
include ::Sidekiq::Worker

def perform(yml)
(subscriber, event, args) = ::YAML.load(yml)
subscriber.public_send(event, *args)
end
end

def self.register
Expand All @@ -15,6 +25,17 @@ def self.register
config.broadcaster :async, SidekiqBroadcaster.new
end
end

def broadcast(subscriber, publisher, event, args)
options = sidekiq_options(subscriber)
Worker.set(options).perform_async(::YAML.dump([subscriber, event, args]))
end

private

def sidekiq_options(subscriber)
subscriber.respond_to?(:sidekiq_options) ? subscriber.sidekiq_options : {}
end
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/dummy_app/app.rb
@@ -1,3 +1,4 @@
# This is loaded by Sidekiq

require 'wisper/sidekiq'
require_relative 'subscriber'
1 change: 1 addition & 0 deletions spec/dummy_app/subscriber.rb
Expand Up @@ -2,6 +2,7 @@ class Subscriber
def self.it_happened(message)
File.open('/tmp/shared', 'w') do |file|
file.puts "pid: #{Process.pid}"
file.puts message.inspect
end
end
end
29 changes: 20 additions & 9 deletions spec/integration_spec.rb
Expand Up @@ -9,29 +9,40 @@
include Wisper::Publisher

def run
broadcast(:it_happened, 'hello, world')
broadcast(:it_happened, { hello: 'world' })
end
end.new
end
let(:shared_content) { File.read('/tmp/shared') }

def ensure_sidekiq_was_running
Timeout.timeout(10) do
while !File.exist?('/tmp/shared')
sleep(0.1)
end
end
end

before do
Sidekiq::Testing.disable!
Sidekiq::Queue.new.clear
Sidekiq::RetrySet.new.clear
File.delete('/tmp/shared') if File.exist?('/tmp/shared')
end

it 'performs event in a different process' do
publisher.subscribe(Subscriber, async: Wisper::SidekiqBroadcaster.new)

publisher.run
ensure_sidekiq_was_running

Timeout.timeout(10) do
while !File.exist?('/tmp/shared')
sleep(0.1)
end
end
expect(shared_content).not_to include("pid: #{Process.pid}\n")
end

it 'performs event' do
publisher.subscribe(Subscriber, async: Wisper::SidekiqBroadcaster.new)
publisher.run
ensure_sidekiq_was_running

shared_content = File.read('/tmp/shared')
expect(shared_content).not_to eq "pid: #{Process.pid}\n"
expect(shared_content).to include('{:hello=>"world"}')
end
end
2 changes: 2 additions & 0 deletions spec/spec_helper.rb
@@ -1,5 +1,7 @@
require 'coveralls'
require 'simplecov'
require 'pry' unless ENV['CI']
require 'sidekiq/testing'

SimpleCov.formatter = Coveralls::SimpleCov::Formatter

Expand Down
60 changes: 60 additions & 0 deletions spec/wisper/sidekiq_broadcaster_spec.rb
@@ -0,0 +1,60 @@
require 'wisper/sidekiq'

RSpec.describe Wisper::SidekiqBroadcaster do
class PublisherUnderTest
include Wisper::Publisher

def run
broadcast(:it_happened)
end
end

class RegularSubscriberUnderTest
def self.it_happened(*_)
end
end

class CustomizedSubscriberUnderTest
def self.it_happened
end

def self.sidekiq_options
{ queue: "my_queue" }
end
end

let(:publisher) { PublisherUnderTest.new }

before { Sidekiq::Testing.fake! }
after { Sidekiq::Testing.disable! }

describe '#broadcast' do
it 'schedules a sidekiq job' do
publisher.subscribe(RegularSubscriberUnderTest, async: described_class.new)

expect { publisher.run }
.to change(Sidekiq::Queues["default"], :size).by(1)
end

it 'can respect custom sidekiq_options' do
publisher.subscribe(CustomizedSubscriberUnderTest, async: described_class.new)

expect { publisher.run }
.to change(Sidekiq::Queues["my_queue"], :size).by(1)
end

context 'when provides subscriber with args' do
let(:subscriber) { RegularSubscriberUnderTest }
let(:event) { 'it_happened' }
let(:args) { [1,2,3] }

subject(:broadcast_event) { described_class.new.broadcast(subscriber, nil, event, args) }

it 'subscriber receives event with corrects args' do
expect(RegularSubscriberUnderTest).to receive(event).with(*args)

Sidekiq::Testing.inline! { broadcast_event }
end
end
end
end

0 comments on commit 153cdf3

Please sign in to comment.