Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source "https://rubygems.org"
source 'https://rubygems.org'

group :test do
gem 'simplecov',"~> 0.16.1", require: false
gem 'simplecov', '~> 0.16.1', require: false
end

gemspec
12 changes: 6 additions & 6 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require "bundler"
require 'bundler'
Bundler::GemHelper.install_tasks

require "rake/testtask"
require 'rake/testtask'
require 'rake/clean'

CLEAN.concat FileList[
Expand All @@ -10,8 +10,8 @@ CLEAN.concat FileList[
]

Rake::TestTask.new(:test) do |t|
t.libs.push("lib", "test")
t.test_files = FileList["test/**/test_*.rb"]
t.libs.push('lib', 'test')
t.test_files = FileList['test/**/test_*.rb']
t.verbose = true
t.warning = false
end
Expand All @@ -20,8 +20,8 @@ task default: [:test]

namespace :docker do
desc 'Build docker image'
task :build, [:tag] => :build do |t, args|
raise "Argument `tag` was not provided." unless args.tag
task :build, [:tag] => :build do |_t, args|
raise 'Argument `tag` was not provided.' unless args.tag

cp Dir['pkg/fluent-plugin-kubernetes-metrics-*.gem'], 'docker/'
sh "docker build --no-cache -t splunk/connect-for-kubernetes:#{args.tag} ./docker"
Expand Down
1 change: 1 addition & 0 deletions VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.1.0
43 changes: 20 additions & 23 deletions fluent-plugin-kubernetes-metrics.gemspec
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
lib = File.expand_path("../lib", __FILE__)
lib = File.expand_path('lib', __dir__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |spec|
spec.name = "fluent-plugin-kubernetes-metrics"
spec.version = "1.1.0.Alpha"
spec.authors = ["Don Tregonning", "Chaitanya Phalak"]
spec.email = ["team-da-sf@splunk.com"]

spec.summary = %q{A fluentd input plugin that collects kubernetes cluster metrics.}
spec.description = %q{A fluentd input plugin that collects node and container metrics from a kubernetes cluster via summary API.}
spec.homepage = "https://github.com/splunk/fluent-plugin-kubernetes-metrics"
spec.license = "Apache-2.0"

spec.name = 'fluent-plugin-kubernetes-metrics'
spec.version = File.read('VERSION')
spec.authors = ['Splunk Inc.']
spec.email = ['DataEdge@splunk.com']
spec.summary = 'A fluentd input plugin that collects kubernetes cluster metrics.'
spec.description = 'A fluentd input plugin that collects node and container metrics from a kubernetes cluster.'
spec.homepage = 'https://github.com/splunk/fluent-plugin-kubernetes-metrics'
spec.license = 'Apache-2.0'
test_files, files = `git ls-files -z`.split("\x0").partition do |f|
f.match(%r{^(test|spec|features)/})
end
spec.files = files
spec.executables = files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = test_files
spec.require_paths = ["lib"]

spec.add_development_dependency "bundler", "~> 2.0.1"
spec.add_development_dependency "rake", "~> 12.3.2"
spec.add_development_dependency "test-unit", "~> 3.3.0"
spec.add_development_dependency "simplecov", "~> 0.16.1"
spec.add_development_dependency "webmock", "~> 3.5.1"
spec.add_runtime_dependency "fluentd", "~> 1.3.3"
spec.add_runtime_dependency "kubeclient", "~> 4.2.2"
spec.add_runtime_dependency "multi_json", "~> 1.13.1"
spec.add_runtime_dependency "oj", "~> 3.7.8"
end
spec.require_paths = ['lib']
spec.add_development_dependency 'bundler', '~> 2.0.1'
spec.add_development_dependency 'rake', '~> 12.3.2'
spec.add_development_dependency 'simplecov', '~> 0.16.1'
spec.add_development_dependency 'test-unit', '~> 3.3.0'
spec.add_development_dependency 'webmock', '~> 3.5.1'
spec.add_runtime_dependency 'fluentd', '~> 1.3.3'
spec.add_runtime_dependency 'kubeclient', '~> 4.2.2'
spec.add_runtime_dependency 'multi_json', '~> 1.13.1'
spec.add_runtime_dependency 'oj', '~> 3.7.8'
end
143 changes: 70 additions & 73 deletions lib/fluent/plugin/in_kubernetes_metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def initialize_rest_client
if @ca_file.nil? && File.exist?(secret_ca_file)
@ca_file = secret_ca_file
end
if @bearer_token_file.nil? and File.exist?(secret_token_file)
if @bearer_token_file.nil? && File.exist?(secret_token_file)
@bearer_token_file = secret_token_file
end
end
Expand All @@ -226,9 +226,9 @@ def initialize_rest_client
def set_ssl_options
if @use_rest_client_ssl
ssl_options = {
ssl_ca_file: @ca_file,
verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER,
headers: {:Authorization => 'Bearer ' + File.read(@bearer_token_file)}
ssl_ca_file: @ca_file,
verify_ssl: @insecure_ssl ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER,
headers: { Authorization: 'Bearer ' + File.read(@bearer_token_file) }
}
else
ssl_options = {}
Expand All @@ -239,21 +239,21 @@ def set_ssl_options
# This method is used to set the options for sending a request to the kubelet api
def request_options
options = { method: 'get', url: @kubelet_url }
options = options.merge(set_ssl_options())
options = options.merge(set_ssl_options)
options
end

# This method is used to set the options for sending a request to the stats api
def request_options_stats
options = { method: 'get', url: @kubelet_url_stats }
options = options.merge(set_ssl_options())
options = options.merge(set_ssl_options)
options
end

# This method is used to set the options for sending a request to the cadvisor api
def cadvisor_request_options
options = { method: 'get', url: @cadvisor_url }
options = options.merge(set_ssl_options())
options = options.merge(set_ssl_options)
options
end

Expand Down Expand Up @@ -361,7 +361,7 @@ def emit_system_container_metrics(node_name, container)
end

def emit_stats_breakdown(stats)
stats_latest = stats[-1]
stats_latest = stats[-1]
tag = 'node'
labels = { 'node' => @node_name }
stats_timestamp = parse_time stats_latest['timestamp']
Expand Down Expand Up @@ -410,92 +410,91 @@ def emit_cpu_metrics_stats(tag:, metrics:, labels:, time:)
if cpu_cfs_throttled_time = metrics['cfs']['throttled_time']
router.emit generate_tag("#{tag}.cpu.cfs.throttled_time"), time, labels.merge('value' => cpu_cfs_throttled_time)
end
if cpu_load_average = metrics['load_average']
if cpu_load_average = metrics['load_average']
router.emit generate_tag("#{tag}.cpu.load_average"), time, labels.merge('value' => cpu_load_average)
end
end

def emit_diskio_metrics_stats(tag:, metrics:, labels:, time:)
%w[io_service_bytes io_serviced io_queued sectors io_service_time io_wait_time io_merged io_time ].each do |metric_name|
if current_io_metric = metrics[metric_name]
current_io_metric.each do |device|
if diskio_io_service_bytes_major = device['major']
router.emit generate_tag("#{tag}.diskio".concat(metric_name).concat(".major.")), time, labels.merge('device' => device['device'], 'value' => diskio_io_service_bytes_major)
%w[io_service_bytes io_serviced io_queued sectors io_service_time io_wait_time io_merged io_time].each do |metric_name|
next unless current_io_metric = metrics[metric_name]

current_io_metric.each do |device|
if diskio_io_service_bytes_major = device['major']
router.emit generate_tag("#{tag}.diskio".concat(metric_name).concat('.major.')), time, labels.merge('device' => device['device'], 'value' => diskio_io_service_bytes_major)
end
if diskio_io_service_bytes_minor = device['minor']
router.emit generate_tag("#{tag}.diskio".concat(metric_name).concat(".minor.")), time, labels.merge('device' => device['device'], 'value' => diskio_io_service_bytes_minor)
if diskio_io_service_bytes_minor = device['minor']
router.emit generate_tag("#{tag}.diskio".concat(metric_name).concat('.minor.')), time, labels.merge('device' => device['device'], 'value' => diskio_io_service_bytes_minor)
end
device_stats = device['stats']
device_stats.each do | device_stat |
device_stats.each do |device_stat|
device_key, device_value = device_stat
router.emit generate_tag("#{tag}.diskio.".concat(metric_name).concat(".stats.").concat(device_key)), time, labels.merge('device' => device['device'], 'value' => device_value)
router.emit generate_tag("#{tag}.diskio.".concat(metric_name).concat('.stats.').concat(device_key)), time, labels.merge('device' => device['device'], 'value' => device_value)
end
end
end
end
end

def emit_memory_metrics_stats(tag:, metrics:, labels:, time:)
%w[usage max_usage cache rss swap working_set failcnt].each do | metric_name |
if current_memory_metric = metrics[metric_name]
%w[usage max_usage cache rss swap working_set failcnt].each do |metric_name|
if current_memory_metric = metrics[metric_name]
router.emit generate_tag("#{tag}.memory.".concat(metric_name)), time, labels.merge('value' => current_memory_metric)
end

end
%w[container_data hierarchical_data ].each do | metric_name_group |
if current_memory_metric_group = metrics[metric_name_group]
current_memory_metric_group.each do | metric_name |
metric_key, metric_value = metric_name
router.emit generate_tag("#{tag}.memory.".concat(metric_name_group).concat(".").concat(metric_key)), time, labels.merge('value' => metric_value)
end
%w[container_data hierarchical_data].each do |metric_name_group|
next unless current_memory_metric_group = metrics[metric_name_group]

current_memory_metric_group.each do |metric_name|
metric_key, metric_value = metric_name
router.emit generate_tag("#{tag}.memory.".concat(metric_name_group).concat('.').concat(metric_key)), time, labels.merge('value' => metric_value)
end
end
end

def emit_network_metrics_stats(tag:, metrics:, labels:, time:)
network_name = metrics['name']
%w[rx_bytes rx_packets rx_errors rx_dropped tx_bytes tx_packets tx_errors tx_dropped].each do | metric_name |
%w[rx_bytes rx_packets rx_errors rx_dropped tx_bytes tx_packets tx_errors tx_dropped].each do |metric_name|
if current_network_metric = metrics[metric_name]
router.emit generate_tag("#{tag}.network.".concat(network_name).concat(".").concat(metric_name)), time, labels.merge('value' => current_network_metric)
router.emit generate_tag("#{tag}.network.".concat(network_name).concat('.').concat(metric_name)), time, labels.merge('value' => current_network_metric)
end
end

if network_interfaces = metrics['interfaces']
network_interfaces.each do | current_interface |
network_interfaces.each do |current_interface|
name = current_interface['name']
%w[rx_bytes rx_packets rx_errors rx_dropped tx_bytes tx_packets tx_errors tx_dropped].each do | current_metric|
%w[rx_bytes rx_packets rx_errors rx_dropped tx_bytes tx_packets tx_errors tx_dropped].each do |current_metric|
if metric_value = current_interface[current_metric]
router.emit generate_tag("#{tag}.network.".concat(name).concat(".").concat(current_metric)), time, labels.merge('value' => metric_value)
router.emit generate_tag("#{tag}.network.".concat(name).concat('.').concat(current_metric)), time, labels.merge('value' => metric_value)
end
end
end
end

%w[tcp tcp6 udp udp6].each do | metric_name_group |
if metric_group = metrics[metric_name_group]
metric_group.each do |current_metric|
metric_key, metric_value = current_metric
router.emit generate_tag("#{tag}.network.".concat(metric_name_group).concat(".").concat(metric_key)), time, labels.merge('value' => metric_value)
end
%w[tcp tcp6 udp udp6].each do |metric_name_group|
next unless metric_group = metrics[metric_name_group]

metric_group.each do |current_metric|
metric_key, metric_value = current_metric
router.emit generate_tag("#{tag}.network.".concat(metric_name_group).concat('.').concat(metric_key)), time, labels.merge('value' => metric_value)
end
end
end

def emit_filesystem_metrics_stats(tag:, metrics:, labels:, time:)
metrics.each do | file_system |
metrics.each do |file_system|
device = file_system['device']
type = file_system['type']
file_system.each do | file_metric |
file_key , file_value = file_metric
if not ['device', 'type', 'has_inodes'].include? file_key
file_system.each do |file_metric|
file_key, file_value = file_metric
unless %w[device type has_inodes].include? file_key
router.emit generate_tag("#{tag}.filesystem.".concat(file_key)), time, labels.merge('device' => device, 'type' => type, 'value' => file_value)
end
end
end
end

def emit_tasks_stats_metrics_stats(tag:, metrics:, labels:, time:)
metrics.each do | task_stats |
metrics.each do |task_stats|
task_key, task_value = task_stats
router.emit generate_tag("#{tag}.tasks_stats.".concat(task_key)), time, labels.merge('value' => task_value)
end
Expand Down Expand Up @@ -573,36 +572,34 @@ def emit_stats_metrics(metrics)

def emit_cadvisor_metrics(metrics)
metrics = metrics.split("\n")
for metric in metrics
if metric.include? "container_name="
if metric.match(/^((?!container_name="").)*$/) && metric[0] != '#'
metric_str, metric_val = metric.split(" ")
if metric_val.kind_of? String
metric_val = metric_val.to_f
end
first_occur = metric_str.index('{')
metric_name = metric_str[0..first_occur-1]
pod_name = metric.match(/pod_name="\S*"/).to_s
pod_name = pod_name.split('"')[1]
image_name = metric.match(/image="\S*"/).to_s
image_name = image_name.split('"')[1]
namespace = metric.match(/namespace="\S*"/).to_s
namespace = namespace.split('"')[1]
metric_labels = {'pod_name' => pod_name, 'image' => image_name, 'namespace' => namespace, 'value' => metric_val, 'node' => @node_name}
if metric.match(/^((?!container_name="POD").)*$/)
tag = 'pod'
tag = generate_tag("#{tag}#{metric_name.gsub('_', '.')}")
tag = tag.gsub('container', '')
else
container_name = metric.match(/container_name="\S*"/).to_s
container_name = container_name.split('"')[1]
container_label = {'container_name' => container_name}
metric_labels.merge(container_label)
tag = generate_tag("#{metric_name.gsub('_', '.')}")
end
router.emit tag, @scraped_at_cadvisor, metric_labels
end
metrics.each do |metric|
next unless metric.include? 'container_name='

next unless metric.match(/^((?!container_name="").)*$/) && metric[0] != '#'

metric_str, metric_val = metric.split(' ')
metric_val = metric_val.to_f if metric_val.is_a? String
first_occur = metric_str.index('{')
metric_name = metric_str[0..first_occur - 1]
pod_name = metric.match(/pod_name="\S*"/).to_s
pod_name = pod_name.split('"')[1]
image_name = metric.match(/image="\S*"/).to_s
image_name = image_name.split('"')[1]
namespace = metric.match(/namespace="\S*"/).to_s
namespace = namespace.split('"')[1]
metric_labels = { 'pod_name' => pod_name, 'image' => image_name, 'namespace' => namespace, 'value' => metric_val, 'node' => @node_name }
if metric =~ /^((?!container_name="POD").)*$/
tag = 'pod'
tag = generate_tag("#{tag}#{metric_name.tr('_', '.')}")
tag = tag.gsub('container', '')
else
container_name = metric.match(/container_name="\S*"/).to_s
container_name = container_name.split('"')[1]
container_label = { 'container_name' => container_name }
metric_labels.merge(container_label)
tag = generate_tag(metric_name.tr('_', '.').to_s)
end
router.emit tag, @scraped_at_cadvisor, metric_labels
end
end

Expand Down
Loading