Skip to content

Commit

Permalink
convert "workers" collection to mongoengine
Browse files Browse the repository at this point in the history
This patch converts the `workers` collection to mongoengine. It also moves some
functionality into tasks.py that previously lived in the worker manager.

One thing to note, it appears that the existing workers collection did not have
a `_ns` field. Thus, I did not add this into the new collection.

This commit includes a fix to CriteriaQuerySet's `update()` method which was
incorrectly firing the `post_save()` hook on TaskStatus objects, even when the
Document was not a task status.

A spec file requirement for `python-blinker` was also added to support
mongoengine signals. Additionally, a small unrelated flake8 fixup to
test_users.py was added.

fixes #88
  • Loading branch information
beav committed Mar 27, 2015
1 parent 7c5e469 commit 384ace0
Show file tree
Hide file tree
Showing 26 changed files with 386 additions and 622 deletions.
1 change: 1 addition & 0 deletions pulp.spec
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ rm -rf %{buildroot}
Summary: The pulp platform server
Group: Development/Languages
Requires: python-%{name}-common = %{pulp_version}
Requires python-blinker
Requires: python-celery >= 3.1.0
Requires: python-celery < 3.2.0
Requires: python-pymongo >= 2.5.2
Expand Down
7 changes: 2 additions & 5 deletions server/pulp/server/async/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
from pulp.server.async.tasks import _delete_worker
from pulp.server.db import connection as db_connection
from pulp.server.db.connection import retry_decorator
from pulp.server.db.model.criteria import Criteria
from pulp.server.db.model.dispatch import ScheduledCall, ScheduleEntry
from pulp.server.managers import resources
from pulp.server.db.model.workers import Worker
from pulp.server.managers.schedule import utils

# The import below is not used in this module, but it needs to be kept here. This module is the
Expand Down Expand Up @@ -229,9 +228,7 @@ def check_workers(self):
'Looking for workers missing for more than %s seconds') % self.WORKER_TIMEOUT_SECONDS
_logger.debug(msg)
oldest_heartbeat_time = datetime.utcnow() - timedelta(seconds=self.WORKER_TIMEOUT_SECONDS)
worker_criteria = Criteria(filters={'last_heartbeat': {'$lt': oldest_heartbeat_time}},
fields=('_id', 'last_heartbeat'))
worker_list = list(resources.filter_workers(worker_criteria))
worker_list = Worker.objects(last_heartbeat__lt=oldest_heartbeat_time)
for worker in worker_list:
msg = _("Workers '%s' has gone missing, removing from list of workers") % worker.name
_logger.error(msg)
Expand Down
85 changes: 72 additions & 13 deletions server/pulp/server/async/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
from mongoengine.queryset import DoesNotExist

from pulp.common import constants, dateutils
from pulp.common.constants import SCHEDULER_WORKER_NAME
from pulp.server.async.celery_instance import celery, RESOURCE_MANAGER_QUEUE, \
DEDICATED_QUEUE_EXCHANGE
from pulp.server.exceptions import PulpException, MissingResource
from pulp.server.db.model.criteria import Criteria
from pulp.server.db.model.dispatch import TaskStatus
from pulp.server.db.model.resources import ReservedResource, Worker
from pulp.server.db.model.resources import ReservedResource
from pulp.server.db.model.workers import Worker
from pulp.server.exceptions import NoWorkers
from pulp.server.managers import resources
from pulp.server.managers.repo import _common as common_utils


