Browse files

Merge branch 'master' of git://github.com/ezmobius/nanite

* 'master' of git://github.com/ezmobius/nanite:
  changing some queue names in the mapper to be more readble in the output of
  Fixup mapper to have non nanite identity, use fanout exchange for responses
  adding permissions stuff to rabbitconf.rb, the permissions only works on the
  very minor changes in queue names were all that is required to use new rabbitmq ACL's
  • Loading branch information...
2 parents 7a2bed1 + 1b2a612 commit 9d7aaa4408aa77561941a3adf66d31915d639ed8 @raggi committed Feb 2, 2009
Showing with 34 additions and 10 deletions.
  1. +4 −0 examples/rabbitconf.rb
  2. +20 −0 lib/nanite.rb
  3. +3 −3 lib/nanite/actor.rb
  4. +3 −3 lib/nanite/agent.rb
  5. +1 −1 lib/nanite/dispatcher.rb
  6. +3 −3 lib/nanite/mapper.rb
View
4 examples/rabbitconf.rb
@@ -7,7 +7,11 @@
puts `rabbitmqctl map_user_vhost #{agent} /nanite`
end
+puts `scripts/rabbitmqctl set_permissions -p /nanite mapper '.*' ".*"`
+puts `scripts/rabbitmqctl set_permissions -p /nanite nanite '^nanite.*' ".*"`
+
puts `rabbitmqctl list_vhosts`
puts `rabbitmqctl list_users`
+puts `rabbitmqctl list_permissions`
puts `rabbitmqctl list_vhost_users /nanite`
View
20 lib/nanite.rb
@@ -17,6 +17,26 @@
require 'nanite/console'
require 'nanite/agent'
+
+# monkey patch to the amqp gem that adds :no_declare => true option for new
+# Exchange objects. This allows us to send messeages to exchanges that are
+# declared by the mappers and that we have no configuration priviledges on.
+# temporary uyntil we get this into amqp proper
+MQ::Exchange.class_eval do
+ def initialize mq, type, name, opts = {}
+ @mq = mq
+ @type, @name = type, name
+ @mq.exchanges[@name = name] ||= self
+ @key = opts[:key]
+
+ @mq.callback{
+ @mq.send AMQP::Protocol::Exchange::Declare.new({ :exchange => name,
+ :type => type,
+ :nowait => true }.merge(opts))
+ } unless name == "amq.#{type}" or name == '' or opts[:no_declare]
+ end
+end
+
module Nanite
VERSION = '0.2.0' unless defined?(Nanite::VERSION)
View
6 lib/nanite/actor.rb
@@ -19,6 +19,6 @@ def self.provides_for(prefix)
sets << "/#{prefix}/#{meth}".squeeze('/')
end
sets
- end # def
- end # class
-end # module
+ end
+ end
+end
View
6 lib/nanite/agent.rb
@@ -14,15 +14,15 @@ class Agent
# 'fanout'.
def send_ping
ping = Ping.new(identity, status_proc.call, identity)
- amq.fanout('heartbeat').publish(dump_packet(ping))
+ amq.fanout('heartbeat', :no_declare => true).publish(dump_packet(ping))
end
# Sends a services advertisement message to the 'registration' exchange of type
# 'fanout'.
def advertise_services
log.debug "advertise_services: #{dispatcher.all_services.inspect}"
reg = Register.new(identity, dispatcher.all_services, status_proc.call)
- amq.fanout('registration').publish(dump_packet(reg))
+ amq.fanout('registration', :no_declare => true).publish(dump_packet(reg))
end
# Starts interactive Nanite shell.
@@ -91,7 +91,7 @@ def initialize(options = {})
@log = opts[:log]
@log_dir = opts[:log_dir]
@format = opts[:format] || :marshal
- @identity = opts[:identity] || Nanite.gensym
+ @identity = "#{opts[:mapper] ? 'mapper' : 'nanite'}-#{opts[:identity] || Nanite.gensym}"
@host = opts[:host] || '0.0.0.0'
@vhost = opts[:vhost]
@file_root = opts[:file_root] || "#{root}/files"
View
2 lib/nanite/dispatcher.rb
@@ -65,7 +65,7 @@ def handle(packet)
when Request
agent.log.debug "handling Request: #{packet}"
result = dispatch_request(packet)
- agent.amq.queue(packet.reply_to).publish(agent.dump_packet(result)) if packet.reply_to
+ agent.amq.fanout(packet.reply_to, :no_declare => true).publish(agent.dump_packet(result)) if packet.reply_to
when Result
agent.log.debug "handling Result: #{packet}"
agent.reducer.handle_result(packet)
View
6 lib/nanite/mapper.rb
@@ -191,15 +191,15 @@ def push(type, payload="", opts = {:selector => :least_loaded, :timeout => 60})
def setup_queues
agent.log.debug "setting up queues"
- amq.queue("heartbeat#{@identity}", :exclusive => true).bind(amq.fanout('heartbeat')).subscribe{ |ping|
+ amq.queue("heartbeat-#{agent.identity}", :exclusive => true).bind(amq.fanout('heartbeat')).subscribe{ |ping|
agent.log.debug "Got heartbeat"
handle_ping(agent.load_packet(ping))
}
- amq.queue("mapper#{@identity}", :exclusive => true).bind(amq.fanout('registration')).subscribe{ |msg|
+ amq.queue("registration-#{agent.identity}", :exclusive => true).bind(amq.fanout('registration')).subscribe{ |msg|
agent.log.debug "Got registration"
register(agent.load_packet(msg))
}
- amq.queue(agent.identity, :exclusive => true).subscribe{ |msg|
+ amq.queue(agent.identity, :exclusive => true).bind(amq.fanout(agent.identity)).subscribe{ |msg|
msg = agent.load_packet(msg)
agent.log.debug "Got a message: #{msg.inspect}"
agent.reducer.handle_result(msg)

0 comments on commit 9d7aaa4

Please sign in to comment.