From 1534c7c004ea9b9b28c7dd8cdcb2d1b7d5b83769 Mon Sep 17 00:00:00 2001 From: Lars Kiesow Date: Tue, 21 Feb 2017 23:45:48 +0100 Subject: [PATCH] Added proper signal handling This patch will make pyCA handle SIGINT and SIGTERM properly. Both will shut down pyCA gracefully with the difference of SIGTERM ensuring that pyCA capture processes are shut down as well. Signed-off-by: Lars Kiesow --- pyca/__main__.py | 52 ++++++++++++++++++++++++++----------------- pyca/capture.py | 32 +++++++++++++++++++------- pyca/ingest.py | 13 +++++------ pyca/schedule.py | 12 +++++----- tests/test_capture.py | 18 +++++++-------- tests/test_ingest.py | 15 +++++-------- tests/test_utils.py | 6 +++-- tests/tools.py | 12 ++++++++++ 8 files changed, 98 insertions(+), 62 deletions(-) create mode 100644 tests/tools.py diff --git a/pyca/__main__.py b/pyca/__main__.py index 1194063d..70f5ca1f 100755 --- a/pyca/__main__.py +++ b/pyca/__main__.py @@ -8,10 +8,11 @@ :license: LGPL – see license.lgpl for more details. ''' -import sys import getopt -import os import multiprocessing +import os +import signal +import sys from pyca import capture, config, schedule, ingest USAGE = ''' @@ -19,8 +20,9 @@ COMMANDS: run -- Start all pyCA components except ui (default) - capture -- Start pyCA capturer - schedule -- Start pyCA scheduler + capture -- Start pyCA capture service + ingest -- Start pyCA ingest service + schedule -- Start pyCA schedule service ui -- Start web based user interface OPTIONS: @@ -42,15 +44,36 @@ def usage(retval=0): sys.exit(retval) -def start_all(processes): - '''Start all processes which are not alive. +def sigint_handler(signum, frame): + '''Intercept sigint and terminate services gracefully. ''' + for mod in (capture, ingest, schedule): + mod.terminate = True + + +def sigterm_handler(signum, frame): + '''Intercept sigterm and terminate all processes. + ''' + sigint_handler(signum, frame) + for process in multiprocessing.active_children(): + process.terminate() + sys.exit(0) + + +def run_all(*modules): + '''Start all services. + ''' + processes = [multiprocessing.Process(target=mod.run) for mod in modules] + for p in processes: + p.start() for p in processes: - if not p.is_alive(): - p.start() + p.join() def main(): + # Set signal handler + signal.signal(signal.SIGINT, sigint_handler) + signal.signal(signal.SIGTERM, sigterm_handler) # Probe for configuration file location cfg = '/etc/pyca.conf' @@ -79,18 +102,7 @@ def main(): config.update_configuration(cfg) if cmd == 'run': - processes = [multiprocessing.Process(target=schedule.run), - multiprocessing.Process(target=capture.run), - multiprocessing.Process(target=ingest.run)] - start_all(processes) - try: - # Ensure processes are restarted until all are dead - while [p for p in processes if p.is_alive()]: - start_all(processes) - for p in processes: - p.join(2) - except KeyboardInterrupt: - pass + run_all(schedule, capture, ingest) elif cmd == 'schedule': schedule.run() elif cmd == 'capture': diff --git a/pyca/capture.py b/pyca/capture.py index 4ee32331..cb02362c 100644 --- a/pyca/capture.py +++ b/pyca/capture.py @@ -15,10 +15,24 @@ import logging import os import os.path +import shlex +import signal +import subprocess import time import traceback +terminate = False +captureproc = None + + +def sigterm_handler(signum, frame): + '''Intercept sigterm and terminate all processes. + ''' + if captureproc and captureproc.poll() is None: + captureproc.terminate() + + def start_capture(event): '''Start the capture process, creating all necessary files and directories as well as ingesting the captured files if no backup mode is configured. @@ -70,7 +84,7 @@ def safe_start_capture(event): ''' try: return start_capture(event) - except: + except Exception: logging.error('Start capture failed') logging.error(traceback.format_exc()) register_ca(status='idle') @@ -87,8 +101,12 @@ def recording_command(directory, name, duration): cmd = cmd.replace('{{name}}', name) cmd = cmd.replace('{{previewdir}}', preview_dir) logging.info(cmd) - if os.system(cmd): - raise Exception('Recording failed') + args = shlex.split(cmd) + captureproc = subprocess.Popen(args) + while captureproc.poll() is None: + time.sleep(0.1) + if captureproc.returncode > 0: + raise Exception('Recording failed (%i)' % captureproc.returncode) # Remove preview files: for preview in config()['capture']['preview']: @@ -110,7 +128,7 @@ def control_loop(): '''Main loop of the capture agent, retrieving and checking the schedule as well as starting the capture process if necessry. ''' - while True: + while not terminate: # Get next recording register_ca() events = get_session().query(UpcomingEvent)\ @@ -124,13 +142,11 @@ def control_loop(): def run(): '''Start the capture agent. ''' + signal.signal(signal.SIGTERM, sigterm_handler) configure_service('capture.admin') while not register_ca(): time.sleep(5.0) - try: - control_loop() - except KeyboardInterrupt: - pass + control_loop() register_ca(status='unknown') diff --git a/pyca/ingest.py b/pyca/ingest.py index c04e52a1..ffc4bbfa 100644 --- a/pyca/ingest.py +++ b/pyca/ingest.py @@ -21,6 +21,9 @@ import traceback +terminate = False + + def get_config_params(properties): '''Extract the set of configuration parameters from the properties attached to the schedule @@ -144,7 +147,7 @@ def safe_start_ingest(event): ''' try: return start_ingest(event) - except: + except Exception: logging.error('Start ingest failed') logging.error(traceback.format_exc()) register_ca(status='idle') @@ -155,7 +158,7 @@ def control_loop(): '''Main loop of the capture agent, retrieving and checking the schedule as well as starting the capture process if necessry. ''' - while True: + while not terminate: # Get next recording register_ca() events = get_session().query(RecordedEvent)\ @@ -171,8 +174,4 @@ def run(): ''' configure_service('ingest') configure_service('capture.admin') - - try: - control_loop() - except KeyboardInterrupt: - pass + control_loop() diff --git a/pyca/schedule.py b/pyca/schedule.py index 8a088a2b..571b8e7a 100644 --- a/pyca/schedule.py +++ b/pyca/schedule.py @@ -19,6 +19,9 @@ import traceback +terminate = False + + def parse_ical(vcal): '''Parse Opencast schedule iCalendar file and return events as dict ''' @@ -71,7 +74,7 @@ def get_schedule(): try: cal = parse_ical(vcal.decode('utf-8')) - except: + except Exception: logging.error('Could not parse ical') logging.error(traceback.format_exc()) return @@ -93,7 +96,7 @@ def get_schedule(): def control_loop(): '''Main loop, retrieving the schedule. ''' - while True: + while not terminate: # Try getting an updated schedult get_schedule() q = get_session().query(UpcomingEvent)\ @@ -110,7 +113,4 @@ def run(): '''Start the capture agent. ''' configure_service('scheduler') - try: - control_loop() - except KeyboardInterrupt: - pass + control_loop() diff --git a/tests/test_capture.py b/tests/test_capture.py index 3e9f5d88..25a11f23 100644 --- a/tests/test_capture.py +++ b/tests/test_capture.py @@ -12,6 +12,7 @@ import unittest from pyca import capture, config, db, utils +from tests.tools import should_fail if sys.version_info.major > 2: try: @@ -22,10 +23,6 @@ class TestPycaCapture(unittest.TestCase): - dbfile = None - cadir = None - event = None - def setUp(self): reload(config) reload(capture) @@ -34,19 +31,20 @@ def setUp(self): utils.http_request = lambda x, y=False: b'xxx' _, self.dbfile = tempfile.mkstemp() self.cadir = tempfile.mkdtemp() + preview = os.path.join(self.cadir, 'preview.png') + open(preview, 'a').close() config.config()['agent']['database'] = 'sqlite:///' + self.dbfile config.config()['capture']['command'] = 'touch {{dir}}/{{name}}.mp4' config.config()['capture']['directory'] = self.cadir - config.config()['capture']['preview'] = os.path.join(self.cadir, 'no') + config.config()['capture']['preview'] = [preview] config.config()['service-capture.admin'] = [''] # Mock event - db.init() self.event = db.BaseEvent() self.event.uid = '123123' self.event.start = utils.timestamp() - self.event.end = self.event.start + 1 + self.event.end = self.event.start + 3 data = [{'data': u'äüÄÜß', 'fmttype': 'application/xml', 'x-apple-filename': 'episode.xml'}, @@ -73,13 +71,13 @@ def test_start_capture_recording_command_failure(self): assert not capture.start_capture(self.event) def test_safe_start_capture(self): - capture.start_capture = 'fail' + capture.start_capture = should_fail assert not capture.safe_start_capture(1) capture.start_capture = lambda x: True assert capture.safe_start_capture(1) - def test_control_loop(self): - capture.control_loop = lambda: True + def test_run(self): + capture.terminate = True capture.run() diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 67c956b1..0d43090b 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -12,6 +12,7 @@ import unittest from pyca import ingest, config, db, utils +from tests.tools import should_fail if sys.version_info.major > 2: try: @@ -22,10 +23,6 @@ class TestPycaIngest(unittest.TestCase): - dbfile = None - cadir = None - event = None - def setUp(self): reload(config) reload(ingest) @@ -62,10 +59,10 @@ def setUp(self): # Create recording os.mkdir(self.event.directory()) - with open(os.path.join(self.event.directory(), 'test.mp4'), 'wb') as f: + trackfile = os.path.join(self.event.directory(), 'test.mp4') + with open(trackfile, 'wb') as f: f.write(b'123') - self.event.set_tracks([('presenter/source', - self.event.directory() + '/test.mp4')]) + self.event.set_tracks([('presenter/source', trackfile)]) def tearDown(self): os.remove(self.dbfile) @@ -75,11 +72,11 @@ def test_start_ingest(self): assert ingest.start_ingest(self.event) def test_start_ingest_failure(self): - ingest.ingest = 'fail' + ingest.ingest = should_fail assert not ingest.start_ingest(self.event) def test_safe_start_ingest(self): - ingest.start_ingest = 'fail' + ingest.start_ingest = should_fail assert not ingest.safe_start_ingest(1) ingest.start_ingest = lambda x: True assert ingest.safe_start_ingest(1) diff --git a/tests/test_utils.py b/tests/test_utils.py index 20b72808..2acb7a57 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -7,6 +7,8 @@ import unittest from pyca import utils, config +from tests.tools import should_fail + import sys if sys.version_info.major > 2: try: @@ -62,7 +64,7 @@ def test_http_request(self): def test_register_ca(self): utils.http_request = lambda x, y=False: b'xxx' assert utils.register_ca() - utils.http_request = 'fail' + utils.http_request = should_fail assert not utils.register_ca() config.config()['agent']['backup_mode'] = True assert utils.register_ca() @@ -71,7 +73,7 @@ def test_recording_state(self): utils.http_request = lambda x, y=False: b'xxx' config.config()['service-capture.admin'] = [''] utils.recording_state('123', 'recording') - utils.http_request = 'fail' + utils.http_request = should_fail utils.recording_state('123', 'recording') config.config()['agent']['backup_mode'] = True utils.recording_state('123', 'recording') diff --git a/tests/tools.py b/tests/tools.py new file mode 100644 index 00000000..5c18bfd2 --- /dev/null +++ b/tests/tools.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +''' +Some helper tools for pyCA testing. +''' + + +class ShouldFailException(Exception): + pass + + +def should_fail(*args, **kwargs): + raise ShouldFailException()