Permalink
Browse files

github68: --auto-commit-interval option

  • Loading branch information...
1 parent 62e0774 commit 138addc016199ab8c75b6e48f84016ccb40b42cc Luke Lovett committed Jan 27, 2014
View
@@ -188,8 +188,7 @@ __7) commit(self)__
This function is used to force a refresh/commit.
It is used only in the beginning of rollbacks and in test cases, and is
not meant to be called in other circumstances. The body should commit
-all documents to the target system (like auto_commit), but not have
-any timers or run itself again (unlike auto_commit). In the event of
+all documents to the target system (like auto_commit). In the event of
too many engine searchers, the commit can be wrapped in a
retry_until_ok to keep trying until the commit goes through.
@@ -26,23 +26,24 @@
import threading
import time
import imp
-from mongo_connector import errors, util
+from mongo_connector import constants, errors, util
from mongo_connector.locking_dict import LockingDict
-from mongo_connector.constants import DEFAULT_BATCH_SIZE
from mongo_connector.oplog_manager import OplogThread
try:
from pymongo import MongoClient as Connection
except ImportError:
from pymongo import Connection
+
class Connector(threading.Thread):
"""Checks the cluster for shards to tail.
"""
def __init__(self, address, oplog_checkpoint, target_url, ns_set,
u_key, auth_key, doc_manager=None, auth_username=None,
- collection_dump=True, batch_size=DEFAULT_BATCH_SIZE,
- fields=None):
+ collection_dump=True, batch_size=constants.DEFAULT_BATCH_SIZE,
+ fields=None,
+ auto_commit_interval=constants.DEFAULT_COMMIT_INTERVAL):
if doc_manager is not None:
doc_manager = imp.load_source('DocManager', doc_manager)
else:
@@ -98,20 +99,23 @@ def __init__(self, address, oplog_checkpoint, target_url, ns_set,
else: # imported using load source
self.doc_manager = doc_manager.DocManager(
unique_key=u_key,
- namespace_set=ns_set
+ namespace_set=ns_set,
+ auto_commit_interval=auto_commit_interval
)
else:
if doc_manager is None:
self.doc_manager = DocManager(
self.target_url,
unique_key=u_key,
- namespace_set=ns_set
+ namespace_set=ns_set,
+ auto_commit_interval=auto_commit_interval
)
else:
self.doc_manager = doc_manager.DocManager(
self.target_url,
unique_key=u_key,
- namespace_set=ns_set
+ namespace_set=ns_set,
+ auto_commit_interval=auto_commit_interval
)
except errors.ConnectionFailed:
err_msg = "MongoConnector: Could not connect to target system"
@@ -378,7 +382,7 @@ def main():
#--batch-size specifies num docs to read from oplog before updating the
#--oplog-ts config file with current oplog position
parser.add_option("--batch-size", action="store",
- default=DEFAULT_BATCH_SIZE, type="int",
+ default=constants.DEFAULT_BATCH_SIZE, type="int",
help="Specify an int to update the --oplog-ts "
"config file with latest position of oplog every "
"N documents. By default, the oplog config isn't "
@@ -487,6 +491,19 @@ def main():
""" Use a comma separated list of fields to specify multiple fields."""
""" The '_id', 'ns' and '_ts' fields are always exported.""")
+ #--auto-commit-interval to specify auto commit time interval
+ parser.add_option("--auto-commit-interval", action="store",
+ dest="commit_interval", type="int",
+ default=constants.DEFAULT_COMMIT_INTERVAL,
+ help="""Seconds in-between calls for the Doc Manager"""
+ """ to commit changes to the target system. A value of"""
+ """ 0 means to commit after every write operation."""
+ """ When left unset, Mongo Connector will not make"""
+ """ explicit commits. Some systems have"""
+ """ their own mechanism for adjusting a commit"""
+ """ interval, which should be preferred to this"""
+ """ option.""")
+
(options, args) = parser.parse_args()
logger = logging.getLogger()
@@ -495,8 +512,11 @@ def main():
if options.enable_syslog:
syslog_info = options.syslog_host.split(":")
- syslog_host = logging.handlers.SysLogHandler(address=(syslog_info[0],
- int(syslog_info[1])),facility=options.syslog_facility)
+ syslog_host = logging.handlers.SysLogHandler(
+ address=(syslog_info[0],
+ int(syslog_info[1])),
+ facility=options.syslog_facility
+ )
syslog_host.setLevel(loglevel)
logger.addHandler(syslog_host)
else:
@@ -537,6 +557,9 @@ def main():
logger.error("Admin username specified without password!")
sys.exit(1)
+ if options.commit_interval is not None and options.commit_interval < 0:
+ raise ValueError("--auto-commit-interval must be non-negative")
+
connector = Connector(
address=options.main_addr,
oplog_checkpoint=options.oplog_config,
@@ -548,7 +571,8 @@ def main():
auth_username=options.admin_name,
collection_dump=(not options.no_dump),
batch_size=options.batch_size,
- fields=fields
+ fields=fields,
+ auto_commit_interval=options.commit_interval
)
connector.start()
@@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# Maximumum # of documents to process before recording timestamp
+# Maximum # of documents to process before recording timestamp
# default = -1 (no maximum)
DEFAULT_BATCH_SIZE = -1
+# Interval in seconds between doc manager flushes (i.e. auto commit)
+# default = None (never auto commit)
+DEFAULT_COMMIT_INTERVAL = None
@@ -21,16 +21,16 @@
same class and replace the method definitions with API calls for the
desired backend.
"""
-import itertools
-import logging
from threading import Timer
import bson.json_util as bsjson
from elasticsearch import Elasticsearch, exceptions as es_exceptions
from mongo_connector import errors
+from mongo_connector.constants import DEFAULT_COMMIT_INTERVAL
from mongo_connector.util import retry_until_ok
+
class DocManager():
"""The DocManager class creates a connection to the backend engine and
adds/removes documents, and in the case of rollback, searches for them.
@@ -43,20 +43,21 @@ class DocManager():
them as fields in the document, due to compatibility issues.
"""
- def __init__(self, url, auto_commit=False, unique_key='_id', **kwargs):
+ def __init__(self, url, auto_commit_interval=DEFAULT_COMMIT_INTERVAL,
+ unique_key='_id', **kwargs):
""" Establish a connection to Elastic
"""
self.elastic = Elasticsearch(hosts=[url])
- self.auto_commit = auto_commit
+ self.auto_commit_interval = auto_commit_interval
self.doc_type = 'string' # default type is string, change if needed
self.unique_key = unique_key
- if auto_commit:
+ if self.auto_commit_interval not in [None, 0]:
self.run_auto_commit()
def stop(self):
""" Stops the instance
"""
- self.auto_commit = False
+ self.auto_commit_interval = None
def upsert(self, doc):
"""Update or insert a document into Elastic
@@ -72,35 +73,36 @@ def upsert(self, doc):
doc_id = doc[self.unique_key]
try:
self.elastic.index(index=index, doc_type=doc_type,
- body=bsjson.dumps(doc), id=doc_id, refresh=True)
+ body=bsjson.dumps(doc), id=doc_id,
+ refresh=(self.auto_commit_interval == 0))
except (es_exceptions.ConnectionError):
raise errors.ConnectionFailed("Could not connect to Elastic Search")
except es_exceptions.TransportError:
- raise errors.OperationFailed("Could not index document: %s"%(
+ raise errors.OperationFailed("Could not index document: %s" % (
bsjson.dumps(doc)))
def bulk_upsert(self, docs):
"""Update or insert multiple documents into Elastic
docs may be any iterable
"""
-
def docs_to_upsert():
doc = None
for doc in docs:
index = doc["ns"]
doc[self.unique_key] = str(doc[self.unique_key])
doc_id = doc[self.unique_key]
- yield { "index": {"_index": index, "_type": self.doc_type,
- "_id": doc_id} }
+ yield {"index": {"_index": index, "_type": self.doc_type,
+ "_id": doc_id}}
yield doc
if not doc:
raise errors.EmptyDocsError(
"Cannot upsert an empty sequence of "
"documents into Elastic Search")
try:
self.elastic.bulk(doc_type=self.doc_type,
- body=docs_to_upsert(), refresh=True)
+ body=docs_to_upsert(),
+ refresh=(self.auto_commit_interval == 0))
except (es_exceptions.ConnectionError):
raise errors.ConnectionFailed("Could not connect to Elastic Search")
except es_exceptions.TransportError:
@@ -118,11 +120,12 @@ def remove(self, doc):
"""
try:
self.elastic.delete(index=doc['ns'], doc_type=self.doc_type,
- id=str(doc[self.unique_key]), refresh=True)
+ id=str(doc[self.unique_key]),
+ refresh=(self.auto_commit_interval == 0))
except (es_exceptions.ConnectionError):
raise errors.ConnectionFailed("Could not connect to Elastic Search")
except es_exceptions.TransportError:
- raise errors.OperationFailed("Could not remove document: %s"%(
+ raise errors.OperationFailed("Could not remove document: %s" % (
bsjson.dumps(doc)))
def _remove(self):
@@ -131,7 +134,7 @@ def _remove(self):
try:
self.elastic.delete_by_query(index="test.test",
doc_type=self.doc_type,
- body={"match_all":{}})
+ body={"match_all": {}})
except (es_exceptions.ConnectionError):
raise errors.ConnectionFailed("Could not connect to Elastic Search")
except es_exceptions.TransportError:
@@ -160,7 +163,6 @@ def _stream_search(self, *args, **kwargs):
raise errors.OperationFailed(
"Could not retrieve documents from Elastic Search")
-
def search(self, start_ts, end_ts):
"""Called to query Elastic for documents in a time range.
"""
@@ -186,9 +188,8 @@ def run_auto_commit(self):
"""Periodically commits to the Elastic server.
"""
self.elastic.indices.refresh()
-
- if self.auto_commit:
- Timer(1, self.run_auto_commit).start()
+ if self.auto_commit_interval not in [None, 0]:
+ Timer(self.auto_commit_interval, self.run_auto_commit).start()
def get_last_doc(self):
"""Returns the last document stored in the Elastic engine.
@@ -198,7 +199,7 @@ def get_last_doc(self):
index="_all",
body={
"query": {"match_all": {}},
- "sort": [{"_ts":"desc"}]
+ "sort": [{"_ts": "desc"}]
},
size=1
)["hits"]["hits"]
@@ -23,6 +23,7 @@
"""
import exceptions
+from mongo_connector.constants import DEFAULT_COMMIT_INTERVAL
class DocManager():
@@ -34,7 +35,8 @@ class DocManager():
opposed to multiple, slightly different versions of a doc.
"""
- def __init__(self, url=None, auto_commit=True, unique_key='_id', **kwargs):
+ def __init__(self, url=None, auto_commit_interval=DEFAULT_COMMIT_INTERVAL,
+ unique_key='_id', **kwargs):
"""Verify URL and establish a connection.
This method should, if necessarity, verify the url to the backend
@@ -117,12 +119,13 @@ def run_auto_commit(self):
"""Periodically commits to the engine server, if needed.
This function commits all changes to the engine, and then
- starts a timer that calls this function again in one second.
- The reason for this function is to prevent overloading engine from
- other searchers. This function may be modified based on the backend
- engine and how commits are handled, as timers may not be necessary
- in all instances. It does not have to be implemented if commits
- are not necessary
+ starts a timer that calls this function again in
+ self.auto_commit_interval seconds. The reason for this
+ function is to prevent overloading engine from other
+ searchers. This function may be modified based on the backend
+ engine and how commits are handled, as timers may not be
+ necessary in all instances. It does not have to be implemented
+ if commits are not necessary
"""
raise exceptions.NotImplementedError
Oops, something went wrong.

0 comments on commit 138addc

Please sign in to comment.