Skip to content

Commit

Permalink
Added proper signal handling
Browse files Browse the repository at this point in the history
This patch will make pyCA handle SIGINT and SIGTERM properly. Both will
shut down pyCA gracefully with the difference of SIGTERM ensuring that
pyCA capture processes are shut down as well.

Signed-off-by: Lars Kiesow <lkiesow@uos.de>
  • Loading branch information
lkiesow committed Feb 21, 2017
1 parent 66f9f7e commit 1534c7c
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 62 deletions.
52 changes: 32 additions & 20 deletions pyca/__main__.py
Expand Up @@ -8,19 +8,21 @@
:license: LGPL – see license.lgpl for more details.
'''

import sys
import getopt
import os
import multiprocessing
import os
import signal
import sys
from pyca import capture, config, schedule, ingest

USAGE = '''
Usage %s [OPTIONS] COMMAND
COMMANDS:
run -- Start all pyCA components except ui (default)
capture -- Start pyCA capturer
schedule -- Start pyCA scheduler
capture -- Start pyCA capture service
ingest -- Start pyCA ingest service
schedule -- Start pyCA schedule service
ui -- Start web based user interface
OPTIONS:
Expand All @@ -42,15 +44,36 @@ def usage(retval=0):
sys.exit(retval)


def start_all(processes):
'''Start all processes which are not alive.
def sigint_handler(signum, frame):
'''Intercept sigint and terminate services gracefully.
'''
for mod in (capture, ingest, schedule):
mod.terminate = True


def sigterm_handler(signum, frame):
'''Intercept sigterm and terminate all processes.
'''
sigint_handler(signum, frame)
for process in multiprocessing.active_children():
process.terminate()
sys.exit(0)


def run_all(*modules):
'''Start all services.
'''
processes = [multiprocessing.Process(target=mod.run) for mod in modules]
for p in processes:
p.start()
for p in processes:
if not p.is_alive():
p.start()
p.join()


def main():
# Set signal handler
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigterm_handler)

# Probe for configuration file location
cfg = '/etc/pyca.conf'
Expand Down Expand Up @@ -79,18 +102,7 @@ def main():

config.update_configuration(cfg)
if cmd == 'run':
processes = [multiprocessing.Process(target=schedule.run),
multiprocessing.Process(target=capture.run),
multiprocessing.Process(target=ingest.run)]
start_all(processes)
try:
# Ensure processes are restarted until all are dead
while [p for p in processes if p.is_alive()]:
start_all(processes)
for p in processes:
p.join(2)
except KeyboardInterrupt:
pass
run_all(schedule, capture, ingest)
elif cmd == 'schedule':
schedule.run()
elif cmd == 'capture':
Expand Down
32 changes: 24 additions & 8 deletions pyca/capture.py
Expand Up @@ -15,10 +15,24 @@
import logging
import os
import os.path
import shlex
import signal
import subprocess
import time
import traceback


terminate = False
captureproc = None


def sigterm_handler(signum, frame):
'''Intercept sigterm and terminate all processes.
'''
if captureproc and captureproc.poll() is None:
captureproc.terminate()


def start_capture(event):
'''Start the capture process, creating all necessary files and directories
as well as ingesting the captured files if no backup mode is configured.
Expand Down Expand Up @@ -70,7 +84,7 @@ def safe_start_capture(event):
'''
try:
return start_capture(event)
except:
except Exception:
logging.error('Start capture failed')
logging.error(traceback.format_exc())
register_ca(status='idle')
Expand All @@ -87,8 +101,12 @@ def recording_command(directory, name, duration):
cmd = cmd.replace('{{name}}', name)
cmd = cmd.replace('{{previewdir}}', preview_dir)
logging.info(cmd)
if os.system(cmd):
raise Exception('Recording failed')
args = shlex.split(cmd)
captureproc = subprocess.Popen(args)
while captureproc.poll() is None:
time.sleep(0.1)
if captureproc.returncode > 0:
raise Exception('Recording failed (%i)' % captureproc.returncode)

# Remove preview files:
for preview in config()['capture']['preview']:
Expand All @@ -110,7 +128,7 @@ def control_loop():
'''Main loop of the capture agent, retrieving and checking the schedule as
well as starting the capture process if necessry.
'''
while True:
while not terminate:
# Get next recording
register_ca()
events = get_session().query(UpcomingEvent)\
Expand All @@ -124,13 +142,11 @@ def control_loop():
def run():
'''Start the capture agent.
'''
signal.signal(signal.SIGTERM, sigterm_handler)
configure_service('capture.admin')

while not register_ca():
time.sleep(5.0)

try:
control_loop()
except KeyboardInterrupt:
pass
control_loop()
register_ca(status='unknown')
13 changes: 6 additions & 7 deletions pyca/ingest.py
Expand Up @@ -21,6 +21,9 @@
import traceback


terminate = False


def get_config_params(properties):
'''Extract the set of configuration parameters from the properties attached
to the schedule
Expand Down Expand Up @@ -144,7 +147,7 @@ def safe_start_ingest(event):
'''
try:
return start_ingest(event)
except:
except Exception:
logging.error('Start ingest failed')
logging.error(traceback.format_exc())
register_ca(status='idle')
Expand All @@ -155,7 +158,7 @@ def control_loop():
'''Main loop of the capture agent, retrieving and checking the schedule as
well as starting the capture process if necessry.
'''
while True:
while not terminate:
# Get next recording
register_ca()
events = get_session().query(RecordedEvent)\
Expand All @@ -171,8 +174,4 @@ def run():
'''
configure_service('ingest')
configure_service('capture.admin')

try:
control_loop()
except KeyboardInterrupt:
pass
control_loop()
12 changes: 6 additions & 6 deletions pyca/schedule.py
Expand Up @@ -19,6 +19,9 @@
import traceback


