diff --git a/docs/user-guide/release-notes/2.11.x.rst b/docs/user-guide/release-notes/2.11.x.rst index 766be4e39a..d8899c2f55 100644 --- a/docs/user-guide/release-notes/2.11.x.rst +++ b/docs/user-guide/release-notes/2.11.x.rst @@ -10,3 +10,5 @@ New Features * For RPM content, a full sync will be forced if the sync configuration has been changed or content has been removed since the last sync. + +* When pulp-manage-db is run, prompt the user to continue if there are Pulp services still running. diff --git a/server/pulp/server/async/scheduler.py b/server/pulp/server/async/scheduler.py index 6d797839b5..4fe820bfe3 100644 --- a/server/pulp/server/async/scheduler.py +++ b/server/pulp/server/async/scheduler.py @@ -192,6 +192,9 @@ def __init__(self, *args, **kwargs): self._schedule = None self._loaded_from_db_count = 0 + # Setting the celerybeat name + self.celerybeat_name = constants.SCHEDULER_WORKER_NAME + "@" + platform.node() + # Force the use of the Pulp celery_instance when this custom Scheduler is used. kwargs['app'] = app @@ -241,8 +244,6 @@ def tick(self): :return: number of seconds before the next tick should run :rtype: float """ - # Setting the celerybeat name - celerybeat_name = constants.SCHEDULER_WORKER_NAME + "@" + platform.node() # this is not an event that gets sent anywhere. We process it # immediately. @@ -250,7 +251,7 @@ def tick(self): 'timestamp': time.time(), 'local_received': time.time(), 'type': 'scheduler-event', - 'hostname': celerybeat_name + 'hostname': self.celerybeat_name } worker_watcher.handle_worker_heartbeat(scheduler_event) @@ -258,14 +259,14 @@ def tick(self): old_timestamp = datetime.utcnow() - timedelta(seconds=constants.CELERYBEAT_LOCK_MAX_AGE) # Updating the current lock if lock is on this instance of celerybeat - result = CeleryBeatLock.objects(celerybeat_name=celerybeat_name).\ + result = CeleryBeatLock.objects(celerybeat_name=self.celerybeat_name).\ update(set__timestamp=datetime.utcnow()) # If current instance has lock and updated lock_timestamp, call super if result == 1: _logger.debug(_('Lock updated by %(celerybeat_name)s') - % {'celerybeat_name': celerybeat_name}) - ret = self.call_tick(self, celerybeat_name) + % {'celerybeat_name': self.celerybeat_name}) + ret = self.call_tick(self, self.celerybeat_name) else: # check for old enough time_stamp and remove if such lock is present CeleryBeatLock.objects(timestamp__lte=old_timestamp).delete() @@ -273,13 +274,13 @@ def tick(self): lock_timestamp = datetime.utcnow() # Insert new lock entry - new_lock = CeleryBeatLock(celerybeat_name=celerybeat_name, + new_lock = CeleryBeatLock(celerybeat_name=self.celerybeat_name, timestamp=lock_timestamp) new_lock.save() _logger.info(_("New lock acquired by %(celerybeat_name)s") % - {'celerybeat_name': celerybeat_name}) + {'celerybeat_name': self.celerybeat_name}) # After acquiring new lock call super to dispatch tasks - ret = self.call_tick(self, celerybeat_name) + ret = self.call_tick(self, self.celerybeat_name) except mongoengine.NotUniqueError: # Setting a default wait time for celerybeat instances with no lock @@ -368,3 +369,7 @@ def add(self, **kwargs): entries to the database, and they will be picked up automatically. """ raise NotImplementedError + + def close(self): + _delete_worker(self.celerybeat_name, normal_shutdown=True) + super(Scheduler, self).close() diff --git a/server/pulp/server/db/manage.py b/server/pulp/server/db/manage.py index 2ab99fd4c8..9d4aae12ad 100644 --- a/server/pulp/server/db/manage.py +++ b/server/pulp/server/db/manage.py @@ -1,6 +1,7 @@ """ This module's main() function becomes the pulp-manage-db.py script. """ +from datetime import datetime from gettext import gettext as _ from optparse import OptionParser import logging @@ -8,6 +9,7 @@ import sys import traceback +from pulp.common import constants from pulp.plugins.loader.api import load_content_types from pulp.plugins.loader.manager import PluginManager from pulp.server import logs @@ -15,9 +17,11 @@ from pulp.server.db.migrate import models from pulp.server.db import model from pulp.server.db.migrations.lib import managers -from pulp.server.managers import factory +from pulp.server.db.fields import UTCDateTimeField +from pulp.server.managers import factory, status from pulp.server.managers.auth.role.cud import RoleManager, SUPER_USER_ROLE +from pymongo.errors import ServerSelectionTimeoutError os.environ['DJANGO_SETTINGS_MODULE'] = 'pulp.server.webservices.settings' @@ -191,6 +195,15 @@ def main(): options = parse_args() _start_logging() connection.initialize(max_timeout=1) + + # Prompt the user if there are workers that have not timed out + if filter(lambda worker: (UTCDateTimeField().to_python(datetime.now()) - + worker['last_heartbeat']).total_seconds() < + constants.CELERY_TIMEOUT_SECONDS, status.get_workers()): + if not _user_input_continue('There are still running workers, continuing could ' + 'corrupt your Pulp installation. Are you sure you wish ' + 'to continue?'): + return os.EX_OK return _auto_manage_db(options) except UnperformedMigrationException: return 1 @@ -200,6 +213,9 @@ def main(): return os.EX_DATAERR except models.MigrationRemovedError: return os.EX_SOFTWARE + except ServerSelectionTimeoutError: + _logger.info(_('Cannot connect to database, please validate that the database is up.')) + return os.EX_SOFTWARE except Exception, e: _logger.critical(str(e)) _logger.critical(''.join(traceback.format_exception(*sys.exc_info()))) @@ -274,3 +290,10 @@ def _start_logging(): console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) _logger.root.addHandler(console_handler) + + +def _user_input_continue(question): + reply = str(raw_input(_(question + ' (y/N): '))).lower().strip() + if reply[0] == 'y': + return True + return False diff --git a/server/test/unit/server/async/test_scheduler.py b/server/test/unit/server/async/test_scheduler.py index 68c9903b21..da1e339682 100644 --- a/server/test/unit/server/async/test_scheduler.py +++ b/server/test/unit/server/async/test_scheduler.py @@ -116,9 +116,9 @@ def test_calls_superclass(self, mock_celerybeatlock, mock_worker_watcher, mock_t @mock.patch('pulp.server.async.scheduler.CeleryBeatLock') def test_calls_handle_heartbeat(self, mock_celerybeatlock, mock_worker_watcher, time, node, mock_tick): + node.return_value = 'some_host' sched_instance = scheduler.Scheduler() time.time.return_value = 1449261335.275528 - node.return_value = 'some_host' sched_instance.tick() diff --git a/server/test/unit/server/db/test_manage.py b/server/test/unit/server/db/test_manage.py index f2b13d8f7b..bc17a41f4c 100644 --- a/server/test/unit/server/db/test_manage.py +++ b/server/test/unit/server/db/test_manage.py @@ -4,6 +4,7 @@ from argparse import Namespace from cStringIO import StringIO import os +from datetime import datetime from mock import call, inPy3k, MagicMock, patch from mongoengine.queryset import DoesNotExist @@ -11,6 +12,7 @@ from ... import base from pulp.common.compat import all, json from pulp.server.db import manage +from pulp.server.db.fields import UTCDateTimeField from pulp.server.db.migrate import models from pulp.server.db.model import MigrationTracker import pulp.plugins.types.database as types_db @@ -230,6 +232,53 @@ def test_wont_run_as_root(self, mock_getuid, mock_stderr): self.assertTrue('root' in mock_stderr.write.call_args_list[0][0][0]) self.assertTrue('apache' in mock_stderr.write.call_args_list[0][0][0]) + @patch('logging.config.fileConfig') + @patch('pulp.server.db.manage.logging.getLogger') + @patch('sys.argv', ["pulp-manage-db"]) + @patch('pulp.server.db.connection.initialize') + @patch('pulp.server.managers.status.get_workers') + @patch('pulp.server.db.manage._user_input_continue', return_value=False) + @patch('pulp.server.db.manage._auto_manage_db') + def test_user_prompted_if_workers_tasks_exist(self, mock__auto_manage_db, + mock__user_input, mock_get_workers, + *unused_mocks): + mock_get_workers.return_value = [{'last_heartbeat': + UTCDateTimeField().to_python(datetime.now())}] + ret = manage.main() + # make sure system exits ok if user chooses not to proceed + self.assertEqual(ret, os.EX_OK) + # make sure user is asked for input when there are workers running + self.assertTrue(mock__user_input.called) + + @patch('logging.config.fileConfig') + @patch('pulp.server.db.manage.logging.getLogger') + @patch('sys.argv', ["pulp-manage-db"]) + @patch('pulp.server.db.connection.initialize') + @patch('pulp.server.managers.status.get_workers', return_value=[]) + @patch('pulp.server.db.manage._user_input_continue', return_value=True) + @patch('pulp.server.db.manage._auto_manage_db') + def test_user_not_prompted_if_no_workers(self, mock_auto_manage_db, + mock_user_input, *unused_mocks): + manage.main() + # make sure user is not asked for input when there are no tasks/workers + self.assertFalse(mock_user_input.called) + self.assertTrue(mock_auto_manage_db.called) + + @patch('logging.config.fileConfig') + @patch('pulp.server.db.manage.logging.getLogger') + @patch('sys.argv', ["pulp-manage-db"]) + @patch('pulp.server.db.connection.initialize') + @patch('pulp.server.managers.status.get_workers') + @patch('pulp.server.db.manage._user_input_continue', return_value=True) + @patch('pulp.server.db.manage._auto_manage_db') + def test_user_not_prompted_if_workers_timeout(self, mock__auto_manage_db, mock__user_input, + mock_get_workers, *unused_mocks): + mock_get_workers.return_value = [{'last_heartbeat': + UTCDateTimeField().to_python(datetime(2000, 1, 1))}] + manage.main() + # make sure user is not asked for input when workers have timed out + self.assertFalse(mock__user_input.called) + @patch('pulp.server.db.manage.logging.getLogger') @patch('pkg_resources.iter_entry_points', iter_entry_points) @patch('pulp.server.db.manage.connection.initialize')