Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZeroMQ no longer required when transport is TCP #29294

Merged
merged 1 commit into from Dec 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 27 additions & 4 deletions salt/client/__init__.py
Expand Up @@ -75,12 +75,19 @@
def get_local_client(
c_path=os.path.join(syspaths.CONFIG_DIR, 'master'),
mopts=None,
skip_perm_errors=False):
skip_perm_errors=False,
io_loop=None):
'''
.. versionadded:: 2014.7.0

Read in the config and return the correct LocalClient object based on
the configured transport

:param IOLoop io_loop: io_loop used for events.
Pass in an io_loop if you want asynchronous
operation for obtaining events. Eg use of
set_event_handler() API. Otherwise, operation
will be synchronous.
'''
if mopts:
opts = mopts
Expand All @@ -93,7 +100,10 @@ def get_local_client(
return salt.client.raet.LocalClient(mopts=opts)
# TODO: AIO core is separate from transport
elif opts['transport'] in ('zeromq', 'tcp'):
return LocalClient(mopts=opts, skip_perm_errors=skip_perm_errors)
return LocalClient(
mopts=opts,
skip_perm_errors=skip_perm_errors,
io_loop=io_loop)


class LocalClient(object):
Expand All @@ -118,7 +128,15 @@ class LocalClient(object):
'''
def __init__(self,
c_path=os.path.join(syspaths.CONFIG_DIR, 'master'),
mopts=None, skip_perm_errors=False):
mopts=None, skip_perm_errors=False,
io_loop=None):
'''
:param IOLoop io_loop: io_loop used for events.
Pass in an io_loop if you want asynchronous
operation for obtaining events. Eg use of
set_event_handler() API. Otherwise,
operation will be synchronous.
'''
if mopts:
self.opts = mopts
else:
Expand All @@ -139,7 +157,8 @@ def __init__(self,
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=False)
listen=False,
io_loop=io_loop)
self.utils = salt.loader.utils(self.opts)
self.functions = salt.loader.minion_mods(self.opts, utils=self.utils)
self.returners = salt.loader.returners(self.opts, self.functions)
Expand Down Expand Up @@ -1439,6 +1458,10 @@ def pub(self,
master_uri=master_uri)

try:
# Ensure that the event subscriber is connected.
# If not, we won't get a response, so error out
if not self.event.connect_pub(timeout=timeout):
raise SaltReqTimeoutError()
payload = channel.send(payload_kwargs, timeout=timeout)
except SaltReqTimeoutError:
raise SaltReqTimeoutError(
Expand Down
62 changes: 37 additions & 25 deletions salt/master.py
Expand Up @@ -18,17 +18,25 @@
import traceback

# Import third party libs
import zmq
from Crypto.PublicKey import RSA
# pylint: disable=import-error,no-name-in-module,redefined-builtin
import salt.ext.six as six
from salt.ext.six.moves import range
# pylint: enable=import-error,no-name-in-module,redefined-builtin

import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
try:
import zmq
import zmq.eventloop.ioloop
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
LOOP_CLASS = zmq.eventloop.ioloop.ZMQIOLoop
HAS_ZMQ = True
except ImportError:
import tornado.ioloop
LOOP_CLASS = tornado.ioloop.IOLoop
HAS_ZMQ = False

import tornado.gen # pylint: disable=F0401

# Import salt libs
Expand Down Expand Up @@ -311,22 +319,23 @@ def __init__(self, opts):

:param dict: The salt options
'''
# Warn if ZMQ < 3.2
try:
zmq_version_info = zmq.zmq_version_info()
except AttributeError:
# PyZMQ <= 2.1.9 does not have zmq_version_info, fall back to
# using zmq.zmq_version() and build a version info tuple.
zmq_version_info = tuple(
[int(x) for x in zmq.zmq_version().split('.')]
)
if zmq_version_info < (3, 2):
log.warning(
'You have a version of ZMQ less than ZMQ 3.2! There are '
'known connection keep-alive issues with ZMQ < 3.2 which '
'may result in loss of contact with minions. Please '
'upgrade your ZMQ!'
)
if HAS_ZMQ:
# Warn if ZMQ < 3.2
try:
zmq_version_info = zmq.zmq_version_info()
except AttributeError:
# PyZMQ <= 2.1.9 does not have zmq_version_info, fall back to
# using zmq.zmq_version() and build a version info tuple.
zmq_version_info = tuple(
[int(x) for x in zmq.zmq_version().split('.')]
)
if zmq_version_info < (3, 2):
log.warning(
'You have a version of ZMQ less than ZMQ 3.2! There are '
'known connection keep-alive issues with ZMQ < 3.2 which '
'may result in loss of contact with minions. Please '
'upgrade your ZMQ!'
)
SMaster.__init__(self, opts)

def __set_max_open_files(self):
Expand Down Expand Up @@ -663,10 +672,12 @@ def run(self):

def destroy(self):
if hasattr(self, 'clients') and self.clients.closed is False:
self.clients.setsockopt(zmq.LINGER, 1)
if HAS_ZMQ:
self.clients.setsockopt(zmq.LINGER, 1)
self.clients.close()
if hasattr(self, 'workers') and self.workers.closed is False:
self.workers.setsockopt(zmq.LINGER, 1)
if HAS_ZMQ:
self.workers.setsockopt(zmq.LINGER, 1)
self.workers.close()
if hasattr(self, 'context') and self.context.closed is False:
self.context.term()
Expand Down Expand Up @@ -737,8 +748,9 @@ def __bind(self):
Bind to the local port
'''
# using ZMQIOLoop since we *might* need zmq in there
zmq.eventloop.ioloop.install()
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
if HAS_ZMQ:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS()
for req_channel in self.req_channels:
req_channel.post_fork(self._handle_payload, io_loop=self.io_loop) # TODO: cleaner? Maybe lazily?
self.io_loop.start()
Expand Down
33 changes: 18 additions & 15 deletions salt/minion.py
Expand Up @@ -38,9 +38,11 @@
# support pyzmq 13.0.x, TODO: remove once we force people to 14.0.x
if not hasattr(zmq.eventloop.ioloop, 'ZMQIOLoop'):
zmq.eventloop.ioloop.ZMQIOLoop = zmq.eventloop.ioloop.IOLoop
LOOP_CLASS = zmq.eventloop.ioloop.ZMQIOLoop
HAS_ZMQ = True
except ImportError:
# Running in local, zmq not needed
import tornado.ioloop
LOOP_CLASS = tornado.ioloop.IOLoop
HAS_ZMQ = False

HAS_RANGE = False
Expand Down Expand Up @@ -619,8 +621,9 @@ def __init__(self, opts):
self.auth_wait = self.opts['acceptance_wait_time']
self.max_auth_wait = self.opts['acceptance_wait_time_max']

zmq.eventloop.ioloop.install()
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
if HAS_ZMQ:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS()

def _spawn_minions(self):
'''
Expand Down Expand Up @@ -699,8 +702,9 @@ def __init__(self, opts, timeout=60, safe=True, loaded_base_name=None, io_loop=N
self.loaded_base_name = loaded_base_name

if io_loop is None:
zmq.eventloop.ioloop.install()
self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()
if HAS_ZMQ:
zmq.eventloop.ioloop.install()
self.io_loop = LOOP_CLASS()
else:
self.io_loop = io_loop

Expand Down Expand Up @@ -908,7 +912,7 @@ def _process_beacons(self):
salt.utils.event.TAGEND,
serialized_data,
)
self.event_publisher.handle_publish([event])
self.event_publisher.handle_publish(event, None)

def _load_modules(self, force_refresh=False, notify=False, proxy=None):
'''
Expand Down Expand Up @@ -1956,7 +1960,8 @@ def tune_in(self, start=True):
self.sync_connect_master()

# Instantiate the local client
self.local = salt.client.get_local_client(self.opts['_minion_conf_file'])
self.local = salt.client.get_local_client(
self.opts['_minion_conf_file'], io_loop=self.io_loop)
self.local.event.subscribe('')
self.local.opts['interface'] = self._syndic_interface

Expand All @@ -1965,8 +1970,7 @@ def tune_in(self, start=True):

# register the event sub to the poller
self._reset_event_aggregation()
self.local_event_stream = zmq.eventloop.zmqstream.ZMQStream(self.local.event.sub, io_loop=self.io_loop)
self.local_event_stream.on_recv(self._process_event)
self.local.event.set_event_handler(self._process_event)

# forward events every syndic_event_forward_timeout
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,
Expand All @@ -1991,7 +1995,8 @@ def tune_in_no_block(self):
the tune_in sequence
'''
# Instantiate the local client
self.local = salt.client.get_local_client(self.opts['_minion_conf_file'])
self.local = salt.client.get_local_client(
self.opts['_minion_conf_file'], io_loop=self.io_loop)

# add handler to subscriber
self.pub_channel.on_recv(self._process_cmd_socket)
Expand All @@ -2007,7 +2012,6 @@ def _reset_event_aggregation(self):

def _process_event(self, raw):
# TODO: cleanup: Move down into event class
raw = raw[0]
mtag, data = self.local.event.unpack(raw, self.local.event.serial)
event = {'data': data, 'tag': mtag}
log.trace('Got event {0}'.format(event['tag'])) # pylint: disable=no-member
Expand Down Expand Up @@ -2218,15 +2222,15 @@ def tune_in(self):
'''
self._spawn_syndics()
# Instantiate the local client
self.local = salt.client.get_local_client(self.opts['_minion_conf_file'])
self.local = salt.client.get_local_client(
self.opts['_minion_conf_file'], io_loop=self.io_loop)
self.local.event.subscribe('')

log.debug('MultiSyndic \'{0}\' trying to tune in'.format(self.opts['id']))

# register the event sub to the poller
self._reset_event_aggregation()
self.local_event_stream = zmq.eventloop.zmqstream.ZMQStream(self.local.event.sub, io_loop=self.io_loop)
self.local_event_stream.on_recv(self._process_event)
self.local.event.set_event_handler(self._process_event)

# forward events every syndic_event_forward_timeout
self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events,
Expand All @@ -2241,7 +2245,6 @@ def tune_in(self):

def _process_event(self, raw):
# TODO: cleanup: Move down into event class
raw = raw[0]
mtag, data = self.local.event.unpack(raw, self.local.event.serial)
event = {'data': data, 'tag': mtag}
log.trace('Got event {0}'.format(event['tag'])) # pylint: disable=no-member
Expand Down
11 changes: 4 additions & 7 deletions salt/netapi/rest_tornado/saltnado.py
Expand Up @@ -204,7 +204,7 @@
import tornado.web
import tornado.gen
from tornado.concurrent import Future
from zmq.eventloop import ioloop, zmqstream
from zmq.eventloop import ioloop
import salt.ext.six as six
# pylint: enable=import-error

Expand Down Expand Up @@ -298,6 +298,7 @@ def __init__(self, mod_opts, opts):
opts['transport'],
opts=opts,
listen=True,
io_loop=tornado.ioloop.IOLoop.current()
)

# tag -> list of futures
Expand All @@ -309,11 +310,7 @@ def __init__(self, mod_opts, opts):
# map of future -> timeout_callback
self.timeout_map = {}

self.stream = zmqstream.ZMQStream(
self.event.sub,
io_loop=tornado.ioloop.IOLoop.current(),
)
self.stream.on_recv(self._handle_event_socket_recv)
self.event.set_event_handler(self._handle_event_socket_recv)

def clean_timeout_futures(self, request):
'''
Expand Down Expand Up @@ -378,7 +375,7 @@ def _handle_event_socket_recv(self, raw):
'''
Callback for events on the event sub socket
'''
mtag, data = self.event.unpack(raw[0], self.event.serial)
mtag, data = self.event.unpack(raw, self.event.serial)
# see if we have any futures that need this info:
for tag_prefix, futures in six.iteritems(self.tag_map):
if mtag.startswith(tag_prefix):
Expand Down