Skip to content

Commit

Permalink
Let's try this again...
Browse files Browse the repository at this point in the history
Use `Object#respond_to?` to determine which MultiJson API to use.
  • Loading branch information
sferik committed Apr 22, 2012
1 parent aa35124 commit 68c725e
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 18 deletions.
6 changes: 5 additions & 1 deletion lib/sidekiq/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ def self.push(item)

pushed = false
Sidekiq.client_middleware.invoke(worker_class, item, queue) do
payload = MultiJson.encode(item)
payload = if MultiJson.respond_to?(:dump)
MultiJson.dump(item)
else
MultiJson.encode(item)
end
Sidekiq.redis do |conn|
_, pushed = conn.multi do
conn.sadd('queues', queue)
Expand Down
6 changes: 5 additions & 1 deletion lib/sidekiq/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ def assign(msg, queue)
processor = @ready.pop
@in_progress[processor.object_id] = [msg, queue]
@busy << processor
processor.process!(MultiJson.decode(msg), queue)
if MultiJson.respond_to?(:adapter)
processor.process!(MultiJson.load(msg), queue)
else
processor.process!(MultiJson.decode(msg), queue)
end
end
end
end
Expand Down
8 changes: 6 additions & 2 deletions lib/sidekiq/middleware/client/unique_jobs.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
require 'multi_json'
require 'digest'
require 'multi_json'

module Sidekiq
module Middleware
Expand All @@ -10,7 +10,11 @@ class UniqueJobs
def call(worker_class, item, queue)
enabled = worker_class.get_sidekiq_options['unique']
if enabled
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
payload_hash = if MultiJson.respond_to?(:dump)
Digest::MD5.hexdigest(MultiJson.dump(item))
else
Digest::MD5.hexdigest(MultiJson.encode(item))
end
unique = false

Sidekiq.redis do |conn|
Expand Down
9 changes: 7 additions & 2 deletions lib/sidekiq/middleware/server/failure_jobs.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'multi_json'

module Sidekiq
module Middleware
module Server
Expand All @@ -14,8 +16,11 @@ def call(*args)
:worker => args[1]['class'],
:queue => args[2]
}

Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.encode(data)) }
if MultiJson.respond_to?(:dump)
Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.dump(data)) }
else
Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.encode(data)) }
end
raise
end
end
Expand Down
8 changes: 7 additions & 1 deletion lib/sidekiq/middleware/server/retry_jobs.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'multi_json'

require 'sidekiq/retry'

module Sidekiq
Expand Down Expand Up @@ -44,7 +46,11 @@ def call(worker, msg, queue)
delay = DELAY.call(count)
logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = MultiJson.encode(msg)
payload = if MultiJson.respond_to?(:dump)
MultiJson.dump(msg)
else
MultiJson.encode(msg)
end
Sidekiq.redis do |conn|
conn.zadd('retry', retry_at.to_s, payload)
end
Expand Down
8 changes: 7 additions & 1 deletion lib/sidekiq/middleware/server/unique_jobs.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
require 'multi_json'

module Sidekiq
module Middleware
module Server
class UniqueJobs
def call(*args)
yield
ensure
json = MultiJson.encode(args[1])
json = if MultiJson.respond_to?(:dump)
MultiJson.dump(args[1])
else
MultiJson.encode(args[1])
end
hash = Digest::MD5.hexdigest(json)
Sidekiq.redis {|conn| conn.del(hash) }
end
Expand Down
9 changes: 7 additions & 2 deletions lib/sidekiq/processor.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'celluloid'
require 'multi_json'
require 'sidekiq/util'

