Skip to content

Commit

Permalink
MongoDB: atomic in write and remove return matched doc
Browse files Browse the repository at this point in the history
  • Loading branch information
dendisuhubdy authored and tsirif committed Nov 30, 2017
1 parent 94f8e09 commit ab159cf
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 17 deletions.
18 changes: 14 additions & 4 deletions src/metaopt/io/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def close_connection(self):

@abstractmethod
def write(self, collection_name, data,
query=None):
query=None, atomic=False, selection=None):
"""Write new information to a collection. Perform insert or update.
Parameters
Expand All @@ -91,8 +91,13 @@ def write(self, collection_name, data,
New data that will **be inserted** or that will **update** entries.
query : dict, optional
Assumes an update operation: filter entries in collection to be updated.
atomic : bool, optional
If True, update only the most recent entry that matches `query`.
selection : dict, optional
Used in `atomic` update operation: elements of the matched entry to
return, the projection.
:return: operation success.
:return: operation success, if not `atomic`, matched updated document else.
.. note::
In the case of an insert operation, `data` variable will be updated
Expand Down Expand Up @@ -124,7 +129,7 @@ def read(self, collection_name, query, selection=None):
pass

@abstractmethod
def remove(self, collection_name, query):
def remove(self, collection_name, query, atomic=False, selection=None):
"""Delete from a collection document[s] which match the `query`.
Parameters
Expand All @@ -133,8 +138,13 @@ def remove(self, collection_name, query):
A collection inside database, a table.
query : dict
Filter entries in collection.
atomic : bool, optional
If True, delete only the most recent entry that matches `query`.
selection : dict, optional
Used in `atomic` delete operation: elements of the matched entry to
return, the projection.
:return: operation success.
:return: operation success, if not `atomic`, matched deleted document else.
"""
pass
Expand Down
33 changes: 24 additions & 9 deletions src/metaopt/io/database/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from __future__ import absolute_import

import pymongo
from pymongo import MongoClient
from pymongo import (MongoClient, ReturnDocument)
from pymongo.uri_parser import parse_uri
from six import raise_from

Expand Down Expand Up @@ -88,7 +88,7 @@ def close_connection(self):
self._conn.close()

def write(self, collection_name, data,
query=None):
query=None, atomic=False, selection=None):
"""Write new information to a collection. Perform insert or update.
.. seealso:: :meth:`AbstractDB.write` for argument documentation.
Expand All @@ -106,10 +106,19 @@ def write(self, collection_name, data,

update_data = {'$set': data}

result = dbcollection.update_many(filter=query,
update=update_data,
upsert=True)
return result.acknowledged
if not atomic:
result = dbcollection.update_many(filter=query,
update=update_data,
upsert=True)
return result.acknowledged

result = dbcollection.find_one_and_update(filter=query,
update=update_data,
projection=selection,
upsert=True,
sort=[('$natural', -1)],
return_document=ReturnDocument.AFTER)
return result

def read(self, collection_name, query, selection=None):
"""Read a collection and return a value according to the query.
Expand All @@ -127,16 +136,22 @@ def read(self, collection_name, query, selection=None):

return dbdocs if len(dbdocs) > 1 else dbdocs[0]

def remove(self, collection_name, query):
def remove(self, collection_name, query, atomic=False, selection=None):
"""Delete from a collection document[s] which match the `query`.
.. seealso:: :meth:`AbstractDB.remove` for argument documentation.
"""
dbcollection = self._db[collection_name]

result = dbcollection.delete_many(filter=query)
return result.acknowledged
if not atomic:
result = dbcollection.delete_many(filter=query)
return result.acknowledged

result = dbcollection.find_one_and_delete(filter=query,
projection=selection,
sort=[('$natural', -1)])
return result

def _sanitize_attrs(self):
"""Sanitize attributes using MongoDB's 'uri_parser' module."""
Expand Down
75 changes: 71 additions & 4 deletions tests/core/unittests/mongodb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ def test_insert_many(self, database, moptdb):
value = database.experiments.find_one({'exp_name': 'supernaekei3'})
assert value == item[1]

def test_update_many_default(self, database, moptdb):
"""Should match existing entries, and update some of their keys."""
def test_update_many_atomic_default(self, database, moptdb):
"""Should match existing entries, and update some of their keys.
- Atomic is implicitly False.
"""
filt = {'exp_name': 'supernaedo2', 'metadata.user': 'tsirif'}
count_before = database.experiments.count()
# call interface
Expand All @@ -182,15 +185,79 @@ def test_update_many_default(self, database, moptdb):
assert value[1]['pool_size'] == 16
assert value[2]['pool_size'] == 2

def test_update_many_atomic_false(self, database, moptdb):
"""Should match existing entries, and update some of their keys.
- Atomic is explicitly False.
"""
filt = {'exp_name': 'supernaedo2', 'metadata.user': 'tsirif'}
count_before = database.experiments.count()
# call interface
assert moptdb.write('experiments', {'pool_size': 8}, filt, atomic=False) is True
assert database.experiments.count() == count_before
value = list(database.experiments.find({'exp_name': 'supernaedo2'}))
assert value[0]['pool_size'] == 8
assert value[1]['pool_size'] == 8
assert value[2]['pool_size'] == 2

def test_update_one_atomic_true(self, database, moptdb):
"""Should match existing entries, and update some of their keys.
- Atomic is True.
"""
filt = {'exp_name': 'supernaedo2', 'metadata.user': 'tsirif'}
count_before = database.experiments.count()
# call interface
ret = moptdb.write('experiments', {'pool_size': 10},
filt, atomic=True, selection={'_id': 0})
assert database.experiments.count() == count_before
value = list(database.experiments.find({'exp_name': 'supernaedo2'}))
assert value[0]['pool_size'] != 10
assert value[1]['pool_size'] == 10
assert value[2]['pool_size'] == 2
filt['pool_size'] = 10
assert ret == database.experiments.find_one(filt, {'_id': False})


@pytest.mark.usefixtures("clean_db")
class TestRemove(object):
"""Calls to :meth:`metaopt.io.database.MongoDB.remove`."""

def test_remove_many_default(self, database, moptdb):
"""Should match existing entries, and delete them all."""
def test_remove_many_atomic_default(self, database, moptdb):
"""Should match existing entries, and delete them all.
- Atomic is implicitly False.
"""
filt = {'exp_name': 'supernaedo2', 'metadata.user': 'tsirif'}
count_before = database.experiments.count()
# call interface
assert moptdb.remove('experiments', filt) is True
assert database.experiments.count() == count_before - 2

def test_remove_many_atomic_false(self, database, moptdb):
"""Should match existing entries, and delete them all.
- Atomic is explicitly False.
"""
filt = {'exp_name': 'supernaedo2', 'user': 'tsirif'}
count_before = database.workers.count()
# call interface
assert moptdb.remove('workers', filt, atomic=False) is True
assert database.workers.count() == count_before - 2

def test_remove_one_atomic_true(self, database, moptdb, exp_config):
"""Should match existing entries, and delete the most recent one only.
- Atomic is True.
"""
filt = {'exp_name': 'supernaedo2', 'user': 'tsirif'}
count_before = database.trials.count()
# call interface
value = moptdb.remove('trials', filt, atomic=True, selection=['result', 'params'])
assert database.trials.count() == count_before - 1
assert value == {'result': exp_config[1][-1]['result'],
'params': exp_config[1][-1]['params'],
'_id': exp_config[1][-1]['_id']}
trials = list(database.trials.find())
for trial in trials:
assert trial['submit_time'] < exp_config[1][-1]['submit_time']

0 comments on commit ab159cf

Please sign in to comment.