Permalink
Browse files

simplify and support sub collective

  • Loading branch information...
1 parent f4e8e86 commit b357080878cc8b537c015c1d991154a7ca4fbc58 @ripienaar committed Jan 9, 2013
Showing with 10 additions and 22 deletions.
  1. +10 −22 connector/redis/redis.rb
View
@@ -9,8 +9,7 @@ module Connector
# starter / testing style setup which would be easier for new
# users to evaluate mcollective
#
- # It supports direct addressing but not yet collectives - those
- # that is possible
+ # It supports direct addressing and sub collectives
#
# We'd also add a registration plugin for it and a discovery
# plugin which means we can give a very solid fast first-user
@@ -39,8 +38,10 @@ def subscribe(agent, type, collective)
if PluginManager["security_plugin"].initiated_by == :client
@sources << "mcollective::reply::%s::%d" % [@config.identity, $$]
else
- @sources << "mcollective::server::direct::%s" % @config.identity
- @sources << "mcollective::server::agents"
+ @config.collectives.each do |collective|
+ @sources << "%s::server::direct::%s" % [collective, @config.identity]
+ @sources << "%s::server::agents" % collective
+ end
end
@subscribed = true
@@ -61,8 +62,8 @@ def publish(msg)
if msg.type == :direct_request
msg.discovered_hosts.each do |node|
- target[:name] = "mcollective::server::direct::%s" % node
- target[:headers] = headers_for(msg, node)
+ target[:name] = "%s::server::direct::%s" % [msg.collective, node]
+ target[:headers]["reply-to"] = msg.reply_to
Log.debug("Sending a direct message to Redis target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
@@ -73,11 +74,12 @@ def publish(msg)
else
if msg.type == :reply
target[:name] = msg.request.headers["reply-to"]
+ target[:headers]["reply-to"] = "mcollective::reply::%s::%d" % [@config.identity, $$]
elsif msg.type == :request
- target[:name] = "mcollective::server::agents"
+ target[:name] = "%s::server::agents" % msg.collective
+ target[:headers]["reply-to"] = msg.collective
end
- target[:headers].merge!(headers_for(msg))
Log.debug("Sending a broadcast message to Redis target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
@@ -87,20 +89,6 @@ def publish(msg)
end
end
- def headers_for(msg, identity=nil)
- headers = {}
-
- if [:request, :direct_request].include?(msg.type)
- if msg.reply_to
- headers["reply-to"] = msg.reply_to
- else
- headers["reply-to"] = "mcollective::reply::%s::%d" % [@config.identity, $$]
- end
- end
-
- headers
- end
-
def start_receiver_thread(sources)
@receiver_thread = Thread.new do
@receiver_redis.subscribe(@sources) do |on|

0 comments on commit b357080

Please sign in to comment.