-
Notifications
You must be signed in to change notification settings - Fork 97
/
grok.rb
553 lines (486 loc) · 19.3 KB
/
grok.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/environment"
require "logstash/patterns/core"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require "grok-pure" # rubygem 'jls-grok'
require "timeout"
# Parse arbitrary text and structure it.
#
# Grok is currently the best way in Logstash to parse unstructured log
# data into something structured and queryable.
#
# This tool is perfect for syslog logs, apache and other webserver logs, mysql
# logs, and in general, any log format that is generally written for humans
# and not computer consumption.
#
# Logstash ships with about 120 patterns by default. You can find them here:
# <https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns>. You can add
# your own trivially. (See the `patterns_dir` setting)
#
# If you need help building patterns to match your logs, you will find the
# <http://grokdebug.herokuapp.com> and <http://grokconstructor.appspot.com/> applications quite useful!
#
# ==== Grok Basics
#
# Grok works by combining text patterns into something that matches your
# logs.
#
# The syntax for a grok pattern is `%{SYNTAX:SEMANTIC}`
#
# The `SYNTAX` is the name of the pattern that will match your text. For
# example, `3.44` will be matched by the `NUMBER` pattern and `55.3.244.1` will
# be matched by the `IP` pattern. The syntax is how you match.
#
# The `SEMANTIC` is the identifier you give to the piece of text being matched.
# For example, `3.44` could be the duration of an event, so you could call it
# simply `duration`. Further, a string `55.3.244.1` might identify the `client`
# making a request.
#
# For the above example, your grok filter would look something like this:
# [source,ruby]
# %{NUMBER:duration} %{IP:client}
#
# Optionally you can add a data type conversion to your grok pattern. By default
# all semantics are saved as strings. If you wish to convert a semantic's data type,
# for example change a string to an integer then suffix it with the target data type.
# For example `%{NUMBER:num:int}` which converts the `num` semantic from a string to an
# integer. Currently the only supported conversions are `int` and `float`.
#
# .Examples:
#
# With that idea of a syntax and semantic, we can pull out useful fields from a
# sample log like this fictional http request log:
# [source,ruby]
# 55.3.244.1 GET /index.html 15824 0.043
#
# The pattern for this could be:
# [source,ruby]
# %{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
#
# A more realistic example, let's read these logs from a file:
# [source,ruby]
# input {
# file {
# path => "/var/log/http.log"
# }
# }
# filter {
# grok {
# match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
# }
# }
#
# After the grok filter, the event will have a few extra fields in it:
#
# * `client: 55.3.244.1`
# * `method: GET`
# * `request: /index.html`
# * `bytes: 15824`
# * `duration: 0.043`
#
# ==== Regular Expressions
#
# Grok sits on top of regular expressions, so any regular expressions are valid
# in grok as well. The regular expression library is Oniguruma, and you can see
# the full supported regexp syntax https://github.com/kkos/oniguruma/blob/master/doc/RE[on the Oniguruma
# site].
#
# ==== Custom Patterns
#
# Sometimes logstash doesn't have a pattern you need. For this, you have
# a few options.
#
# First, you can use the Oniguruma syntax for named capture which will
# let you match a piece of text and save it as a field:
# [source,ruby]
# (?<field_name>the pattern here)
#
# For example, postfix logs have a `queue id` that is an 10 or 11-character
# hexadecimal value. I can capture that easily like this:
# [source,ruby]
# (?<queue_id>[0-9A-F]{10,11})
#
# Alternately, you can create a custom patterns file.
#
# * Create a directory called `patterns` with a file in it called `extra`
# (the file name doesn't matter, but name it meaningfully for yourself)
# * In that file, write the pattern you need as the pattern name, a space, then
# the regexp for that pattern.
#
# For example, doing the postfix queue id example as above:
# [source,ruby]
# # contents of ./patterns/postfix:
# POSTFIX_QUEUEID [0-9A-F]{10,11}
#
# Then use the `patterns_dir` setting in this plugin to tell logstash where
# your custom patterns directory is. Here's a full example with a sample log:
# [source,ruby]
# Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
# [source,ruby]
# filter {
# grok {
# patterns_dir => ["./patterns"]
# match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
# }
# }
#
# The above will match and result in the following fields:
#
# * `timestamp: Jan 1 06:25:43`
# * `logsource: mailserver14`
# * `program: postfix/cleanup`
# * `pid: 21403`
# * `queue_id: BEF25A72965`
# * `syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>`
#
# The `timestamp`, `logsource`, `program`, and `pid` fields come from the
# `SYSLOGBASE` pattern which itself is defined by other patterns.
#
# Another option is to define patterns _inline_ in the filter using `pattern_definitions`.
# This is mostly for convenience and allows user to define a pattern which can be used just in that
# filter. This newly defined patterns in `pattern_definitions` will not be available outside of that particular `grok` filter.
#
class LogStash::Filters::Grok < LogStash::Filters::Base
include LogStash::PluginMixins::ECSCompatibilitySupport
config_name "grok"
# A hash of matches of field => value
#
# For example:
# [source,ruby]
# filter {
# grok { match => { "message" => "Duration: %{NUMBER:duration}" } }
# }
#
# If you need to match multiple patterns against a single field, the value can be an array of patterns
# [source,ruby]
# filter {
# grok { match => { "message" => [ "Duration: %{NUMBER:duration}", "Speed: %{NUMBER:speed}" ] } }
# }
#
config :match, :validate => :hash, :default => {}
#
# Logstash ships by default with a bunch of patterns, so you don't
# necessarily need to define this yourself unless you are adding additional
# patterns. You can point to multiple pattern directories using this setting.
# Note that Grok will read all files in the directory matching the patterns_files_glob
# and assume it's a pattern file (including any tilde backup files).
# [source,ruby]
# patterns_dir => ["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]
#
# Pattern files are plain text with format:
# [source,ruby]
# NAME PATTERN
#
# For example:
# [source,ruby]
# NUMBER \d+
#
# The patterns are loaded when the pipeline is created.
config :patterns_dir, :validate => :array, :default => []
# A hash of pattern-name and pattern tuples defining custom patterns to be used by
# the current filter. Patterns matching existing names will override the pre-existing
# definition. Think of this as inline patterns available just for this definition of
# grok
config :pattern_definitions, :validate => :hash, :default => {}
# Glob pattern, used to select the pattern files in the directories
# specified by patterns_dir
config :patterns_files_glob, :validate => :string, :default => "*"
# Break on first match. The first successful match by grok will result in the
# filter being finished. If you want grok to try all patterns (maybe you are
# parsing different things), then set this to false.
config :break_on_match, :validate => :boolean, :default => true
# If `true`, only store named captures from grok.
config :named_captures_only, :validate => :boolean, :default => true
# If `true`, keep empty captures as event fields.
config :keep_empty_captures, :validate => :boolean, :default => false
# Define the target field for placing the matched captures.
# If this setting is omitted, data gets stored at the root (top level) of the event.
config :target, :validate => :string
# Append values to the `tags` field when there has been no
# successful match
config :tag_on_failure, :validate => :array, :default => ["_grokparsefailure"]
# Attempt to terminate regexps after this amount of time.
# This applies per pattern if multiple patterns are applied
# This will never timeout early, but may take a little longer to timeout.
# Actual timeout is approximate based on a 250ms quantization.
# Set to 0 to disable timeouts
config :timeout_millis, :validate => :number, :default => 30000
# When multiple patterns are provided to `match`,
# the timeout has historically applied to _each_ pattern, incurring overhead
# for each and every pattern that is attempted; when the grok filter is
# configured with `timeout_scope => 'event'`, the plugin instead enforces
# a single timeout across all attempted matches on the event, so it can
# achieve similar safeguard against runaway matchers with significantly
# less overhead.
# It's usually better to scope the timeout for the whole event.
config :timeout_scope, :validate => %w(pattern event), :default => "pattern"
# Tag to apply if a grok regexp times out.
config :tag_on_timeout, :validate => :string, :default => '_groktimeout'
# The fields to overwrite.
#
# This allows you to overwrite a value in a field that already exists.
#
# For example, if you have a syslog line in the `message` field, you can
# overwrite the `message` field with part of the match like so:
# [source,ruby]
# filter {
# grok {
# match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
# overwrite => [ "message" ]
# }
# }
#
# In this case, a line like `May 29 16:37:11 sadness logger: hello world`
# will be parsed and `hello world` will overwrite the original message.
config :overwrite, :validate => :array, :default => []
def register
# a cache of capture name handler methods.
@handlers = {}
@patternfiles = []
# Have (default) patterns_path show first. Last-in pattern definitions wins
# this will let folks redefine built-in patterns at runtime
@patternfiles += patterns_files_from_paths(patterns_path, "*")
@patternfiles += patterns_files_from_paths(@patterns_dir, @patterns_files_glob)
@patterns = Hash.new { |h,k| h[k] = [] }
@logger.debug("Match data", :match => @match)
@metric_match_fields = metric.namespace(:patterns_per_field)
@match.each do |field, patterns|
patterns = [patterns] if patterns.is_a?(String)
@metric_match_fields.gauge(field, patterns.length)
@logger.trace? && @logger.trace("Grok compile", :field => field, :patterns => patterns)
patterns.each do |pattern|
@logger.debug? && @logger.debug("regexp: #{@type}/#{field}", :pattern => pattern)
grok = Grok.new
grok.logger = @logger
add_patterns_from_files(@patternfiles, grok)
add_patterns_from_inline_definition(@pattern_definitions, grok)
grok.compile(pattern, @named_captures_only)
@patterns[field] << grok
end
end # @match.each
@match_counter = metric.counter(:matches)
@failure_counter = metric.counter(:failures)
@target = "[#{@target.strip}]" if @target && @target !~ /\[.*?\]/
@timeout = @timeout_millis > 0.0 ? RubyTimeout.new(@timeout_millis) : NoopTimeout::INSTANCE
@matcher = ( @timeout_scope.eql?('event') ? EventTimeoutMatcher : PatternTimeoutMatcher ).new(self)
end # def register
def filter(event)
matched = false
@logger.debug? && @logger.debug("Running grok filter", :event => event.to_hash)
@patterns.each do |field, groks|
if match(groks, field, event)
matched = true
break if @break_on_match
end
end
if matched
@match_counter.increment(1)
filter_matched(event)
else
@failure_counter.increment(1)
@tag_on_failure.each {|tag| event.tag(tag)}
end
@logger.debug? && @logger.debug("Event now: ", :event => event.to_hash)
rescue GrokTimeoutException => e
@logger.warn(e.message)
metric.increment(:timeouts)
event.tag(@tag_on_timeout)
end # def filter
def close
end
private
# The default pattern paths, depending on environment.
def patterns_path
patterns_path = []
case ecs_compatibility
when :disabled
patterns_path << LogStash::Patterns::Core.path # :legacy
when :v1
patterns_path << LogStash::Patterns::Core.path('ecs-v1')
when :v8
@logger.warn("ECS v8 support is a preview of the unreleased ECS v8, and uses the v1 patterns. When Version 8 of the Elastic Common Schema becomes available, this plugin will need to be updated")
patterns_path << LogStash::Patterns::Core.path('ecs-v1')
else
fail(NotImplementedError, "ECS #{ecs_compatibility} is not supported by this plugin.")
end
# allow plugin to be instantiated outside the LS environment (in tests)
if defined? LogStash::Environment.pattern_path
patterns_path << LogStash::Environment.pattern_path("*")
end
patterns_path
end
def match(groks, field, event)
input = event.get(field)
if input.is_a?(Array)
success = false
input.each do |input|
success |= match_against_groks(groks, field, input, event)
end
return success
else
match_against_groks(groks, field, input, event)
end
rescue StandardError => e
@logger.warn("Grok regexp threw exception", :message => e.message, :exception => e.class, :backtrace => e.backtrace)
return false
end
def match_against_groks(groks, field, input, event)
# Convert anything else to string (number, hash, etc)
context = GrokContext.new(field, input.to_s)
@matcher.match(context, groks, event, @break_on_match)
end
# Internal (base) helper to handle the global timeout switch.
# @private
class Matcher
def initialize(filter)
@filter = filter
end
def match(context, groks, event, break_on_match)
matched = false
groks.each do |grok|
context.set_grok(grok)
matched = execute(context, grok)
if matched
grok.capture(matched) { |field, value| @filter.handle(field, value, event) }
break if break_on_match
end
end
matched
end
protected
def execute(context, grok)
grok.execute(context.input)
end
end
# @private
class EventTimeoutMatcher < Matcher
# @override
def match(context, groks, event, break_on_match)
@filter.with_timeout(context) { super }
end
end
# @private
class PatternTimeoutMatcher < Matcher
# @override
def execute(context, grok)
@filter.with_timeout(context) { super }
end
end
def handle(field, value, event)
return if (value.nil? || (value.is_a?(String) && value.empty?)) unless @keep_empty_captures
target_field = @target ? "#{@target}[#{field}]" : field
if @overwrite.include?(field)
event.set(target_field, value)
else
v = event.get(target_field)
if v.nil?
event.set(target_field, value)
elsif v.is_a?(Array)
# do not replace the code below with:
# event[field] << value
# this assumes implementation specific feature of returning a mutable object
# from a field ref which should not be assumed and will change in the future.
v << value
event.set(target_field, v)
elsif v.is_a?(String)
# Promote to array since we aren't overwriting.
event.set(target_field, [v, value])
else
@logger.debug("Not adding matched value - found existing (#{v.class})", :field => target_field, :value => value)
end
end
end
public :handle
def patterns_files_from_paths(paths, glob)
patternfiles = []
@logger.debug("Grok patterns path", :paths => paths)
paths.each do |path|
if File.directory?(path)
path = File.join(path, glob)
end
Dir.glob(path).each do |file|
@logger.trace("Grok loading patterns from file", :path => file)
if File.directory?(file)
@logger.debug("Skipping path because it is a directory", :path => file)
else
patternfiles << file
end
end
end
patternfiles
end # def patterns_files_from_paths
def add_patterns_from_files(paths, grok)
paths.each do |path|
if !File.exists?(path)
raise "Grok pattern file does not exist: #{path}"
end
grok.add_patterns_from_file(path)
end
end # def add_patterns_from_files
def add_patterns_from_inline_definition(pattern_definitions, grok)
pattern_definitions.each do |name, pattern|
next if pattern.nil?
grok.add_pattern(name, pattern.chomp)
end
end
class TimeoutError < RuntimeError; end
class GrokTimeoutException < Exception
attr_reader :grok, :field, :value
def initialize(grok, field, value)
@grok = grok
@field = field
@value = value
end
def message
"Timeout executing grok '#{@grok.pattern}' against field '#{field}' with value '#{trunc_value}'!"
end
def trunc_value
if value.size <= 255 # If no more than 255 chars
value
else
"Value too large to output (#{value.bytesize} bytes)! First 255 chars are: #{value[0..255]}"
end
end
end
def with_timeout(context, &block)
@timeout.exec(&block)
rescue TimeoutError => error
handle_timeout(context, error)
end
public :with_timeout
def handle_timeout(context, error)
raise GrokTimeoutException.new(context.grok, context.field, context.input)
end
# @private
class GrokContext
attr_reader :grok, :field, :input
def initialize(field, input)
@field = field
@input = input
end
def set_grok(grok)
@grok = grok
end
end
# @private
class NoopTimeout
INSTANCE = new
def exec
yield
end
end
# @private
class RubyTimeout
def initialize(timeout_millis)
# divide by float to allow fractional seconds, the Timeout class timeout value is in seconds but the underlying
# executor resolution is in microseconds so fractional second parameter down to microseconds is possible.
# see https://github.com/jruby/jruby/blob/9.2.7.0/core/src/main/java/org/jruby/ext/timeout/Timeout.java#L125
@timeout_seconds = timeout_millis / 1000.0
end
def exec(&block)
Timeout.timeout(@timeout_seconds, TimeoutError, &block)
end
end
end # class LogStash::Filters::Grok