Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add indexation to databases and fix race condition for experiment configuration #55

Merged
merged 6 commits into from
Mar 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions src/metaopt/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import logging

from metaopt.core import resolve_config
from metaopt.core.io.database import Database
from metaopt.core.io.database import Database, DuplicateKeyError
from metaopt.core.worker import workon
from metaopt.core.worker.experiment import Experiment

Expand Down Expand Up @@ -43,7 +43,8 @@ def infer_experiment():
expconfig = resolve_config.fetch_default_options()

# Fetch mopt system variables (database and resource information)
# See :const:`metaopt.core.io.resolve_config.ENV_VARS` for environmental variables used
# See :const:`metaopt.core.io.resolve_config.ENV_VARS` for environmental
# variables used
expconfig = resolve_config.merge_env_vars(expconfig)

# Initialize singleton database object
Expand All @@ -58,9 +59,37 @@ def infer_experiment():
exp_name = tmpconfig['name']
if exp_name is None:
raise RuntimeError("Could not infer experiment's name. "
"Please use either `name` cmd line arg or provide one "
"in metaopt's configuration file.")
"Please use either `name` cmd line arg or provide "
"one in metaopt's configuration file.")

experiment = create_experiment(exp_name, expconfig, cmdconfig, cmdargs)

return experiment


def create_experiment(exp_name, expconfig, cmdconfig, cmdargs):
"""Create an experiment based on configuration.

Configuration is a combination of command line, experiment configuration
file, experiment configuration in database and metaopt configuration files.

Precedence of configurations is:
`cmdargs` > `cmdconfig` > `dbconfig` > `expconfig`

This means `expconfig` values would be overwritten by `dbconfig` and so on.

Parameters
----------
exp_name: str
Name of the experiment
expconfig: dict
Configuration coming from default configuration files.
cmdconfig: dict
Configuration coming from configuration file.
cmdargs: dict
Configuration coming from command line arguments.

"""
# Initialize experiment object.
# Check for existing name and fetch configuration.
experiment = Experiment(exp_name)
Expand All @@ -77,8 +106,16 @@ def infer_experiment():
expconfig.pop('resources', None)
expconfig.pop('status', None)

# Finish experiment's configuration
experiment.configure(expconfig)
# Finish experiment's configuration and write it to database.
try:
experiment.configure(expconfig)
except DuplicateKeyError:
# Fails if concurrent experiment with identical (name, metadata.user)
# is written first in the database.
# Next infer_experiment() should either load experiment from database
# and run smoothly if identical or trigger an experiment fork.
# In other words, there should not be more than 1 level of recursion.
experiment = create_experiment(exp_name, expconfig, cmdconfig, cmdargs)

return experiment

Expand Down
54 changes: 49 additions & 5 deletions src/metaopt/core/io/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

"""
from abc import abstractmethod, abstractproperty
import logging

from metaopt.core.utils import (AbstractSingletonType, SingletonFactory)

Expand All @@ -39,11 +38,12 @@ class AbstractDB(object, metaclass=AbstractSingletonType):

"""

ASCENDING = 0
DESCENDING = 1

def __init__(self, host='localhost', name=None,
port=None, username=None, password=None):
"""Init method, see attributes of :class:`AbstractDB`."""
self._logger = logging.getLogger(__name__)

self.host = host
self.name = name
self.port = port
Expand Down Expand Up @@ -74,8 +74,34 @@ def close_connection(self):
pass

@abstractmethod
def write(self, collection_name, data,
query=None):
def ensure_index(self, collection_name, keys, unique=False):
"""Create given indexes if they do not already exist in database.

Parameters
----------
collection_name : str
A collection inside database, a table.
keys: str or list of tuples
Can be a string representing a key to index, or a list of tuples
with the structure `[(key_name, sort_order)]`. `key_name` must be a
string and sort_order can be either `AbstractDB.ASCENDING` or
AbstractDB.DESCENDING`.
unique: bool, optional
Ensure each document have a different key value. If not, operations
like `write()` and `read_and_write()` will raise
`DuplicateKeyError`.
Defaults to False.

.. note::
Depending on the backend, the indexing operation might operate in
background. This means some operations on the database might occur
before the indexes are totally built.

"""
pass

