Browse files

Allow the registration connector to do normal publishes but also receive

a lambda that will be called with the DB handle

This is then used by the registration agent to build a registration
database by directly writing to the Redis instance without going through
some intermediate agent
  • Loading branch information...
1 parent ca29a44 commit 27425a7f580b3bc36018bf5610581f2c455f9cdf @ripienaar committed Jan 15, 2013
Showing with 88 additions and 4 deletions.
  1. +19 −4 connector/redis/redis.rb
  2. +69 −0 connector/redis/redis_registration.rb
View
23 connector/redis/redis.rb
@@ -57,6 +57,8 @@ def pop
end
end
+ attr_reader :receiver_queue, :sender_queue
+
def initialize
@config = Config.instance
@sources = []
@@ -69,6 +71,8 @@ def initialize
def connect
redis_opts = {:host => @host, :port => @port, :db => @db}
+ Log.debug("Connecting to redis: %s" % redis_opts.inspect)
+
@receiver_redis = ::Redis.new(redis_opts)
@receiver_queue = ThreadsafeQueue.new
@receiver_thread = nil
@@ -118,7 +122,8 @@ def publish(msg)
@sender_queue << {:channel => target[:name],
:body => msg.payload,
- :headers => target[:headers]}
+ :headers => target[:headers],
+ :command => :publish}
end
else
if msg.type == :reply
@@ -134,7 +139,8 @@ def publish(msg)
@sender_queue << {:channel => target[:name],
:body => msg.payload,
- :headers => target[:headers]}
+ :headers => target[:headers],
+ :command => :publish}
end
end
@@ -168,11 +174,20 @@ def start_receiver_thread(sources)
def start_sender_thread
@sender_thread = Thread.new do
+ Log.debug("Starting sender thread")
+
loop do
begin
msg = @sender_queue.pop
- encoded = {:body => msg[:body], :headers => msg[:headers]}.to_yaml
- @sender_redis.publish(msg[:channel], encoded)
+
+ case msg[:command]
+ when :publish
+ encoded = {:body => msg[:body], :headers => msg[:headers]}.to_yaml
+ @sender_redis.publish(msg[:channel], encoded)
+
+ when :proc
+ msg[:proc].call(@sender_redis)
+ end
rescue Exception => e
Log.warn("Could not publish message to redis: %s: %s" % [e.class, e.to_s])
sleep 0.2
View
69 connector/redis/redis_registration.rb
@@ -0,0 +1,69 @@
+module MCollective
+ module Registration
+ # A registration plugin that sends in all the metadata we have for a node
+ # to redis, this will only work with the Redis connector and no other
+ # connector
+ #
+ # Metadata being sent:
+ #
+ # - all facts
+ # - all agents
+ # - all classes (if applicable)
+ # - the configured identity
+ # - the list of collectives the nodes belong to
+ # - last seen time
+ #
+ # Keys will be set to expire (2 * registration interval) + 2
+ class Redis_registration<Base
+ def body
+ data = {:agentlist => [],
+ :facts => {},
+ :classes => [],
+ :collectives => []}
+
+ identity = Config.instance.identity
+
+ cfile = Config.instance.classesfile
+
+ if File.exist?(cfile)
+ data[:classes] = File.readlines(cfile).map {|i| i.chomp}
+ end
+
+ data[:identity] = Config.instance.identity
+ data[:agentlist] = Agents.agentlist
+ data[:facts] = PluginManager["facts_plugin"].get_facts
+ data[:collectives] = Config.instance.collectives.sort
+
+ commit = lambda do |redis|
+ begin
+ redis.multi do
+ prefix = "mcollective::%s::" % data[:identity]
+ agents = [prefix, "agents"].join
+ facts = [prefix, "facts"].join
+ classes = [prefix, "classes"].join
+ collectives = [prefix, "collectives"].join
+ lastseen = [prefix, "lastseen"].join
+ expiry = (Config.instance.registerinterval * 2) + 5
+
+ redis.del agents, facts, classes, collectives, lastseen
+ redis.rpush agents, data[:agentlist]
+ redis.rpush classes, data[:classes]
+ redis.rpush collectives, data[:collectives]
+ redis.hmset facts, data[:facts].to_a.flatten
+ redis.set lastseen, Time.now.to_i
+
+ [prefix, agents, facts, classes, collectives, lastseen].each do |k|
+ redis.expire k, expiry
+ end
+ end
+ rescue => e
+ Log.error("%s: %s: %s" % [e.backtrace.first, e.class, e.to_s])
+ end
+ end
+
+ PluginManager["connector_plugin"].sender_queue << {:command => :proc, :proc => commit}
+ nil
+ end
+ end
+ end
+end

0 comments on commit 27425a7

Please sign in to comment.