Skip to content

Commit

Permalink
add a with_params variant, discussion at #12
Browse files Browse the repository at this point in the history
  • Loading branch information
jondot committed Jan 22, 2014
1 parent ea0e807 commit fb6360d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
7 changes: 6 additions & 1 deletion lib/sneakers/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def initialize(queue=nil, pool=nil, opts=nil)
@should_ack = opts[:ack]
@timeout_after = opts[:timeout_job_after]
@pool = pool || Thread.pool(opts[:threads]) # XXX config threads
@call_with_params = respond_to?(:work_with_params)

@queue = queue || Sneakers::Queue.new(
queue_name,
Expand Down Expand Up @@ -53,7 +54,11 @@ def do_work(hdr, props, msg, handler)
metrics.increment("work.#{self.class.name}.started")
Timeout.timeout(@timeout_after) do
metrics.timing("work.#{self.class.name}.time") do
res = work(msg)
if @call_with_params
res = work_with_params(msg, hdr, props)
else
res = work(msg)
end
end
end
rescue Timeout::Error
Expand Down
33 changes: 33 additions & 0 deletions spec/sneakers/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ def work(msg)
end
end

class WithParamsWorker
include Sneakers::Worker
from_queue 'defaults',
:ack => true,
:timeout_job_after => 0.5

def work_with_params(msg, header, props)
msg
end
end


class TestPool
Expand Down Expand Up @@ -339,4 +349,27 @@ def with_test_queuefactory(ctx, ack=true, msg=nil, nowork=false)
end
end



describe 'With Params' do
before do
@handler = Object.new
stub(@handler).acknowledge("tag")
stub(@handler).reject("tag")
stub(@handler).timeout("tag")
stub(@handler).error("tag", anything)
stub(@handler).noop("tag")

@header = Object.new
stub(@header).delivery_tag { "tag" }

@w = WithParamsWorker.new(@queue, TestPool.new)
mock(@w.metrics).timing("work.WithParamsWorker.time").yields.once
end

it 'should call work_with_params and not work' do
mock(@w).work_with_params(:ack, @header, {:foo => 1}).once
@w.do_work(@header, {:foo => 1 }, :ack, @handler)
end
end
end

0 comments on commit fb6360d

Please sign in to comment.