Skip to content

Commit

Permalink
Revert "Prompt user of running workers/tasks on migration"
Browse files Browse the repository at this point in the history
  • Loading branch information
werwty committed Dec 7, 2016
1 parent 38e6082 commit d633f81
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 90 deletions.
3 changes: 1 addition & 2 deletions docs/user-guide/release-notes/2.11.x.rst
Expand Up @@ -24,9 +24,8 @@ New Features
* For RPM content, a full sync will be forced if the sync configuration has been changed or content
has been removed since the last sync.

* When pulp-manage-db is run, prompt the user to continue if there are Pulp services still running.

* Memory improvements! Worker process recycling can be enabled to release unused memory back to
the system after tasks complete. This feature is disabled by default. See the
:ref:`process recycling documentation<process_recycling>` for more info. Thank you to Jan-Otto
Kröpke for contributing this feature.

23 changes: 9 additions & 14 deletions server/pulp/server/async/scheduler.py
Expand Up @@ -192,9 +192,6 @@ def __init__(self, *args, **kwargs):
self._schedule = None
self._loaded_from_db_count = 0

# Setting the celerybeat name
self.celerybeat_name = constants.SCHEDULER_WORKER_NAME + "@" + platform.node()

# Force the use of the Pulp celery_instance when this custom Scheduler is used.
kwargs['app'] = app

Expand Down Expand Up @@ -244,43 +241,45 @@ def tick(self):
:return: number of seconds before the next tick should run
:rtype: float
"""
# Setting the celerybeat name
celerybeat_name = constants.SCHEDULER_WORKER_NAME + "@" + platform.node()

# this is not an event that gets sent anywhere. We process it
# immediately.
scheduler_event = {
'timestamp': time.time(),
'local_received': time.time(),
'type': 'scheduler-event',
'hostname': self.celerybeat_name
'hostname': celerybeat_name
}

worker_watcher.handle_worker_heartbeat(scheduler_event)

old_timestamp = datetime.utcnow() - timedelta(seconds=constants.CELERYBEAT_LOCK_MAX_AGE)

# Updating the current lock if lock is on this instance of celerybeat
result = CeleryBeatLock.objects(celerybeat_name=self.celerybeat_name).\
result = CeleryBeatLock.objects(celerybeat_name=celerybeat_name).\
update(set__timestamp=datetime.utcnow())

# If current instance has lock and updated lock_timestamp, call super
if result == 1:
_logger.debug(_('Lock updated by %(celerybeat_name)s')
% {'celerybeat_name': self.celerybeat_name})
ret = self.call_tick(self, self.celerybeat_name)
% {'celerybeat_name': celerybeat_name})
ret = self.call_tick(self, celerybeat_name)
else:
# check for old enough time_stamp and remove if such lock is present
CeleryBeatLock.objects(timestamp__lte=old_timestamp).delete()
try:
lock_timestamp = datetime.utcnow()

# Insert new lock entry
new_lock = CeleryBeatLock(celerybeat_name=self.celerybeat_name,
new_lock = CeleryBeatLock(celerybeat_name=celerybeat_name,
timestamp=lock_timestamp)
new_lock.save()
_logger.info(_("New lock acquired by %(celerybeat_name)s") %
{'celerybeat_name': self.celerybeat_name})
{'celerybeat_name': celerybeat_name})
# After acquiring new lock call super to dispatch tasks
ret = self.call_tick(self, self.celerybeat_name)
ret = self.call_tick(self, celerybeat_name)

except mongoengine.NotUniqueError:
# Setting a default wait time for celerybeat instances with no lock
Expand Down Expand Up @@ -369,7 +368,3 @@ def add(self, **kwargs):
entries to the database, and they will be picked up automatically.
"""
raise NotImplementedError

def close(self):
_delete_worker(self.celerybeat_name, normal_shutdown=True)
super(Scheduler, self).close()
25 changes: 1 addition & 24 deletions server/pulp/server/db/manage.py
@@ -1,27 +1,23 @@
"""
This module's main() function becomes the pulp-manage-db.py script.
"""
from datetime import datetime
from gettext import gettext as _
from optparse import OptionParser
import logging
import os
import sys
import traceback

from pulp.common import constants
from pulp.plugins.loader.api import load_content_types
from pulp.plugins.loader.manager import PluginManager
from pulp.server import logs
from pulp.server.db import connection
from pulp.server.db.migrate import models
from pulp.server.db import model
from pulp.server.db.migrations.lib import managers
from pulp.server.db.fields import UTCDateTimeField
from pulp.server.managers import factory, status
from pulp.server.managers import factory
from pulp.server.managers.auth.role.cud import RoleManager, SUPER_USER_ROLE

from pymongo.errors import ServerSelectionTimeoutError

os.environ['DJANGO_SETTINGS_MODULE'] = 'pulp.server.webservices.settings'

Expand Down Expand Up @@ -195,15 +191,6 @@ def main():
options = parse_args()
_start_logging()
connection.initialize(max_timeout=1)

