Skip to content

Commit

Permalink
Change old "message" terminology to "job"
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Oct 23, 2015
1 parent 18a513c commit 93dddd7
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
8 changes: 4 additions & 4 deletions lib/sidekiq/fetch.rb
Expand Up @@ -6,7 +6,7 @@ class BasicFetch
# can check if the process is shutting down.
TIMEOUT = 2

UnitOfWork = Struct.new(:queue, :message) do
UnitOfWork = Struct.new(:queue, :job) do
def acknowledge
# nothing to do
end
Expand All @@ -17,7 +17,7 @@ def queue_name

def requeue
Sidekiq.redis do |conn|
conn.rpush("queue:#{queue_name}", message)
conn.rpush("queue:#{queue_name}", job)
end
end
end
Expand Down Expand Up @@ -61,7 +61,7 @@ def self.bulk_requeue(inprogress, options)
jobs_to_requeue = {}
inprogress.each do |unit_of_work|
jobs_to_requeue[unit_of_work.queue_name] ||= []
jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.message
jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.job
end

Sidekiq.redis do |conn|
Expand All @@ -71,7 +71,7 @@ def self.bulk_requeue(inprogress, options)
end
end
end
Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis")
Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
Expand Down
24 changes: 12 additions & 12 deletions lib/sidekiq/processor.rb
Expand Up @@ -104,7 +104,7 @@ def fetch
def handle_fetch_exception(ex)
if !@down
@down = Time.now
logger.error("Error fetching message: #{ex}")
logger.error("Error fetching job: #{ex}")
ex.backtrace.each do |bt|
logger.error(bt)
end
Expand All @@ -113,23 +113,23 @@ def handle_fetch_exception(ex)
end

def process(work)
msgstr = work.message
jobstr = work.job
queue = work.queue_name

ack = false
begin
msg = Sidekiq.load_json(msgstr)
klass = msg['class'.freeze].constantize
job = Sidekiq.load_json(jobstr)
klass = job['class'.freeze].constantize
worker = klass.new
worker.jid = msg['jid'.freeze]
worker.jid = job['jid'.freeze]

stats(worker, msg, queue) do
Sidekiq.server_middleware.invoke(worker, msg, queue) do
stats(worker, job, queue) do
Sidekiq.server_middleware.invoke(worker, job, queue) do
# Only ack if we either attempted to start this job or
# successfully completed it. This prevents us from
# losing jobs if a middleware raises an exception before yielding
ack = true
execute_job(worker, cloned(msg['args'.freeze]))
execute_job(worker, cloned(job['args'.freeze]))
end
end
ack = true
Expand All @@ -139,7 +139,7 @@ def process(work)
# we didn't properly finish it.
ack = false
rescue Exception => ex
handle_exception(ex, msg || { :message => msgstr })
handle_exception(ex, job || { :job => jobstr })
raise
ensure
work.acknowledge if ack
Expand All @@ -158,9 +158,9 @@ def thread_identity
PROCESSED = Concurrent::AtomicFixnum.new
FAILURE = Concurrent::AtomicFixnum.new

def stats(worker, msg, queue)
def stats(worker, job, queue)
tid = thread_identity
WORKER_STATE[tid] = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
WORKER_STATE[tid] = {:queue => queue, :payload => job, :run_at => Time.now.to_i }

begin
yield
Expand All @@ -174,7 +174,7 @@ def stats(worker, msg, queue)
end

# Deep clone the arguments passed to the worker so that if
# the message fails, what is pushed back onto Redis hasn't
# the job fails, what is pushed back onto Redis hasn't
# been mutated by the worker.
def cloned(ary)
Marshal.load(Marshal.dump(ary))
Expand Down
2 changes: 1 addition & 1 deletion test/test_fetch.rb
Expand Up @@ -20,7 +20,7 @@ class TestFetcher < Sidekiq::Test
uow = fetch.retrieve_work
refute_nil uow
assert_equal 'basic', uow.queue_name
assert_equal 'msg', uow.message
assert_equal 'msg', uow.job
q = Sidekiq::Queue.new('basic')
assert_equal 0, q.size
uow.requeue
Expand Down
2 changes: 1 addition & 1 deletion test/test_processor.rb
Expand Up @@ -87,7 +87,7 @@ def call(worker, item, queue)

before do
work.expect(:queue_name, 'queue:default')
work.expect(:message, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
work.expect(:job, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
Sidekiq.server_middleware do |chain|
chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
end
Expand Down

0 comments on commit 93dddd7

Please sign in to comment.