From f923deaf6be867f0cab0b92689082f1569beaeb6 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Tue, 21 Apr 2015 09:06:08 -0700 Subject: [PATCH] Re-work job returns for syndics and runners There was a patch (0d6691a) to add job returns to runners, but in so doing made syndics actually call "save_load" for each and every minion return. After looking through the code, the call paths aren't the same at all-- so IMO it doesn't make sense to consolidate these into a single function. This breaks them back out and fixes the sporratic "ExtraData" errors from msgpack. --- salt/client/mixins.py | 28 ++++++++------- salt/master.py | 31 +++++++++++++++-- salt/utils/job.py | 80 ------------------------------------------- 3 files changed, 44 insertions(+), 95 deletions(-) diff --git a/salt/client/mixins.py b/salt/client/mixins.py index a902ec7b632f..263fb38dbe9e 100644 --- a/salt/client/mixins.py +++ b/salt/client/mixins.py @@ -15,7 +15,6 @@ import salt.utils import salt.utils.event import salt.utils.jid -import salt.utils.job import salt.transport from salt.utils.error import raise_error from salt.utils.event import tagify @@ -325,16 +324,19 @@ def low(self, fun, low): data['success'] = False namespaced_event.fire_event(data, 'ret') - salt.utils.job.store_job( - self.opts, - {'id': self.opts['id'], - 'tgt': self.opts['id'], - 'jid': data['jid'], - 'return': data, - }, - event=None, - mminion=self.mminion, - ) + load = { + 'id': self.opts['id'], + 'tgt': self.opts['id'], + 'jid': data['jid'], + 'return': data, + } + # save load + save_fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[save_fstr](load['jid'], load) + # returner + returner_fstr = '{0}.returner'.format(self.opts['master_job_cache']) + self.mminion.returners[returner_fstr](load) + # if we fired an event, make sure to delete the event object. # This will ensure that we call destroy, which will do the 0MQ linger log.info('Runner completed: {0}'.format(data['jid'])) @@ -402,7 +404,9 @@ def cmd_async(self, low): def _gen_async_pub(self, jid=None): if jid is None: - jid = salt.utils.jid.gen_jid() + # since we will store this, lets call prep_jid to get a unique one + prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + jid = self.mminion.returners[prep_fstr]() tag = tagify(jid, prefix=self.tag_prefix) return {'tag': tag, 'jid': jid} diff --git a/salt/master.py b/salt/master.py index ec77860d0e69..d84afa7b945f 100644 --- a/salt/master.py +++ b/salt/master.py @@ -43,7 +43,6 @@ import salt.defaults.exitcodes import salt.utils.atomicfile import salt.utils.event -import salt.utils.job import salt.utils.reactor import salt.utils.verify import salt.utils.minions @@ -1268,8 +1267,34 @@ def _return(self, load): :param dict load: The minion payload ''' - salt.utils.job.store_job( - self.opts, load, event=self.event, mminion=self.mminion) + # If the return data is invalid, just ignore it + if any(key not in load for key in ('return', 'jid', 'id')): + return False + if not salt.utils.verify.valid_id(self.opts, load['id']): + return False + if load['jid'] == 'req': + # The minion is returning a standalone job, request a jobid + load['arg'] = load.get('arg', load.get('fun_args', [])) + load['tgt_type'] = 'glob' + load['tgt'] = load['id'] + prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache']) + load['jid'] = self.mminion.returners[prep_fstr](nocache=load.get('nocache', False)) + + # save the load, since we don't have it + saveload_fstr = '{0}.save_load'.format(self.opts['master_job_cache']) + self.mminion.returners[saveload_fstr](load['jid'], load) + log.info('Got return from {id} for job {jid}'.format(**load)) + self.event.fire_event( + load, tagify([load['jid'], 'ret', load['id']], 'job')) + self.event.fire_ret_load(load) + + # if you have a job_cache, or an ext_job_cache, don't write to the regular master cache + if not self.opts['job_cache'] or self.opts.get('ext_job_cache'): + return + + # otherwise, write to the master cache + fstr = '{0}.returner'.format(self.opts['master_job_cache']) + self.mminion.returners[fstr](load) def _syndic_return(self, load): ''' diff --git a/salt/utils/job.py b/salt/utils/job.py index ec05f18ad982..1d5b3db8bccd 100644 --- a/salt/utils/job.py +++ b/salt/utils/job.py @@ -1,89 +1,9 @@ # -*- coding: utf-8 -*- import logging -import salt.minion -import salt.utils.verify -import salt.utils.jid -from salt.utils.event import tagify - log = logging.getLogger(__name__) -def store_job(opts, load, event=None, mminion=None): - ''' - Store job information using the configured master_job_cache - ''' - # If the return data is invalid, just ignore it - if any(key not in load for key in ('return', 'jid', 'id')): - return False - if not salt.utils.verify.valid_id(opts, load['id']): - return False - if mminion is None: - mminion = salt.minion.MasterMinion(opts, states=False, rend=False) - - job_cache = opts['master_job_cache'] - if load['jid'] == 'req': - # The minion is returning a standalone job, request a jobid - load['arg'] = load.get('arg', load.get('fun_args', [])) - load['tgt_type'] = 'glob' - load['tgt'] = load['id'] - - prep_fstr = '{0}.prep_jid'.format(opts['master_job_cache']) - try: - load['jid'] = mminion.returners[prep_fstr](nocache=load.get('nocache', False)) - except KeyError: - emsg = "Returner '{0}' does not support function prep_jid".format(job_cache) - log.error(emsg) - raise KeyError(emsg) - - # save the load, since we don't have it - saveload_fstr = '{0}.save_load'.format(job_cache) - try: - mminion.returners[saveload_fstr](load['jid'], load) - except KeyError: - emsg = "Returner '{0}' does not support function save_load".format(job_cache) - log.error(emsg) - raise KeyError(emsg) - elif salt.utils.jid.is_jid(load['jid']): - # Store the jid - jidstore_fstr = '{0}.prep_jid'.format(job_cache) - try: - mminion.returners[jidstore_fstr](False, passed_jid=load['jid']) - except KeyError: - emsg = "Returner '{0}' does not support function prep_jid".format(job_cache) - log.error(emsg) - raise KeyError(emsg) - - if event: - # If the return data is invalid, just ignore it - log.info('Got return from {id} for job {jid}'.format(**load)) - event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job')) - event.fire_ret_load(load) - - # if you have a job_cache, or an ext_job_cache, don't write to - # the regular master cache - if not opts['job_cache'] or opts.get('ext_job_cache'): - return - - # otherwise, write to the master cache - savefstr = '{0}.save_load'.format(job_cache) - fstr = '{0}.returner'.format(job_cache) - if 'fun' not in load and load.get('return', {}): - ret_ = load.get('return', {}) - if 'fun' in ret_: - load.update({'fun': ret_['fun']}) - if 'user' in ret_: - load.update({'user': ret_['user']}) - try: - if 'jid' in load: - mminion.returners[savefstr](load['jid'], load) - mminion.returners[fstr](load) - except KeyError: - emsg = "Returner '{0}' does not support function returner".format(job_cache) - log.error(emsg) - raise KeyError(emsg) - - def get_retcode(ret): ''' Determine a retcode for a given return