# Prompt the user if there are workers that have not timed out
if filter(lambda worker: (UTCDateTimeField().to_python(datetime.now()) -
worker['last_heartbeat']).total_seconds() <
constants.CELERY_TIMEOUT_SECONDS, status.get_workers()):
if not _user_input_continue('There are still running workers, continuing could '
'corrupt your Pulp installation. Are you sure you wish '
'to continue?'):
return os.EX_OK
return _auto_manage_db(options)
except UnperformedMigrationException:
return 1
Expand All @@ -213,9 +200,6 @@ def main():
return os.EX_DATAERR
except models.MigrationRemovedError:
return os.EX_SOFTWARE
except ServerSelectionTimeoutError:
_logger.info(_('Cannot connect to database, please validate that the database is up.'))
return os.EX_SOFTWARE
except Exception, e:
_logger.critical(str(e))
_logger.critical(''.join(traceback.format_exception(*sys.exc_info())))
Expand Down Expand Up @@ -290,10 +274,3 @@ def _start_logging():
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
_logger.root.addHandler(console_handler)


def _user_input_continue(question):
reply = str(raw_input(_(question + ' (y/N): '))).lower().strip()
if reply[0] == 'y':
return True
return False
2 changes: 1 addition & 1 deletion server/test/unit/server/async/test_scheduler.py
Expand Up @@ -116,9 +116,9 @@ def test_calls_superclass(self, mock_celerybeatlock, mock_worker_watcher, mock_t
@mock.patch('pulp.server.async.scheduler.CeleryBeatLock')
def test_calls_handle_heartbeat(self, mock_celerybeatlock, mock_worker_watcher, time, node,
mock_tick):
node.return_value = 'some_host'
sched_instance = scheduler.Scheduler()
time.time.return_value = 1449261335.275528
node.return_value = 'some_host'

sched_instance.tick()

Expand Down
49 changes: 0 additions & 49 deletions server/test/unit/server/db/test_manage.py
Expand Up @@ -4,15 +4,13 @@
from argparse import Namespace
from cStringIO import StringIO
import os
from datetime import datetime

from mock import call, inPy3k, MagicMock, patch
from mongoengine.queryset import DoesNotExist

from ... import base
from pulp.common.compat import all, json
from pulp.server.db import manage
from pulp.server.db.fields import UTCDateTimeField
from pulp.server.db.migrate import models
from pulp.server.db.model import MigrationTracker
import pulp.plugins.types.database as types_db
Expand Down Expand Up @@ -232,53 +230,6 @@ def test_wont_run_as_root(self, mock_getuid, mock_stderr):
self.assertTrue('root' in mock_stderr.write.call_args_list[0][0][0])
self.assertTrue('apache' in mock_stderr.write.call_args_list[0][0][0])

@patch('logging.config.fileConfig')
@patch('pulp.server.db.manage.logging.getLogger')
@patch('sys.argv', ["pulp-manage-db"])
@patch('pulp.server.db.connection.initialize')
@patch('pulp.server.managers.status.get_workers')
@patch('pulp.server.db.manage._user_input_continue', return_value=False)
@patch('pulp.server.db.manage._auto_manage_db')
def test_user_prompted_if_workers_tasks_exist(self, mock__auto_manage_db,
mock__user_input, mock_get_workers,
*unused_mocks):
mock_get_workers.return_value = [{'last_heartbeat':
UTCDateTimeField().to_python(datetime.now())}]
ret = manage.main()
# make sure system exits ok if user chooses not to proceed
self.assertEqual(ret, os.EX_OK)
# make sure user is asked for input when there are workers running
self.assertTrue(mock__user_input.called)

@patch('logging.config.fileConfig')
@patch('pulp.server.db.manage.logging.getLogger')
@patch('sys.argv', ["pulp-manage-db"])
@patch('pulp.server.db.connection.initialize')
@patch('pulp.server.managers.status.get_workers', return_value=[])
@patch('pulp.server.db.manage._user_input_continue', return_value=True)
@patch('pulp.server.db.manage._auto_manage_db')
def test_user_not_prompted_if_no_workers(self, mock_auto_manage_db,
mock_user_input, *unused_mocks):
manage.main()
# make sure user is not asked for input when there are no tasks/workers
self.assertFalse(mock_user_input.called)
self.assertTrue(mock_auto_manage_db.called)

@patch('logging.config.fileConfig')
@patch('pulp.server.db.manage.logging.getLogger')
@patch('sys.argv', ["pulp-manage-db"])
@patch('pulp.server.db.connection.initialize')
@patch('pulp.server.managers.status.get_workers')
@patch('pulp.server.db.manage._user_input_continue', return_value=True)
@patch('pulp.server.db.manage._auto_manage_db')
def test_user_not_prompted_if_workers_timeout(self, mock__auto_manage_db, mock__user_input,
mock_get_workers, *unused_mocks):
mock_get_workers.return_value = [{'last_heartbeat':
UTCDateTimeField().to_python(datetime(2000, 1, 1))}]
manage.main()
# make sure user is not asked for input when workers have timed out
self.assertFalse(mock__user_input.called)

@patch('pulp.server.db.manage.logging.getLogger')
@patch('pkg_resources.iter_entry_points', iter_entry_points)
@patch('pulp.server.db.manage.connection.initialize')
Expand Down

0 comments on commit d633f81

Please sign in to comment.