From 8e0a771f14082b376f203b831c4701661aa905a5 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Wed, 21 Dec 2016 18:05:38 -0500 Subject: [PATCH] Add support for including or excluding fields per namespace (#612) Add support for including or excluding fields per namespace Add support for wildcards in GridFS namespaces. Add support for excluding and including different namespaces at the same time. - previously it was a configuration error. Add a new format for the `namespaces` configuration file option. - it is invalid to mix old and new namespaces configuration. --- config.json | 45 ++- mongo_connector/connector.py | 153 +++++----- mongo_connector/namespace_config.py | 327 ++++++++++++++++------ mongo_connector/oplog_manager.py | 166 ++++------- tests/test_command_replication.py | 2 +- tests/test_config.py | 34 +-- tests/test_filter_fields.py | 402 +++------------------------ tests/test_mongo.py | 63 +++-- tests/test_mongo_connector.py | 3 - tests/test_mongo_doc_manager.py | 2 +- tests/test_namespace_config.py | 225 +++++++++++++-- tests/test_oplog_manager.py | 22 +- tests/test_oplog_manager_wildcard.py | 2 +- tests/test_synchronizer.py | 4 +- 14 files changed, 739 insertions(+), 711 deletions(-) diff --git a/config.json b/config.json index e7f3773e..b68b8375 100644 --- a/config.json +++ b/config.json @@ -37,18 +37,41 @@ "__fields": ["field1", "field2", "field3"], - "namespaces": { - "__include": ["db.source1", "db.source2", "db2.*"], - "__exclude": ["db3.source1", "db3.*"], - "__mapping": { - "db.source1": "db.dest1", - "db.source2": "db.dest2", - "db2.*": "db2.dest_*", - "db.col_*": { - "rename": "db.new_name_*" - } + "__namespaces": { + "excluded.collection": false, + "excluded_wildcard.*": false, + "*.exclude_collection_from_every_database": false, + "included.collection1": true, + "included.collection2": {}, + "included.collection4": { + "includeFields": ["included_field", "included.nested.field"] }, - "__gridfs": ["db.fs"] + "included.collection5": { + "rename": "included.new_collection5_name", + "includeFields": ["included_field", "included.nested.field"] + }, + "included.collection6": { + "excludeFields": ["excluded_field", "excluded.nested.field"] + }, + "included.collection7": { + "rename": "included.new_collection7_name", + "excludeFields": ["excluded_field", "excluded.nested.field"] + }, + "included_wildcard1.*": true, + "included_wildcard2.*": true, + "renamed.collection1": "something.else1", + "renamed.collection2": { + "rename": "something.else2" + }, + "renamed_wildcard.*": { + "rename": "new_name.*" + }, + "gridfs.collection": { + "gridfs": true + }, + "gridfs_wildcard.*": { + "gridfs": true + } }, "docManagers": [ diff --git a/mongo_connector/connector.py b/mongo_connector/connector.py index 4dc03503..171b6ef9 100644 --- a/mongo_connector/connector.py +++ b/mongo_connector/connector.py @@ -14,6 +14,7 @@ """Discovers the MongoDB cluster and starts the connector. """ +import copy import json import logging import logging.handlers @@ -36,7 +37,8 @@ from mongo_connector.doc_managers.doc_manager_base import DocManagerBase from mongo_connector.command_helper import CommandHelper from mongo_connector.util import log_fatal_exceptions, retry_until_ok -from mongo_connector.namespace_config import NamespaceConfig +from mongo_connector.namespace_config import (NamespaceConfig, + validate_namespace_options) from pymongo import MongoClient @@ -127,8 +129,14 @@ def __init__(self, mongo_address, doc_managers=None, **kwargs): # The namespace configuration shared by all OplogThreads and # DocManagers self.namespace_config = NamespaceConfig( - kwargs.get('ns_set'), kwargs.get('ex_ns_set'), - kwargs.get('dest_mapping')) + namespace_set=kwargs.get('ns_set'), + ex_namespace_set=kwargs.get('ex_ns_set'), + gridfs_set=kwargs.get('gridfs_set'), + dest_mapping=kwargs.get('dest_mapping'), + namespace_options=kwargs.get('namespace_options'), + include_fields=kwargs.get('fields'), + exclude_fields=kwargs.get('exclude_fields') + ) # Initialize and set the command helper command_helper = CommandHelper(self.namespace_config) @@ -185,6 +193,7 @@ def from_config(cls, config): ns_set=config['namespaces.include'], ex_ns_set=config['namespaces.exclude'], dest_mapping=config['namespaces.mapping'], + namespace_options=config['namespaces.namespace_options'], gridfs_set=config['namespaces.gridfs'], ssl_certfile=config['ssl.sslCertfile'], ssl_keyfile=config['ssl.sslKeyfile'], @@ -732,84 +741,94 @@ def apply_exclude_fields(option, cli_values): "exported. Supports dot notation for document fields but cannot span " "arrays. Cannot use both 'fields' and 'exclude_fields'.") - def apply_namespaces(option, cli_values): - if cli_values['ns_set']: - option.value['include'] = cli_values['ns_set'].split(',') + def merge_namespaces_cli(option, cli_values): + def validate_no_duplicates(lst, list_name): + if len(lst) != len(set(lst)): + raise errors.InvalidConfiguration( + "%s should not contain any duplicates." % (list_name,)) - if cli_values['ex_ns_set']: - option.value['exclude'] = cli_values['ex_ns_set'].split(',') + if cli_values["ns_set"]: + option.value["include"] = cli_values["ns_set"].split(",") - if cli_values['gridfs_set']: - option.value['gridfs'] = cli_values['gridfs_set'].split(',') + if cli_values["ex_ns_set"]: + option.value["exclude"] = cli_values["ex_ns_set"].split(",") - if cli_values['dest_ns_set']: - ns_set = option.value['include'] - dest_ns_set = cli_values['dest_ns_set'].split(',') + if cli_values["gridfs_set"]: + option.value["gridfs"] = cli_values["gridfs_set"].split(",") + + if cli_values["dest_ns_set"]: + ns_set = option.value["include"] + dest_ns_set = cli_values["dest_ns_set"].split(",") if len(ns_set) != len(dest_ns_set): raise errors.InvalidConfiguration( "Destination namespace set should be the" " same length as the origin namespace set.") - option.value['mapping'] = dict(zip(ns_set, dest_ns_set)) + option.value["mapping"] = dict(zip(ns_set, dest_ns_set)) + + # Check for duplicates + if option.value["include"]: + validate_no_duplicates(option.value["include"], + "Include namespace set") + if option.value["exclude"]: + validate_no_duplicates(option.value["exclude"], + "Exclude namespace set") + if option.value["gridfs"]: + validate_no_duplicates(option.value["gridfs"], + "GridFS namespace set") + if option.value["mapping"]: + validate_no_duplicates(list(option.value["mapping"].values()), + "Destination namespace set") - ns_set = option.value['include'] - if len(ns_set) != len(set(ns_set)): - raise errors.InvalidConfiguration( - "Namespace set should not contain any duplicates.") + def apply_namespaces(option, cli_values): + if (option.value['include'] or option.value['exclude'] or + option.value['mapping'] or option.value['gridfs']): + return apply_old_namespace_options(option, cli_values) + else: + return apply_new_namespace_options(option, cli_values) + + def apply_new_namespace_options(option, cli_values): + """Apply the new format (since 2.5.0) namespaces options.""" + merge_namespaces_cli(option, cli_values) + namespace_options = copy.deepcopy(option.value) + ns_set = namespace_options.pop('include', None) + ex_ns_set = namespace_options.pop('exclude', None) + gridfs_set = namespace_options.pop('gridfs', None) + dest_mapping = namespace_options.pop('mapping', None) + option.value["namespace_options"] = namespace_options + + validate_namespace_options( + namespace_set=ns_set, ex_namespace_set=ex_ns_set, + dest_mapping=dest_mapping, + namespace_options=namespace_options, gridfs_set=gridfs_set) + + def apply_old_namespace_options(option, cli_values): + """Apply the old format (before 2.5.0) namespaces options.""" + merge_namespaces_cli(option, cli_values) + + LOG.warning("Deprecation warning: the current namespaces " + "configuration format is outdated and support may be " + "removed in a future release. Please update your " + "config file to use the new format.") + ns_set = option.value['include'] ex_ns_set = option.value['exclude'] - if len(ex_ns_set) != len(set(ex_ns_set)): - raise errors.InvalidConfiguration( - "Exclude namespace set should not contain any duplicates.") - - # not allow to exist both 'include' and 'exclude' - if ns_set and ex_ns_set: - raise errors.InvalidConfiguration( - "Cannot use both namespace 'include' " - "(--namespace-set) and 'exclude' " - "(--exclude-namespace-set).") + gridfs_set = option.value['gridfs'] + dest_mapping = option.value['mapping'] - # validate 'include' format - for ns in ns_set: - if ns.count("*") > 1: + valid_names = set(['include', 'exclude', 'gridfs', 'mapping']) + valid_names |= set('__' + name for name in valid_names) + valid_names.add('__comment__') + for key in option.value: + if key not in valid_names: raise errors.InvalidConfiguration( - "Namespace set should be plain text " - "e.g. foo.bar or only contains one wildcard, e.g. foo.* .") + "Invalid option %s in old style (pre 2.5.0) namespaces " + "configuration. The only valid option names are: %r" % + (key, list(valid_names))) - # validate 'exclude' format - for ens in ex_ns_set: - if ens.count("*") > 1: - raise errors.InvalidConfiguration( - "Exclude namespace set should be plain text " - "e.g. foo.bar or only contains one wildcard, e.g. foo.* .") - dest_mapping = option.value['mapping'] - if len(dest_mapping) != len(set(dest_mapping.values())): - raise errors.InvalidConfiguration( - "Destination namespaces set should not" - " contain any duplicates.") - - for source_name, value in dest_mapping.items(): - # Mapping may be old style with only a target name, eg: - # "db.source": "db.dest", - # or new style which can support extra options per namespace: - # "db.source": {"rename": "db.dest", ...} - if isinstance(value, dict): - dest_name = value.get("rename", source_name) - else: - dest_name = value - if source_name.count("*") > 1 or dest_name.count("*") > 1: - raise errors.InvalidConfiguration( - "The namespace mapping source and destination " - "cannot contain more than one '*' character.") - if source_name.count("*") != dest_name.count("*"): - raise errors.InvalidConfiguration( - "The namespace mapping source and destination " - "must contain the same number of '*' characters.") - - gridfs_set = option.value['gridfs'] - if len(gridfs_set) != len(set(gridfs_set)): - raise errors.InvalidConfiguration( - "GridFS set should not contain any duplicates.") + validate_namespace_options( + namespace_set=ns_set, ex_namespace_set=ex_ns_set, + dest_mapping=dest_mapping, gridfs_set=gridfs_set) default_namespaces = { "include": [], diff --git a/mongo_connector/namespace_config.py b/mongo_connector/namespace_config.py index 8f8d183d..c9daade2 100644 --- a/mongo_connector/namespace_config.py +++ b/mongo_connector/namespace_config.py @@ -19,17 +19,32 @@ from itertools import combinations from mongo_connector import errors +from mongo_connector import compat LOG = logging.getLogger(__name__) -_Namespace = namedtuple('Namespace', ['dest_name', 'source_name']) +_Namespace = namedtuple('Namespace', ['dest_name', 'source_name', 'gridfs', + 'include_fields', 'exclude_fields']) class Namespace(_Namespace): - def __new__(cls, dest_name=None, source_name=None): - return super(Namespace, cls).__new__(cls, dest_name, source_name) + def __new__(cls, dest_name=None, source_name=None, gridfs=False, + include_fields=None, exclude_fields=None): + include_fields = set(include_fields or []) + exclude_fields = set(exclude_fields or []) + return super(Namespace, cls).__new__( + cls, dest_name, source_name, gridfs, include_fields, + exclude_fields) + + def with_options(self, **kwargs): + new_options = dict( + dest_name=self.dest_name, source_name=self.source_name, + gridfs=self.gridfs, include_fields=self.include_fields, + exclude_fields=self.exclude_fields) + new_options.update(kwargs) + return Namespace(**new_options) class RegexSet(MutableSet): @@ -89,7 +104,8 @@ class NamespaceConfig(object): """Manages included and excluded namespaces. """ def __init__(self, namespace_set=None, ex_namespace_set=None, - user_mapping=None): + gridfs_set=None, dest_mapping=None, namespace_options=None, + include_fields=None, exclude_fields=None): # A mapping from non-wildcard source namespaces to a MappedNamespace # containing the non-wildcard target name. self._plain = {} @@ -108,54 +124,50 @@ def __init__(self, namespace_set=None, ex_namespace_set=None, # created in `self.plain` for faster subsequent lookups. self._regex_map = [] - # The set of namespaces to exclude. Can contain wildcard namespaces. - self._ex_namespace_set = RegexSet.from_namespaces( - ex_namespace_set or []) - - user_mapping = user_mapping or {} - namespace_set = namespace_set or [] - # Add each namespace from the namespace_set and user_mapping - # parameters. Namespaces have a one-to-one relationship with the - # target system, meaning multiple source namespaces cannot be merged - # into a single namespace in the target. - for ns in namespace_set: - user_mapping.setdefault(ns, ns) - - renames = {} - for src_name, v in user_mapping.items(): - if isinstance(v, dict): - target_name = v.get('rename', src_name) - else: - target_name = v - renames[src_name] = target_name - self._add_collection(src_name, target_name) - validate_target_namespaces(renames) - - def _add_collection(self, src_name, dest_name=None): - """Add the collection name and the corresponding command namespace.""" - if dest_name is None: - dest_name = src_name - self._add_namespace(Namespace( - dest_name=dest_name, source_name=src_name)) + # Fields to include or exclude from all namespaces + self._include_fields = validate_include_fields(include_fields) + self._exclude_fields = validate_exclude_fields(exclude_fields) + + # Add each included namespace. Namespaces have a one-to-one + # relationship to the target system, meaning multiple source + # namespaces cannot be merged into a single namespace in the target. + ex_namespace_set, namespaces = validate_namespace_options( + namespace_set=namespace_set, + ex_namespace_set=ex_namespace_set, + gridfs_set=gridfs_set, + dest_mapping=dest_mapping, + namespace_options=namespace_options, + include_fields=include_fields, + exclude_fields=exclude_fields) + + # The set of, possibly wildcard, namespaces to exclude. + self._ex_namespace_set = RegexSet.from_namespaces(ex_namespace_set) + + for namespace in namespaces: + self._register_namespace_and_command(namespace) + + def _register_namespace_and_command(self, namespace): + """Add a Namespace and the corresponding command namespace.""" + self._add_namespace(namespace) # Add the namespace for commands on this database - cmd_name = src_name.split('.', 1)[0] + '.$cmd' - dest_cmd_name = dest_name.split('.', 1)[0] + '.$cmd' - self._add_namespace(Namespace( - dest_name=dest_cmd_name, source_name=cmd_name)) + cmd_name = namespace.source_name.split('.', 1)[0] + '.$cmd' + dest_cmd_name = namespace.dest_name.split('.', 1)[0] + '.$cmd' + self._add_namespace(Namespace(dest_name=dest_cmd_name, + source_name=cmd_name)) - def _add_namespace(self, mapped_namespace): + def _add_namespace(self, namespace): """Add an included and possibly renamed Namespace.""" - src_name = mapped_namespace.source_name + src_name = namespace.source_name if "*" in src_name: self._regex_map.append((namespace_to_regex(src_name), - mapped_namespace)) + namespace)) else: - self._add_plain_namespace(mapped_namespace) + self._add_plain_namespace(namespace) - def _add_plain_namespace(self, mapped_namespace): + def _add_plain_namespace(self, namespace): """Add an included and possibly renamed non-wildcard Namespace.""" - src_name = mapped_namespace.source_name - target_name = mapped_namespace.dest_name + src_name = namespace.source_name + target_name = namespace.dest_name src_names = self._reverse_plain.setdefault(target_name, set()) src_names.add(src_name) if len(src_names) > 1: @@ -167,7 +179,7 @@ def _add_plain_namespace(self, mapped_namespace): "exists a mapping from '%s' to '%s'" % (src_name, target_name, existing_src, target_name)) - self._plain[src_name] = mapped_namespace + self._plain[src_name] = namespace src_db, _ = src_name.split(".", 1) target_db, _ = target_name.split(".", 1) self._plain_db.setdefault(src_db, set()).add(target_db) @@ -181,23 +193,25 @@ def lookup(self, plain_src_ns): return None # Include all namespaces if there are no included namespaces. if not self._regex_map and not self._plain: - return Namespace(plain_src_ns) + return Namespace(dest_name=plain_src_ns, source_name=plain_src_ns, + include_fields=self._include_fields, + exclude_fields=self._exclude_fields) # First, search for the namespace in the plain namespaces. try: return self._plain[plain_src_ns] except KeyError: # Search for the namespace in the wildcard namespaces. - for regex, mapped in self._regex_map: + for regex, namespace in self._regex_map: new_name = match_replace_regex(regex, plain_src_ns, - mapped.dest_name) + namespace.dest_name) if not new_name: continue # Save the new target Namespace in the plain namespaces so # future lookups are fast. - new_mapped = Namespace( + new_namespace = namespace.with_options( dest_name=new_name, source_name=plain_src_ns) - self._add_plain_namespace(new_mapped) - return new_mapped + self._add_plain_namespace(new_namespace) + return new_namespace # Save the not included namespace to the excluded namespaces so # that future lookups of the same namespace are fast. @@ -208,9 +222,18 @@ def map_namespace(self, plain_src_ns): """Given a plain source namespace, return the corresponding plain target namespace, or None if it is not included. """ - mapped = self.lookup(plain_src_ns) - if mapped: - return mapped.dest_name + namespace = self.lookup(plain_src_ns) + if namespace: + return namespace.dest_name + return None + + def gridfs_namespace(self, plain_src_ns): + """Given a plain source namespace, return the corresponding plain + target namespace if this namespace is a gridfs collection. + """ + namespace = self.lookup(plain_src_ns) + if namespace and namespace.gridfs: + return namespace.dest_name return None def unmap_namespace(self, plain_target_ns): @@ -227,10 +250,10 @@ def unmap_namespace(self, plain_target_ns): for src_name in src_name_set: return src_name # The target namespace could also exist in the wildcard namespaces - for _, mapped in self._regex_map: + for _, namespace in self._regex_map: original_name = match_replace_regex( - namespace_to_regex(mapped.dest_name), plain_target_ns, - mapped.source_name) + namespace_to_regex(namespace.dest_name), plain_target_ns, + namespace.source_name) if original_name: return original_name return None @@ -255,6 +278,17 @@ def map_db(self, plain_src_db): self.lookup(plain_src_db + '.$cmd') return list(self._plain_db.get(plain_src_db, set())) + def projection(self, plain_src_name): + """Return the projection for the given source namespace.""" + mapped = self.lookup(plain_src_name) + if not mapped: + return None + fields = mapped.include_fields or mapped.exclude_fields + if fields: + include = 1 if mapped.include_fields else 0 + return dict((field, include) for field in fields) + return None + def _character_matches(name1, name2): """Yield the number of characters that match the beginning of each string. @@ -281,12 +315,24 @@ def wildcards_overlap(name1, name2): return False -def validate_target_namespaces(user_mapping): - """Validate that no target namespaces overlap exactly with each other. +def _validate_namespace(name): + """Validate a MongoDB namespace.""" + if name.find('.', 1, len(name) - 1) < 0: + raise errors.InvalidConfiguration( + "Invalid MongoDB namespace '%s'!" % (name,)) + - Also warns when wildcard namespaces have a chance of overlapping. +def _validate_namespaces(namespaces): + """Validate wildcards and renaming in namespaces. + + Target namespaces should have the same number of wildcards as the source. + No target namespaces overlap exactly with each other. Logs a warning + when wildcard namespaces have a chance of overlapping. """ - for source, target in user_mapping.items(): + for source, namespace in namespaces.items(): + target = namespace.dest_name + _validate_namespace(source) + _validate_namespace(target) if source.count("*") > 1 or target.count("*") > 1: raise errors.InvalidConfiguration( "The namespace mapping from '%s' to '%s' cannot contain more " @@ -308,23 +354,127 @@ def validate_target_namespaces(user_mapping): "source collection name must also appear in the target " "collection name" % (source, target)) - for namespace1, namespace2 in combinations(user_mapping.keys(), 2): - if wildcards_overlap(namespace1, namespace2): - LOG.warn('Namespaces "%s" and "%s" may match the ' - 'same source namespace.', namespace1, namespace2) - target1 = user_mapping[namespace1] - target2 = user_mapping[namespace2] + for source1, source2 in combinations(namespaces.keys(), 2): + if wildcards_overlap(source1, source2): + LOG.warning('Namespaces "%s" and "%s" may match the ' + 'same source namespace.', source1, source2) + target1 = namespaces[source1].dest_name + target2 = namespaces[source2].dest_name if target1 == target2: raise errors.InvalidConfiguration( "Multiple namespaces cannot be combined into one target " "namespace. Trying to map '%s' to '%s' but '%s' already " "corresponds to '%s' in the target system." % - (namespace2, target2, namespace1, target1)) + (source2, target2, source1, target1)) if wildcards_overlap(target1, target2): - LOG.warn("Multiple namespaces cannot be combined into one target " - "namespace. Mapping from '%s' to '%s' might overlap " - "with mapping from '%s' to '%s'." % - (namespace2, target2, namespace1, target1)) + LOG.warning( + "Multiple namespaces cannot be combined into one target " + "namespace. Mapping from '%s' to '%s' might overlap " + "with mapping from '%s' to '%s'." % + (source2, target2, source1, target1)) + + +def _merge_namespace_options(namespace_set=None, ex_namespace_set=None, + gridfs_set=None, dest_mapping=None, + namespace_options=None, + include_fields=None, exclude_fields=None): + """Merges namespaces options together. + + The first is the set of excluded namespaces and the second is a mapping + from source namespace to Namespace instances. + """ + namespace_set = set(namespace_set or []) + ex_namespace_set = set(ex_namespace_set or []) + gridfs_set = set(gridfs_set or []) + dest_mapping = dest_mapping or {} + namespace_options = namespace_options or {} + include_fields = set(include_fields or []) + exclude_fields = set(exclude_fields or []) + namespaces = {} + + for source_name, options_or_str in namespace_options.items(): + if isinstance(options_or_str, dict): + namespace_set.add(source_name) + if options_or_str.get('gridfs'): + gridfs_set.add(source_name) + namespaces[source_name] = Namespace( + dest_name=options_or_str.get('rename'), + include_fields=options_or_str.get('includeFields'), + exclude_fields=options_or_str.get('excludeFields'), + gridfs=options_or_str.get('gridfs', False)) + elif compat.is_string(options_or_str): + namespace_set.add(source_name) + namespaces[source_name] = Namespace(dest_name=options_or_str) + elif options_or_str: + namespace_set.add(source_name) + else: + ex_namespace_set.add(source_name) + + # Add namespaces that are renamed but not in namespace_options + for source_name, target_name in dest_mapping.items(): + namespaces[source_name] = namespaces.get( + source_name, Namespace()).with_options(dest_name=target_name) + + # Add namespaces that are included but not in namespace_options + for included_name in namespace_set: + if included_name not in namespaces: + namespaces[included_name] = Namespace() + + # Add namespaces that are excluded but not in namespace_options + for gridfs_name in gridfs_set: + namespaces[gridfs_name] = namespaces.get( + gridfs_name, Namespace()).with_options(gridfs=True) + + # Add source, destination name, and globally included and excluded fields + for included_name in namespaces: + namespace = namespaces[included_name] + namespace = namespace.with_options( + source_name=included_name, + include_fields=validate_include_fields(include_fields, + namespace.include_fields), + exclude_fields=validate_exclude_fields(exclude_fields, + namespace.exclude_fields)) + # The default destination name is the same as the source. + if not namespace.dest_name: + namespace = namespace.with_options(dest_name=included_name) + namespaces[included_name] = namespace + + return ex_namespace_set, namespaces + + +def validate_namespace_options(namespace_set=None, ex_namespace_set=None, + gridfs_set=None, dest_mapping=None, + namespace_options=None, + include_fields=None, exclude_fields=None): + ex_namespace_set, namespaces = _merge_namespace_options( + namespace_set=namespace_set, + ex_namespace_set=ex_namespace_set, + gridfs_set=gridfs_set, + dest_mapping=dest_mapping, + namespace_options=namespace_options, + include_fields=include_fields, + exclude_fields=exclude_fields) + + for excluded_name in ex_namespace_set: + _validate_namespace(excluded_name) + if excluded_name in namespaces: + raise errors.InvalidConfiguration( + "Cannot include namespace '%s', it is already excluded." % + (excluded_name,)) + + for namespace in namespaces.values(): + if namespace.include_fields and namespace.exclude_fields: + raise errors.InvalidConfiguration( + "Cannot mix include fields and exclude fields in " + "namespace mapping for: '%s'" % (namespace.source_name,)) + + if namespace.gridfs and namespace.dest_name != namespace.source_name: + raise errors.InvalidConfiguration( + "GridFS namespaces cannot be renamed: '%s'" % ( + namespace.source_name,)) + + _validate_namespaces(namespaces) + return ex_namespace_set, namespaces.values() def match_replace_regex(regex, src_namespace, dest_namespace): @@ -343,11 +493,28 @@ def wildcard_in_db(namespace): def namespace_to_regex(namespace): """Create a RegexObject from a wildcard namespace.""" - if wildcard_in_db(namespace): - # A database name cannot contain a '.' character - wildcard_group = '([^.]*)' - else: - wildcard_group = '(.*)' - return re.compile(r'\A' + - re.escape(namespace).replace('\*', wildcard_group) + - r'\Z') + db_name, coll_name = namespace.split('.', 1) + # A database name cannot contain a '.' character + db_regex = re.escape(db_name).replace('\*', '([^.]*)') + # But a collection name can. + coll_regex = re.escape(coll_name).replace('\*', '(.*)') + return re.compile(r'\A' + db_regex + r'\.' + coll_regex + r'\Z') + + +def validate_include_fields(include_fields, namespace_fields=None): + include_fields = set(include_fields or []) + namespace_fields = set(namespace_fields or []) + merged = include_fields | namespace_fields + if merged: + merged.add('_id') + return merged + + +def validate_exclude_fields(exclude_fields, namespace_fields=None): + exclude_fields = set(exclude_fields or []) + namespace_fields = set(namespace_fields or []) + merged = exclude_fields | namespace_fields + if '_id' in merged: + LOG.warning("Cannot exclude '_id' field, ignoring") + merged.discard('_id') + return merged diff --git a/mongo_connector/oplog_manager.py b/mongo_connector/oplog_manager.py index 5907e459..9652bdca 100755 --- a/mongo_connector/oplog_manager.py +++ b/mongo_connector/oplog_manager.py @@ -111,21 +111,12 @@ def __init__(self, primary_client, doc_managers, # Represents the last checkpoint for a OplogThread. self.oplog_progress = oplog_progress_dict - # The set of gridfs namespaces to process from the mongo cluster - self.gridfs_set = kwargs.get('gridfs_set', []) - # The namespace configuration self.namespace_config = namespace_config # Whether the collection dump gracefully handles exceptions self.continue_on_error = kwargs.get('continue_on_error', False) - # Set of fields to export - self._exclude_fields = set([]) - self.fields = kwargs.get('fields', None) - if kwargs.get('exclude_fields', None): - self.exclude_fields = kwargs['exclude_fields'] - LOG.info('OplogThread: Initializing oplog thread') self.oplog = self.primary_client.local.oplog.rs @@ -136,70 +127,6 @@ def __init__(self, primary_client, doc_managers, err_msg = 'OplogThread: No oplog for thread:' LOG.warning('%s %s' % (err_msg, self.primary_client)) - @property - def fields(self): - if self._fields: - return list(self._fields) - return None - - @property - def exclude_fields(self): - if self._exclude_fields: - return list(self._exclude_fields) - return None - - @fields.setter - def fields(self, value): - if self._exclude_fields: - raise errors.InvalidConfiguration( - "Cannot set 'fields' when 'exclude_fields' has already " - "been set to non-empty list.") - if value: - self._fields = set(value) - # Always include _id field - self._fields.add('_id') - self._projection = dict((field, 1) for field in self._fields) - else: - self._fields = set([]) - self._projection = None - - @exclude_fields.setter - def exclude_fields(self, value): - if self._fields: - raise errors.InvalidConfiguration( - "Cannot set 'exclude_fields' when 'fields' has already " - "been set to non-empty list.") - if value: - self._exclude_fields = set(value) - if '_id' in value: - LOG.warning("OplogThread: Cannot exclude '_id' field, " - "ignoring") - self._exclude_fields.remove('_id') - if not self._exclude_fields: - self._projection = None - else: - self._projection = dict( - (field, 0) for field in self._exclude_fields) - else: - self._exclude_fields = set([]) - self._projection = None - - @property - def gridfs_set(self): - return self._gridfs_set - - @gridfs_set.setter - def gridfs_set(self, gridfs_set): - self._gridfs_set = gridfs_set - self._gridfs_files_set = [ns + '.files' for ns in gridfs_set] - - @property - def gridfs_files_set(self): - try: - return self._gridfs_files_set - except AttributeError: - return [] - def _should_skip_entry(self, entry): """Determine if this oplog entry should be skipped. @@ -229,8 +156,8 @@ def _should_skip_entry(self, entry): is_gridfs_file = False if coll.endswith(".files"): - if ns in self.gridfs_files_set: - ns = ns[:-len(".files")] + ns = ns[:-len(".files")] + if self.namespace_config.gridfs_namespace(ns): is_gridfs_file = True else: return True, False @@ -242,22 +169,20 @@ def _should_skip_entry(self, entry): # Rename or filter out namespaces that are ignored keeping # included gridfs namespaces. - new_ns = self.namespace_config.map_namespace(ns) - if new_ns is None: - if is_gridfs_file: - # Gridfs files should not be skipped - new_ns = ns - else: - LOG.debug("OplogThread: Skipping oplog entry: " - "'%s' is not in the namespace set." % (ns,)) - return True, False + namespace = self.namespace_config.lookup(ns) + if namespace is None: + LOG.debug("OplogThread: Skipping oplog entry: " + "'%s' is not in the namespace configuration." % (ns,)) + return True, False - # update the namespace - entry['ns'] = new_ns + # Update the namespace. + entry['ns'] = namespace.dest_name # Take fields out of the oplog entry that shouldn't be replicated. # This may nullify the document if there's nothing to do. - if not self.filter_oplog_entry(entry): + if not self.filter_oplog_entry( + entry, include_fields=namespace.include_fields, + exclude_fields=namespace.exclude_fields): return True, False return False, is_gridfs_file @@ -470,10 +395,10 @@ def find_partial_matches(): # Field does not exactly match any key in the update doc. return list(find_partial_matches()) - def _pop_excluded_fields(self, doc, update=False): + def _pop_excluded_fields(self, doc, exclude_fields, update=False): # Remove all the fields that were passed in exclude_fields. find_fields = self._find_update_fields if update else self._find_field - for field in self._exclude_fields: + for field in exclude_fields: for path, _ in find_fields(field, doc): # Delete each matching field in the original document. temp_doc = doc @@ -483,10 +408,10 @@ def _pop_excluded_fields(self, doc, update=False): return doc # Need this to be similar to copy_included_fields. - def _copy_included_fields(self, doc, update=False): + def _copy_included_fields(self, doc, include_fields, update=False): new_doc = {} find_fields = self._find_update_fields if update else self._find_field - for field in self._fields: + for field in include_fields: for path, value in find_fields(field, doc): # Copy each matching field in the original document. temp_doc = new_doc @@ -496,30 +421,32 @@ def _copy_included_fields(self, doc, update=False): return new_doc - def filter_oplog_entry(self, entry): + def filter_oplog_entry(self, entry, include_fields=None, + exclude_fields=None): """Remove fields from an oplog entry that should not be replicated. NOTE: this does not support array indexing, for example 'a.b.2'""" - if not self._fields and not self._exclude_fields: + if not include_fields and not exclude_fields: return entry - elif self._fields: + elif include_fields: filter_fields = self._copy_included_fields else: filter_fields = self._pop_excluded_fields + fields = include_fields or exclude_fields entry_o = entry['o'] # 'i' indicates an insert. 'o' field is the doc to be inserted. if entry['op'] == 'i': - entry['o'] = filter_fields(entry_o) + entry['o'] = filter_fields(entry_o, fields) # 'u' indicates an update. The 'o' field describes an update spec # if '$set' or '$unset' are present. elif entry['op'] == 'u' and ('$set' in entry_o or '$unset' in entry_o): if '$set' in entry_o: - entry['o']["$set"] = filter_fields(entry_o["$set"], - update=True) + entry['o']["$set"] = filter_fields( + entry_o["$set"], fields, update=True) if '$unset' in entry_o: - entry['o']["$unset"] = filter_fields(entry_o["$unset"], - update=True) + entry['o']["$unset"] = filter_fields( + entry_o["$unset"], fields, update=True) # not allowed to have empty $set/$unset, so remove if empty if "$set" in entry_o and not entry_o['$set']: entry_o.pop("$set") @@ -530,7 +457,7 @@ def filter_oplog_entry(self, entry): # 'u' indicates an update. The 'o' field is the replacement document # if no '$set' or '$unset' are present. elif entry['op'] == 'u': - entry['o'] = filter_fields(entry_o) + entry['o'] = filter_fields(entry_o, fields) return entry @@ -574,7 +501,8 @@ def dump_collection(self): dump_cancelled = [False] def get_all_ns(): - all_ns_set = [] + ns_set = [] + gridfs_ns_set = [] db_list = retry_until_ok(self.primary_client.database_names) for database in db_list: if database == "config" or database == "local": @@ -585,34 +513,41 @@ def get_all_ns(): # ignore system collections if coll.startswith("system."): continue - # ignore gridfs collections - if coll.endswith(".files") or coll.endswith(".chunks"): + # ignore gridfs chunks collections + if coll.endswith(".chunks"): continue - namespace = "%s.%s" % (database, coll) - if self.namespace_config.map_namespace(namespace): - all_ns_set.append(namespace) - return all_ns_set - - dump_set = get_all_ns() + if coll.endswith(".files"): + namespace = "%s.%s" % (database, coll) + namespace = namespace[:-len(".files")] + if self.namespace_config.gridfs_namespace(namespace): + gridfs_ns_set.append(namespace) + else: + namespace = "%s.%s" % (database, coll) + if self.namespace_config.map_namespace(namespace): + ns_set.append(namespace) + return ns_set, gridfs_ns_set + + dump_set, gridfs_dump_set = get_all_ns() LOG.debug("OplogThread: Dumping set of collections %s " % dump_set) def docs_to_dump(from_coll): last_id = None attempts = 0 + projection = self.namespace_config.projection(from_coll.full_name) # Loop to handle possible AutoReconnect while attempts < 60: if last_id is None: cursor = retry_until_ok( from_coll.find, - projection=self._projection, + projection=projection, sort=[("_id", pymongo.ASCENDING)] ) else: cursor = retry_until_ok( from_coll.find, {"_id": {"$gt": last_id}}, - projection=self._projection, + projection=projection, sort=[("_id", pymongo.ASCENDING)] ) try: @@ -685,8 +620,12 @@ def do_dump(dm, error_queue): "collection dump") upsert_all(dm) + if gridfs_dump_set: + LOG.info("OplogThread: dumping GridFS collections: %s", + gridfs_dump_set) + # Dump GridFS files - for gridfs_ns in self.gridfs_set: + for gridfs_ns in gridfs_dump_set: mongo_coll = self.get_collection(gridfs_ns) from_coll = self.get_collection(gridfs_ns + '.files') dest_ns = self.namespace_config.map_namespace(gridfs_ns) @@ -963,7 +902,8 @@ def rollback(self): to_update = util.retry_until_ok( client[database][coll].find, {'_id': {'$in': bson_obj_id_list}}, - projection=self._projection + projection=self.namespace_config.projection( + original_namespace) ) # Doc list are docs in target system, to_update are # Docs in mongo diff --git a/tests/test_command_replication.py b/tests/test_command_replication.py index 65d10977..cd4b9f71 100644 --- a/tests/test_command_replication.py +++ b/tests/test_command_replication.py @@ -112,7 +112,7 @@ def test_command_helper(self): } helper = CommandHelper(NamespaceConfig( - namespace_set=list(mapping) + ['a.z'], user_mapping=mapping)) + namespace_set=list(mapping) + ['a.z'], namespace_options=mapping)) self.assertEqual(set(helper.map_db('a')), set(['a', 'b', 'c'])) self.assertEqual(helper.map_db('d'), []) diff --git a/tests/test_config.py b/tests/test_config.py index e9862af5..cafce8fc 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -253,17 +253,6 @@ def test_namespace_set_validation(self): } self.assertRaises(errors.InvalidConfiguration, self.load_options, args) - # ns_set and ex_ns_set should not exist both - args = { - "-n": "a.x,b.y", - "-x": "c.z" - } - self.assertRaises(errors.InvalidConfiguration, self.load_options, args) - d = { - 'namespaces': {'include': ['a.x', 'b.y'], 'exclude': ['c.z']} - } - self.assertRaises(errors.InvalidConfiguration, self.load_json, d) - # validate ns_set format args = { "-n": "a*.x*" @@ -274,16 +263,6 @@ def test_namespace_set_validation(self): } self.assertRaises(errors.InvalidConfiguration, self.load_json, d) - # validate ex_ns_set format - args = { - "-x": "a*.x*" - } - self.assertRaises(errors.InvalidConfiguration, self.load_options, args) - d = { - 'namespaces': {'exclude': ['a*.x*']} - } - self.assertRaises(errors.InvalidConfiguration, self.load_json, d) - # validate dest_ns_set format args = { "-n": "a.x*", @@ -303,6 +282,19 @@ def test_namespace_set_validation(self): } self.assertRaises(errors.InvalidConfiguration, self.load_json, d) + def test_validate_mixed_namespace_format(self): + # It is invalid to combine new and old namespace formats + mix_namespaces = [ + {'mapping': {'old.format': 'old.format2'}, 'new.format': True}, + {'gridfs': ['old.format'], 'new.format': True}, + {'include': ['old.format'], 'new.format': True}, + {'exclude': ['old.format'], 'new.format': True}, + ] + for namespaces in mix_namespaces: + with self.assertRaises(errors.InvalidConfiguration): + self.load_json({'namespaces': namespaces}) + + def test_doc_managers_from_args(self): # Test basic docmanager construction from args args = { diff --git a/tests/test_filter_fields.py b/tests/test_filter_fields.py index 34d5c061..f40730af 100644 --- a/tests/test_filter_fields.py +++ b/tests/test_filter_fields.py @@ -44,7 +44,7 @@ def tearDownClass(cls): cls.repl_set.stop() def setUp(self): - self.namespace_config = NamespaceConfig([], [], {}) + self.namespace_config = NamespaceConfig() self.opman = OplogThread( primary_client=self.primary_conn, doc_managers=(DocManager(),), @@ -59,22 +59,11 @@ def tearDown(self): # OplogThread may not have been started pass - def _check_fields(self, opman, fields, exclude_fields, projection): - if fields: - self.assertEqual(sorted(opman.fields), sorted(fields)) - self.assertEqual(opman._fields, set(fields)) - else: - self.assertEqual(opman.fields, None) - self.assertEqual(opman._fields, set([])) - if exclude_fields: - self.assertEqual(sorted(opman.exclude_fields), - sorted(exclude_fields)) - self.assertEqual(opman._exclude_fields, set(exclude_fields)) - else: - self.assertEqual(opman.exclude_fields, None) - self.assertEqual(opman._exclude_fields, set([])) - - self.assertEqual(opman._projection, projection) + def reset_include_fields(self, fields): + self.opman.namespace_config = NamespaceConfig(include_fields=fields) + + def reset_exclude_fields(self, fields): + self.opman.namespace_config = NamespaceConfig(exclude_fields=fields) def test_filter_fields(self): docman = self.opman.doc_managers[0] @@ -84,7 +73,7 @@ def test_filter_fields(self): exclude_fields = ["d", "e", "f"] # Set fields to care about - self.opman.fields = include_fields + self.reset_include_fields(include_fields) # Documents have more than just these fields doc = { "a": 1, "b": 2, "c": 3, @@ -130,67 +119,55 @@ def test_filter_exclude_oplog_entry(self): } } + def filter_doc(document, fields): + if fields and '_id' in fields: + fields.remove('_id') + return self.opman.filter_oplog_entry( + document, exclude_fields=fields) + # Case 0: insert op, no fields provided - self.opman.exclude_fields = None - filtered = self.opman.filter_oplog_entry(insert_op()) + filtered = filter_doc(insert_op(), None) self.assertEqual(filtered, insert_op()) - self.assertEqual(None, self.opman._projection) # Case 1: insert op, fields provided - self.opman.exclude_fields = ['c'] - filtered = self.opman.filter_oplog_entry(insert_op()) + filtered = filter_doc(insert_op(), ['c']) self.assertEqual(filtered['o'], {'_id': 0, 'a': 1, 'b': 2}) - self.assertEqual({'c': 0}, self.opman._projection) # Case 2: insert op, fields provided, doc becomes empty except for _id - self.opman.exclude_fields = ['a', 'b', 'c'] - filtered = self.opman.filter_oplog_entry(insert_op()) + filtered = filter_doc(insert_op(), ['a', 'b', 'c']) self.assertEqual(filtered['o'], {'_id': 0}) - self.assertEqual({'a': 0, 'b': 0, 'c': 0}, self.opman._projection) # Case 3: update op, no fields provided - self.opman.exclude_fields = None - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), None) self.assertEqual(filtered, update_op()) - self.assertEqual(None, self.opman._projection) # Case 4: update op, fields provided - self.opman.exclude_fields = ['b'] - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), ['b']) self.assertNotIn('b', filtered['o']['$set']) self.assertIn('a', filtered['o']['$set']) self.assertEqual(filtered['o']['$unset'], update_op()['o']['$unset']) - self.assertEqual({'b': 0}, self.opman._projection) # Case 5: update op, fields provided, empty $set - self.opman.exclude_fields = ['a', 'b'] - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), ['a', 'b']) self.assertNotIn('$set', filtered['o']) self.assertEqual(filtered['o']['$unset'], update_op()['o']['$unset']) - self.assertEqual({'a': 0, 'b': 0}, self.opman._projection) # Case 6: update op, fields provided, empty $unset - self.opman.exclude_fields = ['c'] - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), ['c']) self.assertNotIn('$unset', filtered['o']) self.assertEqual(filtered['o']['$set'], update_op()['o']['$set']) - self.assertEqual({'c': 0}, self.opman._projection) # Case 7: update op, fields provided, entry is nullified - self.opman.exclude_fields = ['a', 'b', 'c'] - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), ['a', 'b', 'c']) self.assertEqual(filtered, None) - self.assertEqual({'a': 0, 'b': 0, 'c': 0}, self.opman._projection) # Case 8: update op, fields provided, replacement - self.opman.exclude_fields = ['d', 'e', 'f'] - filtered = self.opman.filter_oplog_entry({ + filtered = filter_doc({ 'op': 'u', 'o': {'a': 1, 'b': 2, 'c': 3, 'd': 4} - }) + }, ['d', 'e', 'f']) self.assertEqual( filtered, {'op': 'u', 'o': {'a': 1, 'b': 2, 'c': 3}}) - self.assertEqual({'d': 0, 'e': 0, 'f': 0}, self.opman._projection) def test_filter_oplog_entry(self): # Test oplog entries: these are callables, since @@ -220,300 +197,62 @@ def test_filter_oplog_entry(self): } } + def filter_doc(document, fields): + if fields and '_id' not in fields: + fields.append('_id') + return self.opman.filter_oplog_entry( + document, include_fields=fields) + # Case 0: insert op, no fields provided - self.opman.fields = None - filtered = self.opman.filter_oplog_entry(insert_op()) + filtered = filter_doc(insert_op(), None) self.assertEqual(filtered, insert_op()) - self.assertEqual(None, self.opman._projection) # Case 1: insert op, fields provided - self.opman.fields = ['a', 'b'] - filtered = self.opman.filter_oplog_entry(insert_op()) + filtered = filter_doc(insert_op(), ['a', 'b']) self.assertEqual(filtered['o'], {'_id': 0, 'a': 1, 'b': 2}) - self.assertEqual({'_id': 1, 'a': 1, 'b': 1}, self.opman._projection) # Case 2: insert op, fields provided, doc becomes empty except for _id - self.opman.fields = ['d', 'e', 'f'] - filtered = self.opman.filter_oplog_entry(insert_op()) + filtered = filter_doc(insert_op(), ['d', 'e', 'f']) self.assertEqual(filtered['o'], {'_id': 0}) - self.assertEqual({'_id': 1, 'd': 1, 'e': 1, 'f': 1}, - self.opman._projection) # Case 3: update op, no fields provided - self.opman.fields = None - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), None) self.assertEqual(filtered, update_op()) - self.assertEqual(None, self.opman._projection) # Case 4: update op, fields provided - self.opman.fields = ['a', 'c'] - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), ['a', 'c']) self.assertNotIn('b', filtered['o']['$set']) self.assertIn('a', filtered['o']['$set']) self.assertEqual(filtered['o']['$unset'], update_op()['o']['$unset']) - self.assertEqual({'_id': 1, 'a': 1, 'c': 1}, self.opman._projection) # Case 5: update op, fields provided, empty $set - self.opman.fields = ['c'] - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), ['c']) self.assertNotIn('$set', filtered['o']) self.assertEqual(filtered['o']['$unset'], update_op()['o']['$unset']) - self.assertEqual({'_id': 1, 'c': 1}, self.opman._projection) # Case 6: update op, fields provided, empty $unset - self.opman.fields = ['a', 'b'] - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), ['a', 'b']) self.assertNotIn('$unset', filtered['o']) self.assertEqual(filtered['o']['$set'], update_op()['o']['$set']) - self.assertEqual({'_id': 1, 'a': 1, 'b': 1}, self.opman._projection) # Case 7: update op, fields provided, entry is nullified - self.opman.fields = ['d', 'e', 'f'] - filtered = self.opman.filter_oplog_entry(update_op()) + filtered = filter_doc(update_op(), ['d', 'e', 'f']) self.assertEqual(filtered, None) - self.assertEqual({'_id': 1, 'd': 1, 'e': 1, 'f': 1}, - self.opman._projection) # Case 8: update op, fields provided, replacement - self.opman.fields = ['a', 'b', 'c'] - filtered = self.opman.filter_oplog_entry({ + filtered = filter_doc({ 'op': 'u', 'o': {'a': 1, 'b': 2, 'c': 3, 'd': 4} - }) + }, ['a', 'b', 'c']) self.assertEqual( filtered, {'op': 'u', 'o': {'a': 1, 'b': 2, 'c': 3}}) - self.assertEqual({'_id': 1, 'a': 1, 'b': 1, 'c': 1}, - self.opman._projection) - - def test_exclude_fields_constructor(self): - # Test with the "_id" field in exclude_fields - exclude_fields = ["_id", "title", "content", "author"] - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - exclude_fields=exclude_fields - ) - exclude_fields.remove('_id') - self._check_fields(opman, [], exclude_fields, - dict((f, 0) for f in exclude_fields)) - extra_fields = exclude_fields + ['extra1', 'extra2'] - filtered = opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in ['extra1', 'extra2']), filtered) - - # Test without "_id" field included in exclude_fields - exclude_fields = ["title", "content", "author"] - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - exclude_fields=exclude_fields - ) - self._check_fields(opman, [], exclude_fields, - dict((f, 0) for f in exclude_fields)) - extra_fields = extra_fields + ['extra1', 'extra2'] - filtered = opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual({'extra1': 1, 'extra2': 1}, filtered) - - # Test with only "_id" field in exclude_fields - exclude_fields = ["_id"] - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - exclude_fields=exclude_fields - ) - self._check_fields(opman, [], [], None) - extra_fields = exclude_fields + ['extra1', 'extra2'] - filtered = opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in extra_fields), filtered) - - # Test with nothing set for exclude_fields - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - exclude_fields=None - ) - self._check_fields(opman, [], [], None) - extra_fields = ['_id', 'extra1', 'extra2'] - filtered = opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in extra_fields), filtered) - - def test_fields_constructor(self): - # Test with "_id" field in constructor - fields = ["_id", "title", "content", "author"] - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - fields=fields - ) - self._check_fields(opman, fields, [], - dict((f, 1) for f in fields)) - extra_fields = fields + ['extra1', 'extra2'] - filtered = opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in fields), filtered) - - # Test without "_id" field in constructor - fields = ["title", "content", "author"] - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - fields=fields - ) - fields.append('_id') - self._check_fields(opman, fields, [], - dict((f, 1) for f in fields)) - extra_fields = fields + ['extra1', 'extra2'] - filtered = opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in fields), filtered) - - # Test with only "_id" field - fields = ["_id"] - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - fields=fields - ) - self._check_fields(opman, fields, [], - dict((f, 1) for f in fields)) - extra_fields = fields + ['extra1', 'extra2'] - filtered = opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual({'_id': 1}, filtered) - - # Test with no fields set - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config - ) - self._check_fields(opman, [], [], None) - extra_fields = ['_id', 'extra1', 'extra2'] - filtered = opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in extra_fields), filtered) - - def test_exclude_fields_attr(self): - # Test with the "_id" field in exclude_fields. - exclude_fields = ["_id", "title", "content", "author"] - exclude_fields.remove('_id') - self.opman.exclude_fields = exclude_fields - self._check_fields(self.opman, [], exclude_fields, - dict((f, 0) for f in exclude_fields)) - extra_fields = exclude_fields + ['extra1', 'extra2'] - filtered = self.opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in ['extra1', 'extra2']), filtered) - - # Test without "_id" field included in exclude_fields - exclude_fields = ["title", "content", "author"] - self.opman.exclude_fields = exclude_fields - self._check_fields(self.opman, [], exclude_fields, - dict((f, 0) for f in exclude_fields)) - extra_fields = extra_fields + ['extra1', 'extra2'] - filtered = self.opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual({'extra1': 1, 'extra2': 1}, filtered) - - # Test with only "_id" field in exclude_fields - exclude_fields = ["_id"] - self.opman.exclude_fields = exclude_fields - self._check_fields(self.opman, [], [], None) - extra_fields = exclude_fields + ['extra1', 'extra2'] - filtered = self.opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in extra_fields), filtered) - - # Test with nothing set for exclude_fields - self.opman.exclude_fields = None - self._check_fields(self.opman, [], [], None) - extra_fields = ['_id', 'extra1', 'extra2'] - filtered = self.opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in extra_fields), filtered) - - def test_fields_attr(self): - # Test with "_id" field included in fields - fields = ["_id", "title", "content", "author"] - self.opman.fields = fields - self._check_fields(self.opman, fields, [], - dict((f, 1) for f in fields)) - extra_fields = fields + ['extra1', 'extra2'] - filtered = self.opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in fields), filtered) - - # Test without "_id" field included in fields - fields = ["title", "content", "author"] - self.opman.fields = fields - fields.append('_id') - self._check_fields(self.opman, fields, [], - dict((f, 1) for f in fields)) - extra_fields = fields + ['extra1', 'extra2'] - filtered = self.opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in fields), filtered) - - # Test with only "_id" field - fields = ["_id"] - self.opman.fields = fields - self._check_fields(self.opman, fields, [], - dict((f, 1) for f in fields)) - extra_fields = fields + ['extra1', 'extra2'] - filtered = self.opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual({'_id': 1}, filtered) - - # Test with no fields set - self.opman.fields = None - self._check_fields(self.opman, [], [], None) - extra_fields = ['_id', 'extra1', 'extra2'] - filtered = self.opman.filter_oplog_entry( - {'op': 'i', - 'o': dict((f, 1) for f in extra_fields)})['o'] - self.assertEqual(dict((f, 1) for f in extra_fields), filtered) def test_nested_fields(self): def check_nested(document, fields, filtered_document, op='i'): - self.opman.fields = fields - fields.append('_id') - self.assertEqual(set(fields), self.opman._fields) - self.assertEqual(sorted(fields), sorted(self.opman.fields)) + if '_id' not in fields: + fields.append('_id') filtered_result = self.opman.filter_oplog_entry( - {'op': op, 'o': document}) + {'op': op, 'o': document}, include_fields=fields) if filtered_result is not None: filtered_result = filtered_result['o'] self.assertEqual(filtered_result, filtered_document) @@ -598,14 +337,10 @@ def check_nested(document, fields, filtered_document, op='i'): def test_nested_exclude_fields(self): def check_nested(document, exclude_fields, filtered_document, op='i'): - self.opman.exclude_fields = exclude_fields if '_id' in exclude_fields: exclude_fields.remove('_id') - self.assertEqual(set(exclude_fields), self.opman._exclude_fields) - self.assertEqual(sorted(exclude_fields), - sorted(self.opman.exclude_fields)) filtered_result = self.opman.filter_oplog_entry( - {'op': op, 'o': document}) + {'op': op, 'o': document}, exclude_fields=exclude_fields) if filtered_result is not None: filtered_result = filtered_result['o'] self.assertEqual(filtered_result, filtered_document) @@ -718,57 +453,6 @@ def check_nested(document, exclude_fields, filtered_document, op='i'): filtered_update = None check_nested(update, exclude_fields, filtered_update, op='u') - def test_fields_and_exclude(self): - fields = ['a', 'b', 'c', '_id'] - exclude_fields = ['x', 'y', 'z'] - - # Test setting both to None in constructor - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - fields=None, - exclude_fields=None - ) - self._check_fields(opman, [], [], None) - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - fields=None, - exclude_fields=exclude_fields - ) - self._check_fields(opman, [], exclude_fields, - dict((f, 0) for f in exclude_fields)) - # Test setting fields when exclude_fields is set - self.assertRaises( - errors.InvalidConfiguration, setattr, opman, "fields", fields) - self.assertRaises( - errors.InvalidConfiguration, setattr, opman, "fields", None) - opman = OplogThread( - primary_client=self.primary_conn, - doc_managers=(DocManager(),), - oplog_progress_dict=LockingDict(), - namespace_config=self.namespace_config, - exclude_fields=None, - fields=fields - ) - self._check_fields(opman, fields, [], dict((f, 1) for f in fields)) - self.assertRaises(errors.InvalidConfiguration, setattr, opman, - "exclude_fields", exclude_fields) - self.assertRaises(errors.InvalidConfiguration, setattr, opman, - "exclude_fields", None) - self.assertRaises( - errors.InvalidConfiguration, OplogThread, - self.primary_conn, - (DocManager(),), - LockingDict(), - self.namespace_config, - fields=fields, - exclude_fields=exclude_fields) - class TestFindFields(unittest.TestCase): def test_find_field(self): diff --git a/tests/test_mongo.py b/tests/test_mongo.py index f5d9e656..c30e28e2 100644 --- a/tests/test_mongo.py +++ b/tests/test_mongo.py @@ -29,6 +29,7 @@ from mongo_connector.connector import Connector from mongo_connector.util import retry_until_ok from mongo_connector.test_utils import (ReplicaSet, + ReplicaSetSingle, Server, connector_opts, assert_soon, @@ -70,33 +71,17 @@ def _search(self, **kwargs): yield doc def _remove(self): - self.mongo_conn['test']['test'].drop() - self.mongo_conn['test']['test.files'].drop() - self.mongo_conn['test']['test.chunks'].drop() + for db in self.mongo_conn.database_names(): + if db not in ["local", "admin"]: + self.mongo_conn.drop_database(db) -class TestMongo(MongoTestCase): - """ Tests the mongo instance - """ - - @classmethod - def setUpClass(cls): - MongoTestCase.setUpClass() - cls.repl_set = ReplicaSet().start() - cls.conn = cls.repl_set.client() - - @classmethod - def tearDownClass(cls): - """ Kills cluster instance - """ - MongoTestCase.tearDownClass() - cls.repl_set.stop() - - def tearDown(self): - self.connector.join() - self.drop_all_databases() +class MongoReplicaSetTestCase(MongoTestCase): def setUp(self): + self.repl_set = self.replica_set_class().start() + self.conn = self.repl_set.client() + try: os.unlink("oplog.timestamp") except OSError: @@ -104,16 +89,15 @@ def setUp(self): self._remove() self.connector = Connector( mongo_address=self.repl_set.uri, - ns_set=['test.test', 'rename.me'], - dest_mapping={'rename.me': 'new.target', - 'rename.me2': 'new2.target2'}, doc_managers=(self.mongo_doc,), - gridfs_set=['test.test'], + namespace_options={ + 'test.test': {'gridfs': True}, + 'rename.me': 'new.target', + 'rename.me2': 'new2.target2' + }, **connector_opts ) - self.drop_all_databases() - self.connector.start() assert_soon(lambda: len(self.connector.shard_set) > 0) assert_soon(lambda: sum(1 for _ in self._search()) == 0) @@ -126,6 +110,19 @@ def drop_all_databases(self): if name not in ["local", "admin"]: self.conn.drop_database(name) + def tearDown(self): + self.connector.join() + self.drop_all_databases() + self.repl_set.stop() + + +class TestMongoReplicaSetSingle(MongoReplicaSetTestCase): + """ Tests MongoDB to MongoDB DocManager replication with a 1 node replica + set. + """ + + replica_set_class = ReplicaSetSingle + def test_insert(self): """Tests insert """ @@ -264,6 +261,14 @@ def test_rename_collection_renamed(self): lambda: "target" not in self.mongo_conn.new.collection_names()) self.check_renamed_insert("new2.target2") + +class TestMongoReplicaSet(MongoReplicaSetTestCase): + """ Tests MongoDB to MongoDB DocManager replication with a 3 node replica + set. + """ + + replica_set_class = ReplicaSet + def test_rollback(self): """Tests rollback. We force a rollback by adding a doc, killing the primary, adding another doc, killing the new primary, and then diff --git a/tests/test_mongo_connector.py b/tests/test_mongo_connector.py index 5a7e4ee3..897eb09b 100755 --- a/tests/test_mongo_connector.py +++ b/tests/test_mongo_connector.py @@ -56,7 +56,6 @@ def test_connector(self): """ conn = Connector( mongo_address=self.repl_set.uri, - ns_set=['test.test'], **connector_opts ) conn.start() @@ -81,7 +80,6 @@ def test_write_oplog_progress(self): conn = Connector( mongo_address=self.repl_set.uri, oplog_checkpoint="temp_oplog.timestamp", - ns_set=['test.test'], **connector_opts ) @@ -118,7 +116,6 @@ def test_read_oplog_progress(self): conn = Connector( mongo_address=self.repl_set.uri, oplog_checkpoint=None, - ns_set=['test.test'], **connector_opts ) diff --git a/tests/test_mongo_doc_manager.py b/tests/test_mongo_doc_manager.py index 67b4f0f7..5ce9f0c1 100644 --- a/tests/test_mongo_doc_manager.py +++ b/tests/test_mongo_doc_manager.py @@ -232,7 +232,7 @@ def test_commands(self): # renaming a database. namespace_config = NamespaceConfig( namespace_set=['test.test', 'test.test2', 'test.drop'], - user_mapping={ + namespace_options={ 'test.test': 'test.othertest', 'test.drop': 'dropped.collection' }) diff --git a/tests/test_namespace_config.py b/tests/test_namespace_config.py index 82ca9fc2..da545c93 100644 --- a/tests/test_namespace_config.py +++ b/tests/test_namespace_config.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json +import os import re from tests import unittest @@ -21,7 +23,11 @@ from mongo_connector import errors -class TestDNamespaceConfig(unittest.TestCase): +CONFIG_PATH = os.path.join(os.path.dirname(os.path.dirname( + os.path.realpath(__file__))), "config.json") + + +class TestNamespaceConfig(unittest.TestCase): """Test the NamespaceConfig class""" def test_default(self): @@ -33,6 +39,12 @@ def test_default(self): self.assertEqual(namespace_config.map_namespace("db1.col1"), "db1.col1") + def test_config(self): + """Test that the namespace option in the example config is valid.""" + with open(CONFIG_PATH) as filep: + namespaces = json.load(filep)['__namespaces'] + NamespaceConfig(namespace_options=namespaces) + def test_include_plain(self): """Test including namespaces without wildcards""" namespace_config = NamespaceConfig( @@ -54,13 +66,15 @@ def test_include_wildcard(self): """Test including namespaces with wildcards""" equivalent_namespace_configs = ( NamespaceConfig(namespace_set=["db1.*"]), - NamespaceConfig(user_mapping={"db1.*": {}}), - NamespaceConfig(user_mapping={"db1.*": {"rename": "db1.*"}})) + NamespaceConfig(namespace_options={"db1.*": {}}), + NamespaceConfig(namespace_options={"db1.*": True}), + NamespaceConfig(namespace_options={"db1.*": {"rename": "db1.*"}}) + ) for namespace_config in equivalent_namespace_configs: self.assertEqual(namespace_config.unmap_namespace("db1.col1"), "db1.col1") - self.assertEqual(namespace_config.unmap_namespace("db1.col1"), - "db1.col1") + self.assertEqual(namespace_config.unmap_namespace("db1.col2"), + "db1.col2") self.assertEqual(namespace_config.lookup("db1.col1"), Namespace(dest_name="db1.col1", source_name="db1.col1")) @@ -71,7 +85,7 @@ def test_include_wildcard(self): def test_map_db_wildcard(self): """Test a crazy namespace renaming scheme with wildcards.""" - namespace_config = NamespaceConfig(user_mapping={ + namespace_config = NamespaceConfig(namespace_options={ "db.1_*": "db1.new_*", "db.2_*": "db2.new_*", "db.3": "new_db.3"}) @@ -106,6 +120,30 @@ def test_include_wildcard_metacharacters(self): "db&_foo.$_^_#_!_[_]_") self.assertIsNone(namespace_config.map_namespace("db&.foo")) + def test_gridfs(self): + """Test the gridfs property is set correctly.""" + equivalent_namespace_configs = ( + NamespaceConfig(gridfs_set=["db1.*"]), + NamespaceConfig(namespace_options={"db1.*": {"gridfs": True}}) + ) + for namespace_config in equivalent_namespace_configs: + self.assertEqual(namespace_config.unmap_namespace("db1.col1"), + "db1.col1") + self.assertEqual(namespace_config.unmap_namespace("db1.col2"), + "db1.col2") + self.assertEqual(namespace_config.lookup("db1.col1"), + Namespace(dest_name="db1.col1", + source_name="db1.col1", + gridfs=True)) + self.assertListEqual(namespace_config.map_db("db1"), ["db1"]) + self.assertEqual(namespace_config.map_namespace("db1.col1"), + "db1.col1") + self.assertIsNone(namespace_config.map_namespace("db2.col4")) + self.assertTrue(namespace_config.lookup("db1.col1").gridfs) + self.assertEqual(namespace_config.gridfs_namespace("db1.col1"), + "db1.col1") + self.assertIsNone(namespace_config.gridfs_namespace("not.gridfs")) + def test_exclude_plain(self): """Test excluding namespaces without wildcards""" namespace_config = NamespaceConfig(ex_namespace_set=["ex.clude"]) @@ -117,17 +155,60 @@ def test_exclude_plain(self): def test_exclude_wildcard(self): """Test excluding namespaces with wildcards""" - namespace_config = NamespaceConfig(ex_namespace_set=["ex.*"]) - self.assertEqual(namespace_config.unmap_namespace("db.col"), "db.col") - self.assertEqual(namespace_config.unmap_namespace("ex.clude"), - "ex.clude") - self.assertEqual(namespace_config.map_namespace("db.col"), "db.col") - self.assertIsNone(namespace_config.map_namespace("ex.clude")) - self.assertIsNone(namespace_config.map_namespace("ex.clude2")) + equivalent_namespace_configs_for_tests = ( + NamespaceConfig(ex_namespace_set=["ex.*", "ex2.*"]), + NamespaceConfig(namespace_options={"ex.*": False, "ex2.*": False}), + # Multiple wildcards in exclude namespace + NamespaceConfig(ex_namespace_set=["e*.*"]), + ) + for namespace_config in equivalent_namespace_configs_for_tests: + self.assertEqual(namespace_config.unmap_namespace("db.col"), + "db.col") + self.assertEqual(namespace_config.unmap_namespace("ex.clude"), + "ex.clude") + self.assertEqual(namespace_config.map_namespace("db.col"), + "db.col") + self.assertIsNone(namespace_config.map_namespace("ex.clude")) + self.assertIsNone(namespace_config.map_namespace("ex2.clude")) + + def test_include_and_exclude(self): + """Test including and excluding namespaces at the same time.""" + equivalent_namespace_configs_for_tests = ( + NamespaceConfig(ex_namespace_set=["ex.*"], + namespace_set=["ex.cluded_still", "in.cluded"]), + NamespaceConfig(namespace_options={ + "ex.*": False, "ex.cluded_still": True, "in.cluded": True}), + NamespaceConfig(ex_namespace_set=["ex.cluded", "ex.cluded_still"], + namespace_set=["ex.*", "in.cluded"]) + ) + for namespace_config in equivalent_namespace_configs_for_tests: + self.assertIsNone(namespace_config.map_namespace("ex.cluded")) + # Excluded namespaces take precedence over included ones. + self.assertIsNone( + namespace_config.map_namespace("ex.cluded_still")) + # Namespaces that are not explicitly included are ignored. + self.assertIsNone( + namespace_config.map_namespace("also.not.included")) + self.assertEqual( + namespace_config.map_namespace("in.cluded"), "in.cluded") + + def test_include_and_exclude_validation(self): + """Test including and excluding the same namespaces is an error.""" + equivalent_namespace_config_kwargs = ( + dict(ex_namespace_set=["ex.cluded"], + namespace_set=["in.cluded", "ex.cluded"]), + dict(namespace_set=["ex.cluded"], namespace_options={ + "ex.cluded": False, "in.cluded": True}), + dict(ex_namespace_set=["ex.cluded", "in.cluded"], + namespace_options={"in.cluded": True}) + ) + for kwargs in equivalent_namespace_config_kwargs: + with self.assertRaises(errors.InvalidConfiguration): + NamespaceConfig(**kwargs) def test_unmap_namespace_wildcard(self): """Test un-mapping a namespace that was never explicitly mapped.""" - namespace_config = NamespaceConfig(user_mapping={ + namespace_config = NamespaceConfig(namespace_options={ "db2.*": "db2.f*", "db_*.foo": "db_new_*.foo", }) @@ -135,22 +216,70 @@ def test_unmap_namespace_wildcard(self): self.assertEqual(namespace_config.unmap_namespace("db_new_123.foo"), "db_123.foo") + def test_override_namespace_options(self): + """Test gridfs_set and dest_mapping arguments override + namespace_options. + """ + namespace_config = NamespaceConfig( + namespace_set=["override.me", "override.me2"], + gridfs_set=["override.me3"], + dest_mapping={ + "override.me": "overridden.1", + "override.me2": "overridden.2" + }, + namespace_options={ + "override.me": { + "rename": "override.me", + "includeFields": ["_id", "dont_remove"] + }, + "override.me2": "override.me2", + "override.me3": {'gridfs': False} + + } + ) + overridden = namespace_config.lookup("override.me") + self.assertEqual(overridden.dest_name, "overridden.1") + self.assertEqual(overridden.include_fields, + set(["_id", "dont_remove"])) + overridden = namespace_config.lookup("override.me2") + self.assertEqual(overridden.dest_name, "overridden.2") + self.assertFalse(overridden.include_fields) + self.assertTrue(namespace_config.gridfs_namespace("override.me3")) + + def test_invalid_collection_name_validation(self): + """Test that invalid collection names raise InvalidConfiguration.""" + equivalent_namespace_config_kwargs = ( + dict(namespace_options={"invalid_db": "newinvalid_db"}), + dict(namespace_set=["invalid_db."]), + dict(ex_namespace_set=[".invalid_db"]), + dict(gridfs_set=[".invalid_db"]) + ) + for kwargs in equivalent_namespace_config_kwargs: + with self.assertRaises(errors.InvalidConfiguration): + NamespaceConfig(**kwargs) + + def test_gridfs_rename_invalid(self): + """Test that renaming a GridFS collection is invalid.""" + with self.assertRaises(errors.InvalidConfiguration): + NamespaceConfig(namespace_options={ + "gridfs.*": {'rename': 'new_gridfs.*', 'gridfs': True}}) + def test_rename_validation(self): """Test namespace renaming validation.""" # Multiple collections cannot be merged into the same target namespace with self.assertRaises(errors.InvalidConfiguration): - NamespaceConfig(user_mapping={ + NamespaceConfig(namespace_options={ "db1.col1": "newdb.newcol", "db2.col1": "newdb.newcol"}) # Multiple collections cannot be merged into the same target namespace with self.assertRaises(errors.InvalidConfiguration): - NamespaceConfig(user_mapping={ + NamespaceConfig(namespace_options={ "db*.col1": "newdb.newcol*", "db*.col2": "newdb.newcol*"}) # Multiple collections cannot be merged into the same target namespace - namespace_config = NamespaceConfig(user_mapping={ + namespace_config = NamespaceConfig(namespace_options={ "*.coll": "*.new_coll", "db.*": "new_db.*"}) namespace_config.map_namespace("new_db.coll") @@ -162,12 +291,70 @@ def test_rename_validation(self): # For the sake of map_db, wildcards cannot be moved from database name # to collection name. with self.assertRaises(errors.InvalidConfiguration): - NamespaceConfig(user_mapping={"db*.col": "new_db.col_*"}) + NamespaceConfig(namespace_options={"db*.col": "new_db.col_*"}) # For the sake of map_db, wildcards cannot be moved from collection # name to database name. with self.assertRaises(errors.InvalidConfiguration): - NamespaceConfig(user_mapping={"db.*": "new_db_*.col"}) + NamespaceConfig(namespace_options={"db.*": "new_db_*.col"}) + + def test_fields_validation(self): + """Test including/excluding fields per namespace.""" + # Cannot include and exclude fields in the same namespace + with self.assertRaises(errors.InvalidConfiguration): + NamespaceConfig(namespace_options={ + "db.col": {"includeFields": ["a"], "excludeFields": ["b"]}}) + + # Cannot include fields globally and then exclude fields + with self.assertRaises(errors.InvalidConfiguration): + NamespaceConfig(include_fields=["a"], namespace_options={ + "db.col": {"excludeFields": ["b"]}}) + + # Cannot exclude fields globally and then include fields + with self.assertRaises(errors.InvalidConfiguration): + NamespaceConfig(exclude_fields=["b"], namespace_options={ + "db.col": {"includeFields": ["a"]}}) + + def test_projection_include_wildcard(self): + """Test include_fields on a wildcard namespace.""" + equivalent_namespace_configs = ( + NamespaceConfig(include_fields=["foo", "nested.field"], + ex_namespace_set=["ignored.name"]), + NamespaceConfig(include_fields=["foo", "nested.field"], + namespace_set=["db.foo"]), + NamespaceConfig(include_fields=["foo", "nested.field"], + namespace_set=["db.*"]), + NamespaceConfig(namespace_options={ + "db.*": {"includeFields": ["foo", "nested.field"]}}), + NamespaceConfig(namespace_options={ + "db.foo": {"includeFields": ["foo", "nested.field"]}}), + NamespaceConfig(include_fields=["foo", "nested.field"], + namespace_options={ + "db.*": { + "includeFields": ["foo", "nested.field"] + }}) + ) + for namespace_config in equivalent_namespace_configs: + self.assertEqual(namespace_config.projection("db.foo"), + {"_id": 1, "foo": 1, "nested.field": 1}) + self.assertIsNone(namespace_config.projection("ignored.name")) + + def test_projection_exclude_wildcard(self): + """Test exclude_fields on a wildcard namespace.""" + equivalent_namespace_configs = ( + NamespaceConfig(exclude_fields=["_id", "foo", "nested.field"], + namespace_set=["db.*"]), + NamespaceConfig(namespace_options={ + "db.*": {"excludeFields": ["_id", "foo", "nested.field"]}}), + NamespaceConfig( + exclude_fields=["foo", "nested.field"], + namespace_options={"db.*": { + "excludeFields": ["_id", "foo", "nested.field"]}}) + ) + for namespace_config in equivalent_namespace_configs: + self.assertEqual(namespace_config.projection("db.foo"), + {"foo": 0, "nested.field": 0}) + self.assertIsNone(namespace_config.projection("ignored.name")) def test_match_replace_regex(self): """Test regex matching and replacing.""" diff --git a/tests/test_oplog_manager.py b/tests/test_oplog_manager.py index 9c2aa31b..5e9d1a99 100755 --- a/tests/test_oplog_manager.py +++ b/tests/test_oplog_manager.py @@ -21,6 +21,7 @@ import time import bson +import gridfs import pymongo sys.path[0:0] = [""] @@ -49,7 +50,12 @@ def setUp(self): primary_client=self.primary_conn, doc_managers=(DocManager(),), oplog_progress_dict=LockingDict(), - namespace_config=NamespaceConfig(), + namespace_config=NamespaceConfig( + namespace_options={ + 'test.*': True, + 'gridfs.*': {'gridfs': True} + } + ), ) def tearDown(self): @@ -121,7 +127,7 @@ def test_dump_collection(self): Cases: 1. empty oplog - 2. non-empty oplog + 2. non-empty oplog, with gridfs collections 3. non-empty oplog, specified a namespace-set, none of the oplog entries are for collections in the namespace-set """ @@ -131,15 +137,21 @@ def test_dump_collection(self): last_ts = self.opman.dump_collection() self.assertEqual(last_ts, None) - # Test with non-empty oplog + # Test with non-empty oplog with gridfs collections self.opman.oplog = self.primary_conn["local"]["oplog.rs"] + # Insert 10 gridfs files + for i in range(10): + fs = gridfs.GridFS(self.primary_conn["gridfs"], + collection="test" + str(i)) + fs.put(b"hello world") + # Insert 1000 documents for i in range(1000): self.primary_conn["test"]["test"].insert_one({ "i": i + 500 }) last_ts = self.opman.get_last_oplog_timestamp() self.assertEqual(last_ts, self.opman.dump_collection()) - self.assertEqual(len(self.opman.doc_managers[0]._search()), 1000) + self.assertEqual(len(self.opman.doc_managers[0]._search()), 1010) # Case 3 # 1MB oplog so that we can rollover quickly @@ -314,7 +326,7 @@ def test_namespace_mapping(self): dest_mapping = {"test.test1": "test.test1_dest", "test.test2": "test.test2_dest"} self.opman.namespace_config = NamespaceConfig( - namespace_set=source_ns, user_mapping=dest_mapping) + namespace_set=source_ns, namespace_options=dest_mapping) docman = self.opman.doc_managers[0] # start replicating self.opman.start() diff --git a/tests/test_oplog_manager_wildcard.py b/tests/test_oplog_manager_wildcard.py index 14c7ea49..715eb336 100755 --- a/tests/test_oplog_manager_wildcard.py +++ b/tests/test_oplog_manager_wildcard.py @@ -48,7 +48,7 @@ def setUp(self): def reset_opman(self, include_ns=None, exclude_ns=None, dest_mapping=None): self.namespace_config = NamespaceConfig(namespace_set=include_ns, ex_namespace_set=exclude_ns, - user_mapping=dest_mapping) + namespace_options=dest_mapping) self.opman = OplogThread( primary_client=self.primary_conn, doc_managers=(DocManager(),), diff --git a/tests/test_synchronizer.py b/tests/test_synchronizer.py index 66fb9747..a83d83d0 100644 --- a/tests/test_synchronizer.py +++ b/tests/test_synchronizer.py @@ -22,6 +22,7 @@ sys.path[0:0] = [""] from mongo_connector.connector import Connector +from mongo_connector.namespace_config import NamespaceConfig from mongo_connector.test_utils import (assert_soon, connector_opts, ReplicaSetSingle) @@ -138,7 +139,8 @@ def update_and_retrieve(update_spec, replace=False): # ensure update works when fields are given opthread = self.connector.shard_set[0] - opthread.fields = ['a', 'b', 'c'] + opthread.namespace_config = NamespaceConfig( + include_fields=['a', 'b', 'c']) try: doc = update_and_retrieve({"$set": {"d": 10}}) self.assertEqual(self.conn.test.test.find_one(doc['_id'])['d'], 10)