Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fix #4302, This changes the syndic to listen on the event bus

and filters out the jids, and sends them up to the syndic-master.
This catches all jobs as they come in the right way, async, out
of bounds.

The problem of minion identification still persists for the cli client
on the master-master, this will require an update to the master but
it should function out of bounds of the minion update
  • Loading branch information...
commit bf04b70f96e0a0fd2a66f15cad0a64faf853f558 1 parent 81bddc6
@thatch45 thatch45 authored
Showing with 31 additions and 17 deletions.
  1. +31 −17 salt/minion.py
View
48 salt/minion.py
@@ -529,20 +529,24 @@ def _return_pub(self, ret, ret_cmd='_return'):
Return the data from the executed command to the master server
'''
if self.opts['multiprocessing']:
- fn_ = os.path.join(self.proc_dir, ret['jid'])
+ fn_ = os.path.join(self.proc_dir, ret['__jid__'])
if os.path.isfile(fn_):
try:
os.remove(fn_)
except (OSError, IOError):
# The file is gone already
pass
- log.info('Returning information for job: {0}'.format(ret['jid']))
+ log.info('Returning information for job: {0}'.format(ret['__jid__']))
sreq = salt.payload.SREQ(self.opts['master_uri'])
if ret_cmd == '_syndic_return':
load = {'cmd': ret_cmd,
- 'id': self.opts['id']}
+ 'id': self.opts['id'],
+ 'jid': ret['__jid__'],
+ 'fun': ret['__fun__']}
load['return'] = {}
for key, value in ret.items():
+ if key.startswith('__'):
+ continue
load['return'][key] = value
else:
load = {'cmd': ret_cmd,
@@ -843,12 +847,16 @@ class Syndic(Minion):
'''
def __init__(self, opts):
interface = opts.get('interface')
+ sock_dir = opts['sock_dir']
self._syndic = True
+ opts['loop_interval'] = 1
Minion.__init__(self, opts)
self.local = salt.client.LocalClient(opts['_master_conf_file'])
opts.update(self.opts)
self.opts = opts
self.local.opts['interface'] = interface
+ self.event = salt.utils.event.LocalClientEvent(self.local.opts['sock_dir'])
+ self.event.subscribe('')
def _handle_aes(self, load):
'''
@@ -911,20 +919,26 @@ def syndic_cmd(self, data):
data['tgt_type'],
data['ret'],
data['jid'],
- data['to']
- )
- # Gather the return data
- ret = self.local.get_full_returns(
- pub_data['jid'],
- pub_data['minions'],
- data['to']
- )
- for minion in ret:
- ret[minion] = ret[minion]['ret']
- ret['jid'] = data['jid']
- ret['fun'] = data['fun']
- # Return the publication data up the pipe
- self._return_pub(ret, '_syndic_return')
+ data['to'])
+
+ def passive_refresh(self):
+ '''
+ Override the passive refresh function in the minion loop to gather
+ events, aggregate them, and send them up to the master-master
+ '''
+ jids = {}
+ while True:
+ event = self.event.get_event(0.5, full=True)
+ if event is None:
+ break
+ if len(event.get('tag', '')) == 20:
+ if not event['tag'] in jids:
+ jids[event['tag']] = {}
+ jids[event['tag']]['__fun__'] = event['data']['fun']
+ jids[event['tag']]['__jid__'] = event['data']['jid']
+ jids[event['tag']][event['data']['id']] = event['data']['return']
+ for jid in jids:
+ self._return_pub(jids[jid], '_syndic_return')
class Matcher(object):
Please sign in to comment.
Something went wrong with that request. Please try again.