Permalink
Browse files

More capabilities for pub/sub sockets

  • Loading branch information...
1 parent 5bd24cd commit 12ab7620f81def21afa9af6f200218eeff539880 @andrewvc andrewvc committed Sep 6, 2010
Showing with 41 additions and 18 deletions.
  1. +5 −7 dripdrop.gemspec
  2. +4 −2 lib/dripdrop/agent.rb
  3. +27 −5 lib/dripdrop/handlers/zeromq.rb
  4. +5 −4 lib/dripdrop/node.rb
View
12 dripdrop.gemspec
@@ -9,7 +9,7 @@ Gem::Specification.new do |s|
s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Andrew Cholakian"]
- s.date = %q{2010-08-25}
+ s.date = %q{2010-09-05}
s.description = %q{0MQ App stats}
s.email = %q{andrew@andrewvc.com}
s.extra_rdoc_files = [
@@ -26,14 +26,12 @@ Gem::Specification.new do |s|
"doc_img/topology.png",
"dripdrop.gemspec",
"example/agent_test.rb",
- "example/forwarder.cfg",
- "example/node.rb",
- "example/rack-stats/config.ru",
- "example/rack-stats/console-viewer.rb",
- "example/rack-stats/core.rb",
+ "example/pubsub.rb",
+ "example/pushpull.rb",
"lib/dripdrop.rb",
"lib/dripdrop/agent.rb",
- "lib/dripdrop/handlers.rb",
+ "lib/dripdrop/handlers/websockets.rb",
+ "lib/dripdrop/handlers/zeromq.rb",
"lib/dripdrop/message.rb",
"lib/dripdrop/node.rb"
]
View
6 lib/dripdrop/agent.rb
@@ -16,18 +16,20 @@ class Agent
attr_reader :address, :context, :socket
#address should be a string like tcp://127.0.0.1
- def initialize(address)
+ def initialize(sock_type,address)
@context = ZMQ::Context.new(1)
- @socket = @context.socket(ZMQ::PUB)
+ @socket = @context.socket(sock_type)
@socket.connect(address)
end
#Sends a DripDrop::Message to the socket
def send_message(name,body,head={})
message = DripDrop::Message.new(name,:body => body, :head => head).encoded
if ZMQGEM == :rbzmq
+ @socket.send name, ZMQ::SNDMORE
@socket.send message
else
+ @socket.send_string name, ZMQ::SNDMORE
@socket.send_string message
end
end
View
32 lib/dripdrop/handlers/zeromq.rb
@@ -2,6 +2,8 @@
class DripDrop
class ZMQSubHandler
+ attr_reader :address, :socket_ctype
+
def initialize(address,zm_reactor,opts={},&block)
@address = address
@socket_ctype = opts[:socket_ctype] # :bind or :connect
@@ -23,6 +25,10 @@ def on_readable(socket, messages)
if @msg_format == :raw
@recv_cbak.call(messages)
else
+ unless messages.length == 2
+ puts "Expected pub/sub message to come in two parts"
+ return false
+ end
topic = messages.shift.copy_out_string
body = messages.shift.copy_out_string
msg = @recv_cbak.call(DripDrop::Message.decode(body))
@@ -37,6 +43,8 @@ def on_recv(msg_format=:dripdrop,&block)
end
class ZMQPubHandler
+ attr_reader :address, :socket_ctype
+
def initialize(address,zm_reactor,opts={})
@address = address
@socket_ctype = opts[:socket_ctype]
@@ -60,9 +68,17 @@ def on_attach(socket)
#Send any messages buffered in @send_queue
def on_writable(socket)
unless @send_queue.empty?
- topic, message = @send_queue.shift
- socket.send_message_string(topic, ZMQ::SNDMORE)
- socket.send_message_string(message)
+ message = @send_queue.shift
+
+ num_parts = message.length
+ message.each_with_index do |part,i|
+ multipart = i + 1 < num_parts ? true : false
+ if part.class == ZMQ::Message
+ socket.send_message(part, multipart)
+ else
+ socket.send_message_string(part, multipart)
+ end
+ end
else
@zm_reactor.deregister_writable(socket)
end
@@ -72,14 +88,18 @@ def on_writable(socket)
def send_message(message)
if message.is_a?(DripDrop::Message)
@send_queue.push([message.name, message.encoded])
- @zm_reactor.register_writable(@socket)
- else
+ elsif message.is_a?(Array)
@send_queue.push(message)
+ else
+ @send_queue.push([message])
end
+ @zm_reactor.register_writable(@socket)
end
end
class ZMQPullHandler
+ attr_reader :address, :socket_ctype
+
def initialize(address,zm_reactor,opts={},&block)
@address = address
@socket_ctype = opts[:socket_ctype] || :bind
@@ -114,6 +134,8 @@ def on_recv(msg_format=:dripdrop,&block)
class ZMQPushHandler
+ attr_reader :address, :socket_ctype
+
def initialize(address,zm_reactor,opts={})
@address = address
@socket_ctype = opts[:socket_ctype] || :connect
View
9 lib/dripdrop/node.rb
@@ -24,7 +24,7 @@ def initialize(opts={},&block)
ZM::Reactor.new(:my_reactor).run do |zm_reactor|
@zm_reactor = zm_reactor
block.call(self)
- end.join
+ end
end
end
@@ -61,9 +61,10 @@ def zmq_push(address,opts={})
end
def websocket(address,opts={},&block)
- uri = URI.parse(address)
- h_opts = handler_opts_given(opts)
- DripDrop::WebSocketHandler.new(uri,h_opts)
+ uri = URI.parse(address)
+ h_opts = handler_opts_given(opts)
+ handler = DripDrop::WebSocketHandler.new(uri,h_opts)
+ handler
end
def send_internal(dest,data)

0 comments on commit 12ab762

Please sign in to comment.