Skip to content

Commit

Permalink
This came from elasticsearch/logstash-contrib at d183d9cdcb07a56ce916…
Browse files Browse the repository at this point in the history
…b40b65786dafe6a3255e
  • Loading branch information
electrical committed Oct 16, 2014
0 parents commit 494cd2c
Show file tree
Hide file tree
Showing 7 changed files with 514 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
@@ -0,0 +1,4 @@
*.gem
Gemfile.lock
.bundle
vendor
4 changes: 4 additions & 0 deletions Gemfile
@@ -0,0 +1,4 @@
source 'https://rubygems.org'
gem 'rake'
gem 'gem_publisher'
gem 'archive-tar-minitar'
6 changes: 6 additions & 0 deletions Rakefile
@@ -0,0 +1,6 @@
@files=[]

task :default do
system("rake -T")
end

294 changes: 294 additions & 0 deletions lib/logstash/inputs/jmx.rb
@@ -0,0 +1,294 @@
# Permits to retrieve metrics from jmx.
class LogStash::Inputs::Jmx < LogStash::Inputs::Base
# TODO add documentation
config_name 'jmx'
milestone 1

#Class Var
attr_accessor :regexp_group_alias_object
attr_accessor :queue_conf

# Path where json conf files are stored
config :path, :validate => :string, :required => true

# Indicate interval between to jmx metrics retrieval
# (in s)
config :polling_frequency, :validate => :number, :default => 60

# Indicate number of thread launched to retrieve metrics
config :nb_thread, :validate => :number, :default => 4

# Read and parse json conf
private
def read_conf(file_conf)
require 'json'

@logger.debug("Parse json #{file_conf} to ruby data structure")
json = File.read(file_conf)
JSON.parse(json)
end

# Verify that all required parameter are present in the conf_hash
private
def check_conf(conf_hash,file_conf)
#Check required parameters
@logger.debug("Check that required parameters are define with good types in #{conf_hash}")
parameter = {'host' => 'String'.class, 'port' => 1.class, 'queries' => [].class}
parameter.each_key do |param|
if conf_hash.has_key?(param)
unless conf_hash[param].instance_of?(parameter[param])
@logger.error("Bad syntax for conf file #{file_conf}. Bad types for parameter #{param}, expecting #{parameter[param]}, found #{conf_hash[param].class}.")
return false
end
else
@logger.error("Bad syntax for conf file #{file_conf}. Missing parameter #{param}.")
return false
end
end

@logger.debug('Check optional parameters types')
parameter = {'alias' => 'String'.class}
parameter.each_key do |param|
if conf_hash.has_key?(param)
unless conf_hash[param].instance_of?(parameter[param])
@logger.error("Bad syntax for conf file #{file_conf}. Bad types for parameter #{param}, expecting #{parameter[param]}, found #{conf_hash[param].class}.")
return false
end
end
end

@logger.debug('Check that required parameters are define with good types for queries')
parameter = {'object_name' => 'String'.class}
parameter.each_key do |param|
conf_hash['queries'].each do |query|
if query.has_key?(param)
unless query[param].instance_of?(parameter[param])
@logger.error("Bad syntax for conf file #{file_conf}. Bad types for parameter #{param} in query #{query}, expecting #{parameter[param]}, found #{conf_hash[param].class}.")
return false
end
else
@logger.error("Bad syntax for conf file #{file_conf} in query #{query}. Missing parameter #{param}.")
return false
end
end
end

@logger.debug('Check optional parameters types for queries')
parameter = {'object_alias' => 'String'.class, 'attributes' => [].class}
parameter.each_key do |param|
conf_hash['queries'].each do |query|
if query.has_key?(param)
unless query[param].instance_of?(parameter[param])
@logger.error("Bad syntax for conf file #{file_conf} in query #{query}. Bad types for parameter #{param}, expecting #{parameter[param]}, found #{conf_hash[param].class}.")
return false
end
end
end
end

true
end

private
def replace_alias_object(r_alias_object,object_name)
@logger.debug("Replace ${.*} variables from #{r_alias_object} using #{object_name}")
group_alias = @regexp_group_alias_object.match(r_alias_object)
if group_alias
r_alias_object = r_alias_object.gsub('${'+group_alias[1]+'}',object_name.split(group_alias[1]+'=')[1].split(',')[0])
r_alias_object = replace_alias_object(r_alias_object,object_name)
end
r_alias_object
end

