Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

addition of new metric #2060

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class PrometheusTailMonitorInput < Fluent::Plugin::Input

helpers :timer

config_param :interval, :time, default: 5
config_param :interval, :time, default: 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we need to move to more frequent interval ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if needs to be changed should be made in the config generated for the plug-in, not here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all changes done in the vendored_gem_src are restored back as it is not supposed to be changed for creating custom plugin. now all changes sit in fluentd/lib/ folder.

attr_reader :registry

MONITOR_IVARS = [
Expand Down Expand Up @@ -38,24 +38,31 @@ def configure(conf)
@base_labels[key] = expander.expand(value)
end

if defined?(Fluent::Plugin) && defined?(Fluent::Plugin::MonitorAgentInput)
# from v0.14.6
@monitor_agent = Fluent::Plugin::MonitorAgentInput.new
else
@monitor_agent = Fluent::MonitorAgentInput.new
end
@monitor_agent = Fluent::Plugin::MonitorAgentInput.new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the original intent of the if was lost

end

def start
super

@metrics = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing: the high level value which is numebr of Log lines dropped is missing ... can you advice on the formula to calculate this from bellow metrics ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this in @logLoss but this is not exposed as metric (why?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not exposed as this is a raw value we should be able to calculate from exposing what is possible to collect and what is actually collected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored back all changes done in vendored_gem_src as that is not supposed to be changed.

position: @registry.gauge(
position: get_gauge(
:fluentd_tail_file_position,
'Current position of file.'),
inode: @registry.gauge(
inode: get_gauge(
:fluentd_tail_file_inode,
'Current inode of file.'),
maxfsize: get_gauge(
:maxfsize,
'Current max fsize of file on rotation event'),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can drop the current? if this is the max fsize than it is enought to state that. Current might not be "user facing" value,.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to track file size. The interesting metrics are bytes_logged/bytes_collected. max file size is an implementation detail that could change in future (e.g. if we start streaming logs from crio directly via pipes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restored back all changes done in vendored_gem_src as it is not supposed to be changed. Final changes don't have maxfsize and countonrotate parameters getting published.

countonrotate: get_gauge(
:countonrotate,
'No of rotation noticed by fluentd'),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/No/Number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored back all changes done vendored_gem_src as it is not supposed to be changed.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/rotation/rotations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored back all changes done in vendored_gem_src as it is not supposed to be changed.

totalbytesread: get_gauge(
:totalbytesread,
'totalbytes read by fluentd IOHandler'),
totalbytesavailable: get_gauge(
:totalbytesavailable,
'totalbytes available at each instance of rotation - to be read by fluentd IOHandler'),
Copy link

@eranra eranra Feb 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that I understand the term ... maybe something with the words
(After reading the code I understand that this is the potential ... but I am not sure this is useful for custoemrs to be exposed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the terms in the JIRA, @eranra if you agree these are more expressive we should update the code:

bytes_logged (counter) bytes written to log files by containers.
bytes_collected (counter) bytes read by the collector for forwarding. 

Log loss can be computed by a prometheus query expression: (bytes_logged - bytes_collected)

@pmoogi-redhat note the metrics should be counters not guages. They always increase.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alanconway yes ... those terms are simple and make sense :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed parameter names from totalbytesavailable and totalbytesread to totalbytes_logged_in_disk
& totalbytes_collected_from_disk respectively. Got a separate class implementation to monitor stat for getting totalbytes_logged_in_disk

}
timer_execute(:in_prometheus_tail_monitor, @interval, &method(:update_monitor_info))
end
Expand All @@ -77,9 +84,18 @@ def update_monitor_info
# Access to internal variable of internal class...
# Very fragile implementation
pe = watcher.instance_variable_get(:@pe)
totalbytesread = watcher.instance_variable_get(:@totalbytesread)
totalbytesavailable = watcher.instance_variable_get(:@totalbytesavailable)
maxfsize = watcher.instance_variable_get(:@maxfsize)
countonrotate = watcher.instance_variable_get(:@countonrotate)
label = labels(info, watcher.path)
@metrics[:inode].set(label, pe.read_inode)
@metrics[:position].set(label, pe.read_pos)
@metrics[:maxfsize].set(label, maxfsize)
@metrics[:countonrotate].set(label, countonrotate)
@metrics[:totalbytesread].set(label, totalbytesread)
@metrics[:totalbytesavailable].set(label, totalbytesavailable)
#@log.info "IN PROMETHEUS PLUGIN pr.read_inode and pe.read_pos #{pe.read_inode} #{pe.read_pos} maxfsize #{maxfsize} countonrotate #{countonrotate} totalbytesread #{totalbytesread} totalbytesavailable #{totalbytesavailable} "
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider change to log.debug and remove comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored back all changes done in vendored_gem_src as that is not supposed to be changed.

end
end
end
Expand All @@ -91,5 +107,13 @@ def labels(plugin_info, path)
path: path,
)
end

def get_gauge(name, docstring)
if @registry.exist?(name)
@registry.get(name)
else
@registry.gauge(name, docstring)
end
end
end
end
67 changes: 57 additions & 10 deletions fluentd/vendored_gem_src/fluentd/lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,13 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e
@from_encoding = from_encoding
@encoding = encoding
@open_on_every_update = open_on_every_update
@totalbytesread=0
@totalbytesavailable=0
@inodelastfsize_map={}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as next comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eranra @alanconway resolved this in the new implementation.

@inodereadfsize_map={}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As rotations continue forever this map is gowing and growing ... I do not know how much ... but this feels like a memory leak to me ... need to see how large this gets in real-life and consider limiting this somehow in time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. But we should come up with a solution instead of allowing it to grow

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcantrill Maybe aggrigation of infomration from this map every set time for oldest inodes ... this can be second phase and not part of this PR from my PoV ... but we should not forget to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this map is necessary, and the logic can be simpler. See https://github.com/alanconway/file-metric/blob/b0cb7946c3e9dd6b607ef6f5e02218e5c1b33e94/cmd/file-metric/main.go#L69 which is in Go, but the logic can be reproduced in ruby. In fact we don't need the inode, the only thing that matters is when the file size drops, that means we have started a new rotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hash map based implementation is taken out to make implementation more performance efficient. A new write_watcher.rb is introduced to measure totalbytes_logged_in_disk.

@countonrotate=0
@maxfsize=0
@logloss=0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be required

end

attr_reader :path
Expand All @@ -519,6 +526,11 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e
attr_accessor :timer_trigger
attr_accessor :line_buffer, :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
attr_accessor :totalbytesavailable # This is used for removing position entry from PositionFile
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the remark (to be fixed in all variables)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved. please see my earlier comments.

attr_accessor :totalbytesread # This is used for removing position entry from PositionFile
attr_accessor :maxfsize # This is used for removing position entry from PositionFile
attr_accessor :countonrotate # This is used for removing position entry from PositionFile
attr_accessor :logloss # This is used for removing position entry from PositionFile

def tag
@parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
Expand All @@ -535,7 +547,7 @@ def attach

def detach
yield self
@io_handler.on_notify if @io_handler
@io_handler.on_notify(@inodereadfsize_map) if @io_handler
end

def close
Expand All @@ -553,12 +565,28 @@ def on_notify
stat = nil
end

@rotate_handler.on_notify(stat) if @rotate_handler
@totalbytesavailable=@rotate_handler.on_notify(stat,@inodelastfsize_map) if @rotate_handler
@line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
@io_handler.on_notify if @io_handler
@io_handler.on_notify(@inodereadfsize_map) if @io_handler

@logloss=0
if @io_handler && @rotate_handler
@totalbytesread=0
@countonrotate=0
@inodereadfsize_map.each do |k, v|
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be expensive loop after some time if the number of rotations is significant (maybe you can consolidate the info after some time to a variable and remove a lot of the data in that map)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need to expose metrics for each file instead of exposing a sum?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcantrill I think we want need one value ... prodinf a metric per file is not very useful and not very scalable. But, maybe the accomolation can be done in 'streaming' like manner and not doing the entire calculation every time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been learning about prometheus and here's what I think: we have 2 metrics, counters, bytes_loggged and bytes_collected. However those metrics have labels that allow the user to refine the set of log streams they care about. In Go this is called a CounterVec, not sure what it is in fluentd. See https://github.com/alanconway/file-metric/blob/b0cb7946c3e9dd6b607ef6f5e02218e5c1b33e94/cmd/file-metric/main.go
I need to do some research to figure out what the right set of labels is. In my go example it's just the file name, but we need to look at other observatorium label schemes and consider our own data model to find a set that makes sense for queries. The cardinality is always going to be per-log-file, but the log file name isn't a very obvious way for users to query.

@totalbytesread+=v
@countonrotate+=1
end

#@log.info "inodelastfsize_map #{@inodelastfsize_map} inodereadfsize_map #{@inodereadfsize_map}"
@logloss=@totalbytesavailable-@totalbytesread
#@log.info "totalbytesavailable #{@totalbytesavailable} totalbytesread #{@totalbytesread} logloss #{@logloss} countonrotate #{@countonrotate}"
end

end

def on_rotate(stat)
@maxfsize=@pe.read_pos
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to MAX on all rotations so you get to more accurate value over time ? (assuming this is coming from fixed value set by conmon)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't care about max file size. We only care about bytes_logged and bytes_collected. I think this code can be simplified to accumulate directly into the prometheus client metrics (like the Go code at https://github.com/alanconway/file-metric/blob/master/cmd/file-metric/main.go) so that the embedded prom client has an up-to-date view whenever there is a pull from prometheus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken out this change as maxfsize need not be monitored as a metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eranra MAX over all rotation is not required as rotation always happen when fsize > maxsize of log file.

if @io_handler.nil?
if stat
# first time
Expand Down Expand Up @@ -591,6 +619,7 @@ def on_rotate(stat)
@io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
else
@io_handler = NullIOHandler.new
@totalbytesread=0
end
else
watcher_needs_update = false
Expand All @@ -605,7 +634,7 @@ def on_rotate(stat)
else # file is rotated and new file found
watcher_needs_update = true
# Handle the old log file before renewing TailWatcher [fluentd#1055]
@io_handler.on_notify
@io_handler.on_notify(@inodereadfsize_map)
end
else # file is rotated and new file not found
# Clear RotateHandler to avoid duplicated file watch in same path.
Expand Down Expand Up @@ -732,21 +761,28 @@ def initialize(watcher, &receive_lines)
@io = nil
@notify_mutex = Mutex.new
@watcher.log.info "following tail of #{@watcher.path}"
@totalbytesread=0
@statatopen=nil
end

def on_notify
@notify_mutex.synchronize { handle_notify }
def on_notify(inodereadfsize_map)
@notify_mutex.synchronize { handle_notify(inodereadfsize_map) }
end

def handle_notify
def handle_notify(inodereadfsize_map)
with_io do |io|
stat = Fluent::FileWrapper.stat(@watcher.path)
@statatopen=stat
begin
read_more = false

if !io.nil? && @lines.empty?
ino=@statatopen.ino
begin
while true
@fifo << io.readpartial(8192, @iobuf)
buf = io.readpartial(8192, @iobuf)
@totalbytesread+=buf.bytesize
@fifo << buf
@fifo.read_lines(@lines)
if @lines.size >= @watcher.read_lines_limit
# not to use too much memory in case the file is very large
Expand All @@ -756,6 +792,7 @@ def handle_notify
end
rescue EOFError
end
inodereadfsize_map[ino]=@totalbytesread
end

unless @lines.empty?
Expand Down Expand Up @@ -784,7 +821,7 @@ def opened?
def open
io = Fluent::FileWrapper.open(@watcher.path)
io.seek(@watcher.pe.read_pos + @fifo.bytesize)
io
return io
rescue RangeError
io.close if io
raise WatcherSetupError, "seek error with #{@watcher.path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
Expand Down Expand Up @@ -840,9 +877,10 @@ def initialize(watcher, &on_rotate)
@inode = nil
@fsize = -1 # first
@on_rotate = on_rotate
@totalbytesavailable=0
end

def on_notify(stat)
def on_notify(stat,inodelastfsize_map)
if stat.nil?
inode = nil
fsize = 0
Expand All @@ -854,9 +892,18 @@ def on_notify(stat)
begin
if @inode != inode || fsize < @fsize
@on_rotate.call(stat)
inodelastfsize_map[@inode]=@fsize
@totalbytesavailable=@fsize
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of maxfsize if you are using the fsize for logloss computation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxfsize is taken out now as it is not required to be monitored.

end
@inode = inode
@fsize = fsize
inodelastfsize_map[inode]=fsize
@totalbytesavailable=0
inodelastfsize_map.each do | k , v |
@totalbytesavailable+=v
end

return @totalbytesavailable
end

rescue
Expand Down