Expand Down Expand Up @@ -55,14 +55,14 @@ def _queue_reserved_task(name, task_id, resource_id, inner_args, inner_kwargs):
"""
while True:
try:
worker = resources.get_worker_for_reservation(resource_id)
worker = get_worker_for_reservation(resource_id)
except NoWorkers:
pass
else:
break

try:
worker = resources.get_unreserved_worker()
worker = _get_unreserved_worker()
except NoWorkers:
pass
else:
Expand All @@ -84,6 +84,70 @@ def _queue_reserved_task(name, task_id, resource_id, inner_args, inner_kwargs):
exchange=DEDICATED_QUEUE_EXCHANGE)


def _is_worker(worker_name):
"""
Strip out workers that should never be assigned work. We need to check
via "startswith()" since we do not know which host the worker is running on.
"""

if worker_name.startswith(SCHEDULER_WORKER_NAME) or \
worker_name.startswith(RESOURCE_MANAGER_QUEUE):
return False
return True


def get_worker_for_reservation(resource_id):
"""
Return the Worker instance that is associated with a reservation of type resource_id. If
there are no workers with that reservation_id type a pulp.server.exceptions.NoWorkers
exception is raised.
:param resource_id: The name of the resource you wish to reserve for your task.
:raises NoWorkers: If all workers have reserved_resource entries associated with them.
:type resource_id: basestring
:returns: The Worker instance that has a reserved_resource entry of type
`resource_id` associated with it.
:rtype: pulp.server.db.model.resources.Worker
"""
reservation = ReservedResource.get_collection().find_one({'resource_id': resource_id})
if reservation:
return Worker.objects(name=reservation['worker_name']).first()
else:
raise NoWorkers()


def _get_unreserved_worker():
"""
Return the Worker instance that has no reserved_resource entries
associated with it. If there are no unreserved workers a
pulp.server.exceptions.NoWorkers exception is raised.
:raises NoWorkers: If all workers have reserved_resource entries associated with them.
:returns: The Worker instance that has no reserved_resource
entries associated with it.
:rtype: pulp.server.db.model.resources.Worker
"""

# Build a mapping of queue names to Worker objects
workers_dict = dict((worker['name'], worker) for worker in Worker.objects())
worker_names = workers_dict.keys()
reserved_names = [r['worker_name'] for r in ReservedResource.get_collection().find()]

# Find an unreserved worker using set differences of the names, and filter
# out workers that should not be assigned work.
# NB: this is a little messy but set comprehensions are in python 2.7+
unreserved_workers = set(filter(_is_worker, worker_names)) - set(reserved_names)

try:
return workers_dict[unreserved_workers.pop()]
except KeyError:
# All workers are reserved
raise NoWorkers()


def _delete_worker(name, normal_shutdown=False):
"""
Delete the Worker with _id name from the database, cancel any associated tasks and reservations
Expand All @@ -95,8 +159,7 @@ def _delete_worker(name, normal_shutdown=False):
Any tasks associated with this worker are explicitly canceled.
:param name: The name of the worker you wish to delete. In the database, the _id
field is the name.
:param name: The name of the worker you wish to delete.
:type name: basestring
:param normal_shutdown: True if the worker shutdown normally, False otherwise. Defaults to
False.
Expand All @@ -108,17 +171,13 @@ def _delete_worker(name, normal_shutdown=False):
_logger.error(msg)

# Delete the worker document
worker_list = list(resources.filter_workers(Criteria(filters={'_id': name})))
if len(worker_list) > 0:
worker_document = worker_list[0]
worker_document.delete()
Worker.objects(name=name).delete()

# Delete all reserved_resource documents for the worker
ReservedResource.get_collection().remove({'worker_name': name})

