Skip to content

Commit

Permalink
Set linger on ZMQ sockets before closing, stops issues with `context.…
Browse files Browse the repository at this point in the history
…term()` hanging indefinitely. Refs #2958.

* Any unsent messages a socket which does not have `linger` set to a positive value, will make `context.term()` hang indefinitely. See zeromq/pyzmq#102 for more info.
  • Loading branch information
s0undt3ch authored and thatch45 committed Dec 20, 2012
1 parent b08cec3 commit a9c8c9c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
31 changes: 30 additions & 1 deletion salt/master.py
Expand Up @@ -323,7 +323,12 @@ def run(self):
raise exc

except KeyboardInterrupt:
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
pub_sock.setsockopt(zmq.LINGER, 2500)
pub_sock.close()
pull_sock.setsockopt(zmq.LINGER, 2500)
pull_sock.close()
finally:
context.term()
Expand Down Expand Up @@ -407,11 +412,19 @@ def run(self):
'''
self.__bind()

def __del__(self):
def destroy(self):
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
self.clients.setsockopt(zmq.LINGER, 2500)
self.clients.close()
self.workers.setsockopt(zmq.LINGER, 2500)
self.workers.close()
self.context.term()

def __del__(self):
self.destroy()


class MWorker(multiprocessing.Process):
'''
Expand Down Expand Up @@ -455,6 +468,10 @@ def __bind(self):
continue
raise exc
except KeyboardInterrupt:
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
socket.setsockopt(zmq.LINGER, 2500)
socket.close()
finally:
context.term()
Expand Down Expand Up @@ -1086,6 +1103,10 @@ def minion_publish(self, clear_load):
timeout
)
finally:
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
pub_sock.setsockopt(zmq.LINGER, 2500)
pub_sock.close()
context.term()
elif ret_form == 'full':
Expand All @@ -1101,6 +1122,10 @@ def minion_publish(self, clear_load):
try:
return ret
finally:
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
pub_sock.setsockopt(zmq.LINGER, 2500)
pub_sock.close()
context.term()

Expand Down Expand Up @@ -1712,5 +1737,9 @@ def publish(self, clear_load):
}
}
finally:
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
pub_sock.setsockopt(zmq.LINGER, 2500)
pub_sock.close()
context.term()
5 changes: 5 additions & 0 deletions salt/payload.py
Expand Up @@ -172,9 +172,14 @@ def send_auto(self, payload):
def destroy(self):
for socket in self.poller.sockets.keys():
if not socket.closed:
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
socket.setsockopt(zmq.LINGER, 2500)
socket.close()
self.poller.unregister(socket)
if not self.socket.closed:
self.socket.setsockopt(zmq.LINGER, 2500)
self.socket.close()
self.context.term()

Expand Down
15 changes: 13 additions & 2 deletions salt/utils/event.py
Expand Up @@ -150,17 +150,24 @@ def fire_event(self, data, tag=''):

def destroy(self):
if self.cpub:
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
self.sub.setsockopt(zmq.LINGER, 2500)
self.sub.close()
if self.cpush:
self.push.setsockopt(zmq.LINGER, 2500)
self.push.close()
# If socket's are not unregistered from a poller, nothing which touches
# that poller get's garbage collected. The Poller itself, it's
# registered sockets and the Context
for socket in self.poller.sockets.keys():
if not socket.closed:
# Should already be closed from above, but....
socket.setsockopt(zmq.LINGER, 2500)
socket.close()
self.poller.unregister(socket)
#self.context.term()
self.context.term()

def __del__(self):
self.destroy()
Expand Down Expand Up @@ -238,7 +245,12 @@ def run(self):
continue
raise exc
except KeyboardInterrupt:
# Wait at most 2.5 secs to send any remaining messages in the
# socket or the context.term() bellow will hang indefinitely.
# See https://github.com/zeromq/pyzmq/issues/102
self.epub_sock.setsockopt(zmq.LINGER, 2500)
self.epub_sock.close()
self.epull_sock.setsockopt(zmq.LINGER, 2500)
self.epull_sock.close()
finally:
self.context.term()
Expand Down Expand Up @@ -366,4 +378,3 @@ def wheel(self, fun, **kwargs):
kwargs['fun'] = fun
wheel = salt.wheel.Wheel(self.opts)
return wheel.master_call(**kwargs)

0 comments on commit a9c8c9c

Please sign in to comment.