Skip to content

Commit

Permalink
Add support for including or excluding fields per namespace (#612)
Browse files Browse the repository at this point in the history
Add support for including or excluding fields per namespace
Add support for wildcards in GridFS namespaces.
Add support for excluding and including different namespaces at the same time.
 - previously it was a configuration error.
Add a new format for the `namespaces` configuration file option.
 - it is invalid to mix old and new namespaces configuration.
  • Loading branch information
ShaneHarvey committed Dec 21, 2016
1 parent 1b9415f commit 8e0a771
Show file tree
Hide file tree
Showing 14 changed files with 739 additions and 711 deletions.
45 changes: 34 additions & 11 deletions config.json
Expand Up @@ -37,18 +37,41 @@

"__fields": ["field1", "field2", "field3"],

"namespaces": {
"__include": ["db.source1", "db.source2", "db2.*"],
"__exclude": ["db3.source1", "db3.*"],
"__mapping": {
"db.source1": "db.dest1",
"db.source2": "db.dest2",
"db2.*": "db2.dest_*",
"db.col_*": {
"rename": "db.new_name_*"
}
"__namespaces": {
"excluded.collection": false,
"excluded_wildcard.*": false,
"*.exclude_collection_from_every_database": false,
"included.collection1": true,
"included.collection2": {},
"included.collection4": {
"includeFields": ["included_field", "included.nested.field"]
},
"__gridfs": ["db.fs"]
"included.collection5": {
"rename": "included.new_collection5_name",
"includeFields": ["included_field", "included.nested.field"]
},
"included.collection6": {
"excludeFields": ["excluded_field", "excluded.nested.field"]
},
"included.collection7": {
"rename": "included.new_collection7_name",
"excludeFields": ["excluded_field", "excluded.nested.field"]
},
"included_wildcard1.*": true,
"included_wildcard2.*": true,
"renamed.collection1": "something.else1",
"renamed.collection2": {
"rename": "something.else2"
},
"renamed_wildcard.*": {
"rename": "new_name.*"
},
"gridfs.collection": {
"gridfs": true
},
"gridfs_wildcard.*": {
"gridfs": true
}
},

"docManagers": [
Expand Down
153 changes: 86 additions & 67 deletions mongo_connector/connector.py
Expand Up @@ -14,6 +14,7 @@
"""Discovers the MongoDB cluster and starts the connector.
"""

import copy
import json
import logging
import logging.handlers
Expand All @@ -36,7 +37,8 @@
from mongo_connector.doc_managers.doc_manager_base import DocManagerBase
from mongo_connector.command_helper import CommandHelper
from mongo_connector.util import log_fatal_exceptions, retry_until_ok
from mongo_connector.namespace_config import NamespaceConfig
from mongo_connector.namespace_config import (NamespaceConfig,
validate_namespace_options)

from pymongo import MongoClient

Expand Down Expand Up @@ -127,8 +129,14 @@ def __init__(self, mongo_address, doc_managers=None, **kwargs):
# The namespace configuration shared by all OplogThreads and
# DocManagers
self.namespace_config = NamespaceConfig(
kwargs.get('ns_set'), kwargs.get('ex_ns_set'),
kwargs.get('dest_mapping'))
namespace_set=kwargs.get('ns_set'),
ex_namespace_set=kwargs.get('ex_ns_set'),
gridfs_set=kwargs.get('gridfs_set'),
dest_mapping=kwargs.get('dest_mapping'),
namespace_options=kwargs.get('namespace_options'),
include_fields=kwargs.get('fields'),
exclude_fields=kwargs.get('exclude_fields')
)

# Initialize and set the command helper
command_helper = CommandHelper(self.namespace_config)
Expand Down Expand Up @@ -185,6 +193,7 @@ def from_config(cls, config):
ns_set=config['namespaces.include'],
ex_ns_set=config['namespaces.exclude'],
dest_mapping=config['namespaces.mapping'],
namespace_options=config['namespaces.namespace_options'],
gridfs_set=config['namespaces.gridfs'],
ssl_certfile=config['ssl.sslCertfile'],
ssl_keyfile=config['ssl.sslKeyfile'],
Expand Down Expand Up @@ -732,84 +741,94 @@ def apply_exclude_fields(option, cli_values):
"exported. Supports dot notation for document fields but cannot span "
"arrays. Cannot use both 'fields' and 'exclude_fields'.")

def apply_namespaces(option, cli_values):
if cli_values['ns_set']:
option.value['include'] = cli_values['ns_set'].split(',')
def merge_namespaces_cli(option, cli_values):
def validate_no_duplicates(lst, list_name):
if len(lst) != len(set(lst)):
raise errors.InvalidConfiguration(
"%s should not contain any duplicates." % (list_name,))

if cli_values['ex_ns_set']:
option.value['exclude'] = cli_values['ex_ns_set'].split(',')
if cli_values["ns_set"]:
option.value["include"] = cli_values["ns_set"].split(",")

if cli_values['gridfs_set']:
option.value['gridfs'] = cli_values['gridfs_set'].split(',')
if cli_values["ex_ns_set"]:
option.value["exclude"] = cli_values["ex_ns_set"].split(",")

if cli_values['dest_ns_set']:
ns_set = option.value['include']
dest_ns_set = cli_values['dest_ns_set'].split(',')
if cli_values["gridfs_set"]:
option.value["gridfs"] = cli_values["gridfs_set"].split(",")

if cli_values["dest_ns_set"]:
ns_set = option.value["include"]
dest_ns_set = cli_values["dest_ns_set"].split(",")
if len(ns_set) != len(dest_ns_set):
raise errors.InvalidConfiguration(
"Destination namespace set should be the"
" same length as the origin namespace set.")
option.value['mapping'] = dict(zip(ns_set, dest_ns_set))
option.value["mapping"] = dict(zip(ns_set, dest_ns_set))

# Check for duplicates
if option.value["include"]:
validate_no_duplicates(option.value["include"],
"Include namespace set")
if option.value["exclude"]:
validate_no_duplicates(option.value["exclude"],
"Exclude namespace set")
if option.value["gridfs"]:
validate_no_duplicates(option.value["gridfs"],
"GridFS namespace set")
if option.value["mapping"]:
validate_no_duplicates(list(option.value["mapping"].values()),
"Destination namespace set")

ns_set = option.value['include']
if len(ns_set) != len(set(ns_set)):
raise errors.InvalidConfiguration(
"Namespace set should not contain any duplicates.")
def apply_namespaces(option, cli_values):
if (option.value['include'] or option.value['exclude'] or
option.value['mapping'] or option.value['gridfs']):
return apply_old_namespace_options(option, cli_values)
else:
return apply_new_namespace_options(option, cli_values)

def apply_new_namespace_options(option, cli_values):
"""Apply the new format (since 2.5.0) namespaces options."""
merge_namespaces_cli(option, cli_values)
namespace_options = copy.deepcopy(option.value)
ns_set = namespace_options.pop('include', None)
ex_ns_set = namespace_options.pop('exclude', None)
gridfs_set = namespace_options.pop('gridfs', None)
dest_mapping = namespace_options.pop('mapping', None)
option.value["namespace_options"] = namespace_options

validate_namespace_options(
namespace_set=ns_set, ex_namespace_set=ex_ns_set,
dest_mapping=dest_mapping,
namespace_options=namespace_options, gridfs_set=gridfs_set)

def apply_old_namespace_options(option, cli_values):
"""Apply the old format (before 2.5.0) namespaces options."""
merge_namespaces_cli(option, cli_values)

LOG.warning("Deprecation warning: the current namespaces "
"configuration format is outdated and support may be "
"removed in a future release. Please update your "
"config file to use the new format.")

ns_set = option.value['include']
ex_ns_set = option.value['exclude']
if len(ex_ns_set) != len(set(ex_ns_set)):
raise errors.InvalidConfiguration(
"Exclude namespace set should not contain any duplicates.")

# not allow to exist both 'include' and 'exclude'
if ns_set and ex_ns_set:
raise errors.InvalidConfiguration(
"Cannot use both namespace 'include' "
"(--namespace-set) and 'exclude' "
"(--exclude-namespace-set).")
gridfs_set = option.value['gridfs']
dest_mapping = option.value['mapping']

# validate 'include' format
for ns in ns_set:
if ns.count("*") > 1:
valid_names = set(['include', 'exclude', 'gridfs', 'mapping'])
valid_names |= set('__' + name for name in valid_names)
valid_names.add('__comment__')
for key in option.value:
if key not in valid_names:
raise errors.InvalidConfiguration(
"Namespace set should be plain text "
"e.g. foo.bar or only contains one wildcard, e.g. foo.* .")
"Invalid option %s in old style (pre 2.5.0) namespaces "
"configuration. The only valid option names are: %r" %
(key, list(valid_names)))

# validate 'exclude' format
for ens in ex_ns_set:
if ens.count("*") > 1:
raise errors.InvalidConfiguration(
"Exclude namespace set should be plain text "
"e.g. foo.bar or only contains one wildcard, e.g. foo.* .")

dest_mapping = option.value['mapping']
if len(dest_mapping) != len(set(dest_mapping.values())):
raise errors.InvalidConfiguration(
"Destination namespaces set should not"
" contain any duplicates.")

for source_name, value in dest_mapping.items():
# Mapping may be old style with only a target name, eg:
# "db.source": "db.dest",
# or new style which can support extra options per namespace:
# "db.source": {"rename": "db.dest", ...}
if isinstance(value, dict):
dest_name = value.get("rename", source_name)
else:
dest_name = value
if source_name.count("*") > 1 or dest_name.count("*") > 1:
raise errors.InvalidConfiguration(
"The namespace mapping source and destination "
"cannot contain more than one '*' character.")
if source_name.count("*") != dest_name.count("*"):
raise errors.InvalidConfiguration(
"The namespace mapping source and destination "
"must contain the same number of '*' characters.")

gridfs_set = option.value['gridfs']
if len(gridfs_set) != len(set(gridfs_set)):
raise errors.InvalidConfiguration(
"GridFS set should not contain any duplicates.")
validate_namespace_options(
namespace_set=ns_set, ex_namespace_set=ex_ns_set,
dest_mapping=dest_mapping, gridfs_set=gridfs_set)

default_namespaces = {
"include": [],
Expand Down

0 comments on commit 8e0a771

Please sign in to comment.