Skip to content

Commit

Permalink
Re-work job returns for syndics and runners
Browse files Browse the repository at this point in the history
There was a patch (0d6691a) to add job returns to runners, but in so doing made syndics actually call "save_load" for each and every minion return. After looking through the code, the call paths aren't the same at all-- so IMO it doesn't make sense to consolidate these into a single function. This breaks them back out and fixes the sporratic "ExtraData" errors from msgpack.
  • Loading branch information
jacksontj authored and jfindlay committed May 1, 2015
1 parent bd45312 commit f923dea
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 95 deletions.
28 changes: 16 additions & 12 deletions salt/client/mixins.py
Expand Up @@ -15,7 +15,6 @@
import salt.utils
import salt.utils.event
import salt.utils.jid
import salt.utils.job
import salt.transport
from salt.utils.error import raise_error
from salt.utils.event import tagify
Expand Down Expand Up @@ -325,16 +324,19 @@ def low(self, fun, low):
data['success'] = False

namespaced_event.fire_event(data, 'ret')
salt.utils.job.store_job(
self.opts,
{'id': self.opts['id'],
'tgt': self.opts['id'],
'jid': data['jid'],
'return': data,
},
event=None,
mminion=self.mminion,
)
load = {
'id': self.opts['id'],
'tgt': self.opts['id'],
'jid': data['jid'],
'return': data,
}
# save load
save_fstr = '{0}.save_load'.format(self.opts['master_job_cache'])
self.mminion.returners[save_fstr](load['jid'], load)
# returner
returner_fstr = '{0}.returner'.format(self.opts['master_job_cache'])
self.mminion.returners[returner_fstr](load)

# if we fired an event, make sure to delete the event object.
# This will ensure that we call destroy, which will do the 0MQ linger
log.info('Runner completed: {0}'.format(data['jid']))
Expand Down Expand Up @@ -402,7 +404,9 @@ def cmd_async(self, low):

def _gen_async_pub(self, jid=None):
if jid is None:
jid = salt.utils.jid.gen_jid()
# since we will store this, lets call prep_jid to get a unique one
prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache'])
jid = self.mminion.returners[prep_fstr]()
tag = tagify(jid, prefix=self.tag_prefix)
return {'tag': tag, 'jid': jid}

Expand Down
31 changes: 28 additions & 3 deletions salt/master.py
Expand Up @@ -43,7 +43,6 @@
import salt.defaults.exitcodes
import salt.utils.atomicfile
import salt.utils.event
import salt.utils.job
import salt.utils.reactor
import salt.utils.verify
import salt.utils.minions
Expand Down Expand Up @@ -1268,8 +1267,34 @@ def _return(self, load):
:param dict load: The minion payload
'''
salt.utils.job.store_job(
self.opts, load, event=self.event, mminion=self.mminion)
# If the return data is invalid, just ignore it
if any(key not in load for key in ('return', 'jid', 'id')):
return False
if not salt.utils.verify.valid_id(self.opts, load['id']):
return False
if load['jid'] == 'req':
# The minion is returning a standalone job, request a jobid
load['arg'] = load.get('arg', load.get('fun_args', []))
load['tgt_type'] = 'glob'
load['tgt'] = load['id']
prep_fstr = '{0}.prep_jid'.format(self.opts['master_job_cache'])
load['jid'] = self.mminion.returners[prep_fstr](nocache=load.get('nocache', False))

# save the load, since we don't have it
saveload_fstr = '{0}.save_load'.format(self.opts['master_job_cache'])
self.mminion.returners[saveload_fstr](load['jid'], load)
log.info('Got return from {id} for job {jid}'.format(**load))
self.event.fire_event(
load, tagify([load['jid'], 'ret', load['id']], 'job'))
self.event.fire_ret_load(load)

# if you have a job_cache, or an ext_job_cache, don't write to the regular master cache
if not self.opts['job_cache'] or self.opts.get('ext_job_cache'):
return

# otherwise, write to the master cache
fstr = '{0}.returner'.format(self.opts['master_job_cache'])
self.mminion.returners[fstr](load)

def _syndic_return(self, load):
'''
Expand Down
80 changes: 0 additions & 80 deletions salt/utils/job.py
@@ -1,89 +1,9 @@
# -*- coding: utf-8 -*-
import logging
import salt.minion
import salt.utils.verify
import salt.utils.jid
from salt.utils.event import tagify


log = logging.getLogger(__name__)


def store_job(opts, load, event=None, mminion=None):
'''
Store job information using the configured master_job_cache
'''
# If the return data is invalid, just ignore it
if any(key not in load for key in ('return', 'jid', 'id')):
return False
if not salt.utils.verify.valid_id(opts, load['id']):
return False
if mminion is None:
mminion = salt.minion.MasterMinion(opts, states=False, rend=False)

job_cache = opts['master_job_cache']
if load['jid'] == 'req':
# The minion is returning a standalone job, request a jobid
load['arg'] = load.get('arg', load.get('fun_args', []))
load['tgt_type'] = 'glob'
load['tgt'] = load['id']

prep_fstr = '{0}.prep_jid'.format(opts['master_job_cache'])
try:
load['jid'] = mminion.returners[prep_fstr](nocache=load.get('nocache', False))
except KeyError:
emsg = "Returner '{0}' does not support function prep_jid".format(job_cache)
log.error(emsg)
raise KeyError(emsg)

# save the load, since we don't have it
saveload_fstr = '{0}.save_load'.format(job_cache)
try:
mminion.returners[saveload_fstr](load['jid'], load)
except KeyError:
emsg = "Returner '{0}' does not support function save_load".format(job_cache)
log.error(emsg)
raise KeyError(emsg)
elif salt.utils.jid.is_jid(load['jid']):
# Store the jid
jidstore_fstr = '{0}.prep_jid'.format(job_cache)
try:
mminion.returners[jidstore_fstr](False, passed_jid=load['jid'])
except KeyError:
emsg = "Returner '{0}' does not support function prep_jid".format(job_cache)
log.error(emsg)
raise KeyError(emsg)

if event:
# If the return data is invalid, just ignore it
log.info('Got return from {id} for job {jid}'.format(**load))
event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job'))
event.fire_ret_load(load)

# if you have a job_cache, or an ext_job_cache, don't write to
# the regular master cache
if not opts['job_cache'] or opts.get('ext_job_cache'):
return

# otherwise, write to the master cache
savefstr = '{0}.save_load'.format(job_cache)
fstr = '{0}.returner'.format(job_cache)
if 'fun' not in load and load.get('return', {}):
ret_ = load.get('return', {})
if 'fun' in ret_:
load.update({'fun': ret_['fun']})
if 'user' in ret_:
load.update({'user': ret_['user']})
try:
if 'jid' in load:
mminion.returners[savefstr](load['jid'], load)
mminion.returners[fstr](load)
except KeyError:
emsg = "Returner '{0}' does not support function returner".format(job_cache)
log.error(emsg)
raise KeyError(emsg)


def get_retcode(ret):
'''
Determine a retcode for a given return
Expand Down

0 comments on commit f923dea

Please sign in to comment.