-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
fetch.rb
156 lines (132 loc) · 4.14 KB
/
fetch.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
require 'sidekiq'
require 'sidekiq/util'
require 'sidekiq/actor'
module Sidekiq
##
# The Fetcher blocks on Redis, waiting for a message to process
# from the queues. It gets the message and hands it to the Manager
# to assign to a ready Processor.
class Fetcher
include Util
include Actor
TIMEOUT = 1
attr_reader :down
def initialize(mgr, options)
@down = nil
@mgr = mgr
@strategy = Fetcher.strategy.new(options)
end
# Fetching is straightforward: the Manager makes a fetch
# request for each idle processor when Sidekiq starts and
# then issues a new fetch request every time a Processor
# finishes a message.
#
# Because we have to shut down cleanly, we can't block
# forever and we can't loop forever. Instead we reschedule
# a new fetch if the current fetch turned up nothing.
def fetch
watchdog('Fetcher#fetch died') do
return if Sidekiq::Fetcher.done?
begin
work = @strategy.retrieve_work
::Sidekiq.logger.info("Redis is online, #{Time.now.to_f - @down.to_f} sec downtime") if @down
@down = nil
if work
@mgr.async.assign(work)
else
after(0) { fetch }
end
rescue => ex
handle_fetch_exception(ex)
end
end
end
private
def pause
sleep(TIMEOUT)
end
def handle_fetch_exception(ex)
if !@down
logger.error("Error fetching message: #{ex}")
ex.backtrace.each do |bt|
logger.error(bt)
end
end
@down ||= Time.now
pause
after(0) { fetch }
rescue Task::TerminatedError
# If redis is down when we try to shut down, all the fetch backlog
# raises these errors. Haven't been able to figure out what I'm doing wrong.
end
# Ugh. Say hello to a bloody hack.
# Can't find a clean way to get the fetcher to just stop processing
# its mailbox when shutdown starts.
def self.done!
@done = true
end
def self.reset # testing only
@done = nil
end
def self.done?
@done
end
def self.strategy
Sidekiq.options[:fetch] || BasicFetch
end
end
class BasicFetch
def initialize(options)
@strictly_ordered_queues = !!options[:strict]
@queues = options[:queues].map { |q| "queue:#{q}" }
@unique_queues = @queues.uniq
end
def retrieve_work
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
UnitOfWork.new(*work) if work
end
# By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it
# an instance method will make it async to the Fetcher actor
def self.bulk_requeue(inprogress, options)
return if inprogress.empty?
Sidekiq.logger.debug { "Re-queueing terminated jobs" }
jobs_to_requeue = {}
inprogress.each do |unit_of_work|
jobs_to_requeue[unit_of_work.queue_name] ||= []
jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.message
end
Sidekiq.redis do |conn|
conn.pipelined do
jobs_to_requeue.each do |queue, jobs|
conn.rpush("queue:#{queue}", jobs)
end
end
end
Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis")
rescue => ex
Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
UnitOfWork = Struct.new(:queue, :message) do
def acknowledge
# nothing to do
end
def queue_name
queue.gsub(/.*queue:/, '')
end
def requeue
Sidekiq.redis do |conn|
conn.rpush("queue:#{queue_name}", message)
end
end
end
# Creating the Redis#brpop command takes into account any
# configured queue weights. By default Redis#brpop returns
# data from the first queue that has pending elements. We
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
queues = @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq
queues << Sidekiq::Fetcher::TIMEOUT
end
end
end