/
file.rb
156 lines (132 loc) · 5.44 KB
/
file.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "pathname"
require "socket" # for Socket.gethostname
# Stream events from files.
#
# By default, each event is assumed to be one line. If you would like
# to join multiple log lines into one event, you'll want to use the
# multiline codec.
#
# Files are followed in a manner similar to `tail -0F`. File rotation
# is detected and handled by this input.
class LogStash::Inputs::File < LogStash::Inputs::Base
config_name "file"
# TODO(sissel): This should switch to use the `line` codec by default
# once file following
default :codec, "plain"
# The path(s) to the file(s) to use as an input.
# You can use globs here, such as `/var/log/*.log`
# Paths must be absolute and cannot be relative.
#
# You may also configure multiple paths. See an example
# on the [Logstash configuration page](configuration#array).
config :path, :validate => :array, :required => true
# Exclusions (matched against the filename, not full path). Globs
# are valid here, too. For example, if you have
# [source,ruby]
# path => "/var/log/*"
#
# You might want to exclude gzipped files:
# [source,ruby]
# exclude => "*.gz"
config :exclude, :validate => :array
# How often (in seconds) we stat files to see if they have been modified.
# Increasing this interval will decrease the number of system calls we make,
# but increase the time to detect new log lines.
config :stat_interval, :validate => :number, :default => 1
# How often (in seconds) we expand globs to discover new files to watch.
config :discover_interval, :validate => :number, :default => 15
# Where to write the sincedb database (keeps track of the current
# position of monitored log files). The default will write
# sincedb files to some path matching `$HOME/.sincedb*`
config :sincedb_path, :validate => :string
# How often (in seconds) to write a since database with the current position of
# monitored log files.
config :sincedb_write_interval, :validate => :number, :default => 15
# Choose where Logstash starts initially reading files: at the beginning or
# at the end. The default behavior treats files like live streams and thus
# starts at the end. If you have old data you want to import, set this
# to 'beginning'
#
# This option only modifies "first contact" situations where a file is new
# and not seen before. If a file has already been seen before, this option
# has no effect.
config :start_position, :validate => [ "beginning", "end"], :default => "end"
# set the new line delimiter, defaults to "\n"
config :delimiter, :validate => :string, :default => "\n"
public
def register
require "addressable/uri"
require "filewatch/tail"
require "digest/md5"
@logger.info("Registering file input", :path => @path)
@tail_config = {
:exclude => @exclude,
:stat_interval => @stat_interval,
:discover_interval => @discover_interval,
:sincedb_write_interval => @sincedb_write_interval,
:delimiter => @delimiter,
:logger => @logger,
}
@path.each do |path|
if Pathname.new(path).relative?
raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
end
end
if @sincedb_path.nil?
if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil?
@logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \
"to keep track of the files I'm watching. Either set " \
"HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \
"in your Logstash config for the file input with " \
"path '#{@path.inspect}'")
raise # TODO(sissel): HOW DO I FAIL PROPERLY YO
end
#pick SINCEDB_DIR if available, otherwise use HOME
sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"]
# Join by ',' to make it easy for folks to know their own sincedb
# generated path (vs, say, inspecting the @path array)
@sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@path.join(",")))
# Migrate any old .sincedb to the new file (this is for version <=1.1.1 compatibility)
old_sincedb = File.join(sincedb_dir, ".sincedb")
if File.exists?(old_sincedb)
@logger.info("Renaming old ~/.sincedb to new one", :old => old_sincedb,
:new => @sincedb_path)
File.rename(old_sincedb, @sincedb_path)
end
@logger.info("No sincedb_path set, generating one based on the file path",
:sincedb_path => @sincedb_path, :path => @path)
end
@tail_config[:sincedb_path] = @sincedb_path
if @start_position == "beginning"
@tail_config[:start_new_files_at] = :beginning
end
end # def register
public
def run(queue)
@tail = FileWatch::Tail.new(@tail_config)
@tail.logger = @logger
@path.each { |path| @tail.tail(path) }
hostname = Socket.gethostname
@tail.subscribe do |path, line|
@logger.debug? && @logger.debug("Received line", :path => path, :text => line)
@codec.decode(line) do |event|
decorate(event)
event["host"] = hostname if !event.include?("host")
event["path"] = path
queue << event
end
end
finished
end # def run
public
def teardown
if @tail
@tail.sincedb_write
@tail.quit
@tail = nil
end
end # def teardown
end # class LogStash::Inputs::File