Permalink
Browse files

Add registration and discovery plugins for redis

  • Loading branch information...
1 parent 27425a7 commit 55ad473ee9a6ae68638b225dc75f00c746b2eeb9 @ripienaar committed Jan 15, 2013
@@ -0,0 +1,11 @@
+metadata :name => "redis",
+ :description => "Redis based discovery to compliment the Redis connector and registration",
+ :author => "R.I.Pienaar <rip@devco.net>",
+ :license => "ASL 2.0",
+ :version => "0.1",
+ :url => "http://marionette-collective.org/",
+ :timeout => 0
+
+discovery do
+ capabilities [:classes, :facts, :identity, :agents]
+end
@@ -0,0 +1,113 @@
+module MCollective
+ class Discovery
+ class Redis
+ require 'redis'
+
+ class << self
+ def discover(filter, timeout, limit=0, client=nil)
+ config = Config.instance
+
+ host = config.pluginconf.fetch("redis.host", "localhost")
+ port = Integer(config.pluginconf.fetch("redis.port", "6379"))
+ db = Integer(config.pluginconf.fetch("redis.db", "1"))
+ max_age = Integer(config.pluginconf.fetch("redis.max_age", 1800))
+
+ redis_opts = {:host => host, :port => port, :db => db}
+
+ @redis = ::Redis.new(redis_opts)
+
+ found = [collective_hostlist(client.options[:collective], max_age)]
+
+ filter.keys.each do |key|
+ case key
+ when "fact"
+ fact_search(filter["fact"], found, max_age, client.options[:collective])
+
+ when "cf_class"
+ find_in_zlist("class", found, max_age, filter[key])
+
+ when "agent"
+ find_in_zlist("agent", found, max_age, filter[key])
+
+ when "identity"
+ identity_search(filter["identity"], found, max_age, client.options[:collective])
+ end
+ end
+
+ # filters are combined so we get the intersection of values across
+ # all matches found using fact, agent and identity filters
+ found.inject(found[0]){|x, y| x & y}
+ end
+
+ def fact_search(filter, found, max_age, collective)
+ return if filter.empty?
+
+ hosts = collective_hostlist(collective, max_age)
+ facts = {}
+
+ hosts.each do |host|
+ facts[host] = @redis.hgetall("mcollective::facts::#{host}")
+ end
+
+ matched_hosts = []
+
+ filter.each do |f|
+ fact = f[:fact]
+ value = f[:value]
+
+ hosts.each do |host|
+ matched_hosts << host if facts[host].include?(fact) && facts[host][fact].match(regexy_string(value))
+ end
+ end
+
+ found << matched_hosts
+ end
+
+ def identity_search(filter, found, max_age, collective)
+ return if filter.empty?
+
+ hosts = collective_hostlist(collective, max_age)
+
+ filter.each do |match|
+ found << hosts.grep(regexy_string(match))
+ end
+ end
+
+ def find_in_zlist(key_type, found, max_age, filter)
+ return if filter.empty?
+
+ prefix = "mcollective::%s" % key_type
+ oldest = Time.now.utc.to_i - max_age
+
+ agents = @redis.keys.grep(/^#{prefix}/).map do |key|
+ key.match(/^#{prefix}::(.+)$/)[1]
+ end
+
+ filter.each do |matcher|
+ matched = agents.grep(regexy_string(matcher))
+
+ found << [] if matched.empty?
+
+ matched.each do |agent|
+ found << @redis.zrange("#{prefix}::#{agent}", 0, oldest)
+ end
+ end
+ end
+
+ def collective_hostlist(collective, max_age)
+ oldest = Time.now.utc.to_i - max_age
+
+ @redis.zrange("mcollective::collective::#{collective}", 0, oldest)
+ end
+
+ def regexy_string(string)
+ if string.match("^/")
+ Regexp.new(string.gsub("\/", ""))
+ else
+ string
+ end
+ end
+ end
+ end
+ end
+end
@@ -14,7 +14,7 @@ module Registration
# - last seen time
#
# Keys will be set to expire (2 * registration interval) + 2
- class Redis_registration<Base
+ class Redis<Base
def body
data = {:agentlist => [],
:facts => {},
@@ -36,25 +36,15 @@ def body
commit = lambda do |redis|
begin
- redis.multi do
- prefix = "mcollective::%s::" % data[:identity]
- agents = [prefix, "agents"].join
- facts = [prefix, "facts"].join
- classes = [prefix, "classes"].join
- collectives = [prefix, "collectives"].join
- lastseen = [prefix, "lastseen"].join
- expiry = (Config.instance.registerinterval * 2) + 5
+ time = Time.now.utc.to_i
- redis.del agents, facts, classes, collectives, lastseen
- redis.rpush agents, data[:agentlist]
- redis.rpush classes, data[:classes]
- redis.rpush collectives, data[:collectives]
- redis.hmset facts, data[:facts].to_a.flatten
- redis.set lastseen, Time.now.to_i
+ redis.multi do
+ data[:collectives].each {|c| redis.zadd "mcollective::collective::#{c}", time, data[:identity]}
+ data[:agentlist].each {|a| redis.zadd "mcollective::agent::#{a}", time, data[:identity]}
+ data[:classes].each {|c| redis.zadd "mcollective::class::#{c}", time, data[:identity]}
- [prefix, agents, facts, classes, collectives, lastseen].each do |k|
- redis.expire k, expiry
- end
+ redis.del "mcollective::facts::#{data[:identity]}"
+ redis.hmset "mcollective::facts::#{data[:identity]}", data[:facts].map{|k, v| [k.to_s, v.to_s]}.flatten
end
rescue => e
Log.error("%s: %s: %s" % [e.backtrace.first, e.class, e.to_s])

0 comments on commit 55ad473

Please sign in to comment.