forked from sidekiq/sidekiq
/
worker.rb
89 lines (73 loc) · 2.51 KB
/
worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
require 'sidekiq/client'
require 'sidekiq/core_ext'
module Sidekiq
##
# Include this module in your worker class and you can easily create
# asynchronous jobs:
#
# class HardWorker
# include Sidekiq::Worker
#
# def perform(*args)
# # do some work
# end
# end
#
# Then in your Rails app, you can do this:
#
# HardWorker.perform_async(1, 2, 3)
#
# Note that perform_async is a class method, perform is an instance method.
module Worker
attr_accessor :jid
def self.included(base)
base.extend(ClassMethods)
base.class_attribute :sidekiq_options_hash
base.class_attribute :sidekiq_retry_in_block
base.class_attribute :sidekiq_retries_exhausted_block
end
def logger
Sidekiq.logger
end
module ClassMethods
def perform_async(*args)
client_push('class' => self, 'args' => args)
end
def perform_in(interval, *args)
int = interval.to_f
now = Time.now
ts = (int < 1_000_000_000 ? (now + interval).to_f : int)
item = { 'class' => self, 'args' => args, 'at' => ts }
# Optimization to enqueue something now that is scheduled to go out now or in the past
item.delete('at'.freeze) if ts <= now.to_f
client_push(item)
end
alias_method :perform_at, :perform_in
##
# Allows customization for this type of Worker.
# Legal options:
#
# :queue - use a named queue for this Worker, default 'default'
# :retry - enable the RetryJobs middleware for this Worker, default *true*
# :backtrace - whether to save any error backtrace in the retry payload to display in web UI,
# can be true, false or an integer number of lines to save, default *false*
# :pool - use the given Redis connection pool to push this type of job to a given shard.
def sidekiq_options(opts={})
self.sidekiq_options_hash = get_sidekiq_options.merge((opts || {}).stringify_keys)
end
def sidekiq_retry_in(&block)
self.sidekiq_retry_in_block = block
end
def sidekiq_retries_exhausted(&block)
self.sidekiq_retries_exhausted_block = block
end
def get_sidekiq_options # :nodoc:
self.sidekiq_options_hash ||= Sidekiq.default_worker_options
end
def client_push(item) # :nodoc:
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
Sidekiq::Client.new(pool).push(item.stringify_keys)
end
end
end
end