# Cancel all of the tasks that were assigned to this worker's queue
worker = Worker.from_bson({'_id': name})
for task_status in TaskStatus.objects(worker_name=worker.name,
for task_status in TaskStatus.objects(worker_name=name,
state__in=constants.CALL_INCOMPLETE_STATES):
cancel(task_status['task_id'])

Expand Down
21 changes: 6 additions & 15 deletions server/pulp/server/async/worker_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import logging

from pulp.server.async.tasks import _delete_worker
from pulp.server.db.model.criteria import Criteria
from pulp.server.db.model.resources import Worker
from pulp.server.managers import resources
from pulp.server.db.model.workers import Worker


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -76,21 +74,14 @@ def handle_worker_heartbeat(event):
:type event: dict
"""
event_info = _parse_and_log_event(event)
worker = Worker.objects(name=event_info['worker_name']).first()

find_worker_criteria = Criteria(filters={'_id': event_info['worker_name']},
fields=('_id', 'last_heartbeat'))
find_worker_list = list(resources.filter_workers(find_worker_criteria))

if find_worker_list:
Worker.get_collection().find_and_modify(
query={'_id': event_info['worker_name']},
update={'$set': {'last_heartbeat': event_info['timestamp']}}
)
else:
new_worker = Worker(event_info['worker_name'], event_info['timestamp'])
if not worker:
msg = _("New worker '%(worker_name)s' discovered") % event_info
_logger.info(msg)
new_worker.save()

Worker.objects(name=event_info['worker_name']).\
update_one(set__last_heartbeat=event_info['timestamp'], upsert=True)


def handle_worker_offline(event):
Expand Down
38 changes: 32 additions & 6 deletions server/pulp/server/db/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from pulp.server.compat import ObjectId
from pulp.server.db.connection import get_collection

from pulp.server.async.emit import send as send_taskstatus_message


_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -164,12 +162,40 @@ def find_by_criteria(self, criteria):

def update(self, *args, **kwargs):
"""
Send a taskstatus event message and update.
This method emulates post_save() on Documents.
All params are sent through to the super()'s update() call.
It attempts to call post_save() on each Document in the query set. If
post_save() does not exist, it will do nothing.
"""
super(CriteriaQuerySet, self).update(*args, **kwargs)
for doc in self:
send_taskstatus_message(doc, routing_key="tasks.%s" % doc['task_id'])
try:
doc.post_save(type(doc).__name__, doc)
except AttributeError:
# if post_save() is not defined for this particular document, that's ok
pass

def update_one(self, *args, **kwargs):
"""
Mongoengine 0.7's QuerySet.update_one() does not call update() but
instead just makes a slightly different pymongo call[1]. We need to
subclass both methods.
Also, we cannot simply call super()'s update_one here! In Mongoengine
0.8, update_one() simply calls update(). This invokes *OUR* update()
method since 'self' is this class and not the superclass. This causes
the post_save hook to get fired twice. Instead, we call update() with
"multi=False" which mimics the behavior of update_one(). In the
unlikely event that someone really does want to use "multi", we raise an
exception.
Once we are rid of 0.7 we can get rid of this method entirely.
[1] http://tinyurl.com/nf7fafy
"""
if 'multi' in kwargs:
raise NotImplementedError("The 'multi' parameter cannot be set on this method.")
kwargs['multi'] = False
self.update(*args, **kwargs)
78 changes: 0 additions & 78 deletions server/pulp/server/db/model/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,6 @@
from pulp.server.db.model.base import Model


class Worker(Model):
"""
Instances of this class represent known Celery workers that are available for use by
the resource manager for assigning tasks.
:ivar name: The name of the queue
:type name: unicode
:ivar last_heartbeat: A timestamp of the last heartbeat from the Worker
:type last_heartbeat: datetime.datetime
"""
collection_name = 'workers'
unique_indices = tuple()
# The compound index with _id and last_heartbeat will help the
# async.scheduler.WorkerTimeoutMonitor to be able to retrieve the data it needs without
# accessing the disk
search_indices = (('_id', 'last_heartbeat'),)

def __init__(self, name, last_heartbeat):
"""
Initialize the Worker.
:param name: The name of the Worker.
:type name: basestring
:param last_heartbeat: A timestamp of the last heartbeat from the Worker
:type last_heartbeat: datetime.datetime
"""
super(Worker, self).__init__()

self.name = name
self.last_heartbeat = last_heartbeat

# We don't need these
del self['_id']
del self['id']

def delete(self):
"""
Delete this Worker from the database. Take no prisoners.
"""
self.get_collection().remove({'_id': self.name})

@classmethod
def from_bson(cls, bson_worker):
"""
Instantiate a Worker from the given bson. A Python dict can also be used in place
of bson_worker.
:param bson_worker: A bson object or a dict representing a Worker.
:type bson_worker: bson.BSON or dict
:return: A Worker representing the given bson_worker
:rtype: pulp.server.db.model.resources.Worker
"""
return cls(
name=bson_worker['_id'],
last_heartbeat=bson_worker.get('last_heartbeat', None))

@property
def queue_name(self):
"""
This property is a convenience for getting the queue_name that Celery assigns to this
Worker.
:return: The name of the queue that this Worker is uniquely subcribed to.
:rtype: basestring
"""
return "%(name)s.dq" % {'name': self.name}

def save(self):
"""
Save any changes made to this Worker to the database. If it doesn't exist, insert a
new record to represent it.
"""
self.get_collection().save(
{'_id': self.name,
'last_heartbeat': self.last_heartbeat},
manipulate=False, safe=True)


class ReservedResource(Model):
"""
Instances of this class represent resources that have been reserved.
Expand Down
40 changes: 40 additions & 0 deletions server/pulp/server/db/model/workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

from mongoengine import Document, StringField, DateTimeField

from pulp.server.db.model.base import CriteriaQuerySet


_logger = logging.getLogger(__name__)


class Worker(Document):
"""
Represents a worker.
This inherits from mongoengine.Document and defines the schema for the documents
in the worker collection.
:ivar name: worker name, in the form of "worker_type@hostname"
:type name: basestring
:ivar last_heartbeat: A timestamp of the last heartbeat from the Worker
:type last_heartbeat: datetime.datetime
"""
name = StringField(primary_key=True)
last_heartbeat = DateTimeField()

meta = {'collection': 'workers',
'indexes': [], # this is a small collection that does not need an index
'allow_inheritance': False,
'queryset_class': CriteriaQuerySet}

@property
def queue_name(self):
"""
This property is a convenience for getting the queue_name that Celery assigns to this
Worker.
:return: The name of the queue that this Worker is uniquely subcribed to.
:rtype: basestring
"""
return "%(name)s.dq" % {'name': self.name}
Loading

0 comments on commit 384ace0

Please sign in to comment.