Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Don't handle inbound messages while sending outbound messages, as thi…

…s can cause perilous recursion.
  • Loading branch information...
commit 13ea2aaa39351d31fd978172a8eb41d1f75b71c7 1 parent 27c0ddd
@CricketLinden CricketLinden authored
Showing with 28 additions and 27 deletions.
  1. +28 −27 lib/em-zeromq/socket.rb
View
55 lib/em-zeromq/socket.rb
@@ -2,50 +2,50 @@ module EventMachine
module ZeroMQ
class Socket < EventMachine::Connection
attr_accessor :on_readable, :on_writable, :handler
- attr_reader :socket, :socket_type
+ attr_reader :socket, :socket_type
def initialize(socket, socket_type, handler)
@socket = socket
@socket_type = socket_type
@handler = handler
end
-
+
def self.map_sockopt(opt, name)
define_method(name){ getsockopt(opt) }
define_method("#{name}="){|val| @socket.setsockopt(opt, val) }
end
-
+
map_sockopt(ZMQ::HWM, :hwm)
map_sockopt(ZMQ::SWAP, :swap)
map_sockopt(ZMQ::IDENTITY, :identity)
map_sockopt(ZMQ::AFFINITY, :affinity)
map_sockopt(ZMQ::SNDBUF, :sndbuf)
map_sockopt(ZMQ::RCVBUF, :rcvbuf)
-
+
# pgm
map_sockopt(ZMQ::RATE, :rate)
map_sockopt(ZMQ::RECOVERY_IVL, :recovery_ivl)
map_sockopt(ZMQ::MCAST_LOOP, :mcast_loop)
-
+
# User method
def bind(address)
@socket.bind(address)
end
-
+
def connect(address)
@socket.connect(address)
end
-
+
def subscribe(what = '')
raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB'
@socket.setsockopt(ZMQ::SUBSCRIBE, what)
end
-
+
def unsubscribe(what)
raise "only valid on sub socket type (was #{@socket.name})" unless @socket.name == 'SUB'
@socket.setsockopt(ZMQ::UNSUBSCRIBE, what)
end
-
+
# send a non blocking message
# parts: if only one argument is given a signle part message is sent
# if more than one arguments is given a multipart message is sent
@@ -55,7 +55,7 @@ def unsubscribe(what)
def send_msg(*parts)
parts = Array(parts[0]) if parts.size == 0
sent = true
-
+
# multipart
parts[0...-1].each do |msg|
sent = @socket.send_string(msg, ZMQ::NOBLOCK | ZMQ::SNDMORE)
@@ -63,7 +63,7 @@ def send_msg(*parts)
break
end
end
-
+
if sent
# all the previous parts were queued, send
# the last one
@@ -77,12 +77,13 @@ def send_msg(*parts)
self.notify_writable = true
sent = false
end
-
- notify_readable()
-
+
+ # removed because this can cause on_readable to be called recursively to an arbitray depth
+ # notify_readable()
+
sent
end
-
+
def getsockopt(opt)
ret = []
rc = @socket.getsockopt(opt, ret)
@@ -90,18 +91,18 @@ def getsockopt(opt)
raise ZMQOperationFailed, "getsockopt: #{ZMQ::Util.error_string}"
end
- (ret.size == 1) ? ret[0] : ret
+ (ret.size == 1) ? ret[0] : ret
end
-
+
def setsockopt(opt, value)
@socket.setsockopt(opt, value)
end
-
+
# cleanup when ending loop
def unbind
detach_and_close
end
-
+
# Make this socket available for reads
def register_readable
# Since ZMQ is event triggered I think this is necessary
@@ -124,7 +125,7 @@ def notify_readable
# I'm leaving this is because its in the docs, but it could probably
# be taken out.
return unless readable?
-
+
loop do
msg_parts = []
msg = get_message
@@ -138,22 +139,22 @@ def notify_readable
raise "Multi-part message missing a message!"
end
end
-
+
@handler.on_readable(self, msg_parts)
else
break
end
end
end
-
+
def notify_writable
return unless writable?
-
+
# one a writable event is successfully received the socket
# should be accepting messages again so stop triggering
# write events
self.notify_writable = false
-
+
if @handler.respond_to?(:on_writable)
@handler.on_writable(self)
end
@@ -167,9 +168,9 @@ def writable?
# ZMQ::EVENTS has issues in ZMQ HEAD, we'll ignore this till they're fixed
# (getsockopt(ZMQ::EVENTS) & ZMQ::POLLOUT) == ZMQ::POLLOUT
end
-
+
private
-
+
# internal methods
def get_message
@@ -177,7 +178,7 @@ def get_message
msg_recvd = @socket.recv(msg, ZMQ::NOBLOCK)
msg_recvd != -1 ? msg : nil
end
-
+
# Detaches the socket from the EM loop,
# then closes the socket
def detach_and_close
Please sign in to comment.
Something went wrong with that request. Please try again.