-
Notifications
You must be signed in to change notification settings - Fork 21
/
message.rb
348 lines (311 loc) · 10.7 KB
/
message.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
require "timeout"
module Beetle
# Instances of class Message are created when a scubscription callback fires. Class
# Message contains the code responsible for message deduplication and determining if it
# should retry executing the message handler after a handler has crashed (or forcefully
# aborted).
class Message
include Logging
# current message format version
FORMAT_VERSION = 1
# flag for encoding redundant messages
FLAG_REDUNDANT = 1
# default lifetime of messages
DEFAULT_TTL = 1.day
# forcefully abort a running handler after this many seconds.
# can be overriden when registering a handler.
DEFAULT_HANDLER_TIMEOUT = 600.seconds
# how many times we should try to run a handler before giving up
DEFAULT_HANDLER_EXECUTION_ATTEMPTS = 1
# how many seconds we should wait before retrying handler execution
DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY = 10.seconds
# how many exceptions should be tolerated before giving up
DEFAULT_EXCEPTION_LIMIT = 0
# server from which the message was received
attr_reader :server
# name of the queue on which the message was received
attr_reader :queue
# the AMQP header received with the message
attr_reader :header
# the uuid of the message
attr_reader :uuid
# message payload
attr_reader :data
# the message format version of the message
attr_reader :format_version
# flags sent with the message
attr_reader :flags
# unix timestamp after which the message should be considered stale
attr_reader :expires_at
# how many seconds the handler is allowed to execute
attr_reader :timeout
# how long to wait before retrying the message handler
attr_reader :delay
# how many times we should try to run the handler
attr_reader :attempts_limit
# how many exceptions we should tolerate before giving up
attr_reader :exceptions_limit
# exception raised by handler execution
attr_reader :exception
# value returned by handler execution
attr_reader :handler_result
def initialize(queue, header, body, opts = {})
@queue = queue
@header = header
@data = body
setup(opts)
decode
end
def setup(opts) #:nodoc:
@server = opts[:server]
@timeout = opts[:timeout] || DEFAULT_HANDLER_TIMEOUT
@delay = opts[:delay] || DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY
@attempts_limit = opts[:attempts] || DEFAULT_HANDLER_EXECUTION_ATTEMPTS
@exceptions_limit = opts[:exceptions] || DEFAULT_EXCEPTION_LIMIT
@attempts_limit = @exceptions_limit + 1 if @attempts_limit <= @exceptions_limit
@store = opts[:store]
end
# extracts various values form the AMQP header properties
def decode #:nodoc:
# p header.attributes
amqp_headers = header.attributes
@uuid = amqp_headers[:message_id]
headers = amqp_headers[:headers].symbolize_keys
@format_version = headers[:format_version].to_i
@flags = headers[:flags].to_i
@expires_at = headers[:expires_at].to_i
rescue Exception => @exception
Beetle::reraise_expectation_errors!
logger.error "Could not decode message. #{self.inspect}"
end
# build hash with options for the publisher
def self.publishing_options(opts = {}) #:nodoc:
flags = 0
flags |= FLAG_REDUNDANT if opts[:redundant]
expires_at = now + (opts[:ttl] || DEFAULT_TTL)
opts = opts.slice(*PUBLISHING_KEYS)
opts[:message_id] = generate_uuid.to_s
headers = (opts[:headers] ||= {})
headers.merge!(
:format_version => FORMAT_VERSION.to_s,
:flags => flags.to_s,
:expires_at => expires_at.to_s
)
opts
end
# the routing key
def routing_key
header.routing_key
end
alias_method :key, :routing_key
# unique message id. used to form various keys in the deduplication store.
def msg_id
@msg_id ||= "msgid:#{queue}:#{uuid}"
end
# current time (UNIX timestamp)
def now #:nodoc:
Time.now.to_i
end
# current time (UNIX timestamp)
def self.now #:nodoc:
Time.now.to_i
end
# a message has expired if the header expiration timestamp is msaller than the current time
def expired?
@expires_at < now
end
# generate uuid for publishing
def self.generate_uuid
UUID4R::uuid(1)
end
# whether the publisher has tried sending this message to two servers
def redundant?
@flags & FLAG_REDUNDANT == FLAG_REDUNDANT
end
# whether this is a message we can process without accessing the deduplication store
def simple?
!redundant? && attempts_limit == 1
end
# store handler timeout timestamp in the deduplication store
def set_timeout!
@store.set(msg_id, :timeout, now + timeout)
end
# handler timed out?
def timed_out?
(t = @store.get(msg_id, :timeout)) && t.to_i < now
end
# reset handler timeout in the deduplication store
def timed_out!
@store.set(msg_id, :timeout, 0)
end
# message handling completed?
def completed?
@store.get(msg_id, :status) == "completed"
end
# mark message handling complete in the deduplication store
def completed!
@store.set(msg_id, :status, "completed")
timed_out!
end
# whether we should wait before running the handler
def delayed?
(t = @store.get(msg_id, :delay)) && t.to_i > now
end
# store delay value in the deduplication store
def set_delay!
@store.set(msg_id, :delay, now + delay)
end
# how many times we already tried running the handler
def attempts
@store.get(msg_id, :attempts).to_i
end
# record the fact that we are trying to run the handler
def increment_execution_attempts!
@store.incr(msg_id, :attempts)
end
# whether we have already tried running the handler as often as specified when the handler was registered
def attempts_limit_reached?
(limit = @store.get(msg_id, :attempts)) && limit.to_i >= attempts_limit
end
# increment number of exception occurences in the deduplication store
def increment_exception_count!
@store.incr(msg_id, :exceptions)
end
# whether the number of exceptions has exceeded the limit set when the handler was registered
def exceptions_limit_reached?
@store.get(msg_id, :exceptions).to_i > exceptions_limit
end
# have we already seen this message? if not, set the status to "incomplete" and store
# the message exipration timestamp in the deduplication store.
def key_exists?
old_message = !@store.msetnx(msg_id, :status =>"incomplete", :expires => @expires_at, :timeout => now + timeout)
if old_message
logger.debug "Beetle: received duplicate message: #{msg_id} on queue: #{@queue}"
end
old_message
end
# aquire execution mutex before we run the handler (and delete it if we can't aquire it).
def aquire_mutex!
if mutex = @store.setnx(msg_id, :mutex, now)
logger.debug "Beetle: aquired mutex: #{msg_id}"
else
delete_mutex!
end
mutex
end
# delete execution mutex
def delete_mutex!
@store.del(msg_id, :mutex)
logger.debug "Beetle: deleted mutex: #{msg_id}"
end
# process this message and do not allow any exception to escape to the caller
def process(handler)
logger.debug "Beetle: processing message #{msg_id}"
result = nil
begin
result = process_internal(handler)
handler.process_exception(@exception) if @exception
handler.process_failure(result) if result.failure?
rescue Exception => e
Beetle::reraise_expectation_errors!
logger.warn "Beetle: exception '#{e}' during processing of message #{msg_id}"
logger.warn "Beetle: backtrace: #{e.backtrace.join("\n")}"
result = RC::InternalError
end
result
end
private
def process_internal(handler)
if @exception
ack!
RC::DecodingError
elsif expired?
logger.warn "Beetle: ignored expired message (#{msg_id})!"
ack!
RC::Ancient
elsif simple?
ack!
run_handler(handler) == RC::HandlerCrash ? RC::AttemptsLimitReached : RC::OK
elsif !key_exists?
run_handler!(handler)
elsif completed?
ack!
RC::OK
elsif delayed?
logger.warn "Beetle: ignored delayed message (#{msg_id})!"
RC::Delayed
elsif !timed_out?
RC::HandlerNotYetTimedOut
elsif attempts_limit_reached?
completed!
ack!
logger.warn "Beetle: reached the handler execution attempts limit: #{attempts_limit} on #{msg_id}"
RC::AttemptsLimitReached
elsif exceptions_limit_reached?
completed!
ack!
logger.warn "Beetle: reached the handler exceptions limit: #{exceptions_limit} on #{msg_id}"
RC::ExceptionsLimitReached
else
set_timeout!
if aquire_mutex!
run_handler!(handler)
else
RC::MutexLocked
end
end
end
def run_handler(handler)
Timer.timeout(@timeout.to_f) { @handler_result = handler.call(self) }
RC::OK
rescue Exception => @exception
ActiveRecord::Base.clear_all_connections! if defined?(ActiveRecord)
Beetle::reraise_expectation_errors!
logger.debug "Beetle: message handler crashed on #{msg_id}"
RC::HandlerCrash
ensure
ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord)
end
def run_handler!(handler)
increment_execution_attempts!
case result = run_handler(handler)
when RC::OK
completed!
ack!
result
else
handler_failed!(result)
end
end
def handler_failed!(result)
increment_exception_count!
if attempts_limit_reached?
completed!
ack!
logger.debug "Beetle: reached the handler execution attempts limit: #{attempts_limit} on #{msg_id}"
RC::AttemptsLimitReached
elsif exceptions_limit_reached?
completed!
ack!
logger.debug "Beetle: reached the handler exceptions limit: #{exceptions_limit} on #{msg_id}"
RC::ExceptionsLimitReached
else
delete_mutex!
timed_out!
set_delay!
result
end
end
# ack the message for rabbit. deletes all keys associated with this message in the
# deduplication store if we are sure this is the last message with the given msg_id.
def ack!
#:doc:
logger.debug "Beetle: ack! for message #{msg_id}"
header.ack
return if simple? # simple messages don't use the deduplication store
if !redundant? || @store.incr(msg_id, :ack_count) == 2
@store.del_keys(msg_id)
end
end
end
end