Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.
/ pulp Public archive

Commit

Permalink
Merge remote-tracking branch 'pulp/master' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Bouterse committed Oct 25, 2016
2 parents 601156a + ed12742 commit debd56c
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 7 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ Florian Weimer (fw@deneb.enyo.de)
Adam Young (ayoung@redhat.com)
Lukas Zapletal (lzap+git@redhat.com)
Bihan Zhang (bizhang@redhat.com)
Jan-Otto Kröpke (mail@jkroepke.de)
5 changes: 5 additions & 0 deletions docs/user-guide/release-notes/2.11.x.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ New Features
has been removed since the last sync.

* When pulp-manage-db is run, prompt the user to continue if there are Pulp services still running.

* Memory improvements! Worker process recycling can be enabled to release unused memory back to
the system after tasks complete. This feature is disabled by default. See the
:ref:`process recycling documentation<process_recycling>` for more info. Thank you to Jan-Otto
Kröpke for contributing this feature.
6 changes: 6 additions & 0 deletions docs/user-guide/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,9 @@ be done by that worker. This is normal for cancellation and is not a cause for c
celery.worker.job:ERROR: (15328-02560) File "/usr/lib64/python2.7/site-packages/billiard/pool.py", line 1673, in _set_terminated
celery.worker.job:ERROR: (15328-02560) raise Terminated(-(signum or 0))
celery.worker.job:ERROR: (15328-02560) Terminated: 0

Workers not releasing memory
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

See the :ref:`process recycling documentation<process_recycling>` for more information on how to
have your Pulp workers return memory back to the system.
16 changes: 16 additions & 0 deletions docs/user-guide/tuning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ the collection responsible for that content type. Each content type has a collec

.. _here: http://docs.mongodb.org/manual/core/index-creation/

.. _process_recycling:

Memory Issues
^^^^^^^^^^^^^
Pulp workers do not release all unused memory back to the system once tasks are complete. This is
a known issues with the version of Python that Pulp uses.

To work around this problem, Pulp supports worker process recycling to terminate a worker
process after X tasks and replace it with a new one. This will release unused memory back to the
system after tasks complete. This will not interfere with your usage of Pulp, but it does incur
a small runtime overhead on the tasking system from killing and respawning processes regularly.

See the ``PULP_MAX_TASKS_PER_CHILD`` variable in your ``/etc/default/pulp_workers`` file to enable
this feature. After adjusting the configuration value you will need to restart your
``pulp_workers`` processes.

Monitoring
----------

Expand Down
4 changes: 4 additions & 0 deletions server/etc/default/systemd_pulp_workers
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@

# Configure Python's encoding for writing all logs, stdout and stderr
PYTHONIOENCODING="UTF-8"

# To avoid memory leaks, Pulp can terminate and replace a worker after processing X tasks. If
# left commented, process recycling is disabled. PULP_MAX_TASKS_PER_CHILD must be > 0.
# PULP_MAX_TASKS_PER_CHILD=2
4 changes: 4 additions & 0 deletions server/etc/default/upstart_pulp_workers
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ CELERYD_PID_FILE="/var/run/pulp/%n.pid"
# Configure Python's encoding for writing all logs, stdout and stderr
PYTHONIOENCODING="UTF-8"

# To avoid memory leaks, Pulp can terminate and replace a worker after processing X tasks. If
# left commented, process recycling is disabled. PULP_MAX_TASKS_PER_CHILD must be > 0.
# PULP_MAX_TASKS_PER_CHILD=2

######################################################################
# Please do not edit any of the settings below this mark in this file!
######################################################################
Expand Down
8 changes: 8 additions & 0 deletions server/etc/rc.d/init.d/pulp_workers
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ if [ ! -z "$CELERY_APP" ]; then
CELERY_APP_ARG="--app=$CELERY_APP"
fi

# Sets --maxtasksperchild argument for CELERY_BIN
if [ -z ${PULP_MAX_TASKS_PER_CHILD+x} ]; then
CELERY_MAX_TASKS_PER_CHILD_ARG=""
else
CELERY_MAX_TASKS_PER_CHILD_ARG="--maxtasksperchild=$PULP_MAX_TASKS_PER_CHILD"
fi

CELERYD_USER=${CELERYD_USER:-$DEFAULT_USER}

