Navigation Menu

Skip to content
This repository has been archived by the owner on Oct 28, 2019. It is now read-only.

Commit

Permalink
Update Pulp to work with the pymongo 3.0.0 API.
Browse files Browse the repository at this point in the history
  • Loading branch information
Randy Barlow committed Jan 22, 2016
1 parent c7933db commit 644b607
Show file tree
Hide file tree
Showing 18 changed files with 281 additions and 113 deletions.
3 changes: 2 additions & 1 deletion docs/user-guide/release-notes/master.rst
Expand Up @@ -23,11 +23,12 @@ New Features
Deprecation
-----------

Supported Platforms Changes
Dependency/Platform Changes
---------------------------

* If run on CentOS or Red Hat Enterprise Linux, the Pulp server now requires either
version 7.1+ or 6.7+.
* pymongo >= 3.0.0 is now required.

Client Changes
--------------
Expand Down
4 changes: 2 additions & 2 deletions pulp.spec
Expand Up @@ -350,8 +350,8 @@ Requires: python-%{name}-repoauth = %{pulp_version}
Requires: python-blinker
Requires: python-celery >= 3.1.0
Requires: python-celery < 3.2.0
Requires: python-pymongo >= 2.7.1
Requires: python-mongoengine >= 0.9.0
Requires: python-pymongo >= 3.0.0
Requires: python-mongoengine >= 0.10.0
Requires: python-setuptools
Requires: python-oauth2 >= 1.5.211
Requires: python-httplib2
Expand Down
6 changes: 3 additions & 3 deletions server/pulp/plugins/types/database.py
Expand Up @@ -165,7 +165,7 @@ def all_type_ids():
"""

collection = ContentType.get_collection()
type_id_son = list(collection.find(fields={'id': 1}))
type_id_son = list(collection.find(projection={'id': 1}))
type_ids = [t['id'] for t in type_id_son]

return type_ids
Expand All @@ -178,7 +178,7 @@ def all_type_collection_names():
"""

collection = ContentType.get_collection()
type_ids = list(collection.find(fields={'id': 1}))
type_ids = list(collection.find(projection={'id': 1}))

type_collection_names = []
for id in type_ids:
Expand Down Expand Up @@ -266,7 +266,7 @@ def _create_or_update_type(type_def):
type_def.id, type_def.display_name, type_def.description, type_def.unit_key,
type_def.search_indexes, type_def.referenced_types)
# no longer rely on _id = id
existing_type = content_type_collection.find_one({'id': type_def.id}, fields=[])
existing_type = content_type_collection.find_one({'id': type_def.id}, projection=[])
if existing_type is not None:
content_type._id = existing_type['_id']
# XXX this still causes a potential race condition when 2 users are updating the same type
Expand Down
12 changes: 6 additions & 6 deletions server/pulp/server/controllers/repository.py
Expand Up @@ -514,9 +514,9 @@ def delete(repo_id):
# to keep the database clean.
model.Distributor.objects(repo_id=repo_id).delete()
model.Importer.objects(repo_id=repo_id).delete()
RepoSyncResult.get_collection().remove({'repo_id': repo_id}, safe=True)
RepoPublishResult.get_collection().remove({'repo_id': repo_id}, safe=True)
RepoContentUnit.get_collection().remove({'repo_id': repo_id}, safe=True)
RepoSyncResult.get_collection().remove({'repo_id': repo_id})
RepoPublishResult.get_collection().remove({'repo_id': repo_id})
RepoContentUnit.get_collection().remove({'repo_id': repo_id})
except Exception, e:
msg = _('Error updating one or more database collections while removing repo [%(r)s]')
msg = msg % {'r': repo_id}
Expand Down Expand Up @@ -780,7 +780,7 @@ def sync(repo_id, sync_config_override=None, scheduled_call_id=None):
# Do an update instead of a save in case the importer has changed the scratchpad
model.Importer.objects(repo_id=repo_obj.repo_id).update(set__last_sync=sync_end_timestamp)
# Add a sync history entry for this run
sync_result_collection.save(sync_result, safe=True)
sync_result_collection.save(sync_result)
# Ensure counts are updated
rebuild_content_unit_counts(repo_obj)