require 'sidekiq/middleware/server/active_record'
Expand Down Expand Up @@ -53,8 +54,12 @@ def stats(worker, msg, queue)
redis do |conn|
conn.multi do
conn.set("worker:#{self}:started", Time.now.to_s)
conn.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg,
:run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")))
hash = {:queue => queue, :payload => msg, :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")}
if MultiJson.respond_to?(:dump)
conn.set("worker:#{self}", MultiJson.dump(hash))
else
conn.set("worker:#{self}", MultiJson.encode(hash))
end
end
end

Expand Down
6 changes: 5 additions & 1 deletion lib/sidekiq/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ def poll

messages.each do |message|
logger.debug { "Retrying #{message}" }
msg = MultiJson.decode(message)
msg = if MultiJson.respond_to?(:adapter)
MultiJson.load(message)
else
MultiJson.decode(message)
end
conn.rpush("queue:#{msg['queue']}", message)
end
end
Expand Down
32 changes: 27 additions & 5 deletions lib/sidekiq/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ def workers
Sidekiq.redis do |conn|
conn.smembers('workers').map do |w|
msg = conn.get("worker:#{w}")
msg = MultiJson.decode(msg) if msg
if msg
msg = if MultiJson.respond_to?(:adapter)
MultiJson.load(msg)
else
MultiJson.decode(msg)
end
end
[w, msg]
end.sort { |x| x[1] ? -1 : 1 }
end
Expand All @@ -74,7 +80,11 @@ def retry_count
def retries
Sidekiq.redis do |conn|
results = conn.zrange('retry', 0, 25, :withscores => true)
results.each_slice(2).map { |msg, score| [MultiJson.decode(msg), Float(score)] }
if MultiJson.respond_to?(:adapter)
results.each_slice(2).map { |msg, score| [MultiJson.load(msg), Float(score)] }
else
results.each_slice(2).map { |msg, score| [MultiJson.decode(msg), Float(score)] }
end
end
end

Expand All @@ -89,7 +99,11 @@ def queues
def retries_with_score(score)
Sidekiq.redis do |conn|
results = conn.zrangebyscore('retry', score, score)
results.map { |msg| MultiJson.decode(msg) }
if MultiJson.respond_to?(:adapter)
results.map { |msg| MultiJson.load(msg) }
else
results.map { |msg| MultiJson.decode(msg) }
end
end
end

Expand Down Expand Up @@ -124,7 +138,11 @@ def relative_time(time)
get "/queues/:name" do
halt 404 unless params[:name]
@name = params[:name]
@messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.decode(str) }
if MultiJson.respond_to?(:adapter)
@messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.load(str) }
else
@messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.decode(str) }
end
slim :queue
end

Expand All @@ -142,7 +160,11 @@ def relative_time(time)
results = conn.zrangebyscore('retry', score, score)
conn.zremrangebyscore('retry', score, score)
results.map do |message|
msg = MultiJson.decode(message)
msg = if MultiJson.respond_to?(:adapter)
MultiJson.load(message)
else
MultiJson.decode(message)
end
conn.rpush("queue:#{msg['queue']}", message)
end
end
Expand Down
2 changes: 1 addition & 1 deletion sidekiq.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Gem::Specification.new do |gem|
gem.add_dependency 'redis-namespace'
gem.add_dependency 'connection_pool', '~> 0.9.0'
gem.add_dependency 'celluloid', '~> 0.10.0'
gem.add_dependency 'multi_json', '>= 1.0', '< 1.3'
gem.add_dependency 'multi_json', '~> 1.0'
gem.add_development_dependency 'minitest'
gem.add_development_dependency 'sinatra'
gem.add_development_dependency 'slim'
Expand Down
7 changes: 6 additions & 1 deletion test/test_retry.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'helper'
require 'multi_json'
require 'sidekiq/retry'
require 'sidekiq/middleware/server/retry_jobs'

Expand Down Expand Up @@ -81,7 +82,11 @@ def @redis.with; yield self; end
end

it 'should poll like a bad mother...SHUT YO MOUTH' do
fake_msg = MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
fake_msg = if MultiJson.respond_to?(:dump)
MultiJson.dump({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
else
MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
end
@redis.expect :multi, [[fake_msg], 1], []
@redis.expect :rpush, 1, ['queue:someq', fake_msg]

Expand Down

0 comments on commit 68c725e

Please sign in to comment.