Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

remove activemq connector - now in base, add test registration receiver

  • Loading branch information...
commit 1a13290602379568e1757b96d736cd122ce35305 1 parent 104e4c8
@ripienaar authored
View
4 connector/activemq/README.md
@@ -1,4 +0,0 @@
-A connector plugin for MCollective 1.3.1 that supports the new
-direct addressing mode.
-
-Incubating here for eventual inclusion into mainline.
View
291 connector/activemq/activemq.rb
@@ -1,291 +0,0 @@
-require 'stomp'
-require 'pp'
-
-module MCollective
- module Connector
- # Handles sending and receiving messages over the Stomp protocol for ActiveMQ
- # servers specifically, we take advantages of ActiveMQ specific features and
- # enhancements to the Stomp protocol. For best results in a clustered environment
- # use ActiveMQ 5.5.0 at least.
- #
- # This plugin supports 1.1.6 and newer of the Stomp rubygem
- #
- # connector = activemq
- # plugin.activemq.pool.size = 2
- #
- # plugin.activemq.pool.1.host = stomp1.your.net
- # plugin.activemq.pool.1.port = 6163
- # plugin.activemq.pool.1.user = you
- # plugin.activemq.pool.1.password = secret
- # plugin.activemq.pool.1.ssl = true
- #
- # plugin.activemq.pool.2.host = stomp2.your.net
- # plugin.activemq.pool.2.port = 6163
- # plugin.activemq.pool.2.user = you
- # plugin.activemq.pool.2.password = secret
- # plugin.activemq.pool.2.ssl = false
- #
- # Using this method you can supply just STOMP_USER and STOMP_PASSWORD. The port will
- # default to 6163 if not specified.
- #
- # In addition you can set the following options for the rubygem:
- #
- # plugin.activemq.initial_reconnect_delay = 0.01
- # plugin.activemq.max_reconnect_delay = 30.0
- # plugin.activemq.use_exponential_back_off = true
- # plugin.activemq.back_off_multiplier = 2
- # plugin.activemq.max_reconnect_attempts = 0
- # plugin.activemq.randomize = false
- # plugin.activemq.timeout = -1
- #
- # ActiveMQ JMS message priorities can be set:
- #
- # plugin.activemq.priority = 4
- #
- class Activemq<Base
- attr_reader :connection
-
- # Class for Stomp 1.9.2 callback based logging
- class EventLogger
- def on_connecting(params=nil)
- Log.info("Connection attempt %d to %s" % [params[:cur_conattempts], stomp_url(params)])
- rescue
- end
-
- def on_connected(params=nil)
- Log.info("Conncted to #{stomp_url(params)}")
- rescue
- end
-
- def on_disconnect(params=nil)
- Log.info("Disconnected from #{stomp_url(params)}")
- rescue
- end
-
- def on_connectfail(params=nil)
- Log.info("Connction to #{stomp_url(params)} failed on attempt #{params[:cur_conattempts]}")
- rescue
- end
-
- def on_miscerr(params, errstr)
- Log.debug("Unexpected error on connection #{stomp_url(params)}: #{errstr}")
- rescue
- end
-
- def stomp_url(params)
- "stomp://%s@%s:%d" % [params[:cur_login], params[:cur_host], params[:cur_port]]
- end
- end
-
- def initialize
- @config = Config.instance
- @subscriptions = []
- @msgpriority = 0
- @base64 = false
- end
-
- # Connects to the ActiveMQ middleware
- def connect(connector = ::Stomp::Connection)
- if @connection
- Log.debug("Already connection, not re-initializing connection")
- return
- end
-
- begin
- @base64 = get_bool_option("activemq.base64", false)
- @msgpriority = get_option("activemq.priority", 0).to_i
-
- pools = @config.pluginconf["activemq.pool.size"].to_i
- hosts = []
-
- 1.upto(pools) do |poolnum|
- host = {}
-
- host[:host] = get_option("activemq.pool.#{poolnum}.host")
- host[:port] = get_option("activemq.pool.#{poolnum}.port", 6163).to_i
- host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user")
- host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password")
- host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", false)
-
- Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
- hosts << host
- end
-
- raise "No hosts found for the ActiveMQ connection pool" if hosts.size == 0
-
- connection = {:hosts => hosts}
-
- # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
- # these can be guessed, the documentation isn't clear
- connection[:initial_reconnect_delay] = get_option("activemq.initial_reconnect_delay", 0.01).to_f
- connection[:max_reconnect_delay] = get_option("activemq.max_reconnect_delay", 30.0).to_f
- connection[:use_exponential_back_off] = get_bool_option("activemq.use_exponential_back_off", true)
- connection[:back_off_multiplier] = get_bool_option("activemq.back_off_multiplier", 2).to_i
- connection[:max_reconnect_attempts] = get_option("activemq.max_reconnect_attempts", 0).to_i
- connection[:randomize] = get_bool_option("activemq.randomize", false)
- connection[:backup] = get_bool_option("activemq.backup", false)
- connection[:timeout] = get_option("activemq.timeout", -1).to_i
- connection[:logger] = EventLogger.new
-
- @connection = connector.new(connection)
- rescue Exception => e
- raise("Could not connect to ActiveMQ Server: #{e}")
- end
- end
-
- # Receives a message from the ActiveMQ connection
- def receive
- Log.debug("Waiting for a message from ActiveMQ")
- msg = @connection.receive
-
- Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
- end
-
- # Sends a message to the ActiveMQ connection
- def publish(msg)
- msg.base64_encode! if @base64
-
- if msg.type == :direct_request
- target = target_for(msg)
-
- msg.discovered_hosts.each do |node|
- target[:headers] = headers_for(msg, node)
-
- Log.debug("Sending a direct message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].pretty_inspect.chomp}'")
-
- @connection.publish(target[:name], msg.payload, target[:headers])
- end
- else
- target = target_for(msg)
- target[:headers].merge!(headers_for(msg))
-
- Log.debug("Sending a broadcast message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].pretty_inspect.chomp}'")
-
- @connection.publish(target[:name], msg.payload, target[:headers])
- end
- end
-
- # Subscribe to a topic or queue
- def subscribe(agent, type, collective)
- source = make_target(agent, type, collective)
-
- unless @subscriptions.include?(source)
- Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].pretty_inspect}")
- @connection.subscribe(source[:name], source[:headers])
- @subscriptions << source
- end
- end
-
- # Subscribe to a topic or queue
- def unsubscribe(agent, type, collective)
- source = make_target(agent, type, collective)
-
- Log.debug("Unsubscribing from #{source[:name]}")
- @connection.unsubscribe(source[:name], source[:headers])
- @subscriptions.delete(source)
- end
-
- def target_for(msg)
- if msg.type == :reply
- target = {:name => msg.request.headers["reply-to"], :headers => {}}
- elsif msg.type == :request
- target = make_target(msg.agent, msg.type, msg.collective)
- elsif msg.type == :direct_request
- target = make_target(msg.agent, msg.type, msg.collective)
- else
- raise "Don't now how to create a target for message type #{msg.type}"
- end
-
- return target
- end
-
- # Disconnects from the ActiveMQ connection
- def disconnect
- Log.debug("Disconnecting from ActiveMQ")
- @connection.disconnect
- end
-
- def headers_for(msg, identity=nil)
- headers = {}
- headers = {"priority" => @msgpriority} if @msgpriority > 0
-
- if msg.type == :request
- target = make_target(msg.agent, :reply, msg.collective)
- headers["reply-to"] = target[:name]
- elsif msg.type == :direct_request
- target = make_target(msg.agent, :reply, msg.collective)
- headers["reply-to"] = target[:name]
- headers["mc_identity"] = identity
- end
-
- return headers
- end
-
- def make_target(agent, type, collective)
- raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
- raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)
-
- target = {:name => nil, :headers => {}}
-
- case type
- when :reply
- target[:name] = ["/topic/" + collective, agent, :reply].join(".")
-
- when :broadcast
- target[:name] = ["/topic/" + collective, agent, :agent].join(".")
-
- when :request
- target[:name] = ["/topic/" + collective, agent, :agent].join(".")
-
- when :direct_request
- target[:name] = ["/queue/" + collective, :nodes].join(".")
-
- when :directed
- target[:name] = ["/queue/" + collective, :nodes].join(".")
- target[:headers]["selector"] = "mc_identity = '#{@config.identity}'"
- end
-
- target
- end
-
- # looks in the environment first then in the config file
- # for a specific option, accepts an optional default.
- #
- # raises an exception when it cant find a value anywhere
- def get_env_or_option(env, opt, default=nil)
- return ENV[env] if ENV.include?(env)
- return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
- return default if default
-
- raise("No #{env} environment or plugin.#{opt} configuration option given")
- end
-
- # looks for a config option, accepts an optional default
- #
- # raises an exception when it cant find a value anywhere
- def get_option(opt, default=nil)
- return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
- return default if default
-
- raise("No plugin.#{opt} configuration option given")
- end
-
- # gets a boolean option from the config, supports y/n/true/false/1/0
- def get_bool_option(opt, default)
- return default unless @config.pluginconf.include?(opt)
-
- val = @config.pluginconf[opt]
-
- if val =~ /^1|yes|true/
- return true
- elsif val =~ /^0|no|false/
- return false
- else
- return default
- end
- end
- end
- end
-end
-
-# vi:tabstop=4:expandtab:ai
View
88 registration/mcollective-registration-receiver.rb
@@ -0,0 +1,88 @@
+#!/usr/bin/env ruby
+
+require 'mcollective'
+
+module MCollective
+ class Util::RegistrationDaemon
+ def initialize
+ oparser = Optionparser.new
+ options = oparser.parse
+
+ @config = Config.instance
+ @config.loadconfig(options[:config])
+
+ @connector = PluginManager["connector_plugin"]
+ @connector.connect
+
+ queue = Config.instance.pluginconf["registration_daemon_queue"] || "/queue/mcollective.registration"
+ Log.info("Subscribing to #{queue} for new events")
+
+ @connector.connection.subscribe(queue)
+
+ Util.loadclass("MCollective::Agent::Registration")
+
+ @agent = Agent::Registration.new
+
+ Log.info("MCollective Registration daemon started")
+ end
+
+ def daemonize
+ fork do
+ Process.setsid
+ exit if fork
+ Dir.chdir('/tmp')
+ STDIN.reopen('/dev/null')
+ STDOUT.reopen('/dev/null', 'a')
+ STDERR.reopen('/dev/null', 'a')
+
+ yield
+ end
+ end
+
+ def run
+ if Config.instance.daemonize
+ daeminize { receive_loop }
+ else
+ receive_loop
+ end
+ end
+
+ def receive_loop
+ loop do
+ begin
+ msg = @connector.connection.receive
+
+ start_time = Time.now
+
+ registration_data = JSON.load(msg.body)
+
+ raise RPCAborted, "Did not receive a FQDN fact" unless registration_data["facts"].include?("fqdn")
+
+ sender = registration_data["facts"]["fqdn"]
+ registration_msg = {:senderid => sender, :body => registration_data}
+
+ begin
+ @agent.handlemsg(registration_msg, @connector)
+ rescue Exception => e
+ raise RPCAborted, "registration raised an unexpected exception: #{e.class}: #{e}"
+ end
+
+ Log.info("Processed registration data from %s in %.2f seconds" % [sender, (Time.now - start_time).to_f])
+
+ rescue RPCAborted
+ Log.warn("Failed to handle registration data: #{e}")
+ rescue Interrupt
+ Log.info("Exiting on interrupt signal")
+ exit
+ rescue Exception => e
+ Log.error("Unexpected Exception: #{e.class}: #{e}")
+ sleep 1
+ end
+ end
+ end
+ end
+end
+
+receiver = MCollective::Util::RegistrationDaemon.new
+
+receiver.run
Please sign in to comment.
Something went wrong with that request. Please try again.