Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add a plot application

  • Loading branch information...
commit 75243e98d90988ed3ec05aeaf98f3f9b8d26d1c6 1 parent 55ad473
@ripienaar authored
View
68 application/README.md
@@ -0,0 +1,68 @@
+GNU Plotter for MCollective Data
+================================
+
+A small tool to plot data that can be gathered via MCollective data plugins.
+
+Example
+-------
+
+If you have the Puppet agent install on your infrastructure you can use this
+application to examine the behavior of your Puppet infrastructure.
+
+ $ mco plot resource config_retrieval_time --np
+
+ Information about Puppet managed resources
+ Nodes
+ 4 ++------*--------+-------+--------+-------+-------+--------+------++
+ + * + + + + + + +
+ 3.5 ++ * * ++
+ | * * |
+ 3 ++ * * * ++
+ | * * * * |
+ | * * * * |
+ 2.5 ++ * * * * ++
+ | * * * * |
+ 2 ++ * ** * * **** ++
+ | * * * * * |
+ 1.5 ++ * * * * * ++
+ | ** * * * |
+ 1 ++ * ********* * * * ++
+ | * * * * |
+ | * * * * |
+ 0.5 ++ * * * * ++
+ + + + + + * + * + * *+ +
+ 0 ++------+--------+-------+--------+----*********--+------*-+------++
+ 0 5 10 15 20 25 30 35 40
+ Config Retrieval Time
+
+This shows you that the time to retrieve the configuration from my master is
+generally fast but there are some slowdown of nodes taking > 25 seconds.
+
+We can interogate the network and ask it which machines this is:
+
+ $ mco find -S "resource().config_retrieval_time > 25"
+ dev2.example.net
+ .
+ .
+ .
+
+This shows how you can first view and then dig into the graph and find nodes
+matching it.
+
+To see what data you can plot use the *mco plugin doc* application:
+
+ $ mco plugin doc
+ .
+ .
+ Data Queries:
+ agent Meta data about installed MColletive Agents
+ augeas_match Allows agents and discovery to do Augeas match lookups
+ domain_mailq Checks the mailq for mail to a certain domain
+ fstat Retrieve file stat data for a given file
+ nrpe Checks the exit codes of executed Nrpe commands
+ puppet Information about Puppet agent state
+ resource Information about Puppet managed resources
+ sysctl Retrieve values for a given sysctl
+
+Any numeric data in these data sources can be plotted, see *mco plugin doc
+data/puppet* to get details about a specific plugin.
View
182 application/plot/application/plot.rb
@@ -0,0 +1,182 @@
+class MCollective
+ class Application::Plot<MCollective::Application
+ description "Plots data provided by Data Plugins"
+
+ usage <<-END_OF_USAGE
+mco plot [data plugin] [output item]
+Usage: mco plot [data plugin] [data query] [output item]
+
+Example:
+
+With the Puppet agent installed, this will plot the total
+config retrieval time for all nodes.
+
+ mco plot resource config_retrieval_time
+
+The nodes will by default be grouped into 20 buckets to provide
+a scaling function, you can increase this to more buckets to
+gain better data resolution:
+
+ mco plot resource config_retrieval_time --buckets 60
+
+For data plugins that require a data query you need to supply
+that:
+
+ mco plot fstat /etc/hosts size
+
+This will plot the size of /etc/hosts across your estate
+
+The various title, axis titles and overwall graph width and
+height is also settable.
+END_OF_USAGE
+
+ option :x_title,
+ :description => "Sets a title for the X axis",
+ :arguments => ["--xtitle [TITLE]"]
+
+ option :y_title,
+ :description => "Sets a title for the Y axis",
+ :arguments => ["--ytitle [TITLE]"]
+
+
+ option :title,
+ :description => "Sets the graph title",
+ :arguments => ["--title [TITLE]"]
+
+ option :buckets,
+ :description => "How many buckets to group nodes into",
+ :arguments => ["--buckets [COUNT]"],
+ :default => 20,
+ :type => Integer
+
+ option :width,
+ :description => "Set the graph width in characters",
+ :arguments => ["--width [WIDTH]"],
+ :default => 78,
+ :type => Integer
+
+ option :height,
+ :description => "Set the graph width in characters",
+ :arguments => ["--height [HEIGHT]"],
+ :default => 24,
+ :type => Integer
+
+ def post_option_parser(configuration)
+ raise "Please specify a data plugin, query and field to plot" unless ARGV.size >= 2
+
+ if ARGV.size == 2
+ configuration[:datasource] = ARGV.shift
+ configuration[:field] = ARGV.shift
+ elsif ARGV.size == 3
+ configuration[:datasource] = ARGV.shift
+ configuration[:query] = ARGV.shift
+ configuration[:field] = ARGV.shift
+ end
+ end
+
+ def validate_configuration(configuration)
+ raise "Cannot find the 'gnuplot' executable" unless configuration[:gnuplot] = which("gnuplot")
+ end
+
+ def data_for_field(results, field)
+ bucket_count = configuration[:buckets]
+
+ buckets = Array.new(bucket_count + 1) { 0 }
+ values = []
+
+ results.each do |result|
+ if result[:statuscode] == 0
+ begin
+ values << Float(result[:data][field])
+ rescue => e
+ raise "Cannot interpret data item '%s': %s" % [result[:data][field], e.to_s]
+ end
+ end
+ end
+
+ raise "No usable data results were found" if values.empty?
+
+ min = values.min
+ max = values.max
+
+ bucket_size = (max - min) / Float(bucket_count)
+
+ unless max == min
+ values.each do |value|
+ bucket = (value - min) / bucket_size
+ buckets[bucket] += 1
+ end
+ end
+
+ range = Array.new(bucket_count + 1) {|i| Integer(min + (i * bucket_size))}
+
+ [range, buckets]
+ end
+
+ def which (bin)
+ if Util.windows?
+ all = [bin, bin + '.exe']
+ else
+ all = [bin]
+ end
+
+ all.each do |exec|
+ if which_helper(exec)
+ return which_helper(exec)
+ end
+ end
+
+ return nil
+ end
+
+ def which_helper(bin)
+ return bin if File::executable?(bin)
+
+ ENV['PATH'].split(File::PATH_SEPARATOR).each do |dir|
+ candidate = File::join(dir, bin.strip)
+ return candidate if File::executable?(candidate)
+ end
+ return nil
+ end
+
+ def main
+ client = rpcclient("rpcutil")
+
+ args = {:source => configuration[:datasource]}
+ args[:query] = configuration[:query] if configuration[:query]
+
+ ddl = DDL.new("%s_data" % configuration[:datasource], :data)
+
+ x, data = data_for_field(client.get_data(args), configuration[:field].to_sym)
+
+ plot = StringIO.new
+
+ plot.puts 'set title "%s"' % configuration.fetch(:title, ddl.meta[:description])
+ plot.puts 'set terminal dumb %d %d' % [configuration[:width], configuration[:height]]
+ plot.puts 'set key off'
+ plot.puts 'set ylabel "%s"' % configuration.fetch(:y_title, "Nodes")
+ plot.puts 'set xlabel "%s"' % configuration.fetch(:x_title, ddl.dataquery_interface[:output][configuration[:field].to_sym][:display_as])
+ plot.puts "plot '-' with lines"
+
+ x.each_with_index do |v, i|
+ plot.puts "%s %s" % [v, data[i]]
+ end
+
+ output = ""
+
+ begin
+ IO::popen(configuration[:gnuplot], "w+") do |io|
+ io.write plot.string
+ io.close_write
+ output = io.read
+ end
+ rescue => e
+ raise "Could not plot results: %s" % e.to_s
+ end
+
+ puts output
+
+ halt client.stats
+ end
+ end
+end
View
302 connector/rabbitmq/rabbitmq.rb
@@ -1,302 +0,0 @@
-require 'stomp'
-
-module MCollective
- module Connector
- class Rabbitmq<Base
- attr_reader :connection
-
- class EventLogger
- def on_connecting(params=nil)
- Log.info("TCP Connection attempt %d to %s" % [params[:cur_conattempts], stomp_url(params)])
- rescue
- end
-
- def on_connected(params=nil)
- Log.info("Conncted to #{stomp_url(params)}")
- rescue
- end
-
- def on_disconnect(params=nil)
- Log.info("Disconnected from #{stomp_url(params)}")
- rescue
- end
-
- def on_connectfail(params=nil)
- Log.info("TCP Connection to #{stomp_url(params)} failed on attempt #{params[:cur_conattempts]}")
- rescue
- end
-
- def on_miscerr(params, errstr)
- Log.error("Unexpected error on connection #{stomp_url(params)}: #{errstr}")
- rescue
- end
-
- def on_ssl_connecting(params)
- Log.info("Estblishing SSL session with #{stomp_url(params)}")
- rescue
- end
-
- def on_ssl_connected(params)
- Log.info("SSL session established with #{stomp_url(params)}")
- rescue
- end
-
- def on_ssl_connectfail(params)
- Log.error("SSL session creation with #{stomp_url(params)} failed: #{params[:ssl_exception]}")
- end
-
- def stomp_url(params)
- "%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
- end
- end
-
- def initialize
- @config = Config.instance
- @subscriptions = []
- @base64 = false
- end
-
- # Connects to the RabbitMQ middleware
- def connect(connector = ::Stomp::Connection)
- if @connection
- Log.debug("Already connection, not re-initializing connection")
- return
- end
-
- begin
- @base64 = get_bool_option("rabbitmq.base64", false)
-
- pools = @config.pluginconf["rabbitmq.pool.size"].to_i
- hosts = []
-
- 1.upto(pools) do |poolnum|
- host = {}
-
- host[:host] = get_option("rabbitmq.pool.#{poolnum}.host")
- host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 6163).to_i
- host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user")
- host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password")
- host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", false)
-
- host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", false)) if host[:ssl]
-
- Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
- hosts << host
- end
-
- raise "No hosts found for the RabbitMQ connection pool" if hosts.size == 0
-
- connection = {:hosts => hosts}
-
- # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
- # these can be guessed, the documentation isn't clear
- connection[:initial_reconnect_delay] = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01))
- connection[:max_reconnect_delay] = Float(get_option("rabbitmq.max_reconnect_delay", 30.0))
- connection[:use_exponential_back_off] = get_bool_option("rabbitmq.use_exponential_back_off", true)
- connection[:back_off_multiplier] = Integer(get_option("rabbitmq.back_off_multiplier", 2))
- connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0))
- connection[:randomize] = get_bool_option("rabbitmq.randomize", false)
- connection[:backup] = get_bool_option("rabbitmq.backup", false)
- connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1))
- connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30))
- connection[:reliable] = true
- connection[:vhost] = get_option("rabbitmq.vhost", "/")
-
- connection[:logger] = EventLogger.new
-
- @connection = connector.new(connection)
- rescue Exception => e
- raise("Could not connect to RabbitMQ Server: #{e}")
- end
- end
-
- # Sets the SSL paramaters for a specific connection
- def ssl_parameters(poolnum, fallback)
- params = {:cert_file => get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false),
- :key_file => get_option("rabbitmq.pool.#{poolnum}.ssl.key", false),
- :ts_files => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false)}
-
- raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
-
- raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
- raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])
-
- params[:ts_files].split(",").each do |ca|
- raise "Cannot find CA file #{ca}" unless File.exist?(ca)
- end
-
- begin
- Stomp::SSLParams.new(params)
- rescue NameError
- raise "Stomp gem >= 1.2.2 is needed"
- end
-
- rescue Exception => e
- if fallback
- Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
- return true
- else
- Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
- raise(e)
- end
- end
-
- # Receives a message from the RabbitMQ connection
- def receive
- Log.debug("Waiting for a message from RabbitMQ")
-
- # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
- # handling it sets the connection to closed. If we happen to be receiving at just
- # that time we will get an exception warning about the closed connection so handling
- # that here with a sleep and a retry.
- begin
- msg = @connection.receive
- rescue ::Stomp::Error::NoCurrentConnection
- sleep 1
- retry
- end
-
- raise "Received a processing error from RabbitMQ: '%s'" % msg.body.chomp if msg.body =~ /Processing error/
-
- Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
- end
-
- # Sends a message to the RabbitMQ connection
- def publish(msg)
- msg.base64_encode! if @base64
-
- if msg.type == :direct_request
- msg.discovered_hosts.each do |node|
- target = target_for(msg, node)
-
- Log.debug("Sending a direct message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
-
- @connection.publish(target[:name], msg.payload, target[:headers])
- end
- else
- target = target_for(msg)
-
- Log.debug("Sending a broadcast message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
-
- @connection.publish(target[:name], msg.payload, target[:headers])
- end
- end
-
- def target_for(msg, node=nil)
- if msg.type == :reply
- target = {:name => msg.request.headers["reply-to"], :headers => {}, :id => ""}
-
- elsif [:request, :direct_request].include?(msg.type)
- target = make_target(msg.agent, msg.type, msg.collective, node)
-
- else
- raise "Don't now how to create a target for message type #{msg.type}"
-
- end
-
- return target
- end
-
- def make_target(agent, type, collective, node=nil)
- raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
- raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)
-
- target = {:name => "", :headers => {}, :id => nil}
-
- case type
- when :reply # receiving replies on a temp queue
- target[:name] = "/temp-queue/mcollective_reply_%s" % agent
- target[:id] = "mcollective_%s_replies" % agent
-
- when :broadcast, :request # publishing a request to all nodes with an agent
- target[:name] = "/exchange/%s_broadcast/%s" % [collective, agent]
- target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
- target[:id] = "%s_broadcast_%s" % [collective, agent]
-
- when :direct_request # a request to a specific node
- raise "Directed requests need to have a node identity" unless node
-
- target[:name] = "/exchange/%s_directed/%s" % [ collective, node]
- target[:headers]["reply-to"] = "/temp-queue/mcollective_reply_%s" % agent
-
- when :directed # subscribing to directed messages
- target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ]
- target[:id] = "%s_directed_to_identity" % @config.identity
- end
-
- target
- end
-
- # Subscribe to a topic or queue
- def subscribe(agent, type, collective)
- return if type == :reply
-
- source = make_target(agent, type, collective)
-
- unless @subscriptions.include?(source[:id])
- Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
- @connection.subscribe(source[:name], source[:headers], source[:id])
- @subscriptions << source[:id]
- end
- rescue ::Stomp::Error::DuplicateSubscription
- Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
- end
-
- # Subscribe to a topic or queue
- def unsubscribe(agent, type, collective)
- return if type == :reply
-
- source = make_target(agent, type, collective)
-
- Log.debug("Unsubscribing from #{source[:name]}")
- @connection.unsubscribe(source[:name], source[:headers], source[:id])
- @subscriptions.delete(source[:id])
- end
-
- # Disconnects from the RabbitMQ connection
- def disconnect
- Log.debug("Disconnecting from RabbitMQ")
- @connection.disconnect
- end
-
- # looks in the environment first then in the config file
- # for a specific option, accepts an optional default.
- #
- # raises an exception when it cant find a value anywhere
- def get_env_or_option(env, opt, default=nil)
- return ENV[env] if ENV.include?(env)
- return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
- return default if default
-
- raise("No #{env} environment or plugin.#{opt} configuration option given")
- end
-
- # looks for a config option, accepts an optional default
- #
- # raises an exception when it cant find a value anywhere
- def get_option(opt, default=nil)
- return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
- return default unless default.nil?
-
- raise("No plugin.#{opt} configuration option given")
- end
-
- # gets a boolean option from the config, supports y/n/true/false/1/0
- def get_bool_option(opt, default)
- return default unless @config.pluginconf.include?(opt)
-
- val = @config.pluginconf[opt]
-
- if val =~ /^1|yes|true/
- return true
- elsif val =~ /^0|no|false/
- return false
- else
- return default
- end
- end
- end
- end
-end
-
-# vi:tabstop=4:expandtab:ai
Please sign in to comment.
Something went wrong with that request. Please try again.