Skip to content

Commit

Permalink
Merge pull request #161 from wwade/plug-defer
Browse files Browse the repository at this point in the history
info: only resolve workspaceIdentity, workspaceProject as needed
  • Loading branch information
wwade committed Feb 8, 2022
2 parents c90b1da + 7ffe4be commit 91ad1be
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 47 deletions.
23 changes: 21 additions & 2 deletions jobrunner/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import tempfile
import time
from typing import Optional
from uuid import uuid4

from dateutil import parser
Expand Down Expand Up @@ -238,10 +239,28 @@ def __init__(self, config, plugins):
self.plugins = plugins
self._instanceId = uuid4().hex
self.displayPending = set()
self.active = None
self.inactive = None
self._active: Optional[DatabaseBase] = None
self._inactive: Optional[DatabaseBase] = None
self._lock = FileLock(self.config.lockFile)

@property
def active(self) -> DatabaseBase:
assert self._active
return self._active

@active.setter
def active(self, value: DatabaseBase) -> None:
self._active = value

@property
def inactive(self) -> DatabaseBase:
assert self._inactive
return self._inactive

@inactive.setter
def inactive(self, value: DatabaseBase) -> None:
self._inactive = value

def setDbCaching(self, enabled):
pass

Expand Down
82 changes: 55 additions & 27 deletions jobrunner/info.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from __future__ import absolute_import, division, print_function

from builtins import map, range
import errno
from functools import total_ordering
from logging import getLogger
import os
import pipes
import string
from typing import Any, List, Optional

import dateutil.tz
import six
from six.moves import map, range

from jobrunner import utils

Expand Down Expand Up @@ -49,10 +47,16 @@ def cmdString(cmd):
@total_ordering
class JobInfo(object):
# pylint: disable=too-many-instance-attributes,too-many-public-methods
_unresolved = object()

@classmethod
def isUnresolved(cls, obj: Any) -> bool:
return obj is cls._unresolved

def __init__(self, uidx, key=None):
self.prog = None
self.args = None
self.cmd = None
self._cmd = None
self.reminder = None
self.pwd = None
self._autoJob = None
Expand All @@ -64,8 +68,8 @@ def __init__(self, uidx, key=None):
self._host = os.getenv('HOSTNAME')
self._user = os.getenv('USER')
self._env = dict(os.environ)
self._workspace = workspaceIdentity()
self._proj = workspaceProject()
self._workspace = self.__class__._unresolved
self._proj = self.__class__._unresolved
self._rc = None
self.logfile = None
self._key = key
Expand All @@ -79,6 +83,12 @@ def __init__(self, uidx, key=None):
self._mailJob = False
self._isolate = False

def resolve(self, force=False):
if force or self.__class__.isUnresolved(self._workspace):
self._workspace = workspaceIdentity()
if force or self.__class__.isUnresolved(self._proj):
self._proj = workspaceProject()

def __getstate__(self):
odict = self.__dict__.copy()
del odict['_parent']
Expand All @@ -88,13 +98,13 @@ def __setstate__(self, dct):
self.__dict__.update(dct)

def isLocked(self):
return self._parent.isLocked()
return self.parent.isLocked()

def lock(self):
self._parent.lock()
self.parent.lock()

def unlock(self):
self._parent.unlock()
self.parent.unlock()

@property
def autoJob(self):
Expand Down Expand Up @@ -132,18 +142,16 @@ def mailJob(self, value):

@property
def parent(self):
assert self._parent is not None
return self._parent

@parent.setter
def parent(self, parent):
self._parent = parent

@property
def proj(self):
return self._proj

@property
def rc(self):
def rc(self) -> int:
assert self._rc is not None
return self._rc

@property
Expand All @@ -162,8 +170,8 @@ def persistKeyGeneratedGet(self):
return self._persistKeyGenerated
persistKeyGenerated = property(persistKeyGeneratedGet)

def wsBasename(self):
return getattr(self, '_workspace')
def wsBasename(self) -> Optional[str]:
return self.workspace

def cmpCommon(self, other, order):
for func in order:
Expand Down Expand Up @@ -262,14 +270,20 @@ def genPersistKey(self):
self._persistKeyGenerated = persistKey
return

def persistKey(self, _inactive):
def persistKey(self):
if self._persistKey:
return self._persistKey
elif self._key:
return self._key
else:
assert False, "invalid call to persistKey()"
return None
assert self._key, "invalid call to persistKey()"
return self._key

@property
def cmd(self) -> List[str]:
assert self._cmd is not None
return self._cmd

@cmd.setter
def cmd(self, value):
self._cmd = value

def setCmd(self, cmd, reminder=None):
self.pwd = os.getcwd()
Expand Down Expand Up @@ -334,6 +348,7 @@ def unblocked(self, parent):

@locked
def start(self, parent):
self.resolve(force=True)
self._start = utcNow()
parent.inactive.lastKey = self.key
self.genPersistKey()
Expand All @@ -346,7 +361,7 @@ def stop(self, parent, rc):
self.pid = None
del parent.active[self.key]
self.genPersistKey()
k = self.persistKey(parent.inactive)
k = self.persistKey()
parent.inactive[k] = self

@unlocked
Expand Down Expand Up @@ -384,6 +399,7 @@ def getDuration(self):
return str(stop - self.startTime).split('.', 1)[0]

def removeLog(self, verbose):
assert self.logfile is not None
if os.access(self.logfile, os.F_OK):
if verbose:
sprint("Remove logfile '%s'" % self.logfile)
Expand All @@ -405,7 +421,7 @@ def escEnv(value):

def getEnvironment(self):
ret = "\n"
for k, v in sorted(six.iteritems(self._env)):
for k, v in sorted(self._env.items()):
ret += "\t%s=%s\n" % (self.escEnv(k), self.escEnv(v))
return ret