@abstractmethod
def write(self, collection_name, data, query=None):
"""Write new information to a collection. Perform insert or update.

Parameters
Expand All @@ -97,6 +123,11 @@ def write(self, collection_name, data,
In the case of an update operation, if `query` fails to find a
document that matches, insert of `data` will be performed instead.

:raises :exc:`DuplicateKeyError`: if the operation is creating duplicate
keys in two different documents. Only occurs if the keys have
unique indexes. See :meth:`AbstractDB.ensure_index` for more
information about indexes.

"""
pass

Expand Down Expand Up @@ -139,6 +170,11 @@ def read_and_write(self, collection_name, query, data, selection=None):

:return: updated first matched document or None if nothing found

:raises :exc:`DuplicateKeyError`: if the operation is creating duplicate
keys in two different documents. Only occurs if the keys have
unique indexes. See :meth:`AbstractDB.ensure_index` for more
information about indexes.

"""
pass

Expand Down Expand Up @@ -181,6 +217,14 @@ class DatabaseError(RuntimeError):
pass


class DuplicateKeyError(DatabaseError):
"""Exception type used when a write attempt is made but the new document
have an index already contained in the database.
"""

pass


# pylint: disable=too-few-public-methods,abstract-method
class Database(AbstractDB, metaclass=SingletonFactory):
"""Class used to inject dependency on a database framework.
Expand Down
116 changes: 96 additions & 20 deletions src/metaopt/core/io/database/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,57 @@
:synopsis: Implement :class:`metaopt.core.io.database.AbstractDB` for MongoDB.

"""
import functools

import pymongo

from metaopt.core.io.database import (AbstractDB, DatabaseError)
from metaopt.core.io.database import (
AbstractDB, DatabaseError, DuplicateKeyError)


AUTH_FAILED_MESSAGES = [
"auth failed",
"Authentication failed."]

DUPLICATE_KEY_MESSAGES = [
"duplicate key error"]


def mongodb_exception_wrapper(method):
"""Convert pymongo exceptions to generic exception types defined in src.core.io.database.

Current exception types converted:
pymongo.errors.DuplicateKeyError -> DuplicateKeyError
pymongo.errors.BulkWriteError[DUPLICATE_KEY_MESSAGES] -> DuplicateKeyError
pymongo.errors.ConnectionFailure -> DatabaseError
pymongo.errors.OperationFailure(AUTH_FAILED_MESSAGES) -> DatabaseError

"""
@functools.wraps(method)
def _decorator(self, *args, **kwargs):

try:
rval = method(self, *args, **kwargs)
except pymongo.errors.DuplicateKeyError as e:
raise DuplicateKeyError(str(e)) from e
except pymongo.errors.BulkWriteError as e:
for error in e.details['writeErrors']:
if any(m in error["errmsg"] for m in DUPLICATE_KEY_MESSAGES):
raise DuplicateKeyError(error["errmsg"]) from e

raise
except pymongo.errors.ConnectionFailure as e:
raise DatabaseError("Connection Failure: database not found on "
"specified uri") from e
except pymongo.errors.OperationFailure as e:
if any(m in str(e) for m in AUTH_FAILED_MESSAGES):
raise DatabaseError("Authentication Failure: bad credentials") from e

raise

return rval

return _decorator


class MongoDB(AbstractDB):
Expand All @@ -29,6 +77,7 @@ class MongoDB(AbstractDB):

"""

@mongodb_exception_wrapper
def initiate_connection(self):
"""Connect to database, unless MongoDB `is_connected`.

Expand All @@ -40,23 +89,13 @@ def initiate_connection(self):

self._sanitize_attrs()

try:
self._conn = pymongo.MongoClient(host=self.host,
port=self.port,
username=self.username,
password=self.password,
authSource=self.name)
self._db = self._conn[self.name]
self._db.command('ismaster') # .. seealso:: :meth:`is_connected`
except pymongo.errors.ConnectionFailure as e:
self._logger.error("Could not connect to host, %s:%s",
self.host, self.port)
raise DatabaseError("Connection Failure: database not found on "
"specified uri") from e
except pymongo.errors.OperationFailure as e:
self._logger.error("Could not verify user, %s, on database, %s",
self.username, self.name)
raise DatabaseError("Authentication Failure: bad credentials") from e
self._conn = pymongo.MongoClient(host=self.host,
port=self.port,
username=self.username,
password=self.password,
authSource=self.name)
self._db = self._conn[self.name]
self._db.command('ismaster') # .. seealso:: :meth:`is_connected`

@property
def is_connected(self):
Expand All @@ -82,8 +121,44 @@ def close_connection(self):
"""
self._conn.close()

