Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source "https://rubygems.org"

group :test do
gem 'simplecov', require: false
gem 'simplecov',"~> 0.16.1", require: false
end

gemspec
283 changes: 237 additions & 46 deletions lib/fluent/plugin/in_kubernetes_metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def start
super

timer_execute :metric_scraper, @interval, &method(:scrape_metrics)
timer_execute :stats_metric_scraper, @interval, &method(:scrape_stats_metrics)
timer_execute :cadvisor_metric_scraper, @interval, &method(:scrape_cadvisor_metrics)
end

Expand Down Expand Up @@ -192,10 +193,12 @@ def initialize_rest_client

if env_host && env_port
@kubelet_url = "http://#{env_host}:#{env_port}/stats/summary"
@kubelet_url_stats = "http://#{env_host}:#{env_port}/stats/"
@cadvisor_url = "http://#{env_host}:#{env_port}/metrics/cadvisor"
end

log.info("Use URL #{@kubelet_url} for creating client to query kubelet summary api")
log.info("Use URL #{@kubelet_url_stats} for creating client to query kubelet stats api")
log.info("Use URL #{@cadvisor_url} for creating client to query cadvisor metrics api")
end

Expand All @@ -205,6 +208,18 @@ def request_options
options
end

# This method is used to set the options for sending a request to the stats api
def request_options_stats
options = { method: 'get', url: @kubelet_url_stats }
options
end

# This method is used to set the options for sending a request to the cadvisor api
def cadvisor_request_options
options = { method: 'get', url: @cadvisor_url }
options
end

# @client.proxy_url only returns the url, but we need the resource, not just the url
def summary_api(node)
@summary_api =
Expand All @@ -216,8 +231,18 @@ def summary_api(node)
end
end

def stats_api(node)
@stats_api =
begin
@client.discover unless @client.discovered
@client.rest_client["/nodes/#{node}:#{@kubelet_port}/proxy/stats/"].tap do |endpoint|
log.info("Use URL #{endpoint.url} for scraping stats metrics")
end
end
end

def cadvisor_proxy_api(node)
@summary_api =
@cadvisor_api =
begin
@client.discover unless @client.discovered
@client.rest_client["/nodes/#{node}:#{@kubelet_port}/proxy/metrics/cadvisor"].tap do |endpoint|
Expand Down Expand Up @@ -298,6 +323,145 @@ def emit_system_container_metrics(node_name, container)
emit_memory_metrics tag: tag, metrics: container['memory'], labels: labels
end

def emit_stats_breakdown(stats)
stats_latest = stats[-1]
tag = 'stats'
stats_timestamp = stats_latest['timestamp']
labels = { 'stats' => 'stats' }
unless stats_latest['cpu'].nil?
emit_cpu_metrics_stats tag: tag, metrics: stats_latest['cpu'], labels: labels, time: stats_timestamp
end

unless stats_latest['diskio'].nil?
emit_diskio_metrics_stats tag: tag, metrics: stats_latest['diskio'], labels: labels, time: stats_timestamp
end

unless stats_latest['memory'].nil?
emit_memory_metrics_stats tag: tag, metrics: stats_latest['memory'], labels: labels, time: stats_timestamp
end

unless stats_latest['network'].nil?
emit_network_metrics_stats tag: tag, metrics: stats_latest['network'], labels: labels, time: stats_timestamp
end

unless stats_latest['filesystem'].nil?
emit_filesystem_metrics_stats tag: tag, metrics: stats_latest['filesystem'], labels: labels, time: stats_timestamp
end

unless stats_latest['task_stats'].nil?
emit_tasks_stats_metrics_stats tag: tag, metrics: stats_latest['task_stats'], labels: labels, time: stats_timestamp
end
end

def emit_cpu_metrics_stats(tag:, metrics:, labels:, time:)
if cpu_usage_total = metrics['usage']['total']
router.emit generate_tag("#{tag}.cpu.usage.total"), time, labels.merge('value' => cpu_usage_total / 1_000_000)
end
if cpu_usage_user = metrics['usage']['user']
router.emit generate_tag("#{tag}.cpu.usage.user"), time, labels.merge('value' => cpu_usage_user / 1_000_000)
end
if cpu_usage_system = metrics['usage']['system']
router.emit generate_tag("#{tag}.cpu.usage.system"), time, labels.merge('value' => cpu_usage_system / 1_000_000)
end