Expand Down Expand Up @@ -444,8 +460,16 @@ def showPersistKey(self):
return None

@property
def workspace(self):
return getattr(self, '_workspace')
def workspace(self) -> Optional[str]:
if isinstance(self._workspace, str):
return self._workspace
return None

@property
def proj(self) -> Optional[str]:
if isinstance(self._proj, str):
return self._proj
return None

def getValue(self, what):
items = {
Expand Down Expand Up @@ -575,12 +599,16 @@ def encodeJobInfo(obj):
odict[dateTimeKey] = dateTimeToJson(odict.get(dateTimeKey))
odict['_alldeps'] = list(odict.get('_alldeps', []))
return odict
if JobInfo.isUnresolved(obj):
return None
raise TypeError(repr(obj) + " is not JSON serializable")


def decodeJobInfo(odict):
if '_uidx' not in odict:
return odict
if 'cmd' in odict:
odict['_cmd'] = odict.pop('cmd')
uidx = odict['_uidx']
newJob = service().db.jobInfo(uidx)
for dateTimeKey in DATETIME_KEYS:
Expand Down
32 changes: 19 additions & 13 deletions jobrunner/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from .binutils import binDescriptionWithStandardFooter
from .compat import encoding_open, metadata
from .config import Config
from .db import NoMatchingJobError
from .db import JobsBase, NoMatchingJobError
from .info import JobInfo
from .plugins import Plugins
from .service import service
from .service.registry import registerServices
Expand Down Expand Up @@ -57,7 +58,7 @@ def impl_main(args=None):

options = parseArgs(args)
config = Config(options)
jobs = service().db.jobs(config, plugins)
jobs: JobsBase = service().db.jobs(config, plugins)

jobrunner.logging.setup(
config.logDir,
Expand All @@ -84,6 +85,7 @@ def impl_main(args=None):
if options.cc:
for ccAddr in options.cc:
if '@' not in ccAddr:
assert config.mailDomain
ccAddr += '@' + config.mailDomain
cmd += ['-c', ccAddr]
if config.mailProgram == 'chatmail':
Expand Down Expand Up @@ -136,8 +138,11 @@ def impl_main(args=None):
jobs.unlock()
sys.exit(rc)

job, fd = jobs.new(cmd, doIsolate, autoJob=options.auto_job,
key=options.key, reminder=options.reminder)
job: JobInfo
fd: int
job, fd = jobs.new(cmd, doIsolate, autoJob=options.auto_job, key=options.key,
reminder=options.reminder)
job.resolve()
job.genPersistKey()
jobs.active[job.key] = job

Expand Down Expand Up @@ -175,7 +180,7 @@ def impl_main(args=None):
sprint("\ninterrupted")
aborted = True
finally:
LOG.debug("unlocked %s", job, exc_info=1)
LOG.debug("unlocked %s", job, exc_info=True)
job.unblocked(jobs)
job.setDependencies(jobs, None)
if aborted:
Expand Down Expand Up @@ -215,6 +220,7 @@ def impl_main(args=None):
mailSize = 0

# Remove 'to' address temporarily
assert cmd is not None
lastArg = cmd.pop(-1)
for j in mailDeps:
depJob = jobs.inactive[j.permKey]
Expand Down Expand Up @@ -285,7 +291,7 @@ def monitorForkedJob(job, jobs):
LOG.debug('KeyboardInterrupt')
sprint("\n(Stop monitoring): {}".format(job))
except BaseException:
LOG.info('exception', exc_info=1)
LOG.info('exception', exc_info=True)
raise
finally:
LOG.debug('terminate monitor subprocess')
Expand Down Expand Up @@ -426,7 +432,7 @@ def addNonExecOptions(op):
"specified window")


def handleNonExecOptions(options, jobs):
def handleNonExecOptions(options: argparse.Namespace, jobs: JobsBase):
# pylint: disable=too-many-branches
# pylint: disable=too-many-locals
# pylint: disable=too-many-return-statements
Expand Down Expand Up @@ -652,7 +658,7 @@ def parseArgs(args=None):
prog = None

op = argparse.ArgumentParser(
prog=os.path.basename(prog),
prog=os.path.basename(prog) if prog else "job",
formatter_class=argparse.RawDescriptionHelpFormatter,
description=DESC)
op.add_argument("program", nargs='?')
Expand Down Expand Up @@ -797,22 +803,22 @@ def runJob(cmd, options, jobs, job, fd, doIsolate):
rc = check_call(cmd, stdin=fpIn, stdout=fd, stderr=fd)
LOG.debug("check_call() => rc=%d", rc)
except KeyboardInterrupt:
LOG.debug("KeyboardInterrupt", exc_info=1)
LOG.debug("KeyboardInterrupt", exc_info=True)
sprint("\ninterrupted")
rc = -1
except OSError as err:
LOG.debug("OSError %s", err, exc_info=1)
LOG.debug("OSError %s", err, exc_info=True)
rc = -1 * err.errno
sprint(err, file=encoding_open(job.logfile, 'a'))
except CalledProcessError as err:
LOG.debug("CalledProcessError %s", err, exc_info=1)
LOG.debug("CalledProcessError %s", err, exc_info=True)
rc = err.returncode
except Exception:
LOG.debug("General exception", exc_info=1)
LOG.debug("General exception", exc_info=True)
rc = -1
raise
finally:
LOG.debug("stop job, it has finished %s", job, exc_info=1)
LOG.debug("stop job, it has finished %s", job, exc_info=True)
jobs.lock()
LOG.debug("locked DB, writing 'stop' status rc=%d", rc)
job.stop(jobs, rc)
Expand Down

0 comments on commit 91ad1be

Please sign in to comment.