Expand Down Expand Up @@ -973,7 +973,7 @@ def _do_publish(repo_obj, dist_id, dist_inst, transfer_repo, conduit, call_confi
result = RepoPublishResult.error_result(
repo_obj.repo_id, dist.distributor_id, dist.distributor_type_id,
publish_start_timestamp, exception_timestamp, e, sys.exc_info()[2])
publish_result_coll.save(result, safe=True)
publish_result_coll.save(result)

_logger.exception(
_('Exception caught from plugin during publish for repo [%(r)s]'
Expand All @@ -996,7 +996,7 @@ def _do_publish(repo_obj, dist_id, dist_inst, transfer_repo, conduit, call_confi
result = RepoPublishResult.expected_result(
repo_obj.repo_id, dist.distributor_id, dist.distributor_type_id,
publish_start_timestamp, publish_end_timestamp, summary, details, result_code)
publish_result_coll.save(result, safe=True)
publish_result_coll.save(result)
return result


Expand Down
29 changes: 19 additions & 10 deletions server/pulp/server/db/connection.py
Expand Up @@ -66,7 +66,7 @@ def initialize(name=None, seeds=None, max_pool_size=None, replica_set=None, max_
if max_pool_size is None:
# we may want to make this configurable, but then again, we may not
max_pool_size = _DEFAULT_MAX_POOL_SIZE
connection_kwargs['max_pool_size'] = max_pool_size
connection_kwargs['maxPoolSize'] = max_pool_size

if replica_set is None:
if config.config.has_option('database', 'replica_set'):
Expand Down Expand Up @@ -121,14 +121,23 @@ def initialize(name=None, seeds=None, max_pool_size=None, replica_set=None, max_
raise RuntimeError(_("Pulp requires Mongo version %s, but DB is reporting"
"version %s") % (MONGO_MINIMUM_VERSION,
db_version))
elif db_version >= MONGO_WRITE_CONCERN_VERSION or replica_set:
# Write concern of 'majority' only works with a replica set or when using
# MongoDB >= 2.6.0
_CONNECTION.write_concern['w'] = write_concern
else:
_CONNECTION.write_concern['w'] = 1
if 'w' not in connection_kwargs:
# We need to use the mongod version to determine what value to set the
# write_concern to.
if db_version >= MONGO_WRITE_CONCERN_VERSION or replica_set:
# Write concern of 'majority' only works with a replica set or when
# using MongoDB >= 2.6.0
connection_kwargs['w'] = write_concern
else:
connection_kwargs['w'] = 1
# Now that we've determined the write concern that we are allowed to use, we
# must drop this connection and get another one because write_concern can
# only be set upon establishing the connection in pymongo >= 3.
_CONNECTION.close()
_CONNECTION = None
continue
_logger.info(_("Write concern for Mongo connection: %s") %
_CONNECTION.write_concern)
_CONNECTION.write_concern.document)
break
else:
next_delay = min(mongo_retry_timeout_seconds_generator.next(), max_timeout)
Expand Down Expand Up @@ -169,7 +178,7 @@ def _connect_to_one_of_seeds(connection_kwargs, seeds_list, db_name):
:type seeds_list: list of strings
:return: Connection object if connection is made or None if no connection is made
"""

connection_kwargs = copy.deepcopy(connection_kwargs)
for seed in seeds_list:
connection_kwargs.update({'host': seed.strip()})
try:
Expand Down Expand Up @@ -277,7 +286,7 @@ def query(self, criteria):
:return: pymongo cursor for the given query
:rtype: pymongo.cursor.Cursor
"""
cursor = self.find(criteria.spec, fields=criteria.fields)
cursor = self.find(criteria.spec, projection=criteria.fields)

if criteria.sort is not None:
for entry in criteria.sort:
Expand Down
Expand Up @@ -32,7 +32,7 @@ def move_scheduled_syncs(importer_collection, schedule_collection):
"""

# iterate over all importers looking for those with scheduled syncs
for importer in importer_collection.find(fields=['scheduled_syncs', 'id', 'repo_id']):
for importer in importer_collection.find(projection=['scheduled_syncs', 'id', 'repo_id']):
scheduled_syncs = importer.get('scheduled_syncs')
if scheduled_syncs is None:
continue
Expand Down Expand Up @@ -60,7 +60,8 @@ def move_scheduled_publishes(distributor_collection, schedule_collection):
"""

# iterate over all distributors looking for those with scheduled publishes
for distributor in distributor_collection.find(fields=['scheduled_publishes', 'id', 'repo_id']):
for distributor in distributor_collection.find(
projection=['scheduled_publishes', 'id', 'repo_id']):
scheduled_publishes = distributor.get('scheduled_publishes')
if scheduled_publishes is None:
continue
Expand Down
5 changes: 2 additions & 3 deletions server/pulp/server/db/migrations/lib/managers.py
Expand Up @@ -51,7 +51,7 @@ def rebuild_content_unit_counts(repo_ids=None):

# default to all repos if none were specified
if not repo_ids:
repo_ids = [repo['id'] for repo in repo_collection.find(fields=['id'])]
repo_ids = [repo['id'] for repo in repo_collection.find(projection=['id'])]

_logger.info('regenerating content unit counts for %d repositories' % len(repo_ids))

Expand All @@ -64,8 +64,7 @@ def rebuild_content_unit_counts(repo_ids=None):
for type_id in type_ids:
spec = {'repo_id': repo_id, 'unit_type_id': type_id}
counts[type_id] = association_collection.find(spec).count()
repo_collection.update({'id': repo_id}, {'$set': {'content_unit_counts': counts}},
safe=True)
repo_collection.update({'id': repo_id}, {'$set': {'content_unit_counts': counts}})

@staticmethod
def find_with_importer_type(importer_type_id):
Expand Down
12 changes: 6 additions & 6 deletions server/pulp/server/managers/consumer/applicability.py
Expand Up @@ -149,7 +149,7 @@ def batch_regenerate_applicability(repo_id, existing_applicability_ids):
existing_applicability = RepoProfileApplicability(**dict(existing_applicability))
profile_hash = existing_applicability['profile_hash']
unit_profile = UnitProfile.get_collection().find_one({'profile_hash': profile_hash},
fields=['id', 'content_type'])
projection=['id', 'content_type'])
if unit_profile is None:
# Unit profiles change whenever packages are installed or removed on consumers,
# and it is possible that existing_applicability references a UnitProfile
Expand Down Expand Up @@ -206,7 +206,7 @@ def regenerate_applicability(profile_hash, content_type, profile_id,
profile = existing_applicability.profile
else:
unit_profile = UnitProfile.get_collection().find_one({'id': profile_id},
fields=['profile'])
projection=['profile'])
profile = unit_profile['profile']
call_config = PluginCallConfiguration(plugin_config=profiler_cfg,
repo_plugin_config=None)
Expand Down Expand Up @@ -266,7 +266,7 @@ def _is_existing_applicability(repo_id, profile_hash):
:type: boolean
"""
query_params = {'repo_id': repo_id, 'profile_hash': profile_hash}
if RepoProfileApplicability.get_collection().find_one(query_params, fields=['_id']):
if RepoProfileApplicability.get_collection().find_one(query_params, projection=['_id']):
return True
return False

Expand Down Expand Up @@ -517,7 +517,7 @@ def _add_profiles_to_consumer_map_and_get_hashes(consumer_ids, consumer_map):
"""
profiles = UnitProfile.get_collection().find(
{'consumer_id': {'$in': consumer_ids}},
fields=['consumer_id', 'profile_hash'])
projection=['consumer_id', 'profile_hash'])
profile_hashes = set()
for p in profiles:
consumer_map[p['consumer_id']]['profiles'].append(p)
Expand All @@ -543,7 +543,7 @@ def _add_repo_ids_to_consumer_map(consumer_ids, consumer_map):
"""
bindings = Bind.get_collection().find(
{'consumer_id': {'$in': consumer_ids}},
fields=['consumer_id', 'repo_id'])
projection=['consumer_id', 'repo_id'])
for b in bindings:
consumer_map[b['consumer_id']]['repo_ids'].append(b['repo_id'])

Expand Down Expand Up @@ -595,7 +595,7 @@ def _get_applicability_map(profile_hashes, content_types):
"""
applicabilities = RepoProfileApplicability.get_collection().find(
{'profile_hash': {'$in': profile_hashes}},
fields=['profile_hash', 'repo_id', 'applicability'])
projection=['profile_hash', 'repo_id', 'applicability'])
return_value = {}
for a in applicabilities:
if content_types is not None:
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/consumer/cud.py
Expand Up @@ -198,7 +198,7 @@ def get_consumer(id, fields=None):
:raises MissingResource: if a consumer with given id does not exist
"""
consumer_coll = Consumer.get_collection()
consumer = consumer_coll.find_one({'id': id}, fields=fields)
consumer = consumer_coll.find_one({'id': id}, projection=fields)
if not consumer:
raise MissingResource(consumer=id)
return consumer
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/consumer/profile.py
Expand Up @@ -91,7 +91,7 @@ def consumer_deleted(self, id):
"""
collection = UnitProfile.get_collection()
for p in self.get_profiles(id):
collection.remove(p, sefe=True)
collection.remove(p)

@staticmethod
def get_profile(consumer_id, content_type):
Expand Down
4 changes: 2 additions & 2 deletions server/pulp/server/managers/content/orphan.py
Expand Up @@ -103,7 +103,7 @@ def generate_orphans_by_type(content_type_id, fields=None):
content_units_collection = content_types_db.type_units_collection(content_type_id)
repo_content_units_collection = RepoContentUnit.get_collection()

for content_unit in content_units_collection.find({}, fields=fields):
for content_unit in content_units_collection.find({}, projection=fields):

repo_content_units_cursor = repo_content_units_collection.find(
{'unit_id': content_unit['_id']})
Expand Down Expand Up @@ -225,7 +225,7 @@ def delete_orphans_by_type(content_type_id, content_unit_ids=None):
unit_id=content_unit['_id'],
unit_type_id=content_type_id
).delete()
content_units_collection.remove(content_unit['_id'], safe=False)
content_units_collection.remove(content_unit['_id'])

storage_path = content_unit.get('_storage_path', None)
if storage_path is not None:
Expand Down
10 changes: 5 additions & 5 deletions server/pulp/server/managers/content/query.py
Expand Up @@ -71,7 +71,7 @@ def list_content_units(self,
collection = content_types_db.type_units_collection(content_type)
if db_spec is None:
db_spec = {}
cursor = collection.find(db_spec, fields=model_fields)
cursor = collection.find(db_spec, projection=model_fields)
if start > 0:
cursor.skip(start)
if limit is not None:
Expand Down Expand Up @@ -165,7 +165,7 @@ def get_multiple_units_by_keys_dicts(self, content_type, unit_keys_dicts, model_
collection = content_types_db.type_units_collection(content_type)
for segment in paginate(unit_keys_dicts, page_size=50):
spec = _build_multi_keys_spec(content_type, segment)
cursor = collection.find(spec, fields=model_fields)
cursor = collection.find(spec, projection=model_fields)
for unit_dict in cursor:
yield unit_dict

Expand All @@ -185,7 +185,7 @@ def get_multiple_units_by_ids(self, content_type, unit_ids, model_fields=None):
@rtype: (possibly empty) tuple of dict's
"""
collection = content_types_db.type_units_collection(content_type)
cursor = collection.find({'_id': {'$in': unit_ids}}, fields=model_fields)
cursor = collection.find({'_id': {'$in': unit_ids}}, projection=model_fields)
return tuple(cursor)

def get_content_unit_keys(self, content_type, unit_ids):
Expand All @@ -207,7 +207,7 @@ def get_content_unit_keys(self, content_type, unit_ids):
all_fields = ['_id']
_flatten_keys(all_fields, key_fields)
collection = content_types_db.type_units_collection(content_type)
cursor = collection.find({'_id': {'$in': unit_ids}}, fields=all_fields)
cursor = collection.find({'_id': {'$in': unit_ids}}, projection=all_fields)
dicts = tuple(dict(d) for d in cursor)
ids = tuple(d.pop('_id') for d in dicts)
return (ids, dicts)
Expand All @@ -231,7 +231,7 @@ def get_content_unit_ids(content_type, unit_keys):
for segment in paginate(unit_keys):
spec = _build_multi_keys_spec(content_type, segment)
fields = ['_id']
for item in collection.find(spec, fields=fields):
for item in collection.find(spec, projection=fields):
yield str(item['_id'])

def get_root_content_dir(self, content_type):
Expand Down
6 changes: 3 additions & 3 deletions server/pulp/server/managers/repo/unit_association_query.py
Expand Up @@ -239,7 +239,7 @@ def unit_type_ids_for_repo(repo_id):

collection = RepoContentUnit.get_collection()

cursor = collection.find({'repo_id': repo_id}, fields=['unit_type_id'])
cursor = collection.find({'repo_id': repo_id}, projection=['unit_type_id'])

return [t for t in cursor.distinct('unit_type_id')]

Expand All @@ -264,7 +264,7 @@ def _unit_associations_cursor(repo_id, criteria):

collection = RepoContentUnit.get_collection()

cursor = collection.find(spec, fields=criteria.association_fields)
cursor = collection.find(spec, projection=criteria.association_fields)

if criteria.association_sort:
cursor.sort(criteria.association_sort)
Expand Down Expand Up @@ -362,7 +362,7 @@ def _associated_units_by_type_cursor(unit_type_id, criteria, associated_unit_ids
fields = list(fields)
fields.append('_content_type_id')

cursor = collection.find(spec, fields=fields)
cursor = collection.find(spec, projection=fields)

sort = criteria.unit_sort

Expand Down
2 changes: 1 addition & 1 deletion server/setup.py
Expand Up @@ -27,6 +27,6 @@
},
install_requires=[
'blinker', 'celery >=3.1.0, <3.2.0', DJANGO_REQUIRES, 'httplib2', 'iniparse',
'isodate>=0.5.0', 'm2crypto', 'mongoengine>=0.7.10', 'oauth2>=1.5.211', 'pymongo>=2.5.2',
'isodate>=0.5.0', 'm2crypto', 'mongoengine>=0.10.0', 'oauth2>=1.5.211', 'pymongo>=3.0.0',
'semantic_version>=2.2.0', 'setuptools']
)

0 comments on commit 644b607

Please sign in to comment.