# Set CELERY_CREATE_DIRS to always create log/pid dirs.
Expand Down Expand Up @@ -219,6 +226,7 @@ start_workers () {
--logfile="$CELERYD_LOG_FILE" \
--loglevel="$CELERYD_LOG_LEVEL" \
$CELERY_APP_ARG \
$CELERY_MAX_TASKS_PER_CHILD_ARG \
$CELERYD_OPTS
}

Expand Down
1 change: 0 additions & 1 deletion server/selinux/server/pulp-celery.te
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ corenet_tcp_bind_generic_node(celery_t)
# Pulp workers need to do selinux relabeling as part of publishing.
#

selinux_set_enforce_mode(celery_t)
selinux_validate_context(celery_t)

allow celery_t httpd_sys_rw_content_t:dir relabelto;
Expand Down
18 changes: 14 additions & 4 deletions server/test/unit/server/async/test_manage_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def communicate(self):
stdout=subprocess.PIPE, shell=True)
cpu_count.assert_called_once_with()

@mock.patch('pulp.server.async.manage_workers._get_max_tasks', mock.MagicMock(return_value=''))
@mock.patch('pulp.server.async.manage_workers.multiprocessing.cpu_count', return_value=16)
@mock.patch('pulp.server.async.manage_workers.subprocess.Popen')
def test_concurrency_set(self, Popen, cpu_count):
Expand Down Expand Up @@ -172,6 +173,8 @@ class TestStartWorkers(unittest.TestCase):
"""
Test the _start_workers() function. For simplicity, these tests all set concurrency to 1.
"""

@mock.patch('pulp.server.async.manage_workers._get_max_tasks', mock.MagicMock(return_value=''))
@mock.patch('pulp.server.async.manage_workers._get_concurrency', mock.MagicMock(return_value=1))
@mock.patch('pulp.server.async.manage_workers.os.path.exists',
mock.MagicMock(return_value=True))
Expand All @@ -192,7 +195,8 @@ class SysExit(Exception):
Popen.return_value.returncode = 42
Popen.return_value.communicate.return_value = pipe_output
expected_read_data = manage_workers._WORKER_TEMPLATE % {
'num': 0, 'environment_file': manage_workers._ENVIRONMENT_FILE}
'num': 0, 'environment_file': manage_workers._ENVIRONMENT_FILE,
'max_tasks_argument': ''}

with mock.patch('__builtin__.open', autospec=True) as mock_open:
mock_file = mock.MagicMock(spec=file)
Expand Down Expand Up @@ -228,6 +232,7 @@ class SysExit(Exception):
# Make sure the exit code was passed on
exit.assert_called_once_with(42)

@mock.patch('pulp.server.async.manage_workers._get_max_tasks', mock.MagicMock(return_value=''))
@mock.patch('pulp.server.async.manage_workers._get_concurrency', mock.MagicMock(return_value=1))
@mock.patch('pulp.server.async.manage_workers.os.path.exists',
mock.MagicMock(return_value=True))
Expand All @@ -243,7 +248,8 @@ def test_unit_path_does_exist_correctly(self, stdout, Popen):
Popen.return_value.returncode = 0
Popen.return_value.communicate.return_value = pipe_output
expected_read_data = manage_workers._WORKER_TEMPLATE % {
'num': 0, 'environment_file': manage_workers._ENVIRONMENT_FILE}
'num': 0, 'environment_file': manage_workers._ENVIRONMENT_FILE,
'max_tasks_argument': ''}

with mock.patch('__builtin__.open', autospec=True) as mock_open:
mock_file = mock.MagicMock(spec=file)
Expand Down Expand Up @@ -274,6 +280,7 @@ def test_unit_path_does_exist_correctly(self, stdout, Popen):
self.assertEqual(stdout.write.mock_calls[0][1][0], pipe_output[0])
self.assertEqual(stdout.write.mock_calls[1][1][0], '\n')

