Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions salt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ class SaltMasterError(SaltException):
'''


class SaltSyndicMasterError(SaltException):
'''
Problem while proxying a request in the syndication master
'''


class MasterExit(SystemExit):
'''
Rise when the master exits
Expand Down
25 changes: 23 additions & 2 deletions salt/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,21 @@ def run(self):
# SIGUSR1 gracefully so we don't choke and die horribly
try:
package = pull_sock.recv()
pub_sock.send(package)
unpacked_package = salt.payload.unpackage(package)
payload = unpacked_package['payload']

# if you have a specific topic list, use that
if 'topic_lst' in unpacked_package:
for topic in unpacked_package['topic_lst']:
# zmq filters are substring match, hash the topic
# to avoid collisions
htopic = hashlib.sha1(topic).hexdigest()
pub_sock.send(htopic, flags=zmq.SNDMORE)
pub_sock.send(payload)
# otherwise its a broadcast
else:
pub_sock.send('broadcast', flags=zmq.SNDMORE)
pub_sock.send(payload)
except zmq.ZMQError as exc:
if exc.errno == errno.EINTR:
continue
Expand Down Expand Up @@ -2638,7 +2652,14 @@ def publish(self, clear_load):
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
pub_sock.connect(pull_uri)
pub_sock.send(self.serial.dumps(payload))

int_payload = {'payload': self.serial.dumps(payload)}

# add some targeting stuff for lists only (for now)
if load['tgt_type'] == 'list':
int_payload['topic_lst'] = load['tgt']

pub_sock.send(self.serial.dumps(int_payload))
return {
'enc': 'clear',
'load': {
Expand Down
32 changes: 28 additions & 4 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
# Import salt libs
from salt.exceptions import (
AuthenticationError, CommandExecutionError, CommandNotFoundError,
SaltInvocationError, SaltReqTimeoutError, SaltClientError, SaltSystemExit
SaltInvocationError, SaltReqTimeoutError, SaltClientError,
SaltSystemExit, SaltSyndicMasterError
)
import salt.client
import salt.crypt
Expand Down Expand Up @@ -607,6 +608,10 @@ def __init__(self, opts, timeout=60, safe=True):

self.grains_cache = self.opts['grains']

# store your hexid to subscribe to zmq, hash since zmq filters are prefix
# matches this way we can avoid collisions
self.hexid = hashlib.sha1(self.opts['id']).hexdigest()

if 'proxy' in self.opts['pillar']:
log.debug('I am {0} and I need to start some proxies for {0}'.format(self.opts['id'],
self.opts['pillar']['proxy']))
Expand Down Expand Up @@ -1129,7 +1134,8 @@ def _fire_master_minion_start(self):
)

def _setsockopts(self):
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.SUBSCRIBE, 'broadcast')
self.socket.setsockopt(zmq.SUBSCRIBE, self.hexid)
self.socket.setsockopt(zmq.IDENTITY, self.opts['id'])
self._set_ipv4only()
self._set_reconnect_ivl_max()
Expand Down Expand Up @@ -1381,7 +1387,12 @@ def _do_poll(self, loop_interval):

def _do_socket_recv(self, socks):
if socks.get(self.socket) == zmq.POLLIN:
payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK))
# topic filtering is done at the zmq level, so we just strip it
recv_str = self.socket.recv(zmq.NOBLOCK)
# if you have a header, then you have another one coming down the pipe
if recv_str in (self.hexid, 'broadcast'):
recv_str = self.socket.recv(zmq.NOBLOCK)
payload = self.serial.loads(recv_str)
log.trace('Handling payload')
self._handle_payload(payload)

Expand Down Expand Up @@ -1500,6 +1511,8 @@ def tune_in(self):
# Share the poller with the event object
self.poller = self.local.event.poller
self.socket = self.context.socket(zmq.SUB)
# no filters for syndication masters, unless we want to maintain a
# list of all connected minions and update the filter
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.IDENTITY, self.opts['id'])
if hasattr(zmq, 'RECONNECT_IVL_MAX'):
Expand Down Expand Up @@ -1577,7 +1590,18 @@ def tune_in(self):

def _process_cmd_socket(self):
try:
payload = self.serial.loads(self.socket.recv(zmq.NOBLOCK))
messages = self.socket.recv_multipart(zmq.NOBLOCK)
messages_len = len(messages)
idx = None
if messages_len == 1:
idx = 0
elif messages_len == 2:
idx = 1
else:
raise SaltSyndicMasterError('Syndication master recieved message of invalid len ({0}/2)'.format(messages_len))

payload = self.serial.loads(messages[idx])

except zmq.ZMQError as e:
# Swallow errors for bad wakeups or signals needing processing
if e.errno != errno.EAGAIN and e.errno != errno.EINTR:
Expand Down