terminate = False


def parse_ical(vcal):
'''Parse Opencast schedule iCalendar file and return events as dict
'''
Expand Down Expand Up @@ -71,7 +74,7 @@ def get_schedule():

try:
cal = parse_ical(vcal.decode('utf-8'))
except:
except Exception:
logging.error('Could not parse ical')
logging.error(traceback.format_exc())
return
Expand All @@ -93,7 +96,7 @@ def get_schedule():
def control_loop():
'''Main loop, retrieving the schedule.
'''
while True:
while not terminate:
# Try getting an updated schedult
get_schedule()
q = get_session().query(UpcomingEvent)\
Expand All @@ -110,7 +113,4 @@ def run():
'''Start the capture agent.
'''
configure_service('scheduler')
try:
control_loop()
except KeyboardInterrupt:
pass
control_loop()
18 changes: 8 additions & 10 deletions tests/test_capture.py
Expand Up @@ -12,6 +12,7 @@
import unittest

from pyca import capture, config, db, utils
from tests.tools import should_fail

if sys.version_info.major > 2:
try:
Expand All @@ -22,10 +23,6 @@

class TestPycaCapture(unittest.TestCase):

dbfile = None
cadir = None
event = None

def setUp(self):
reload(config)
reload(capture)
Expand All @@ -34,19 +31,20 @@ def setUp(self):
utils.http_request = lambda x, y=False: b'xxx'
_, self.dbfile = tempfile.mkstemp()
self.cadir = tempfile.mkdtemp()
preview = os.path.join(self.cadir, 'preview.png')
open(preview, 'a').close()
config.config()['agent']['database'] = 'sqlite:///' + self.dbfile
config.config()['capture']['command'] = 'touch {{dir}}/{{name}}.mp4'
config.config()['capture']['directory'] = self.cadir
config.config()['capture']['preview'] = os.path.join(self.cadir, 'no')
config.config()['capture']['preview'] = [preview]
config.config()['service-capture.admin'] = ['']

# Mock event

db.init()
self.event = db.BaseEvent()
self.event.uid = '123123'
self.event.start = utils.timestamp()
self.event.end = self.event.start + 1
self.event.end = self.event.start + 3
data = [{'data': u'äüÄÜß',
'fmttype': 'application/xml',
'x-apple-filename': 'episode.xml'},
Expand All @@ -73,13 +71,13 @@ def test_start_capture_recording_command_failure(self):
assert not capture.start_capture(self.event)

def test_safe_start_capture(self):
capture.start_capture = 'fail'
capture.start_capture = should_fail
assert not capture.safe_start_capture(1)
capture.start_capture = lambda x: True
assert capture.safe_start_capture(1)

def test_control_loop(self):
capture.control_loop = lambda: True
def test_run(self):
capture.terminate = True
capture.run()


Expand Down
15 changes: 6 additions & 9 deletions tests/test_ingest.py
Expand Up @@ -12,6 +12,7 @@
import unittest

from pyca import ingest, config, db, utils
from tests.tools import should_fail

if sys.version_info.major > 2:
try:
Expand All @@ -22,10 +23,6 @@

class TestPycaIngest(unittest.TestCase):

dbfile = None
cadir = None
event = None

def setUp(self):
reload(config)
reload(ingest)
Expand Down Expand Up @@ -62,10 +59,10 @@ def setUp(self):

# Create recording
os.mkdir(self.event.directory())
with open(os.path.join(self.event.directory(), 'test.mp4'), 'wb') as f:
trackfile = os.path.join(self.event.directory(), 'test.mp4')
with open(trackfile, 'wb') as f:
f.write(b'123')
self.event.set_tracks([('presenter/source',
self.event.directory() + '/test.mp4')])
self.event.set_tracks([('presenter/source', trackfile)])

def tearDown(self):
os.remove(self.dbfile)
Expand All @@ -75,11 +72,11 @@ def test_start_ingest(self):
assert ingest.start_ingest(self.event)

def test_start_ingest_failure(self):
ingest.ingest = 'fail'
ingest.ingest = should_fail
assert not ingest.start_ingest(self.event)

def test_safe_start_ingest(self):
ingest.start_ingest = 'fail'
ingest.start_ingest = should_fail
assert not ingest.safe_start_ingest(1)
ingest.start_ingest = lambda x: True
assert ingest.safe_start_ingest(1)
Expand Down
6 changes: 4 additions & 2 deletions tests/test_utils.py
Expand Up @@ -7,6 +7,8 @@
import unittest

from pyca import utils, config
from tests.tools import should_fail

import sys
if sys.version_info.major > 2:
try:
Expand Down Expand Up @@ -62,7 +64,7 @@ def test_http_request(self):
def test_register_ca(self):
utils.http_request = lambda x, y=False: b'xxx'
assert utils.register_ca()
utils.http_request = 'fail'
utils.http_request = should_fail
assert not utils.register_ca()
config.config()['agent']['backup_mode'] = True
assert utils.register_ca()
Expand All @@ -71,7 +73,7 @@ def test_recording_state(self):
utils.http_request = lambda x, y=False: b'xxx'
config.config()['service-capture.admin'] = ['']
utils.recording_state('123', 'recording')
utils.http_request = 'fail'
utils.http_request = should_fail
utils.recording_state('123', 'recording')
config.config()['agent']['backup_mode'] = True
utils.recording_state('123', 'recording')
Expand Down
12 changes: 12 additions & 0 deletions tests/tools.py
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
'''
Some helper tools for pyCA testing.
'''


class ShouldFailException(Exception):
pass


def should_fail(*args, **kwargs):
raise ShouldFailException()

0 comments on commit 1534c7c

Please sign in to comment.