-
Notifications
You must be signed in to change notification settings - Fork 306
/
common.rb
298 lines (251 loc) · 11.5 KB
/
common.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
require "logstash/outputs/elasticsearch/template_manager"
module LogStash; module Outputs; class ElasticSearch;
module Common
attr_reader :client, :hosts
# These are codes for temporary recoverable conditions
# 429 just means that ES has too much traffic ATM
# 503 means it , or a proxy is temporarily unavailable
RETRYABLE_CODES = [429, 503]
DLQ_CODES = [400, 404]
SUCCESS_CODES = [200, 201]
CONFLICT_CODE = 409
# When you use external versioning, you are communicating that you want
# to ignore conflicts. More obviously, since an external version is a
# constant part of the incoming document, we should not retry, as retrying
# will never succeed.
VERSION_TYPES_PERMITTING_CONFLICT = ["external", "external_gt", "external_gte"]
def register
@stopping = Concurrent::AtomicBoolean.new(false)
# To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior.
@dlq_writer = supports_dlq? ? execution_context.dlq_writer : nil
setup_hosts # properly sets @hosts
build_client
install_template
check_action_validity
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized))
end
# Receive an array of events and immediately attempt to index them (no buffering)
def multi_receive(events)
if @flush_size
events.each_slice(@flush_size) do |slice|
retrying_submit(slice.map {|e| event_action_tuple(e) })
end
else
retrying_submit(events.map {|e| event_action_tuple(e)})
end
end
# Convert the event into a 3-tuple of action, params, and event
def event_action_tuple(event)
params = event_action_params(event)
action = event.sprintf(@action)
[action, params, event]
end
def setup_hosts
@hosts = Array(@hosts)
if @hosts.empty?
@logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
@hosts.replace(["localhost"])
end
end
def install_template
TemplateManager.install_template(self)
end
def check_action_validity
raise LogStash::ConfigurationError, "No action specified!" unless @action
# If we're using string interpolation, we're good!
return if @action =~ /%{.+}/
return if valid_actions.include?(@action)
raise LogStash::ConfigurationError, "Action '#{@action}' is invalid! Pick one of #{valid_actions} or use a sprintf style statement"
end
# To be overidden by the -java version
VALID_HTTP_ACTIONS=["index", "delete", "create", "update"]
def valid_actions
VALID_HTTP_ACTIONS
end
def retrying_submit(actions)
# Initially we submit the full list of actions
submit_actions = actions
sleep_interval = @retry_initial_interval
while submit_actions && submit_actions.length > 0
# We retry with whatever is didn't succeed
begin
submit_actions = submit(submit_actions)
if submit_actions && submit_actions.size > 0
@logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request.", :count => submit_actions.size)
end
rescue => e
@logger.error("Encountered an unexpected error submitting a bulk request! Will retry.",
:error_message => e.message,
:class => e.class.name,
:backtrace => e.backtrace)
end
# Everything was a success!
break if !submit_actions || submit_actions.empty?
# If we're retrying the action sleep for the recommended interval
# Double the interval for the next time through to achieve exponential backoff
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
sleep_interval = next_sleep_interval(sleep_interval)
end
end
def sleep_for_interval(sleep_interval)
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
next_sleep_interval(sleep_interval)
end
def next_sleep_interval(current_interval)
doubled = current_interval * 2
doubled > @retry_max_interval ? @retry_max_interval : doubled
end
def submit(actions)
bulk_response = safe_bulk(actions)
# If the response is nil that means we were in a retry loop
# and aborted since we're shutting down
# If it did return and there are no errors we're good as well
return if bulk_response.nil? || !bulk_response["errors"]
actions_to_retry = []
bulk_response["items"].each_with_index do |response,idx|
action_type, action_props = response.first
status = action_props["status"]
failure = action_props["error"]
action = actions[idx]
action_params = action[1]
# Retry logic: If it is success, we move on. If it is a failure, we have 3 paths:
# - For 409, we log and drop. there is nothing we can do
# - For a mapping error, we send to dead letter queue for a human to intervene at a later point.
# - For everything else there's mastercard. Yep, and we retry indefinitely. This should fix #572 and other transient network issues
if SUCCESS_CODES.include?(status)
next
elsif CONFLICT_CODE == status
@logger.warn "Failed action.", status: status, action: action, response: response if !failure_type_logging_whitelist.include?(failure["type"])
next
elsif DLQ_CODES.include?(status)
action_event = action[2]
# To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior)
if @dlq_writer
# TODO: Change this to send a map with { :status => status, :action => action } in the future
@dlq_writer.write(action_event, "Could not index event to Elasticsearch. status: #{status}, action: #{action}, response: #{response}")
else
@logger.warn "Could not index event to Elasticsearch.", status: status, action: action, response: response
end
next
else
# only log what the user whitelisted
@logger.info "retrying failed action with response code: #{status} (#{failure})" if !failure_type_logging_whitelist.include?(failure["type"])
actions_to_retry << action
end
end
actions_to_retry
end
# get the action parameters for the given event
def event_action_params(event)
type = get_event_type(event)
params = {
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => event.sprintf(@index),
:_type => type,
:_routing => @routing ? event.sprintf(@routing) : nil
}
if @pipeline
params[:pipeline] = event.sprintf(@pipeline)
end
if @parent
params[:parent] = event.sprintf(@parent)
end
if @action == 'update'
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[:_script] = event.sprintf(@script) if @script != ""
params[:_retry_on_conflict] = @retry_on_conflict
end
if @version
params[:version] = event.sprintf(@version)
end
if @version_type
params[:version_type] = event.sprintf(@version_type)
end
params
end
# Determine the correct value for the 'type' field for the given event
def get_event_type(event)
# Set the 'type' value for the index.
type = if @document_type
event.sprintf(@document_type)
else
event.get("type") || "logs"
end
if !(type.is_a?(String) || type.is_a?(Numeric))
@logger.warn("Bad event type! Non-string/integer type value set!", :type_class => type.class, :type_value => type.to_s, :event => event)
end
type.to_s
end
# Rescue retryable errors during bulk submission
def safe_bulk(actions)
sleep_interval = @retry_initial_interval
begin
es_actions = actions.map {|action_type, params, event| [action_type, params, event.to_hash]}
response = @client.bulk(es_actions)
response
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError => e
# If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
# and let the user sort it out from there
@logger.error(
"Attempted to send a bulk request to elasticsearch'"+
" but Elasticsearch appears to be unreachable or down!",
:error_message => e.message,
:class => e.class.name,
:will_retry_in_seconds => sleep_interval
)
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)
# We retry until there are no errors! Errors should all go to the retry queue
sleep_interval = sleep_for_interval(sleep_interval)
retry unless @stopping.true?
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e
@logger.error(
"Attempted to send a bulk request to elasticsearch, but no there are no living connections in the connection pool. Perhaps Elasticsearch is unreachable or down?",
:error_message => e.message,
:class => e.class.name,
:will_retry_in_seconds => sleep_interval
)
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
sleep_interval = next_sleep_interval(sleep_interval)
retry unless @stopping.true?
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if RETRYABLE_CODES.include?(e.response_code)
log_hash = {:code => e.response_code, :url => e.url.sanitized}
log_hash[:body] = e.body if @logger.debug? # Generally this is too verbose
message = "Encountered a retryable error. Will Retry with exponential backoff "
# We treat 429s as a special case because these really aren't errors, but
# rather just ES telling us to back off a bit, which we do.
# The other retryable code is 503, which are true errors
# Even though we retry the user should be made aware of these
if e.response_code == 429
logger.debug(message, log_hash)
else
logger.error(message, log_hash)
end
sleep_interval = sleep_for_interval(sleep_interval)
retry
else
log_hash = {:code => e.response_code,
:response_body => e.response_body}
log_hash[:request_body] = e.request_body if @logger.debug?
@logger.error("Got a bad response code from server, but this code is not considered retryable. Request will be dropped", log_hash)
end
rescue => e
# Stuff that should never happen
# For all other errors print out full connection issues
@logger.error(
"An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely",
:error_message => e.message,
:error_class => e.class.name,
:backtrace => e.backtrace
)
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)
# We retry until there are no errors! Errors should all go to the retry queue
sleep_interval = sleep_for_interval(sleep_interval)
retry unless @stopping.true?
end
end
def supports_dlq?
respond_to?(:execution_context) && execution_context.respond_to?(:dlq_writer)
end
end
end; end; end