Skip to content

Commit

Permalink
logging: clean up opened filehandles (scaling) (#4)
Browse files Browse the repository at this point in the history
* logging: clean up opened filehandles (scaling)

* make setup_logging a context and remove handlers on ctx exit
  with scaling tests with >1k nodes we ran into crash due to
  "too many open files"

* travis fixes

* travis fixes
  • Loading branch information
lukasheinrich committed Sep 19, 2017
1 parent 2890c40 commit 5fe5fab
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 119 deletions.
34 changes: 20 additions & 14 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
sudo: required

services:
- docker

- docker
language: python
python:
- '2.7'
- '3.5'
- '2.7'
- '3.5'
install:
- pip install pyflakes pytest pytest-cov python-coveralls
- pip install --process-dependency-links -e '.[celery]'

- pip install pyflakes pytest pytest-cov python-coveralls
- pip install --process-dependency-links -e '.[celery]'
script:
- pyflakes packtivity
- pytest --cov=packtivity -vv
- packtivity-run tests/testspecs/noop-test.yml -p a_parameter=hello

- pyflakes packtivity
- pytest --cov=packtivity -vv
- packtivity-run tests/testspecs/noop-test.yml -p a_parameter=hello
jobs:
include:
- stage: deploy
deploy:
provider: pypi
password:
secure: OCaYWRAgoQIgyraeTMOpDxRYh0bIazkB1pwNo/7yxbmDloYZeoHTMYZOWYJ9D6Al9wFzveHvW+iABOhP8Zc3e4crep94VIMfYj+IzPFnVMk4+t2ZnTArJRwuuKdzAl2+aeFcRjgUidORWKwzp/T4kf1ScLjSSf7SQQAQexnhGpGgHB4CHdXhV03fGydKeoBJKNhfXzmQtaNdtJtRURi0s86SQSpJJBDb1Jeike2ZpShJBiehLWQ0TMAivcPKaCI1stRwCeFKpInvxqrJBTj7oov+EdsH0eh1uMgwlNiHjd94RG2epFvlr1GBvq8Dr86/4KKykHrDtwQjLpsIvADzzS/9U+EEd4OKTGkXgF7RFcMKzrJ6MWrdLc5EzeeZRR2k3cPqDByy1Mnmevc+9EzqJHpQteigkXR7pcNjaq7vhFuzbwDcqtgwWZz1g3wenRpdqeYMq3qkkJ8D4yTBtTZejE1mWkUEFKR83ehS8J/7B1XkekHRncLm4vhNGOlcPFIcuvko1NayNmKejH+5BntpO0pZAafeE2vhK8kCDexQbf/P773IOAKPBU7b/pBs/xZ6EwCCqGj2HETsxgFMshMXwjq5f3hOseP8QHlbgelxlKu3Qf816SpUxBZH0STTS6xq3we6feCFpYyza2f9eB98zmF56PJRuzTjUs9+nh0QyEI=
username:
secure: E5tapPXNvC1e92k4KdCojyN2ZUxsN+EZWgB6rLtMh9qgNWZd0FmnX15IuXRD/HzPbSFa10Qus9M7kK0B8Bm0FhmU9a1KnzSPJEDaAGJ1RIx50EfFqJGbqpSp++HVzhHx6y3Jer6ltpeKoYdxFQwi+NVFHGEuJHltzKYcOx8XLWNe0AtZdi5LLqocJyk7MuQ/5QYGIygAR4/+sK7Pe8xmBR6uzrVcKRuoIzfxJEeXepLk779UnxD4Yqn0R19j8hdBNBd2q7ue9iAbPr+Uv3uTRnHjLGVPU4cKy8qeO5h4GXQBbeT1QyJsm+so/fSxqVIb50NNlaOlHSqaAmDGZ+fR70snEYOS+n2LqoD/KpOLip1upgjLo1bIqvUOStqVq4kLPUeb1pX3gONDq1YloSkJwKObNOXP5THMBf2mhtUDsT//iEGKytzclGISYyL77nyVKwcyix6BpHY1IcAvieenEMRtpcz2BaQWAd8s3Xgdk1Ht10wpskbAUomb8R8oI9kf3isvmSEjr57kLmr7zkCIODvHZt2y5Xj16bsruWMTtqfewkgniUOskGV3SNmMhdrgQnFBgv8M4P0C8CK+5zrpO2SXGqjOvyQJw06RjCKPqnWGl2kTPVds3cbrZ668clJexEpRj8m4wR1bgOUh4u1qLVIQtpO7eb6h/roFfAs7onA=
on:
tags: true
after_success:
- coveralls

- coveralls
161 changes: 81 additions & 80 deletions packtivity/handlers/execution_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,22 @@ def run_docker_with_script(state,environment,job,log,metadata):
indocker = envmod+indocker

try:
runlog = logutils.setup_logging_topic(metadata,state,'run', return_logger = True)
subcmd = 'docker run --rm -i {docker_mod} {image}:{imagetag} sh -c \'{indocker}\' '.format(image = image, imagetag = imagetag, docker_mod = docker_mod, indocker = indocker)
log.debug('running docker cmd: %s',subcmd)
proc = subprocess.Popen(shlex.split(subcmd), stdin = subprocess.PIPE, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1)
with logutils.setup_logging_topic(metadata,state,'run', return_logger = True) as runlog:
subcmd = 'docker run --rm -i {docker_mod} {image}:{imagetag} sh -c \'{indocker}\' '.format(image = image, imagetag = imagetag, docker_mod = docker_mod, indocker = indocker)
log.debug('running docker cmd: %s',subcmd)
proc = subprocess.Popen(shlex.split(subcmd), stdin = subprocess.PIPE, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)

log.debug('started run subprocess with pid %s. now piping script',proc.pid)
proc.stdin.write(script.encode('utf-8'))
proc.stdin.close()
time.sleep(0.5)
log.debug('started run subprocess with pid %s. now piping script',proc.pid)
proc.stdin.write(script.encode('utf-8'))
proc.stdin.close()
time.sleep(0.5)

for line in iter(proc.stdout.readline, b''):
runlog.info(line.strip())
while proc.poll() is None:
pass
for line in iter(proc.stdout.readline, b''):
runlog.info(line.strip())
while proc.poll() is None:
pass

proc.stdout.close()

log.debug('docker run subprocess finished. return code: %s',proc.returncode)
if proc.returncode:
Expand Down Expand Up @@ -194,16 +196,18 @@ def docker_pull(docker_pull_cmd,log,state,metadata):
if 'PACKTIVITY_DRYRUN' in os.environ:
return
try:
pulllog = logutils.setup_logging_topic(metadata,state,'pull', return_logger = True)
proc = subprocess.Popen(shlex.split(docker_pull_cmd), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1)
log.debug('started pull subprocess with pid %s. now wait to finish',proc.pid)
time.sleep(0.5)
log.debug('process children: %s',[x for x in psutil.Process(proc.pid).children(recursive = True)])
with logutils.setup_logging_topic(metadata,state,'pull', return_logger = True) as pulllog:
proc = subprocess.Popen(shlex.split(docker_pull_cmd), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)
log.debug('started pull subprocess with pid %s. now wait to finish',proc.pid)
time.sleep(0.5)
log.debug('process children: %s',[x for x in psutil.Process(proc.pid).children(recursive = True)])

for line in iter(proc.stdout.readline, b''):
pulllog.info(line.strip())
while proc.poll() is None:
pass

for line in iter(proc.stdout.readline, b''):
pulllog.info(line.strip())
while proc.poll() is None:
pass
proc.stdout.close()

log.debug('pull subprocess finished. return code: %s',proc.returncode)
if proc.returncode:
Expand All @@ -227,16 +231,18 @@ def docker_run_cmd(fullest_command,log,state,metadata):
if 'PACKTIVITY_DRYRUN' in os.environ:
return
try:
runlog = logutils.setup_logging_topic(metadata,state,'run', return_logger = True)
proc = subprocess.Popen(shlex.split(fullest_command), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1)
log.debug('started run subprocess with pid %s. now wait to finish',proc.pid)
time.sleep(0.5)
log.debug('process children: %s',[x for x in psutil.Process(proc.pid).children(recursive = True)])
with logutils.setup_logging_topic(metadata,state,'run', return_logger = True) as runlog:
proc = subprocess.Popen(shlex.split(fullest_command), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)
log.debug('started run subprocess with pid %s. now wait to finish',proc.pid)
time.sleep(0.5)
log.debug('process children: %s',[x for x in psutil.Process(proc.pid).children(recursive = True)])

for line in iter(proc.stdout.readline, b''):
runlog.info(line.strip())
while proc.poll() is None:
pass
for line in iter(proc.stdout.readline, b''):
runlog.info(line.strip())
while proc.poll() is None:
pass

proc.stdout.close()

log.debug('docker run subprocess finished. return code: %s',proc.returncode)
if proc.returncode:
Expand All @@ -257,58 +263,53 @@ def docker_run_cmd(fullest_command,log,state,metadata):

@executor('docker-encapsulated')
def docker_enc_handler(environment,state,job,metadata):
log = logutils.setup_logging_topic(metadata,state,'step',return_logger = True)

#setup more detailed logging
logutils.setup_logging(metadata, state)


log.debug('starting log for step: %s',metadata)
if 'PACKTIVITY_DOCKER_NOPULL' not in os.environ:
log.info('prepare pull')
docker_pull_cmd = 'docker pull {container}:{tag}'.format(
container = environment['image'],
tag = environment['imagetag']
)
docker_pull(docker_pull_cmd,log,state,metadata)
with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log:
log.debug('starting log for step: %s',metadata)
if 'PACKTIVITY_DOCKER_NOPULL' not in os.environ:
log.info('prepare pull')
docker_pull_cmd = 'docker pull {container}:{tag}'.format(
container = environment['image'],
tag = environment['imagetag']
)
docker_pull(docker_pull_cmd,log,state,metadata)

log.info('running job')

log.info('running job')

if 'command' in job:
# log.info('running oneliner command')
docker_run_cmd_str = prepare_full_docker_with_oneliner(state,environment,job['command'],log,metadata)
docker_run_cmd(docker_run_cmd_str,log,state,metadata)
log.debug('reached return for docker_enc_handler')
elif 'script' in job:
run_docker_with_script(state,environment,job,log,metadata)
else:
raise RuntimeError('do not know yet how to run this...')
if 'command' in job:
# log.info('running oneliner command')
docker_run_cmd_str = prepare_full_docker_with_oneliner(state,environment,job['command'],log,metadata)
docker_run_cmd(docker_run_cmd_str,log,state,metadata)
log.debug('reached return for docker_enc_handler')
elif 'script' in job:
run_docker_with_script(state,environment,job,log,metadata)
else:
raise RuntimeError('do not know yet how to run this...')

@executor('noop-env')
def noop_env(environment,state,job,metadata):
log = logutils.setup_logging_topic(metadata,state,'step',return_logger = True)
log.info('state is: %s',state)
log.info('would be running this job: %s',job)
with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log:
log.info('state is: %s',state)
log.info('would be running this job: %s',job)

@executor('localproc-env')
def localproc_env(environment,state,job,metadata):
log = logutils.setup_logging_topic(metadata,state,'step',return_logger = True)
olddir = os.path.realpath(os.curdir)
workdir = state.readwrite[0]
log.info('running local command %s',job['command'])
try:
log.info('changing to workdirectory %s',workdir)
utils.mkdir_p(workdir)
os.chdir(workdir)
#this is used for testing and we will keep this shell
#doesn't make sense to wrap in sh ...
subprocess.check_call(job['command'], shell = True)
except:
log.exception('local job failed. job: %s',job)
raise
finally:
log.info('changing back to original directory %s',olddir)
os.chdir(olddir)
with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log:
olddir = os.path.realpath(os.curdir)
workdir = state.readwrite[0]
log.info('running local command %s',job['command'])
try:
log.info('changing to workdirectory %s',workdir)
utils.mkdir_p(workdir)
os.chdir(workdir)
#this is used for testing and we will keep this shell
#doesn't make sense to wrap in sh ...
subprocess.check_call(job['command'], shell = True)
except:
log.exception('local job failed. job: %s',job)
raise
finally:
log.info('changing back to original directory %s',olddir)
os.chdir(olddir)

@executor('manual-env')
def manual_env(environment,state,job,metadata):
Expand All @@ -319,8 +320,8 @@ def manual_env(environment,state,job,metadata):

@executor('test-env')
def test_process(environment,state,job,metadata):
log = logutils.setup_logging_topic(metadata,state,'step',return_logger = True)
log.info('a complicated test environment')
log.info('job: {}'.format(job))
log.info('env: {}'.format(environment))
log.info('state {}'.format(state))
with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log:
log.info('a complicated test environment')
log.info('job: {}'.format(job))
log.info('env: {}'.format(environment))
log.info('state {}'.format(state))
39 changes: 25 additions & 14 deletions packtivity/logutils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import logging
import importlib

import contextlib

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

Expand Down Expand Up @@ -41,20 +41,31 @@ def default_logging_handlers(log,metadata,state,topic):
fh.setFormatter(formatter)
log.addHandler(fh)

@contextlib.contextmanager
def setup_logging_topic(metadata,state,topic,return_logger = False):
'''
a context manager for logging
it is a context in order to be able to clean up the logging after it's not needed
if many loggers and handlers that open resources are created at some point these
resoures may dry up. that's why we need a specific end point.
The logger can be recreated multiple times
'''
log = logging.getLogger(get_topic_loggername(metadata,topic))
log.propagate = False

if log.handlers:
return log if return_logger else None

customhandlers = os.environ.get('PACKTIVITY_LOGGING_HANDLER')
if customhandlers:
module,func = customhandlers.split(':')
m = importlib.import_module(module)
f = getattr(m,func)
f(log,metadata,state,topic)
else:
default_logging_handlers(log,metadata,state,topic)
if return_logger:
return log
if not log.handlers:
customhandlers = os.environ.get('PACKTIVITY_LOGGING_HANDLER')
if customhandlers:
module,func = customhandlers.split(':')
m = importlib.import_module(module)
f = getattr(m,func)
f(log,metadata,state,topic)
else:
default_logging_handlers(log,metadata,state,topic)

yield log if return_logger else None

for h in log.handlers:
h.close()
log.removeHandler(h)

22 changes: 11 additions & 11 deletions packtivity/syncbackends.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ def prepublish(spec, parameters, state, pack_config):
return None

def run_packtivity(spec, parameters,state,metadata,config):
log = logutils.setup_logging_topic(metadata,state,'step',return_logger = True)
try:
job = build_job(spec['process'], parameters, state, config)
env = build_env(spec['environment'], parameters, state, config)
run_in_env(env,job,state,metadata,config)
pubdata = publish(spec['publisher'], parameters,state, config)
log.info('publishing data: %s',pubdata)
return pubdata
except:
log.exception('%s raised exception',metadata)
raise
with logutils.setup_logging_topic(metadata,state,'step',return_logger = True) as log:
try:
job = build_job(spec['process'], parameters, state, config)
env = build_env(spec['environment'], parameters, state, config)
run_in_env(env,job,state,metadata,config)
pubdata = publish(spec['publisher'], parameters,state, config)
log.info('publishing data: %s',pubdata)
return pubdata
except:
log.exception('%s raised exception',metadata)
raise

class defaultsyncbackend(object):
def __init__(self,packconfig_spec = None):
Expand Down

0 comments on commit 5fe5fab

Please sign in to comment.