From b6baf2a32c1d4cb9efd904017797a8843af7c8ba Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Wed, 9 Jul 2014 09:45:08 -0700 Subject: [PATCH] Please DO NOT MERGE IMMEDIATELY Salt targeting is done using the zmq pub/sub mechanism. No filtering is done so all requests go to all minions. This is an attempt to make the requests more target-able. ZeroMQ supports filters on these sockets which are processed publisher side. So the thought is to set it to the minion id and a shared "broadcast" one. Only requests of tgt_type == list will use this. The thought is if we like this we can use other data (mine etc) to determine a more specific target on the master (such as a list) from a glob match etc. This is more or less backwards compatible, during my testing it still works but the old minion will throw an exception like: [CRITICAL] An exception occurred while polling the minion Traceback (most recent call last): File "/home/thjackso/src/salt/salt/minion.py", line 1324, in tune_in payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK)) File "/home/thjackso/src/salt/salt/payload.py", line 95, in loads return msgpack.loads(msg, use_list=True) File "_unpacker.pyx", line 114, in msgpack._unpacker.unpackb (msgpack/_unpacker.cpp:114) ExtraData: unpack(b) recieved extra data. So if we decide this is a good idea we'll probably want to wrap the new master side logic in a config option, and just merge in the minion side changes for the first release. This is another attempt of #10374. --- salt/config.py | 3 +++ salt/exceptions.py | 6 ++++++ salt/master.py | 27 +++++++++++++++++++++++++-- salt/minion.py | 42 ++++++++++++++++++++++++++++++++++++++---- 4 files changed, 72 insertions(+), 6 deletions(-) diff --git a/salt/config.py b/salt/config.py index f9f348fe463d..417c750708dd 100644 --- a/salt/config.py +++ b/salt/config.py @@ -249,6 +249,7 @@ 'max_minions': int, 'username': str, 'password': str, + 'zmq_filtering': bool, } # default configurations @@ -373,6 +374,7 @@ 'ping_interval': 0, 'username': None, 'password': None, + 'zmq_filtering': True, } DEFAULT_MASTER_OPTS = { @@ -529,6 +531,7 @@ 'master_sign_pubkey': False, 'master_pubkey_signature': 'master_pubkey_signature', 'master_use_pubkey_signature': False, + 'zmq_filtering': True, } # ----- Salt Cloud Configuration Defaults -----------------------------------> diff --git a/salt/exceptions.py b/salt/exceptions.py index dafee79201d8..f93330478615 100644 --- a/salt/exceptions.py +++ b/salt/exceptions.py @@ -25,6 +25,12 @@ class SaltMasterError(SaltException): ''' +class SaltSyndicMasterError(SaltException): + ''' + Problem while proxying a request in the syndication master + ''' + + class MasterExit(SystemExit): ''' Rise when the master exits diff --git a/salt/master.py b/salt/master.py index 95aa31e1ad64..31b4edd43766 100644 --- a/salt/master.py +++ b/salt/master.py @@ -414,7 +414,24 @@ def run(self): # SIGUSR1 gracefully so we don't choke and die horribly try: package = pull_sock.recv() - pub_sock.send(package) + unpacked_package = salt.payload.unpackage(package) + payload = unpacked_package['payload'] + if self.opts['zmq_filtering']: + # if you have a specific topic list, use that + if 'topic_lst' in unpacked_package: + for topic in unpacked_package['topic_lst']: + # zmq filters are substring match, hash the topic + # to avoid collisions + htopic = hashlib.sha1(topic).hexdigest() + pub_sock.send(htopic, flags=zmq.SNDMORE) + pub_sock.send(payload) + # otherwise its a broadcast + else: + # TODO: constants file for "broadcast" + pub_sock.send('broadcast', flags=zmq.SNDMORE) + pub_sock.send(payload) + else: + pub_sock.send(payload) except zmq.ZMQError as exc: if exc.errno == errno.EINTR: continue @@ -2213,7 +2230,13 @@ def publish(self, clear_load): os.path.join(self.opts['sock_dir'], 'publish_pull.ipc') ) pub_sock.connect(pull_uri) - pub_sock.send(self.serial.dumps(payload)) + int_payload = {'payload': self.serial.dumps(payload)} + + # add some targeting stuff for lists only (for now) + if load['tgt_type'] == 'list': + int_payload['topic_lst'] = load['tgt'] + + pub_sock.send(self.serial.dumps(int_payload)) return { 'enc': 'clear', 'load': { diff --git a/salt/minion.py b/salt/minion.py index 2f49821984f3..2f38e88c0fb6 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -55,7 +55,7 @@ from salt.exceptions import ( AuthenticationError, CommandExecutionError, CommandNotFoundError, SaltInvocationError, SaltReqTimeoutError, SaltClientError, - SaltSystemExit + SaltSystemExit, SaltSyndicMasterError ) import salt.client import salt.crypt @@ -624,6 +624,10 @@ def __init__(self, opts, timeout=60, safe=True): # pylint: disable=W0231 self.grains_cache = self.opts['grains'] + # store your hexid to subscribe to zmq, hash since zmq filters are prefix + # matches this way we can avoid collisions + self.hexid = hashlib.sha1(self.opts['id']).hexdigest() + if 'proxy' in self.opts['pillar']: log.debug('I am {0} and I need to start some proxies for {0}'.format(self.opts['id'], self.opts['pillar']['proxy'])) @@ -1302,7 +1306,13 @@ def _fire_master_minion_start(self): ) def _setsockopts(self): - self.socket.setsockopt(zmq.SUBSCRIBE, '') + if self.opts['zmq_filtering']: + # TODO: constants file for "broadcast" + self.socket.setsockopt(zmq.SUBSCRIBE, 'broadcast') + self.socket.setsockopt(zmq.SUBSCRIBE, self.hexid) + else: + self.socket.setsockopt(zmq.SUBSCRIBE, '') + self.socket.setsockopt(zmq.IDENTITY, self.opts['id']) self._set_ipv4only() self._set_reconnect_ivl_max() @@ -1715,7 +1725,19 @@ def _do_poll(self, loop_interval): def _do_socket_recv(self, socks): if socks.get(self.socket) == zmq.POLLIN: - payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK)) + # topic filtering is done at the zmq level, so we just strip it + messages = self.socket.recv_multipart(zmq.NOBLOCK) + messages_len = len(messages) + # if it was one message, then its old style + if messages_len == 1: + payload = self.serial.loads(messages[0]) + # 2 includes a header which says who should do it + elif messages_len == 2: + payload = self.serial.loads(messages[1]) + else: + raise Exception(('Invalid number of messages ({0}) in zeromq pub' + 'message from master').format(len(messages_len))) + log.trace('Handling payload') self._handle_payload(payload) @@ -1835,6 +1857,8 @@ def tune_in(self): # Share the poller with the event object self.poller = self.local.event.poller self.socket = self.context.socket(zmq.SUB) + # no filters for syndication masters, unless we want to maintain a + # list of all connected minions and update the filter self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.IDENTITY, self.opts['id']) @@ -1899,7 +1923,17 @@ def tune_in(self): def _process_cmd_socket(self): try: - payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK)) + messages = self.socket.recv_multipart(zmq.NOBLOCK) + messages_len = len(messages) + idx = None + if messages_len == 1: + idx = 0 + elif messages_len == 2: + idx = 1 + else: + raise SaltSyndicMasterError('Syndication master recieved message of invalid len ({0}/2)'.format(messages_len)) + + payload = self.serial.loads(messages[idx]) except zmq.ZMQError as e: # Swallow errors for bad wakeups or signals needing processing if e.errno != errno.EAGAIN and e.errno != errno.EINTR: