-
Notifications
You must be signed in to change notification settings - Fork 21.4k
/
log_subscriber.rb
210 lines (179 loc) · 6.55 KB
/
log_subscriber.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# frozen_string_literal: true
require "active_support/log_subscriber"
module ActiveJob
class LogSubscriber < ActiveSupport::LogSubscriber # :nodoc:
class_attribute :backtrace_cleaner, default: ActiveSupport::BacktraceCleaner.new
def enqueue(event)
job = event.payload[:job]
ex = event.payload[:exception_object] || job.enqueue_error
if ex
error do
"Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})"
end
elsif event.payload[:aborted]
info do
"Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
end
else
info do
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
end
end
end
subscribe_log_level :enqueue, :info
def enqueue_at(event)
job = event.payload[:job]
ex = event.payload[:exception_object] || job.enqueue_error
if ex
error do
"Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})"
end
elsif event.payload[:aborted]
info do
"Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
end
else
info do
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
end
end
end
subscribe_log_level :enqueue_at, :info
def enqueue_all(event)
info do
jobs = event.payload[:jobs]
adapter = event.payload[:adapter]
enqueued_count = event.payload[:enqueued_count]
if enqueued_count == jobs.size
enqueued_jobs_message(adapter, jobs)
elsif jobs.any?(&:successfully_enqueued?)
enqueued_jobs = jobs.select(&:successfully_enqueued?)
failed_enqueue_count = jobs.size - enqueued_count
if failed_enqueue_count == 0
enqueued_jobs_message(adapter, enqueued_jobs)
else
"#{enqueued_jobs_message(adapter, enqueued_jobs)}. "\
"Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)}"
end
else
failed_enqueue_count = jobs.size - enqueued_count
"Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)} "\
"to #{ActiveJob.adapter_name(adapter)}"
end
end
end
subscribe_log_level :enqueue_all, :info
def perform_start(event)
info do
job = event.payload[:job]
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at.utc.iso8601(9)}" + args_info(job)
end
end
subscribe_log_level :perform_start, :info
def perform(event)
job = event.payload[:job]
ex = event.payload[:exception_object]
if ex
error do
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
end
elsif event.payload[:aborted]
error do
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: a before_perform callback halted the job execution"
end
else
info do
"Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
end
end
end
subscribe_log_level :perform, :info
def enqueue_retry(event)
job = event.payload[:job]
ex = event.payload[:error]
wait = event.payload[:wait]
info do
if ex
"Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds, due to a #{ex.class} (#{ex.message})."
else
"Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds."
end
end
end
subscribe_log_level :enqueue_retry, :info
def retry_stopped(event)
job = event.payload[:job]
ex = event.payload[:error]
error do
"Stopped retrying #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message}), which reoccurred on #{job.executions} attempts."
end
end
subscribe_log_level :enqueue_retry, :error
def discard(event)
job = event.payload[:job]
ex = event.payload[:error]
error do
"Discarded #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message})."
end
end
subscribe_log_level :discard, :error
private
def queue_name(event)
ActiveJob.adapter_name(event.payload[:adapter]) + "(#{event.payload[:job].queue_name})"
end
def args_info(job)
if job.class.log_arguments? && job.arguments.any?
" with arguments: " +
job.arguments.map { |arg| format(arg).inspect }.join(", ")
else
""
end
end
def format(arg)
case arg
when Hash
arg.transform_values { |value| format(value) }
when Array
arg.map { |value| format(value) }
when GlobalID::Identification
arg.to_global_id rescue arg
else
arg
end
end
def scheduled_at(event)
Time.at(event.payload[:job].scheduled_at).utc
end
def logger
ActiveJob::Base.logger
end
def info(progname = nil, &block)
return unless super
if ActiveJob.verbose_enqueue_logs
log_enqueue_source
end
end
def error(progname = nil, &block)
return unless super
if ActiveJob.verbose_enqueue_logs
log_enqueue_source
end
end
def log_enqueue_source
source = extract_enqueue_source_location(caller)
if source
logger.info("↳ #{source}")
end
end
def extract_enqueue_source_location(locations)
backtrace_cleaner.clean(locations.lazy).first
end
def enqueued_jobs_message(adapter, enqueued_jobs)
enqueued_count = enqueued_jobs.size
job_classes_counts = enqueued_jobs.map(&:class).tally.sort_by { |_k, v| -v }
"Enqueued #{enqueued_count} #{'job'.pluralize(enqueued_count)} to #{ActiveJob.adapter_name(adapter)}"\
" (#{job_classes_counts.map { |klass, count| "#{count} #{klass}" }.join(', ')})"
end
end
end
ActiveJob::LogSubscriber.attach_to :active_job