if cpu_cfs_periods = metrics['cfs']['periods']
router.emit generate_tag("#{tag}.cpu.cfs.periods"), time, labels.merge('value' => cpu_cfs_periods)
end
if cpu_cfs_throttled_periods = metrics['cfs']['throttled_periods']
router.emit generate_tag("#{tag}.cpu.cfs.throttled_periods"), time, labels.merge('value' => cpu_cfs_throttled_periods)
end
if cpu_cfs_throttled_time = metrics['cfs']['throttled_time']
router.emit generate_tag("#{tag}.cpu.cfs.throttled_time"), time, labels.merge('value' => cpu_cfs_throttled_time)
end
if cpu_load_average = metrics['load_average']
router.emit generate_tag("#{tag}.cpu.load_average"), time, labels.merge('value' => cpu_load_average)
end
end

def emit_diskio_metrics_stats(tag:, metrics:, labels:, time:)
%w[io_service_bytes io_serviced].each do |metric_name|
if current_io_metric = metrics[metric_name]
current_io_metric.each do |device|
if diskio_io_service_bytes_major = device['major']
router.emit generate_tag("#{tag}.diskio".concat(metric_name).concat("major")), time, labels.merge('device' => device['device'], 'value' => diskio_io_service_bytes_major)
end
if diskio_io_service_bytes_minor = device['minor']
router.emit generate_tag("#{tag}.diskio".concat(metric_name).concat("minor")), time, labels.merge('device' => device['device'], 'value' => diskio_io_service_bytes_minor)
end
device_stats = device['stats']
device_stats.each do | device_stat |
device_key, device_value = device_stat
router.emit generate_tag("#{tag}.diskio.".concat(metric_name).concat(".stats.").concat(device_key)), time, labels.merge('device' => device['device'], 'value' => device_value)
end
end
end
end
end

def emit_memory_metrics_stats(tag:, metrics:, labels:, time:)
%w[usage max_usage cache rss swap working_set failcnt].each do | metric_name |
if current_memory_metric = metrics[metric_name]
router.emit generate_tag("#{tag}.memory.".concat(metric_name)), time, labels.merge('value' => current_memory_metric)
end

end
%w[container_data hierarchical_data ].each do | metric_name_group |
if current_memory_metric_group = metrics[metric_name_group]
current_memory_metric_group.each do | metric_name |
metric_key, metric_value = metric_name
router.emit generate_tag("#{tag}.memory.".concat(metric_name_group).concat(".").concat(metric_key)), time, labels.merge('value' => metric_value)
end
end
end
end

def emit_network_metrics_stats(tag:, metrics:, labels:, time:)
network_name = metrics['name']
%w[rx_bytes rx_packets rx_errors rx_dropped tx_bytes tx_packets tx_errors tx_dropped].each do | metric_name |
if current_network_metric = metrics[metric_name]
router.emit generate_tag("#{tag}.network.".concat(network_name).concat(".").concat(metric_name)), time, labels.merge('value' => current_network_metric)
end
end

if network_interfaces = metrics['interfaces']
network_interfaces.each do | current_interface |
name = current_interface['name']
%w[rx_bytes rx_packets rx_errors rx_dropped tx_bytes tx_packets tx_errors tx_dropped].each do | current_metric|
if metric_value = current_interface[current_metric]
router.emit generate_tag("#{tag}.network.".concat(name).concat(".").concat(current_metric)), time, labels.merge('value' => metric_value)
end
end
end
end

%w[tcp tcp6 udp udp6].each do | metric_name_group |
if metric_group = metrics[metric_name_group]
metric_group.each do |current_metric|
metric_key, metric_value = current_metric
router.emit generate_tag("#{tag}.network.".concat(metric_name_group).concat(".").concat(metric_key)), time, labels.merge('value' => metric_value)
end
end
end
end

def emit_filesystem_metrics_stats(tag:, metrics:, labels:, time:)
metrics.each do | file_system |
device = file_system['device']
type = file_system['type']
file_system.each do | file_metric |
file_key , file_value = file_metric
router.emit generate_tag("#{tag}.filesystem.".concat(".").concat(file_key)), time, labels.merge('device' => device, 'type' => type, 'value' => file_value)
end
end
end

def emit_tasks_stats_metrics_stats(tag:, metrics:, labels:, time:)
metrics.each do | task_stats |
task_key, task_value = task_stats
router.emit generate_tag("#{tag}.tasks_stats.".concat(task_key)), time, labels.merge('value' => task_value)
end
end

def emit_node_metrics(node)
node_name = node['nodeName']
tag = 'node'
Expand Down Expand Up @@ -328,7 +492,7 @@ def emit_node_metrics(node)
node['systemContainers'].each do |c|
emit_system_container_metrics node_name, c
end
end
end
end

def emit_container_metrics(pod_labels, container)
Expand Down Expand Up @@ -364,35 +528,8 @@ def emit_metrics(metrics)
Array(metrics['pods']).each &method(:emit_pod_metrics).curry.call(metrics['node']['nodeName']) unless metrics['pods'].nil?
end

