Skip to content

Commit

Permalink
in_tail: allow follow_inodes flag to tail files by inodes
Browse files Browse the repository at this point in the history
Based on closed PR fluent#1660 which
is posted by OleksiiDuzhyi because of incomplete test cases.
  • Loading branch information
kenhys committed Sep 29, 2020
1 parent adca029 commit adef9c5
Show file tree
Hide file tree
Showing 3 changed files with 472 additions and 55 deletions.
109 changes: 83 additions & 26 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def initialize
config_param :ignore_repeated_permission_error, :bool, default: false
desc 'Format path with the specified timezone'
config_param :path_timezone, :string, default: nil
desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files'
config_param :follow_inodes, :bool, default: false

config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do
config_argument :usage, :string, default: 'in_tail_parser'
Expand Down Expand Up @@ -154,6 +156,9 @@ def configure(conf)
end
@variable_store[@pos_file] = self.plugin_id
else
if @follow_inodes
raise Fluent::ConfigError, "Can't follow inodes without pos_file configuration parameter"
end
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
end
Expand Down Expand Up @@ -214,7 +219,7 @@ def start
FileUtils.mkdir_p(pos_file_dir, mode: @dir_perm) unless Dir.exist?(pos_file_dir)
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
@pf_file.sync = true
@pf = PositionFile.load(@pf_file, logger: log)
@pf = PositionFile.load(@pf_file, @follow_inodes, logger: log)

if @pos_file_compaction_interval
timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do
Expand Down Expand Up @@ -296,7 +301,31 @@ def expand_paths
end
path.include?('*') ? Dir.glob(path) : path
}.flatten.uniq
paths.uniq - excluded
# filter out non existing files, so in case pattern is without '*' we don't do unnecessary work
hash = {}
(paths - excluded).select { |path|
FileTest.exist?(path)
}.each { |path|
tuple = PathInodeTuple.new(path, Fluent::FileWrapper.stat(path).ino)
if @follow_inodes
hash[tuple.ino] = tuple
else
hash[tuple.path] = tuple
end
}
hash
end

def existence_path
hash = {}
@tails.keys.each {|path_ino|
if @follow_inodes
hash[path_ino.ino] = path_ino
else
hash[path_ino.path] = path_ino
end
}
hash
end

# in_tail with '*' path doesn't check rotation file equality at refresh phase.
Expand All @@ -305,21 +334,21 @@ def expand_paths
# In such case, you should separate log directory and specify two paths in path parameter.
# e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
def refresh_watchers
target_paths = expand_paths
existence_paths = @tails.keys
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug { "tailing paths: target = #{target_paths.join(",")} | existing = #{existence_paths.join(",")}" }

unwatched = existence_paths - target_paths
added = target_paths - existence_paths
unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty?
start_watchers(added) unless added.empty?
stop_watchers(unwatched_hash.values, immediate: false, unwatched: true) unless unwatched_hash.empty?
start_watchers(added_hash.values) unless added_hash.empty?
end

