From 016cafdf57d7fb438b2b04bcea6033de97e10ded Mon Sep 17 00:00:00 2001 From: wwade Date: Fri, 6 Jan 2023 14:39:32 -0800 Subject: [PATCH] cmd: add --notifier PROGRAM arg to invoke system notifier This lets the user run, e.g. a systemd service that will call PROGRAM every time a job exits. Added example usage in README.rst. --- README.rst | 41 ++++++++++++ jobrunner/db/__init__.py | 51 +++++++++------ jobrunner/info.py | 8 +-- jobrunner/main.py | 15 ++++- jobrunner/test/integration/dump_json_input.py | 16 +++++ jobrunner/test/integration/integration_lib.py | 4 +- jobrunner/test/integration/smoke_test.py | 64 +++++++++++++------ 7 files changed, 152 insertions(+), 47 deletions(-) create mode 100755 jobrunner/test/integration/dump_json_input.py diff --git a/README.rst b/README.rst index 52b2f4b..c1cd1f2 100644 --- a/README.rst +++ b/README.rst @@ -129,6 +129,47 @@ Sample rcfile: # It should show up as "user/some_long_integer" somewhere in the span's metadata. user1 = +System Notifications (Systemd user service example) +--------------------------------------------------- + +If you want to enable notifications when jobs finish, one way to do this is to use the --notifier +argument. + +``~/.config/systemd/user/job-notify.service``: + +.. code:: aconf + + [Unit] + Description=Jobrunner Notifier + + [Service] + Type=simple + ExecStart=env job --notifier jsonNotify.py + RestartSec=30 + Restart=always + + [Install] + WantedBy=default.target + +``~/.local/bin/jsonNotify.py``: + +.. code:: python + + #!/usr/bin/env python3 + + from json import load + import subprocess + from sys import stdin + + cmd = ["notify-send"] + data = load(stdin) + rc = data.get("rc", 0) + if rc != 0: + cmd += ["--urgency=critical"] + cmd += [data["subject"], data["body"]] + subprocess.run(cmd) + + Hacking ------- diff --git a/jobrunner/db/__init__.py b/jobrunner/db/__init__.py index b317ebc..32d30e8 100644 --- a/jobrunner/db/__init__.py +++ b/jobrunner/db/__init__.py @@ -1,22 +1,18 @@ -from __future__ import absolute_import, division, print_function - from datetime import datetime from functools import cmp_to_key import logging import os import os.path +import subprocess import sys import tempfile import time -from typing import Optional +from typing import List, Optional from uuid import uuid4 from dateutil import parser from dateutil.tz import tzlocal, tzutc import simplejson as json -import six -from six import text_type -from six.moves import filter from jobrunner import utils @@ -83,9 +79,6 @@ def db(self): def filterJobs(self, k): return k not in self.special - def iteritems(self): - return six.iteritems(self.db) - def getCount(self): return int(self.db[self.ITEMCOUNT]) @@ -99,7 +92,7 @@ def setCount(self, inc): def getCheckpoint(self): try: return dateTimeFromJson(json.loads(self.db[self.CHECKPOINT])) - except (KeyError, EOFError, json.scanner.JSONDecodeError): + except (KeyError, EOFError, json.JSONDecodeError): epoch = datetime.utcfromtimestamp(0) return epoch.replace(tzinfo=tzutc()) @@ -291,9 +284,14 @@ def prune(self, exceptNum=None): del self.inactive[job.key] @staticmethod - def getDbSorted(db, _limit, useCp=False, filterWs=False): + def getDbSorted(db: DatabaseBase, + _limit: Optional[int] = None, + useCp=False, + filterWs=False) -> List[JobInfo]: + cpUtc = None if useCp: cpUtc = db.checkpoint + curWs = None if filterWs: curWs = utils.workspaceIdentity() jobList = [] @@ -303,16 +301,15 @@ def getDbSorted(db, _limit, useCp=False, filterWs=False): job = db[k] except KeyError: continue - if useCp: + if cpUtc: refTime = job.createTime if not refTime or refTime < cpUtc: continue - if filterWs: - if job.workspace != curWs: - continue + if filterWs and job.workspace != curWs: + continue jobList.append(job) -# if _limit and len( jobList ) > _limit: -# break + if _limit and len(jobList) > _limit: + break jobList.sort(reverse=False) return jobList @@ -610,6 +607,24 @@ def getResources(self): val += self.plugins.getResources(self) return val + def notifyActivity(self, callback: str) -> None: + curJobs = {j.key for j in self.getDbSorted(self.active, None, False)} + while True: + safeSleep(1, self) + activeJobs = {j.key for j in self.getDbSorted(self.active, None, False)} + for k in curJobs - activeJobs: + if k not in self.inactive: + continue + job = self.inactive[k] + payload = { + "subject": "Job finished " + str(job), + "body": job.detail(), + "rc": job.rc, + } + inp = json.dumps(payload) + subprocess.run([callback], input=inp, text=True, check=False) + curJobs = activeJobs + def watchActivity(self): # pylint: disable=too-many-locals,too-many-branches,too-many-statements def _nonReminder(job): @@ -758,7 +773,7 @@ def _byAge(refA, refB): sprint( " last %s, \033[97m%s\033[0m ago" % (res, diffTime)) - sprint(" " + text_type(j)) + sprint(" " + str(j)) if wkspace in remind: sprint(" reminders:") for j in remind[wkspace]: diff --git a/jobrunner/info.py b/jobrunner/info.py index 9f62633..99edf3c 100644 --- a/jobrunner/info.py +++ b/jobrunner/info.py @@ -1,11 +1,10 @@ -from builtins import map, range import errno from functools import total_ordering from logging import getLogger import os from shlex import quote import string -from typing import Any, List, Optional +from typing import Any, Iterable, List, Optional, Sized import dateutil.tz @@ -497,7 +496,8 @@ def getValue(self, what): except AttributeError: return "N/A" - def showInOrder(self, order, level): + def showInOrder(self, order: Iterable[str], + level: Optional[Sized] = None) -> str: longLine = 0 for k in order: if len(k) > longLine: @@ -533,7 +533,7 @@ def showReminder(self): ] return self.showInOrder(order, None) - def detail(self, level): + def detail(self, level: Optional[Sized] = None) -> str: if self.reminder: return self.showReminder() diff --git a/jobrunner/main.py b/jobrunner/main.py index ba93bf8..6263daf 100755 --- a/jobrunner/main.py +++ b/jobrunner/main.py @@ -345,7 +345,9 @@ def addNonExecOptions(op): op.add_argument("-L", "--list-inactive", action="store_true", help="List inactive jobs") op.add_argument("-W", "--watch", action="store_true", - help="Watch for any job acitvity") + help="Watch for any job activity") + op.add_argument("--notifier", metavar="PROGRAM", + help="Notify using PROGRAM on any job activity") op.add_argument("-s", "--show", metavar="KEY", action="append", help="Get details for job specified by KEY") op.add_argument("-K", "--last-key", action="store_true", @@ -493,9 +495,10 @@ def handleNonExecOptions(options: argparse.Namespace, jobs: JobsBase): return False -def handleNonExecWriteOptions(options, jobs): +def handleNonExecWriteOptions(options, jobs: JobsBase): # pylint: disable=too-many-branches # pylint: disable=too-many-return-statements + # pylint: disable=too-many-statements if options.stop: errors = [] for k in options.stop: @@ -555,6 +558,12 @@ def handleNonExecWriteOptions(options, jobs): sprint("Exit on user interrupt") raise ExitCode(1) from err return True + elif options.notifier: + try: + jobs.notifyActivity(options.notifier) + except KeyboardInterrupt: + pass + return True else: return False @@ -796,7 +805,7 @@ def extendMailOrNotifyCmdLockRequired( lastArg = cmd.pop(-1) for j in mailDeps: depJob = jobs.inactive[j.permKey] - safeWrite(tmp, depJob.detail(False)) + safeWrite(tmp, depJob.detail()) safeWrite(tmp, "\n" + SPACER_EACH + "\n") assert depJob.logfile out = check_output(["tail", "-n20", depJob.logfile]) diff --git a/jobrunner/test/integration/dump_json_input.py b/jobrunner/test/integration/dump_json_input.py new file mode 100755 index 0000000..9238bcc --- /dev/null +++ b/jobrunner/test/integration/dump_json_input.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 + +from json import dump, load +from os import environ +from sys import stdin + + +def main(): + data = load(stdin) + with open(environ["DUMP_FILE"], "w", encoding="utf-8") as fp: + dump(data, fp, indent=2) + print("dumped") + + +if __name__ == "__main__": + main() diff --git a/jobrunner/test/integration/integration_lib.py b/jobrunner/test/integration/integration_lib.py index e39babc..069cf0e 100644 --- a/jobrunner/test/integration/integration_lib.py +++ b/jobrunner/test/integration/integration_lib.py @@ -147,7 +147,7 @@ def inactiveCount(): return int(run(["job", "--count"], capture=True)) -def spawn(cmd): +def spawn(cmd, env=None): print(" ".join(map(quote, cmd))) - child = pexpect.spawn(cmd[0], cmd[1:], echo=True) + child = pexpect.spawn(cmd[0], cmd[1:], echo=True, env=env) return child diff --git a/jobrunner/test/integration/smoke_test.py b/jobrunner/test/integration/smoke_test.py index acf1aef..79159af 100644 --- a/jobrunner/test/integration/smoke_test.py +++ b/jobrunner/test/integration/smoke_test.py @@ -1,8 +1,8 @@ -from __future__ import absolute_import, division, print_function - +from json import load from logging import getLogger import os import re +from shlex import quote from subprocess import CalledProcessError, check_call from tempfile import NamedTemporaryFile import time @@ -210,26 +210,50 @@ def test(self): def testWatchWait(self): with getTestEnv(): # --watch + # --notifier # --wait print("+ job --watch") - child = spawn(["job", "--watch"]) - child.expect("No jobs running") - child.expect(r"\r") - sleeper = spawn(["job", "--foreground", "sleep", "60"]) - sleeper.expect(r"execute: sleep 60") - - # Confirm --watch output - child.expect(r"1 job running") - child.sendintr() - - # Wait for the sleep 60 - waiter = spawn(["job", "--wait", "sleep"]) - waiter.expect(r"adding dependency.*sleep 60") - - # Kill the sleep 60 - sleeper.sendintr() - waiter.expect(r"Dependent job failed:.*sleep 60") - waiter.expect(EOF) + watch = spawn(["job", "--watch"]) + watch.expect("No jobs running") + watch.expect(r"\r") + + with NamedTemporaryFile() as notifierOut: + notifierOut.close() + env = dict(os.environ) + env["DUMP_FILE"] = notifierOut.name + cmd = ["job", "--notifier", "./dump_json_input.py"] + print("+", map(quote, cmd)) + notifier = spawn(cmd, env=env) + + sleeper = spawn(["job", "--foreground", "sleep", "60"]) + sleeper.expect(r"execute: sleep 60") + + # Confirm --watch output + watch.expect(r"1 job running") + watch.sendintr() + + # Wait for the sleep 60 + waiter = spawn(["job", "--wait", "sleep"]) + waiter.expect(r"adding dependency.*sleep 60") + + # Kill the sleep 60 + sleeper.sendintr() + waiter.expect(r"Dependent job failed:.*sleep 60") + waiter.expect(EOF) + + notifier.expect("dumped") + notifier.sendintr() + notifier.expect(EOF) + notifierOut.close() + + with open(notifierOut.name, encoding="utf-8") as fp: + dumped = load(fp) + print(dumped) + self.assertIn("subject", dumped) + self.assertIn("body", dumped) + self.assertIn("rc", dumped) + self.assertEqual(-1, dumped["rc"]) + self.assertRegex(dumped["subject"], r"^Job finished.*sleep 60$") def testRobot(self): with getTestEnv():