Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions salt/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@
'max_minions': int,
'username': str,
'password': str,
'zmq_filtering': bool,
}

# default configurations
Expand Down Expand Up @@ -373,6 +374,7 @@
'ping_interval': 0,
'username': None,
'password': None,
'zmq_filtering': True,
}

DEFAULT_MASTER_OPTS = {
Expand Down Expand Up @@ -529,6 +531,7 @@
'master_sign_pubkey': False,
'master_pubkey_signature': 'master_pubkey_signature',
'master_use_pubkey_signature': False,
'zmq_filtering': True,
}

# ----- Salt Cloud Configuration Defaults ----------------------------------->
Expand Down
6 changes: 6 additions & 0 deletions salt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ class SaltMasterError(SaltException):
'''


class SaltSyndicMasterError(SaltException):
'''
Problem while proxying a request in the syndication master
'''


class MasterExit(SystemExit):
'''
Rise when the master exits
Expand Down
27 changes: 25 additions & 2 deletions salt/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,24 @@ def run(self):
# SIGUSR1 gracefully so we don't choke and die horribly
try:
package = pull_sock.recv()
pub_sock.send(package)
unpacked_package = salt.payload.unpackage(package)
payload = unpacked_package['payload']
if self.opts['zmq_filtering']:
# if you have a specific topic list, use that
if 'topic_lst' in unpacked_package:
for topic in unpacked_package['topic_lst']:
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = hashlib.sha1(topic).hexdigest()
pub_sock.send(htopic, flags=zmq.SNDMORE)
pub_sock.send(payload)
# otherwise its a broadcast
else:
# TODO: constants file for "broadcast"
pub_sock.send('broadcast', flags=zmq.SNDMORE)
pub_sock.send(payload)
else:
pub_sock.send(payload)
except zmq.ZMQError as exc:
if exc.errno == errno.EINTR:
continue
Expand Down Expand Up @@ -2213,7 +2230,13 @@ def publish(self, clear_load):
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
pub_sock.connect(pull_uri)
pub_sock.send(self.serial.dumps(payload))
int_payload = {'payload': self.serial.dumps(payload)}

# add some targeting stuff for lists only (for now)
if load['tgt_type'] == 'list':
int_payload['topic_lst'] = load['tgt']

pub_sock.send(self.serial.dumps(int_payload))
return {
'enc': 'clear',
'load': {
Expand Down
42 changes: 38 additions & 4 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from salt.exceptions import (
AuthenticationError, CommandExecutionError, CommandNotFoundError,
SaltInvocationError, SaltReqTimeoutError, SaltClientError,
SaltSystemExit
SaltSystemExit, SaltSyndicMasterError
)
import salt.client
import salt.crypt
Expand Down Expand Up @@ -624,6 +624,10 @@ def __init__(self, opts, timeout=60, safe=True): # pylint: disable=W0231

self.grains_cache = self.opts['grains']

# store your hexid to subscribe to zmq, hash since zmq filters are prefix
# matches this way we can avoid collisions
self.hexid = hashlib.sha1(self.opts['id']).hexdigest()

if 'proxy' in self.opts['pillar']:
log.debug('I am {0} and I need to start some proxies for {0}'.format(self.opts['id'],
self.opts['pillar']['proxy']))
Expand Down Expand Up @@ -1302,7 +1306,13 @@ def _fire_master_minion_start(self):
)

def _setsockopts(self):
self.socket.setsockopt(zmq.SUBSCRIBE, '')
if self.opts['zmq_filtering']:
# TODO: constants file for "broadcast"
self.socket.setsockopt(zmq.SUBSCRIBE, 'broadcast')
self.socket.setsockopt(zmq.SUBSCRIBE, self.hexid)
else:
self.socket.setsockopt(zmq.SUBSCRIBE, '')

self.socket.setsockopt(zmq.IDENTITY, self.opts['id'])
self._set_ipv4only()
self._set_reconnect_ivl_max()
Expand Down Expand Up @@ -1715,7 +1725,19 @@ def _do_poll(self, loop_interval):

def _do_socket_recv(self, socks):
if socks.get(self.socket) == zmq.POLLIN:
payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK))
# topic filtering is done at the zmq level, so we just strip it
messages = self.socket.recv_multipart(zmq.NOBLOCK)
messages_len = len(messages)
# if it was one message, then its old style
if messages_len == 1:
payload = self.serial.loads(messages[0])
# 2 includes a header which says who should do it
elif messages_len == 2:
payload = self.serial.loads(messages[1])
else:
raise Exception(('Invalid number of messages ({0}) in zeromq pub'
'message from master').format(len(messages_len)))

log.trace('Handling payload')
self._handle_payload(payload)

Expand Down Expand Up @@ -1835,6 +1857,8 @@ def tune_in(self):
# Share the poller with the event object
self.poller = self.local.event.poller
self.socket = self.context.socket(zmq.SUB)
# no filters for syndication masters, unless we want to maintain a
# list of all connected minions and update the filter
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.IDENTITY, self.opts['id'])

Expand Down Expand Up @@ -1899,7 +1923,17 @@ def tune_in(self):

def _process_cmd_socket(self):
try:
payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK))
messages = self.socket.recv_multipart(zmq.NOBLOCK)
messages_len = len(messages)
idx = None
if messages_len == 1:
idx = 0
elif messages_len == 2:
idx = 1
else:
raise SaltSyndicMasterError('Syndication master recieved message of invalid len ({0}/2)'.format(messages_len))

payload = self.serial.loads(messages[idx])
except zmq.ZMQError as e:
# Swallow errors for bad wakeups or signals needing processing
if e.errno != errno.EAGAIN and e.errno != errno.EINTR:
Expand Down