def scrape_metrics
if @use_rest_client
response = RestClient::Request.execute request_options
handle_response(response)
else
@node_names.each do |node|
response = summary_api(node).get(@client.headers)
handle_response(response)
end
end
end

# This method is used to handle responses from the kubelet summary api
def handle_response(response)
# Checking response codes only for a successful GET request viz., 2XX codes
if (response.code < 300) && (response.code > 199)
@scraped_at = Time.now
emit_metrics MultiJson.load(response.body)
else
log.error "ExMultiJson.load(response.body) expected 2xx from summary API, but got #{response.code}. Response body = #{response.body}"
end
rescue StandardError => error
log.error "Failed to scrape metrics, error=#{error.inspect}"
log.error_backtrace
end

def cadvisor_request_options
options = { method: 'get', url: @cadvisor_url }
options
def emit_stats_metrics(metrics)
emit_stats_breakdown(metrics['stats']) unless metrics['stats'].nil?
end

def emit_cadvisor_metrics(metrics)
Expand All @@ -410,34 +547,88 @@ def emit_cadvisor_metrics(metrics)
namespace = metric.match(/namespace="\S*"/).to_s
namespace = namespace.split('"')[1]
metric_labels = {'pod_name' => pod_name, 'image' => image_name, 'namespace' => namespace, 'value' => metric_val}
if metric.match(/^((?!container_name="POD").)*$/)
tag = 'pod'
tag = generate_tag("#{tag}#{metric_name.gsub('_', '.')}")
tag = tag.gsub('container', '')
else
container_name = metric.match(/container_name="\S*"/).to_s
container_name = container_name.split('"')[1]
container_label = {'container_name' => container_name}
metric_labels.merge(container_label)
tag = generate_tag("#{metric_name.gsub('_', '.')}")
end
if metric.match(/^((?!container_name="POD").)*$/)
tag = 'pod'
tag = generate_tag("#{tag}#{metric_name.gsub('_', '.')}")
tag = tag.gsub('container', '')
else
container_name = metric.match(/container_name="\S*"/).to_s
container_name = container_name.split('"')[1]
container_label = {'container_name' => container_name}
metric_labels.merge(container_label)
tag = generate_tag("#{metric_name.gsub('_', '.')}")
end
router.emit tag, @scraped_at_cadvisor, metric_labels
end
end
end
end

def scrape_metrics
if @use_rest_client
response = RestClient::Request.execute request_options
handle_response(response)
else
@node_names.each do |node|
response = summary_api(node).get(@client.headers)
handle_response(response)
end
end
end

def scrape_stats_metrics
if @use_rest_client
response_stats = RestClient::Request.execute request_options_stats
handle_stats_response(response_stats)
else
@node_names.each do |node|
response_stats = stats_api(node).get(@client.headers)
handle_stats_response(response_stats)
end
end
end

def scrape_cadvisor_metrics
if @use_rest_client
response = RestClient::Request.execute cadvisor_request_options
handle_cadvisor_response(response)
response_cadvisor = RestClient::Request.execute cadvisor_request_options
handle_cadvisor_response(response_cadvisor)
else
response = cadvisor_proxy_api(@node_name).get(@client.headers)
handle_cadvisor_response(response)
@node_names.each do |node|
response_cadvisor = cadvisor_proxy_api(node).get(@client.headers)
handle_cadvisor_response(response_cadvisor)
end
end
end

# This method is used to handle responses from the kubelet summary api
def handle_response(response)
# Checking response codes only for a successful GET request viz., 2XX codes
if (response.code < 300) && (response.code > 199)
@scraped_at = Time.now
emit_metrics MultiJson.load(response.body)
else
log.error "ExMultiJson.load(response.body) expected 2xx from summary API, but got #{response.code}. Response body = #{response.body}"
end
rescue StandardError => error
log.error "Failed to scrape metrics, error=#{error.inspect}"
log.error_backtrace
end

# This method is used to handle responses from the kubelet stats api
def handle_stats_response(response)
# Checking response codes only for a successful GET request viz., 2XX codes
if (response.code < 300) && (response.code > 199)
@scraped_at = Time.now
emit_stats_metrics MultiJson.load(response.body)
else
log.error "ExMultiJson.load(response.body) expected 2xx from stats API, but got #{response.code}. Response body = #{response.body}"
end
rescue StandardError => error
log.error "Failed to scrape metrics, error=#{error.inspect}"
log.error_backtrace
end

# This method is used to handle responses from the cadvisor api
def handle_cadvisor_response(response)
# Checking response codes only for a successful GET request viz., 2XX codes
if (response.code < 300) && (response.code > 199)
Expand Down
Loading