def setup_watcher(path, pe)
def setup_watcher(path, ino, pe)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, pe, log, @read_from_head, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))
tw = TailWatcher.new(path, ino, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -350,11 +379,13 @@ def setup_watcher(path, pe)
raise e
end

def start_watchers(paths)
paths.each { |path|
def start_watchers(paths_with_inodes)
paths_with_inodes.each { |path_with_inode|
path = path_with_inode.path
ino = path_with_inode.ino
pe = nil
if @pf
pe = @pf[path]
pe = @pf[path, ino]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
Expand All @@ -365,7 +396,7 @@ def start_watchers(paths)
end

begin
tw = setup_watcher(path, pe)
tw = setup_watcher(path, ino, pe)
rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
Expand All @@ -380,9 +411,9 @@ def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: tru
if tw
tw.unwatched = unwatched
if immediate
detach_watcher(tw, false)
detach_watcher(tw, path.ino, false)
else
detach_watcher_after_rotate_wait(tw)
detach_watcher_after_rotate_wait(tw, path.ino)
end
end
}
Expand All @@ -398,25 +429,40 @@ def close_watcher_handles
end

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(path, pe)
def update_watcher(path, ino, pe)
log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")

if @pf
unless pe.read_inode == @pf[path].read_inode
unless pe.read_inode == @pf[path, pe.read_inode].read_inode
log.debug "Skip update_watcher because watcher has been already updated by other inotify event"
return
end
end
rotated_tw = @tails[path]

@tails[path] = setup_watcher(path, pe)
tuple = PathInodeTuple.new(path, pe.read_inode)
detach_watcher_after_rotate_wait(rotated_tw) if rotated_tw
rotated_tw = @tails[tuple]

new_tuple = PathInodeTuple.new(path, inode)

if @follow_inodes
new_position_entry = @pf[path, inode]

if new_position_entry.read_inode == 0
@tails[new_tuple] = setup_watcher(path, inode, new_position_entry)
end
else
@tails[new_tuple] = setup_watcher(path, inode, pe)
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end

# TailWatcher#close is called by another thread at shutdown phase.
# It causes 'can't modify string; temporarily locked' error in IOHandler
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
def detach_watcher(tw, ino, close_io = true)
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end
Expand All @@ -425,15 +471,15 @@ def detach_watcher(tw, close_io = true)
tw.close if close_io

if tw.unwatched && @pf
@pf.unwatch(tw.path)
@pf.unwatch(tw.path, ino)
end
end

def detach_watcher_after_rotate_wait(tw)
def detach_watcher_after_rotate_wait(tw, ino)
# Call event_loop_attach/event_loop_detach is high-cost for short-live object.
# If this has a problem with large number of files, use @_event_loop directly instead of timer_execute.
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw)
detach_watcher(tw, ino)
end
end

Expand Down Expand Up @@ -601,10 +647,12 @@ def on_timer
end

class TailWatcher
def initialize(path, pe, log, read_from_head, update_watcher, line_buffer_timer_flusher, io_handler_build)
def initialize(path, ino, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = path
@ino = ino
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@follow_inodes = follow_inodes
@update_watcher = update_watcher
@log = log
@rotate_handler = RotateHandler.new(log, &method(:on_rotate))
Expand All @@ -614,7 +662,7 @@ def initialize(path, pe, log, read_from_head, update_watcher, line_buffer_timer_
@watchers = []
end

attr_reader :path
attr_reader :path, :ino
attr_reader :pe
attr_reader :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
Expand Down Expand Up @@ -709,7 +757,16 @@ def on_rotate(stat)
end

if watcher_needs_update
@update_watcher.call(@path, swap_state(@pe))
# No need to update a watcher if stat is nil (file not present), because moving to inodes will create
# new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
if stat
if @follow_inodes
# don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(@path, stat.ino, @pe)
else
@update_watcher.call(@path, stat.ino, swap_state(@pe))
end
end
else
@log.info "detected rotation of #{@path}"
@io_handler = io_handler
Expand Down
36 changes: 29 additions & 7 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,49 @@

module Fluent::Plugin
class TailInput < Fluent::Plugin::Input
class PathWithInode
def initialize(path, inode)
@path = path
@inode = inode
end

attr_accessor :path, :inode
end

class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze

def self.load(file, logger:)
pf = new(file, logger: logger)
def self.load(file, follow_inodes, logger:)
pf = new(file, follow_inodes, logger: logger)
pf.load
pf
end

def initialize(file, logger: nil)
def initialize(file, follow_inodes, logger: nil)
@file = file
@logger = logger
@file_mutex = Mutex.new
@map = {}
@follow_inodes = follow_inodes
end

def [](path)
if m = @map[path]
def [](path, inode)
if @follow_inodes && m = @map[inode]
return m
elsif !@follow_inodes && m = @map[path]
return m
end

@file_mutex.synchronize {
@file.seek(0, IO::SEEK_END)
seek = @file.pos + path.bytesize + 1
@file.write "#{path}\t0000000000000000\t0000000000000000\n"
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
if @follow_inodes
@map[inode] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
else
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
end
}
end

Expand All @@ -69,7 +85,11 @@ def load
pos = m[2].to_i(16)
ino = m[3].to_i(16)
seek = @file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
if @follow_inodes
map[ino] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
else
map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
end
end
end

Expand Down Expand Up @@ -224,5 +244,7 @@ def read_inode
@inode
end
end

PathInodeTuple = Struct.new(:path, :ino)
end
end

0 comments on commit adef9c5

Please sign in to comment.