private
def send_event_to_queue(queue,host,metric_path,metric_value)
@logger.debug('Send event to queue to be processed by filters/outputs')
event = LogStash::Event.new
event['host'] = host
event['path'] = @path
event['type'] = @type
number_type = [Fixnum, Bignum, Float]
boolean_type = [TrueClass, FalseClass]
metric_path_substituted = metric_path.gsub(' ','_').gsub('"','')
if number_type.include?(metric_value.class)
@logger.debug("The value #{metric_value} is of type number: #{metric_value.class}")
event['metric_path'] = metric_path_substituted
event['metric_value_number'] = metric_value
elsif boolean_type.include?(metric_value.class)
@logger.debug("The value #{metric_value} is of type boolean: #{metric_value.class}")
event['metric_path'] = metric_path_substituted+'_bool'
event['metric_value_number'] = metric_value ? 1 : 0
else
@logger.debug("The value #{metric_value} is not of type number: #{metric_value.class}")
event['metric_path'] = metric_path_substituted
event['metric_value_string'] = metric_value.to_s
end
queue << event
end

# Thread function to retrieve metrics from JMX
private
def thread_jmx(queue_conf,queue)
require 'jmx4r'

while true
begin
@logger.debug('Wait config to retrieve from queue conf')
thread_hash_conf = queue_conf.pop
@logger.debug("Retrieve config #{thread_hash_conf} from queue conf")

@logger.debug('Check if jmx connection need a user/password')
if thread_hash_conf.has_key?('username') and thread_hash_conf.has_key?('password')
@logger.debug("Connect to #{thread_hash_conf['host']}:#{thread_hash_conf['port']} with user #{thread_hash_conf['username']}")
jmx_connection = JMX::MBean.connection :host => thread_hash_conf['host'],
:port => thread_hash_conf['port'],
:username => thread_hash_conf['username'],
:password => thread_hash_conf['password']
else
@logger.debug("Connect to #{thread_hash_conf['host']}:#{thread_hash_conf['port']}")
jmx_connection = JMX::MBean.connection :host => thread_hash_conf['host'],
:port => thread_hash_conf['port']
end


if thread_hash_conf.has_key?('alias')
@logger.debug("Set base_metric_path to alias: #{thread_hash_conf['alias']}")
base_metric_path = thread_hash_conf['alias']
else
@logger.debug("Set base_metric_path to host_port: #{thread_hash_conf['host']}_#{thread_hash_conf['port']}")
base_metric_path = "#{thread_hash_conf['host']}_#{thread_hash_conf['port']}"
end


@logger.debug("Treat queries #{thread_hash_conf['queries']}")
thread_hash_conf['queries'].each do |query|
@logger.debug("Find all objects name #{query['object_name']}")
jmx_object_name_s = JMX::MBean.find_all_by_name(query['object_name'], :connection => jmx_connection)

if jmx_object_name_s.length > 0
jmx_object_name_s.each do |jmx_object_name|
if query.has_key?('object_alias')
object_name = replace_alias_object(query['object_alias'],jmx_object_name.object_name.to_s)
@logger.debug("Set object_name to object_alias: #{object_name}")
else
object_name = jmx_object_name.object_name.to_s
@logger.debug("Set object_name to jmx object_name: #{object_name}")
end