@mock.patch('pulp.server.async.manage_workers._get_max_tasks', mock.MagicMock(return_value=''))
@mock.patch('pulp.server.async.manage_workers._get_concurrency', mock.MagicMock(return_value=1))
@mock.patch('pulp.server.async.manage_workers.os.path.exists',
mock.MagicMock(return_value=True))
Expand Down Expand Up @@ -303,7 +310,8 @@ def test_unit_path_does_exist_incorrectly(self, stdout, Popen):
unit_filename = manage_workers._UNIT_FILENAME_TEMPLATE % 0
expected_path = os.path.join(manage_workers._SYSTEMD_UNIT_PATH, unit_filename)
expected_file_contents = manage_workers._WORKER_TEMPLATE % {
'num': 0, 'environment_file': manage_workers._ENVIRONMENT_FILE}
'num': 0, 'environment_file': manage_workers._ENVIRONMENT_FILE,
'max_tasks_argument': ''}
self.assertEqual(mock_open.call_count, 2)
# Let's inspect the read call
first_open = mock_open.mock_calls[0]
Expand Down Expand Up @@ -333,6 +341,7 @@ def test_unit_path_does_exist_incorrectly(self, stdout, Popen):
self.assertEqual(stdout.write.mock_calls[0][1][0], pipe_output[0])
self.assertEqual(stdout.write.mock_calls[1][1][0], '\n')

@mock.patch('pulp.server.async.manage_workers._get_max_tasks', mock.MagicMock(return_value=''))
@mock.patch('pulp.server.async.manage_workers._get_concurrency', mock.MagicMock(return_value=1))
# Setting this return value to False will simulate the file not existing
@mock.patch('pulp.server.async.manage_workers.os.path.exists',
Expand All @@ -359,7 +368,8 @@ def test_unit_path_does_not_exist(self, stdout, Popen):
unit_filename = manage_workers._UNIT_FILENAME_TEMPLATE % 0
expected_path = os.path.join(manage_workers._SYSTEMD_UNIT_PATH, unit_filename)
expected_file_contents = manage_workers._WORKER_TEMPLATE % {
'num': 0, 'environment_file': manage_workers._ENVIRONMENT_FILE}
'num': 0, 'environment_file': manage_workers._ENVIRONMENT_FILE,
'max_tasks_argument': ''}
# Now, let's inspect the write call
mock_open.assert_called_once_with(expected_path, 'w')
file_context = mock_open()
Expand Down
25 changes: 23 additions & 2 deletions tasking/pulp/tasking/manage_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
WorkingDirectory=/var/run/pulp/
ExecStart=/usr/bin/celery worker -n reserved_resource_worker-%(num)s@%%%%h -A pulp.server.async.app\
-c 1 --events --umask 18 --pidfile=/var/run/pulp/reserved_resource_worker-%(num)s.pid\
--heartbeat-interval=30
--heartbeat-interval=30 %(max_tasks_argument)s
KillSignal=SIGQUIT
"""

Expand All @@ -44,6 +44,25 @@ def _get_concurrency():
return multiprocessing.cpu_count()


def _get_max_tasks():
"""
Process the _ENVIRONMENT_FILE to determine if celery worker process recycling is to be used
If process recycling is to be used, return the string adding the command line option. If
disabled return an empty string.
:return: The argument string setting maxtasksperchild or empty string
:rtype: basestring
"""
pipe = subprocess.Popen(". %s; echo $PULP_MAX_TASKS_PER_CHILD" % _ENVIRONMENT_FILE,
stdout=subprocess.PIPE, shell=True)
output = pipe.communicate()[0].strip()
if output:
return ' --maxtasksperchild=%s' % int(output)
else:
return ''


def _get_file_contents(path):
"""
Open the file at path, read() it, close the file, and return a string of its contents.
Expand All @@ -66,7 +85,9 @@ def _start_workers():
for i in range(concurrency):
unit_filename = _UNIT_FILENAME_TEMPLATE % i
unit_path = os.path.join(_SYSTEMD_UNIT_PATH, unit_filename)
unit_contents = _WORKER_TEMPLATE % {'num': i, 'environment_file': _ENVIRONMENT_FILE}
max_tasks_argument = _get_max_tasks()
unit_contents = _WORKER_TEMPLATE % {'num': i, 'environment_file': _ENVIRONMENT_FILE,
'max_tasks_argument': max_tasks_argument}
if not os.path.exists(unit_path) or _get_file_contents(unit_path) != unit_contents:
with open(unit_path, 'w') as unit_file:
unit_file.write(unit_contents)
Expand Down

0 comments on commit debd56c

Please sign in to comment.