From 494cd2c464a9bd071ebd70c83148852de9d7026c Mon Sep 17 00:00:00 2001 From: Richard Pijnenburg Date: Thu, 16 Oct 2014 20:25:33 +0000 Subject: [PATCH] This came from elasticsearch/logstash-contrib at d183d9cdcb07a56ce916b40b65786dafe6a3255e --- .gitignore | 4 + Gemfile | 4 + Rakefile | 6 + lib/logstash/inputs/jmx.rb | 294 +++++++++++++++++++++++++++++++++++++ logstash-input-jmx.gemspec | 28 ++++ rakelib/publish.rake | 9 ++ rakelib/vendor.rake | 169 +++++++++++++++++++++ 7 files changed, 514 insertions(+) create mode 100644 .gitignore create mode 100644 Gemfile create mode 100644 Rakefile create mode 100644 lib/logstash/inputs/jmx.rb create mode 100644 logstash-input-jmx.gemspec create mode 100644 rakelib/publish.rake create mode 100644 rakelib/vendor.rake diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3300a23 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.gem +Gemfile.lock +.bundle +vendor diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..ccb9012 --- /dev/null +++ b/Gemfile @@ -0,0 +1,4 @@ +source 'https://rubygems.org' +gem 'rake' +gem 'gem_publisher' +gem 'archive-tar-minitar' diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..da6b220 --- /dev/null +++ b/Rakefile @@ -0,0 +1,6 @@ +@files=[] + +task :default do + system("rake -T") +end + diff --git a/lib/logstash/inputs/jmx.rb b/lib/logstash/inputs/jmx.rb new file mode 100644 index 0000000..4e6e76f --- /dev/null +++ b/lib/logstash/inputs/jmx.rb @@ -0,0 +1,294 @@ +# Permits to retrieve metrics from jmx. +class LogStash::Inputs::Jmx < LogStash::Inputs::Base + # TODO add documentation + config_name 'jmx' + milestone 1 + + #Class Var + attr_accessor :regexp_group_alias_object + attr_accessor :queue_conf + + # Path where json conf files are stored + config :path, :validate => :string, :required => true + + # Indicate interval between to jmx metrics retrieval + # (in s) + config :polling_frequency, :validate => :number, :default => 60 + + # Indicate number of thread launched to retrieve metrics + config :nb_thread, :validate => :number, :default => 4 + + # Read and parse json conf + private + def read_conf(file_conf) + require 'json' + + @logger.debug("Parse json #{file_conf} to ruby data structure") + json = File.read(file_conf) + JSON.parse(json) + end + + # Verify that all required parameter are present in the conf_hash + private + def check_conf(conf_hash,file_conf) + #Check required parameters + @logger.debug("Check that required parameters are define with good types in #{conf_hash}") + parameter = {'host' => 'String'.class, 'port' => 1.class, 'queries' => [].class} + parameter.each_key do |param| + if conf_hash.has_key?(param) + unless conf_hash[param].instance_of?(parameter[param]) + @logger.error("Bad syntax for conf file #{file_conf}. Bad types for parameter #{param}, expecting #{parameter[param]}, found #{conf_hash[param].class}.") + return false + end + else + @logger.error("Bad syntax for conf file #{file_conf}. Missing parameter #{param}.") + return false + end + end + + @logger.debug('Check optional parameters types') + parameter = {'alias' => 'String'.class} + parameter.each_key do |param| + if conf_hash.has_key?(param) + unless conf_hash[param].instance_of?(parameter[param]) + @logger.error("Bad syntax for conf file #{file_conf}. Bad types for parameter #{param}, expecting #{parameter[param]}, found #{conf_hash[param].class}.") + return false + end + end + end + + @logger.debug('Check that required parameters are define with good types for queries') + parameter = {'object_name' => 'String'.class} + parameter.each_key do |param| + conf_hash['queries'].each do |query| + if query.has_key?(param) + unless query[param].instance_of?(parameter[param]) + @logger.error("Bad syntax for conf file #{file_conf}. Bad types for parameter #{param} in query #{query}, expecting #{parameter[param]}, found #{conf_hash[param].class}.") + return false + end + else + @logger.error("Bad syntax for conf file #{file_conf} in query #{query}. Missing parameter #{param}.") + return false + end + end + end + + @logger.debug('Check optional parameters types for queries') + parameter = {'object_alias' => 'String'.class, 'attributes' => [].class} + parameter.each_key do |param| + conf_hash['queries'].each do |query| + if query.has_key?(param) + unless query[param].instance_of?(parameter[param]) + @logger.error("Bad syntax for conf file #{file_conf} in query #{query}. Bad types for parameter #{param}, expecting #{parameter[param]}, found #{conf_hash[param].class}.") + return false + end + end + end + end + + true + end + + private + def replace_alias_object(r_alias_object,object_name) + @logger.debug("Replace ${.*} variables from #{r_alias_object} using #{object_name}") + group_alias = @regexp_group_alias_object.match(r_alias_object) + if group_alias + r_alias_object = r_alias_object.gsub('${'+group_alias[1]+'}',object_name.split(group_alias[1]+'=')[1].split(',')[0]) + r_alias_object = replace_alias_object(r_alias_object,object_name) + end + r_alias_object + end + + private + def send_event_to_queue(queue,host,metric_path,metric_value) + @logger.debug('Send event to queue to be processed by filters/outputs') + event = LogStash::Event.new + event['host'] = host + event['path'] = @path + event['type'] = @type + number_type = [Fixnum, Bignum, Float] + boolean_type = [TrueClass, FalseClass] + metric_path_substituted = metric_path.gsub(' ','_').gsub('"','') + if number_type.include?(metric_value.class) + @logger.debug("The value #{metric_value} is of type number: #{metric_value.class}") + event['metric_path'] = metric_path_substituted + event['metric_value_number'] = metric_value + elsif boolean_type.include?(metric_value.class) + @logger.debug("The value #{metric_value} is of type boolean: #{metric_value.class}") + event['metric_path'] = metric_path_substituted+'_bool' + event['metric_value_number'] = metric_value ? 1 : 0 + else + @logger.debug("The value #{metric_value} is not of type number: #{metric_value.class}") + event['metric_path'] = metric_path_substituted + event['metric_value_string'] = metric_value.to_s + end + queue << event + end + + # Thread function to retrieve metrics from JMX + private + def thread_jmx(queue_conf,queue) + require 'jmx4r' + + while true + begin + @logger.debug('Wait config to retrieve from queue conf') + thread_hash_conf = queue_conf.pop + @logger.debug("Retrieve config #{thread_hash_conf} from queue conf") + + @logger.debug('Check if jmx connection need a user/password') + if thread_hash_conf.has_key?('username') and thread_hash_conf.has_key?('password') + @logger.debug("Connect to #{thread_hash_conf['host']}:#{thread_hash_conf['port']} with user #{thread_hash_conf['username']}") + jmx_connection = JMX::MBean.connection :host => thread_hash_conf['host'], + :port => thread_hash_conf['port'], + :username => thread_hash_conf['username'], + :password => thread_hash_conf['password'] + else + @logger.debug("Connect to #{thread_hash_conf['host']}:#{thread_hash_conf['port']}") + jmx_connection = JMX::MBean.connection :host => thread_hash_conf['host'], + :port => thread_hash_conf['port'] + end + + + if thread_hash_conf.has_key?('alias') + @logger.debug("Set base_metric_path to alias: #{thread_hash_conf['alias']}") + base_metric_path = thread_hash_conf['alias'] + else + @logger.debug("Set base_metric_path to host_port: #{thread_hash_conf['host']}_#{thread_hash_conf['port']}") + base_metric_path = "#{thread_hash_conf['host']}_#{thread_hash_conf['port']}" + end + + + @logger.debug("Treat queries #{thread_hash_conf['queries']}") + thread_hash_conf['queries'].each do |query| + @logger.debug("Find all objects name #{query['object_name']}") + jmx_object_name_s = JMX::MBean.find_all_by_name(query['object_name'], :connection => jmx_connection) + + if jmx_object_name_s.length > 0 + jmx_object_name_s.each do |jmx_object_name| + if query.has_key?('object_alias') + object_name = replace_alias_object(query['object_alias'],jmx_object_name.object_name.to_s) + @logger.debug("Set object_name to object_alias: #{object_name}") + else + object_name = jmx_object_name.object_name.to_s + @logger.debug("Set object_name to jmx object_name: #{object_name}") + end + + if query.has_key?('attributes') + @logger.debug("Retrieves attributes #{query['attributes']} to #{jmx_object_name.object_name}") + query['attributes'].each do |attribute| + begin + jmx_attribute_value = jmx_object_name.send(attribute.snake_case) + if jmx_attribute_value.instance_of? Java::JavaxManagementOpenmbean::CompositeDataSupport + @logger.debug('The jmx value is a composite_data one') + jmx_attribute_value.each do |jmx_attribute_value_composite| + @logger.debug("Get jmx value #{jmx_attribute_value[jmx_attribute_value_composite]} for attribute #{attribute}.#{jmx_attribute_value_composite} to #{jmx_object_name.object_name}") + send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{attribute}.#{jmx_attribute_value_composite}", jmx_attribute_value[jmx_attribute_value_composite]) + end + else + @logger.debug("Get jmx value #{jmx_attribute_value} for attribute #{attribute} to #{jmx_object_name.object_name}") + send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{attribute}", jmx_attribute_value) + end + rescue Exception => ex + @logger.warn("Failed retrieving metrics for attribute #{attribute} on object #{jmx_object_name.object_name}") + @logger.warn(ex.message) + end + end + else + @logger.debug("No attribute to retrieve define on #{jmx_object_name.object_name}, will retrieve all") + jmx_object_name.attributes.each_key do |attribute| + begin + jmx_attribute_value = jmx_object_name.send(attribute) + if jmx_attribute_value.instance_of? Java::JavaxManagementOpenmbean::CompositeDataSupport + @logger.debug('The jmx value is a composite_data one') + jmx_attribute_value.each do |jmx_attribute_value_composite| + @logger.debug("Get jmx value #{jmx_attribute_value[jmx_attribute_value_composite]} for attribute #{jmx_object_name.attributes[attribute]}.#{jmx_attribute_value_composite} to #{jmx_object_name.object_name}") + send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{jmx_object_name.attributes[attribute]}.#{jmx_attribute_value_composite}", jmx_attribute_value[jmx_attribute_value_composite]) + end + else + @logger.debug("Get jmx value #{jmx_attribute_value} for attribute #{jmx_object_name.attributes[attribute]} to #{jmx_object_name.object_name}") + send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{jmx_object_name.attributes[attribute]}", jmx_attribute_value) + end + rescue Exception => ex + @logger.warn("Failed retrieving metrics for attribute #{attribute} on object #{jmx_object_name.object_name}") + @logger.warn(ex.message) + end + end + end + end + else + @logger.warn("No jmx object found for #{query['object_name']}") + end + end + jmx_connection.close + rescue Exception => ex + @logger.error(ex.message) + @logger.error(ex.backtrace.join("\n")) + end + end + end + + public + def register + @logger.info('Registering files in', :path => @path) + + @logger.info('Create queue conf used to send jmx conf to jmx collector threads') + @queue_conf = Queue.new + + @logger.info('Compile regexp for group alias object replacement') + @regexp_group_alias_object = Regexp.new('(?:\${(.*?)})+') + end + + public + def run(queue) + require 'thread' + + begin + threads = [] + @logger.info("Init #{@nb_thread} jmx collector threads") + @nb_thread.times do + threads << Thread.new { thread_jmx(@queue_conf,queue) } + end + + while true + @logger.info("Load conf files in #{@path}") + Dir.foreach(@path) do |item| + begin + next if item == '.' or item == '..' + file_conf = File.join(@path, item) + @logger.debug("Load conf file #{file_conf}") + conf_hash = read_conf(file_conf) + if check_conf(conf_hash,file_conf) + @logger.debug("Add conf #{conf_hash} to the queue conf") + @queue_conf << conf_hash + end + rescue Exception => ex + @logger.warn("Issue parsing file #{file_conf}") + @logger.warn(ex.message) + @logger.warn(ex.backtrace.join("\n")) + next + end + end + @logger.debug('Wait until the queue conf is empty') + delta=0 + until @queue_conf.empty? + @logger.debug("There are still #{@queue_conf.size} messages in the queue conf. Sleep 1s.") + delta=delta+1 + sleep(1) + end + wait_time=@polling_frequency-delta + if wait_time>0 + @logger.debug("Wait #{wait_time}s (#{@polling_frequency}-#{delta}(seconds wait until queue conf empty)) before to launch again a new jmx metrics collection") + sleep(wait_time) + else + @logger.warn("The time taken to retrieve metrics is more important than the retrieve_interval time set. + \nYou must adapt nb_thread, retrieve_interval to the number of jvm/metrics you want to retrieve.") + end + end + rescue Exception => ex + @logger.error(ex.message) + @logger.error(ex.backtrace.join("\n")) + end + end +end \ No newline at end of file diff --git a/logstash-input-jmx.gemspec b/logstash-input-jmx.gemspec new file mode 100644 index 0000000..9c3d5fe --- /dev/null +++ b/logstash-input-jmx.gemspec @@ -0,0 +1,28 @@ +Gem::Specification.new do |s| + + s.name = 'logstash-input-jmx' + s.version = '0.1.0' + s.licenses = ['Apache License (2.0)'] + s.summary = "Retrieve metrics from jmx." + s.description = "Retrieve metrics from jmx." + s.authors = ["Elasticsearch"] + s.email = 'richard.pijnenburg@elasticsearch.com' + s.homepage = "http://logstash.net/" + s.require_paths = ["lib"] + + # Files + s.files = `git ls-files`.split($\)+::Dir.glob('vendor/*') + + # Tests + s.test_files = s.files.grep(%r{^(test|spec|features)/}) + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "group" => "input" } + + # Gem dependencies + s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0' + + s.add_runtime_dependency 'jmx4r' + +end + diff --git a/rakelib/publish.rake b/rakelib/publish.rake new file mode 100644 index 0000000..0ef58c0 --- /dev/null +++ b/rakelib/publish.rake @@ -0,0 +1,9 @@ +require "gem_publisher" + +desc "Publish gem to RubyGems.org" +task :publish_gem do |t| + gem_file = Dir.glob(File.expand_path('../*.gemspec',File.dirname(__FILE__))).first + gem = GemPublisher.publish_if_updated(gem_file, :rubygems) + puts "Published #{gem}" if gem +end + diff --git a/rakelib/vendor.rake b/rakelib/vendor.rake new file mode 100644 index 0000000..2135119 --- /dev/null +++ b/rakelib/vendor.rake @@ -0,0 +1,169 @@ +require "net/http" +require "uri" +require "digest/sha1" + +def vendor(*args) + return File.join("vendor", *args) +end + +directory "vendor/" => ["vendor"] do |task, args| + mkdir task.name +end + +def fetch(url, sha1, output) + + puts "Downloading #{url}" + actual_sha1 = download(url, output) + + if actual_sha1 != sha1 + fail "SHA1 does not match (expected '#{sha1}' but got '#{actual_sha1}')" + end +end # def fetch + +def file_fetch(url, sha1) + filename = File.basename( URI(url).path ) + output = "vendor/#{filename}" + task output => [ "vendor/" ] do + begin + actual_sha1 = file_sha1(output) + if actual_sha1 != sha1 + fetch(url, sha1, output) + end + rescue Errno::ENOENT + fetch(url, sha1, output) + end + end.invoke + + return output +end + +def file_sha1(path) + digest = Digest::SHA1.new + fd = File.new(path, "r") + while true + begin + digest << fd.sysread(16384) + rescue EOFError + break + end + end + return digest.hexdigest +ensure + fd.close if fd +end + +def download(url, output) + uri = URI(url) + digest = Digest::SHA1.new + tmp = "#{output}.tmp" + Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http| + request = Net::HTTP::Get.new(uri.path) + http.request(request) do |response| + fail "HTTP fetch failed for #{url}. #{response}" if [200, 301].include?(response.code) + size = (response["content-length"].to_i || -1).to_f + count = 0 + File.open(tmp, "w") do |fd| + response.read_body do |chunk| + fd.write(chunk) + digest << chunk + if size > 0 && $stdout.tty? + count += chunk.bytesize + $stdout.write(sprintf("\r%0.2f%%", count/size * 100)) + end + end + end + $stdout.write("\r \r") if $stdout.tty? + end + end + + File.rename(tmp, output) + + return digest.hexdigest +rescue SocketError => e + puts "Failure while downloading #{url}: #{e}" + raise +ensure + File.unlink(tmp) if File.exist?(tmp) +end # def download + +def untar(tarball, &block) + require "archive/tar/minitar" + tgz = Zlib::GzipReader.new(File.open(tarball)) + # Pull out typesdb + tar = Archive::Tar::Minitar::Input.open(tgz) + tar.each do |entry| + path = block.call(entry) + next if path.nil? + parent = File.dirname(path) + + mkdir_p parent unless File.directory?(parent) + + # Skip this file if the output file is the same size + if entry.directory? + mkdir path unless File.directory?(path) + else + entry_mode = entry.instance_eval { @mode } & 0777 + if File.exists?(path) + stat = File.stat(path) + # TODO(sissel): Submit a patch to archive-tar-minitar upstream to + # expose headers in the entry. + entry_size = entry.instance_eval { @size } + # If file sizes are same, skip writing. + next if stat.size == entry_size && (stat.mode & 0777) == entry_mode + end + puts "Extracting #{entry.full_name} from #{tarball} #{entry_mode.to_s(8)}" + File.open(path, "w") do |fd| + # eof? check lets us skip empty files. Necessary because the API provided by + # Archive::Tar::Minitar::Reader::EntryStream only mostly acts like an + # IO object. Something about empty files in this EntryStream causes + # IO.copy_stream to throw "can't convert nil into String" on JRuby + # TODO(sissel): File a bug about this. + while !entry.eof? + chunk = entry.read(16384) + fd.write(chunk) + end + #IO.copy_stream(entry, fd) + end + File.chmod(entry_mode, path) + end + end + tar.close + File.unlink(tarball) if File.file?(tarball) +end # def untar + +def ungz(file) + + outpath = file.gsub('.gz', '') + tgz = Zlib::GzipReader.new(File.open(file)) + begin + File.open(outpath, "w") do |out| + IO::copy_stream(tgz, out) + end + File.unlink(file) + rescue + File.unlink(outpath) if File.file?(outpath) + raise + end + tgz.close +end + +desc "Process any vendor files required for this plugin" +task "vendor" do |task, args| + + @files.each do |file| + download = file_fetch(file['url'], file['sha1']) + if download =~ /.tar.gz/ + prefix = download.gsub('.tar.gz', '').gsub('vendor/', '') + untar(download) do |entry| + if !file['files'].nil? + next unless file['files'].include?(entry.full_name.gsub(prefix, '')) + out = entry.full_name.split("/").last + end + File.join('vendor', out) + end + elsif download =~ /.gz/ + ungz(download) + end + end + +end