if query.has_key?('attributes')
@logger.debug("Retrieves attributes #{query['attributes']} to #{jmx_object_name.object_name}")
query['attributes'].each do |attribute|
begin
jmx_attribute_value = jmx_object_name.send(attribute.snake_case)
if jmx_attribute_value.instance_of? Java::JavaxManagementOpenmbean::CompositeDataSupport
@logger.debug('The jmx value is a composite_data one')
jmx_attribute_value.each do |jmx_attribute_value_composite|
@logger.debug("Get jmx value #{jmx_attribute_value[jmx_attribute_value_composite]} for attribute #{attribute}.#{jmx_attribute_value_composite} to #{jmx_object_name.object_name}")
send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{attribute}.#{jmx_attribute_value_composite}", jmx_attribute_value[jmx_attribute_value_composite])
end
else
@logger.debug("Get jmx value #{jmx_attribute_value} for attribute #{attribute} to #{jmx_object_name.object_name}")
send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{attribute}", jmx_attribute_value)
end
rescue Exception => ex
@logger.warn("Failed retrieving metrics for attribute #{attribute} on object #{jmx_object_name.object_name}")
@logger.warn(ex.message)
end
end
else
@logger.debug("No attribute to retrieve define on #{jmx_object_name.object_name}, will retrieve all")
jmx_object_name.attributes.each_key do |attribute|
begin
jmx_attribute_value = jmx_object_name.send(attribute)
if jmx_attribute_value.instance_of? Java::JavaxManagementOpenmbean::CompositeDataSupport
@logger.debug('The jmx value is a composite_data one')
jmx_attribute_value.each do |jmx_attribute_value_composite|
@logger.debug("Get jmx value #{jmx_attribute_value[jmx_attribute_value_composite]} for attribute #{jmx_object_name.attributes[attribute]}.#{jmx_attribute_value_composite} to #{jmx_object_name.object_name}")
send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{jmx_object_name.attributes[attribute]}.#{jmx_attribute_value_composite}", jmx_attribute_value[jmx_attribute_value_composite])
end
else
@logger.debug("Get jmx value #{jmx_attribute_value} for attribute #{jmx_object_name.attributes[attribute]} to #{jmx_object_name.object_name}")
send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{jmx_object_name.attributes[attribute]}", jmx_attribute_value)
end
rescue Exception => ex
@logger.warn("Failed retrieving metrics for attribute #{attribute} on object #{jmx_object_name.object_name}")
@logger.warn(ex.message)
end
end
end
end
else
@logger.warn("No jmx object found for #{query['object_name']}")
end
end
jmx_connection.close
rescue Exception => ex
@logger.error(ex.message)
@logger.error(ex.backtrace.join("\n"))
end
end
end

public
def register
@logger.info('Registering files in', :path => @path)

@logger.info('Create queue conf used to send jmx conf to jmx collector threads')
@queue_conf = Queue.new

@logger.info('Compile regexp for group alias object replacement')
@regexp_group_alias_object = Regexp.new('(?:\${(.*?)})+')
end

public
def run(queue)
require 'thread'

begin
threads = []
@logger.info("Init #{@nb_thread} jmx collector threads")
@nb_thread.times do
threads << Thread.new { thread_jmx(@queue_conf,queue) }
end

while true
@logger.info("Load conf files in #{@path}")
Dir.foreach(@path) do |item|
begin
next if item == '.' or item == '..'
file_conf = File.join(@path, item)
@logger.debug("Load conf file #{file_conf}")
conf_hash = read_conf(file_conf)
if check_conf(conf_hash,file_conf)
@logger.debug("Add conf #{conf_hash} to the queue conf")
@queue_conf << conf_hash
end
rescue Exception => ex
@logger.warn("Issue parsing file #{file_conf}")
@logger.warn(ex.message)
@logger.warn(ex.backtrace.join("\n"))
next
end
end
@logger.debug('Wait until the queue conf is empty')
delta=0
until @queue_conf.empty?
@logger.debug("There are still #{@queue_conf.size} messages in the queue conf. Sleep 1s.")
delta=delta+1
sleep(1)
end
wait_time=@polling_frequency-delta
if wait_time>0
@logger.debug("Wait #{wait_time}s (#{@polling_frequency}-#{delta}(seconds wait until queue conf empty)) before to launch again a new jmx metrics collection")
sleep(wait_time)
else
@logger.warn("The time taken to retrieve metrics is more important than the retrieve_interval time set.
\nYou must adapt nb_thread, retrieve_interval to the number of jvm/metrics you want to retrieve.")
end
end
rescue Exception => ex
@logger.error(ex.message)
@logger.error(ex.backtrace.join("\n"))
end
end
end
28 changes: 28 additions & 0 deletions logstash-input-jmx.gemspec
@@ -0,0 +1,28 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-jmx'
s.version = '0.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Retrieve metrics from jmx."
s.description = "Retrieve metrics from jmx."
s.authors = ["Elasticsearch"]
s.email = 'richard.pijnenburg@elasticsearch.com'
s.homepage = "http://logstash.net/"
s.require_paths = ["lib"]

# Files
s.files = `git ls-files`.split($\)+::Dir.glob('vendor/*')

# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})

# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "group" => "input" }

# Gem dependencies
s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0'

s.add_runtime_dependency 'jmx4r'

end

9 changes: 9 additions & 0 deletions rakelib/publish.rake
@@ -0,0 +1,9 @@
require "gem_publisher"

desc "Publish gem to RubyGems.org"
task :publish_gem do |t|
gem_file = Dir.glob(File.expand_path('../*.gemspec',File.dirname(__FILE__))).first
gem = GemPublisher.publish_if_updated(gem_file, :rubygems)
puts "Published #{gem}" if gem
end

0 comments on commit 494cd2c

Please sign in to comment.