def write(self, collection_name, data,
query=None):
def ensure_index(self, collection_name, keys, unique=False):
"""Create given indexes if they do not already exist in database.

.. seealso:: :meth:`AbstractDB.ensure_index` for argument documentation.

"""
# MongoDB's `create_index()` is idempotent, which means it will only
# create new indexes if they do not already exists. That's why we do
# not need to verify if indexes already exists.
dbcollection = self._db[collection_name]

keys = self._convert_index_keys(keys)

dbcollection.create_index(keys, unique=unique, background=True)

def _convert_index_keys(self, keys):
"""Convert index keys to MongoDB ones."""
if not isinstance(keys, (list, tuple)):
keys = [(keys, self.ASCENDING)]

converted_keys = []
for key, sort_order in keys:
converted_keys.append((key, self._convert_sort_order(sort_order)))

return converted_keys

def _convert_sort_order(self, sort_order):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite sure what's up with the 'ascending', 'descending' business. Does it have to do with how are the documents gonna be stored in a dbcollection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly!

"""Convert generic `AbstractDB` sort orders to MongoDB ones."""
if sort_order is self.ASCENDING:
return pymongo.ASCENDING
elif sort_order is self.DESCENDING:
return pymongo.DESCENDING
else:
raise RuntimeError("Invalid database sort order %s" %
str(sort_order))

@mongodb_exception_wrapper
def write(self, collection_name, data, query=None):
"""Write new information to a collection. Perform insert or update.

.. seealso:: :meth:`AbstractDB.write` for argument documentation.
Expand Down Expand Up @@ -119,6 +194,7 @@ def read(self, collection_name, query=None, selection=None):

return dbdocs

@mongodb_exception_wrapper
def read_and_write(self, collection_name, query, data, selection=None):
"""Read a collection's document and update the found document.

Expand Down
27 changes: 23 additions & 4 deletions src/metaopt/core/worker/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self, name):
log.debug("Creating Experiment object with name: %s", name)
self._init_done = False
self._db = Database() # fetch database instance
self._setup_db() # build indexes for collections

self._id = None
self.name = name
Expand Down Expand Up @@ -134,6 +135,19 @@ def __init__(self, name):

self._last_fetched = self.metadata['datetime']

def _setup_db(self):
self._db.ensure_index('experiments',
[('name', Database.ASCENDING),
('metadata.user', Database.ASCENDING)],
unique=True)
self._db.ensure_index('experiments', 'status')

self._db.ensure_index('trials', 'experiment')
self._db.ensure_index('trials', 'status')
self._db.ensure_index('trials', 'results')
self._db.ensure_index('trials', 'start_time')
self._db.ensure_index('trials', [('end_time', Database.DESCENDING)])

def reserve_trial(self, score_handle=None):
"""Find *new* trials that exist currently in database and select one of
them based on the highest score return from `score_handle` callable.
Expand Down Expand Up @@ -324,15 +338,20 @@ def configure(self, config):

# If everything is alright, push new config to database
if is_new:
# TODO: No need for read_and_write here, because unique indexes
# will make sure no experiments will be added with identical
# names. That means, this need refactoring once support for
# additional indexes is added to database.
# This will raise DuplicateKeyError if a concurrent experiment with
# identical (name, metadata.user) is written first in the database.

self._db.write('experiments', final_config)
# XXX: Reminder for future DB implementations:
# MongoDB, updates an inserted dict with _id, so should you :P
self._id = final_config['_id']
else:
# Writing the final config to an already existing experiment raises
# a DuplicatKeyError because of the embedding id `metadata.user`.
# To avoid this `final_config["name"]` is popped out before
# `db.write()`, thus seamingly breaking the compound index
# `(name, metadata.user)`
final_config.pop("name")
self._db.write('experiments', final_config, {'_id': self._id})

@property
Expand Down
2 changes: 1 addition & 1 deletion tests/unittests/core/experiment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
suspend: False
done: False

- name: supernaedo2
- name: supernaedo3

metadata:
user: tsirif
Expand Down
Loading