Skip to content

Commit

Permalink
Merge pull request #37064 from cachedout/issue_35097
Browse files Browse the repository at this point in the history
Unify job check in scheduler
  • Loading branch information
Mike Place committed Oct 19, 2016
2 parents 7ef10f6 + 980ba89 commit 67faee1
Showing 1 changed file with 17 additions and 31 deletions.
48 changes: 17 additions & 31 deletions salt/utils/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@
import salt.utils.jid
import salt.utils.process
import salt.utils.args
import salt.utils.minion
import salt.loader
import salt.minion
import salt.payload
Expand Down Expand Up @@ -665,37 +666,22 @@ def handle_func(self, multiprocessing_enabled, func, data):
# dict we treat it like it was there and is True
if 'jid_include' not in data or data['jid_include']:
jobcount = 0
for basefilename in os.listdir(salt.minion.get_proc_dir(self.opts['cachedir'])):
fn_ = os.path.join(salt.minion.get_proc_dir(self.opts['cachedir']), basefilename)
if not os.path.exists(fn_):
log.debug('schedule.handle_func: {0} was processed '
'in another thread, skipping.'.format(
basefilename))
continue
with salt.utils.fopen(fn_, 'rb') as fp_:
job = salt.payload.Serial(self.opts).load(fp_)
if job:
if 'schedule' in job:
log.debug('schedule.handle_func: Checking job against '
'fun {0}: {1}'.format(ret['fun'], job))
if ret['schedule'] == job['schedule'] and os_is_running(job['pid']):
jobcount += 1
log.debug(
'schedule.handle_func: Incrementing jobcount, now '
'{0}, maxrunning is {1}'.format(
jobcount, data['maxrunning']))
if jobcount >= data['maxrunning']:
log.debug(
'schedule.handle_func: The scheduled job {0} '
'was not started, {1} already running'.format(
ret['schedule'], data['maxrunning']))
return False
else:
try:
log.info('Invalid job file found. Removing.')
os.remove(fn_)
except OSError:
log.info('Unable to remove file: {0}.'.format(fn_))
for job in salt.utils.minion.running(self.opts):
if 'schedule' in job:
log.debug('schedule.handle_func: Checking job against '
'fun {0}: {1}'.format(ret['fun'], job))
if ret['schedule'] == job['schedule'] and os_is_running(job['pid']):
jobcount += 1
log.debug(
'schedule.handle_func: Incrementing jobcount, now '
'{0}, maxrunning is {1}'.format(
jobcount, data['maxrunning']))
if jobcount >= data['maxrunning']:
log.debug(
'schedule.handle_func: The scheduled job {0} '
'was not started, {1} already running'.format(
ret['schedule'], data['maxrunning']))
return False

if multiprocessing_enabled and not salt.utils.is_windows():
# Reconfigure multiprocessing logging after daemonizing
Expand Down

0 comments on commit 67faee1

Please sign in to comment.