From a9c8c9cd334537d861729adebafd437f71f9f6ed Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Thu, 20 Dec 2012 22:37:21 +0000 Subject: [PATCH] Set linger on ZMQ sockets before closing, stops issues with `context.term()` hanging indefinitely. Refs #2958. * Any unsent messages a socket which does not have `linger` set to a positive value, will make `context.term()` hang indefinitely. See https://github.com/zeromq/pyzmq/issues/102 for more info. --- salt/master.py | 31 ++++++++++++++++++++++++++++++- salt/payload.py | 5 +++++ salt/utils/event.py | 15 +++++++++++++-- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/salt/master.py b/salt/master.py index bd9a1f99a5e5..146a88dea6dc 100644 --- a/salt/master.py +++ b/salt/master.py @@ -323,7 +323,12 @@ def run(self): raise exc except KeyboardInterrupt: + # Wait at most 2.5 secs to send any remaining messages in the + # socket or the context.term() bellow will hang indefinitely. + # See https://github.com/zeromq/pyzmq/issues/102 + pub_sock.setsockopt(zmq.LINGER, 2500) pub_sock.close() + pull_sock.setsockopt(zmq.LINGER, 2500) pull_sock.close() finally: context.term() @@ -407,11 +412,19 @@ def run(self): ''' self.__bind() - def __del__(self): + def destroy(self): + # Wait at most 2.5 secs to send any remaining messages in the + # socket or the context.term() bellow will hang indefinitely. + # See https://github.com/zeromq/pyzmq/issues/102 + self.clients.setsockopt(zmq.LINGER, 2500) self.clients.close() + self.workers.setsockopt(zmq.LINGER, 2500) self.workers.close() self.context.term() + def __del__(self): + self.destroy() + class MWorker(multiprocessing.Process): ''' @@ -455,6 +468,10 @@ def __bind(self): continue raise exc except KeyboardInterrupt: + # Wait at most 2.5 secs to send any remaining messages in the + # socket or the context.term() bellow will hang indefinitely. + # See https://github.com/zeromq/pyzmq/issues/102 + socket.setsockopt(zmq.LINGER, 2500) socket.close() finally: context.term() @@ -1086,6 +1103,10 @@ def minion_publish(self, clear_load): timeout ) finally: + # Wait at most 2.5 secs to send any remaining messages in the + # socket or the context.term() bellow will hang indefinitely. + # See https://github.com/zeromq/pyzmq/issues/102 + pub_sock.setsockopt(zmq.LINGER, 2500) pub_sock.close() context.term() elif ret_form == 'full': @@ -1101,6 +1122,10 @@ def minion_publish(self, clear_load): try: return ret finally: + # Wait at most 2.5 secs to send any remaining messages in the + # socket or the context.term() bellow will hang indefinitely. + # See https://github.com/zeromq/pyzmq/issues/102 + pub_sock.setsockopt(zmq.LINGER, 2500) pub_sock.close() context.term() @@ -1712,5 +1737,9 @@ def publish(self, clear_load): } } finally: + # Wait at most 2.5 secs to send any remaining messages in the + # socket or the context.term() bellow will hang indefinitely. + # See https://github.com/zeromq/pyzmq/issues/102 + pub_sock.setsockopt(zmq.LINGER, 2500) pub_sock.close() context.term() diff --git a/salt/payload.py b/salt/payload.py index 355b8a3f622a..b01c86b3a027 100644 --- a/salt/payload.py +++ b/salt/payload.py @@ -172,9 +172,14 @@ def send_auto(self, payload): def destroy(self): for socket in self.poller.sockets.keys(): if not socket.closed: + # Wait at most 2.5 secs to send any remaining messages in the + # socket or the context.term() bellow will hang indefinitely. + # See https://github.com/zeromq/pyzmq/issues/102 + socket.setsockopt(zmq.LINGER, 2500) socket.close() self.poller.unregister(socket) if not self.socket.closed: + self.socket.setsockopt(zmq.LINGER, 2500) self.socket.close() self.context.term() diff --git a/salt/utils/event.py b/salt/utils/event.py index a410771bcc5f..e1668eb685cb 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -150,17 +150,24 @@ def fire_event(self, data, tag=''): def destroy(self): if self.cpub: + # Wait at most 2.5 secs to send any remaining messages in the + # socket or the context.term() bellow will hang indefinitely. + # See https://github.com/zeromq/pyzmq/issues/102 + self.sub.setsockopt(zmq.LINGER, 2500) self.sub.close() if self.cpush: + self.push.setsockopt(zmq.LINGER, 2500) self.push.close() # If socket's are not unregistered from a poller, nothing which touches # that poller get's garbage collected. The Poller itself, it's # registered sockets and the Context for socket in self.poller.sockets.keys(): if not socket.closed: + # Should already be closed from above, but.... + socket.setsockopt(zmq.LINGER, 2500) socket.close() self.poller.unregister(socket) - #self.context.term() + self.context.term() def __del__(self): self.destroy() @@ -238,7 +245,12 @@ def run(self): continue raise exc except KeyboardInterrupt: + # Wait at most 2.5 secs to send any remaining messages in the + # socket or the context.term() bellow will hang indefinitely. + # See https://github.com/zeromq/pyzmq/issues/102 + self.epub_sock.setsockopt(zmq.LINGER, 2500) self.epub_sock.close() + self.epull_sock.setsockopt(zmq.LINGER, 2500) self.epull_sock.close() finally: self.context.term() @@ -366,4 +378,3 @@ def wheel(self, fun, **kwargs): kwargs['fun'] = fun wheel = salt.wheel.Wheel(self.opts) return wheel.master_call(**kwargs) -