Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Use maxWriteBatchSize for write command batch splitting, PYTHON-642.

  • Loading branch information...
commit 6d5f658c2a36de0e4eb79d3b054a81b675b95bab 1 parent 6c6375d
@ajdavis ajdavis authored
View
20 pymongo/_cmessagemodule.c
@@ -911,6 +911,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
long max_bson_size;
long max_cmd_size;
+ long max_write_batch_size;
long idx_offset = 0;
int idx = 0;
int cmd_len_loc;
@@ -919,6 +920,7 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
int ordered;
char *ns = NULL;
PyObject* max_bson_size_obj;
+ PyObject* max_write_batch_size_obj;
PyObject* command;
PyObject* doc;
PyObject* docs;
@@ -956,6 +958,18 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
*/
max_cmd_size = max_bson_size + 16382;
+ max_write_batch_size_obj = PyObject_GetAttrString(client, "max_write_batch_size");
+#if PY_MAJOR_VERSION >= 3
+ max_write_batch_size = PyLong_AsLong(max_write_batch_size_obj);
+#else
+ max_write_batch_size = PyInt_AsLong(max_write_batch_size_obj);
+#endif
+ Py_XDECREF(max_write_batch_size_obj);
+ if (max_write_batch_size == -1) {
+ PyMem_Free(ns);
+ return NULL;
+ }
+
/* Default to True */
ordered = !((PyDict_GetItemString(command, "ordered")) == Py_False);
@@ -1035,6 +1049,8 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
int sub_doc_begin = buffer_get_position(buffer);
int cur_doc_begin;
int cur_size;
+ int enough_data = 0;
+ int enough_documents = 0;
char key[16];
empty = 0;
INT2STRING(key, idx);
@@ -1052,7 +1068,9 @@ _cbson_do_batched_write_command(PyObject* self, PyObject* args) {
Py_DECREF(doc);
/* We have enough data, maybe send this batch. */
- if (buffer_get_position(buffer) > max_cmd_size) {
+ enough_data = (buffer_get_position(buffer) > max_cmd_size);
+ enough_documents = (idx >= max_write_batch_size);
+ if (enough_data || enough_documents) {
buffer_t new_buffer;
cur_size = buffer_get_position(buffer) - cur_doc_begin;
View
5 pymongo/bulk.py
@@ -235,7 +235,7 @@ def gen_ordered(self):
for idx, (op_type, operation) in enumerate(self.ops):
if run is None:
run = _Run(op_type)
- elif run.op_type != op_type or len(run.ops) > 1000:
+ elif run.op_type != op_type:
yield run
run = _Run(op_type)
run.add(idx, operation)
@@ -248,9 +248,6 @@ def gen_unordered(self):
operations = [_Run(_INSERT), _Run(_UPDATE), _Run(_DELETE)]
for idx, (op_type, operation) in enumerate(self.ops):
operations[op_type].add(idx, operation)
- if len(operations[op_type].ops) > 1000:
- yield operations[op_type]
- operations[op_type] = _Run(op_type)
for run in operations:
if run.ops:
View
6 pymongo/collection.py
@@ -357,9 +357,9 @@ def insert(self, doc_or_docs, manipulate=True,
.. mongodoc:: insert
"""
client = self.database.connection
- # Batch inserts require us to know the connected master's
- # max_bson_size and max_message_size. We have to be connected
- # to a master to know that.
+ # Batch inserts require us to know the connected primary's
+ # max_bson_size, max_message_size, and max_write_batch_size.
+ # We have to be connected to the primary to know that.
client._ensure_connected(True)
docs = doc_or_docs
View
1  pymongo/common.py
@@ -41,6 +41,7 @@
MAX_MESSAGE_SIZE = 2 * MAX_BSON_SIZE
MIN_WIRE_VERSION = 0
MAX_WIRE_VERSION = 0
+MAX_WRITE_BATCH_SIZE = 1000
# What this version of PyMongo supports.
MIN_SUPPORTED_WIRE_VERSION = 0
View
11 pymongo/master_slave_connection.py
@@ -155,6 +155,17 @@ def max_wire_version(self):
"""
return self.master.max_wire_version
+ @property
+ def max_write_batch_size(self):
+ """The maxWriteBatchSize reported by the server.
+
+ Returns a default value when connected to server versions prior to
+ MongoDB 2.6.
+
+ .. versionadded:: 2.7
+ """
+ return self.master.max_write_batch_size
+
def disconnect(self):
"""Disconnect from MongoDB.
View
2  pymongo/member.py
@@ -64,6 +64,8 @@ def __init__(self, host, connection_pool, ismaster_response, ping_time):
'minWireVersion', common.MIN_WIRE_VERSION)
self.max_wire_version = ismaster_response.get(
'maxWireVersion', common.MAX_WIRE_VERSION)
+ self.max_write_batch_size = ismaster_response.get(
+ 'maxWriteBatchSize', common.MAX_WRITE_BATCH_SIZE)
# self.min/max_wire_version is the server's wire protocol.
# MIN/MAX_SUPPORTED_WIRE_VERSION is what PyMongo supports.
View
5 pymongo/message.py
@@ -281,6 +281,7 @@ def _do_batched_write_command(namespace, operation, command,
"""Execute a batch of insert, update, or delete commands.
"""
max_bson_size = client.max_bson_size
+ max_write_batch_size = client.max_write_batch_size
# Max BSON object size + 16k - 2 bytes for ending NUL bytes
# XXX: This should come from the server - SERVER-10643
max_cmd_size = max_bson_size + 16382
@@ -354,7 +355,9 @@ def send_message():
key = b(str(idx))
value = bson.BSON.encode(doc, check_keys, uuid_subtype)
# Send a batch?
- if (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size:
+ enough_data = (buf.tell() + len(key) + len(value) + 2) >= max_cmd_size
+ enough_documents = (idx >= max_write_batch_size)
+ if enough_data or enough_documents:
if not idx:
if operation == _INSERT:
raise DocumentTooLarge("BSON document too large (%d bytes)"
View
12 pymongo/mongo_client.py
@@ -655,6 +655,18 @@ def max_wire_version(self):
return self.__member_property(
'max_wire_version', common.MAX_WIRE_VERSION)
+ @property
+ def max_write_batch_size(self):
+ """The maxWriteBatchSize reported by the server.
+
+ Returns a default value when connected to server versions prior to
+ MongoDB 2.6.
+
+ .. versionadded:: 2.7
+ """
+ return self.__member_property(
+ 'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE)
+
def __simple_command(self, sock_info, dbname, spec):
"""Send a command to the server.
"""
View
14 pymongo/mongo_replica_set_client.py
@@ -987,6 +987,20 @@ def max_wire_version(self):
return common.MAX_WIRE_VERSION
@property
+ def max_write_batch_size(self):
+ """The maxWriteBatchSize reported by the server.
+
+ Returns a default value when connected to server versions prior to
+ MongoDB 2.6.
+
+ .. versionadded:: 2.7
+ """
+ rs_state = self.__rs_state
+ if rs_state.primary_member:
+ return rs_state.primary_member.max_write_batch_size
+ return common.MAX_WRITE_BATCH_SIZE
+
+ @property
def auto_start_request(self):
"""Is auto_start_request enabled?
"""
View
18 test/pymongo_mocks.py
@@ -81,6 +81,9 @@ def __init__(self, standalones, members, mongoses, config):
# Hostname -> (min wire version, max wire version)
self.mock_wire_versions = {}
+ # Hostname -> max write batch size
+ self.mock_max_write_batch_sizes = {}
+
def kill_host(self, host):
"""Host is like 'a:1'."""
self.mock_down_hosts.append(host)
@@ -92,11 +95,17 @@ def revive_host(self, host):
def set_wire_version_range(self, host, min_version, max_version):
self.mock_wire_versions[host] = (min_version, max_version)
+ def set_max_write_batch_size(self, host, size):
+ self.mock_max_write_batch_sizes[host] = size
+
def mock_is_master(self, host):
min_wire_version, max_wire_version = self.mock_wire_versions.get(
host,
(common.MIN_WIRE_VERSION, common.MAX_WIRE_VERSION))
+ max_write_batch_size = self.mock_max_write_batch_sizes.get(
+ host, common.MAX_WRITE_BATCH_SIZE)
+
# host is like 'a:1'.
if host in self.mock_down_hosts:
raise socket.timeout('mock timeout')
@@ -105,7 +114,8 @@ def mock_is_master(self, host):
return {
'ismaster': True,
'minWireVersion': min_wire_version,
- 'maxWireVersion': max_wire_version}
+ 'maxWireVersion': max_wire_version,
+ 'maxWriteBatchSize': max_write_batch_size}
if host in self.mock_members:
ismaster = (host == self.mock_primary)
@@ -117,7 +127,8 @@ def mock_is_master(self, host):
'setName': 'rs',
'hosts': self.mock_ismaster_hosts,
'minWireVersion': min_wire_version,
- 'maxWireVersion': max_wire_version}
+ 'maxWireVersion': max_wire_version,
+ 'maxWriteBatchSize': max_write_batch_size}
if self.mock_primary:
response['primary'] = self.mock_primary
@@ -129,7 +140,8 @@ def mock_is_master(self, host):
'ismaster': True,
'minWireVersion': min_wire_version,
'maxWireVersion': max_wire_version,
- 'msg': 'isdbgrid'}
+ 'msg': 'isdbgrid',
+ 'maxWriteBatchSize': max_write_batch_size}
# In test_internal_ips(), we try to connect to a host listed
# in ismaster['hosts'] but not publicly accessible.
View
21 test/test_bulk.py
@@ -427,6 +427,27 @@ def test_large_inserts_unordered(self):
self.assertEqual(6, result['nInserted'])
self.assertEqual(6, self.coll.count())
+ def test_numerous_inserts(self):
+ # Ensure we don't exceed server's 1000-document batch size limit.
+ n_docs = 2100
+ batch = self.coll.initialize_unordered_bulk_op()
+ for _ in range(n_docs):
+ batch.insert({})
+
+ result = batch.execute()
+ self.assertEqual(n_docs, result['nInserted'])
+ self.assertEqual(n_docs, self.coll.count())
+
+ # Same with ordered bulk.
+ self.coll.remove()
+ batch = self.coll.initialize_ordered_bulk_op()
+ for _ in range(n_docs):
+ batch.insert({})
+
+ result = batch.execute()
+ self.assertEqual(n_docs, result['nInserted'])
+ self.assertEqual(n_docs, self.coll.count())
+
def test_multiple_execution(self):
batch = self.coll.initialize_ordered_bulk_op()
batch.insert({})
View
26 test/test_client.py
@@ -104,6 +104,7 @@ def test_init_disconnected(self):
self.assertIsInstance(c.max_bson_size, int)
self.assertIsInstance(c.min_wire_version, int)
self.assertIsInstance(c.max_wire_version, int)
+ self.assertIsInstance(c.max_write_batch_size, int)
self.assertEqual(None, c.host)
self.assertEqual(None, c.port)
@@ -916,6 +917,31 @@ def test_wire_version(self):
c.disconnect()
self.assertRaises(ConfigurationError, c.db.collection.find_one)
+ def test_max_wire_version(self):
+ c = MockClient(
+ standalones=[],
+ members=['a:1', 'b:2', 'c:3'],
+ mongoses=[],
+ host='b:2', # Pass a secondary.
+ replicaSet='rs',
+ _connect=False)
+
+ c.set_max_write_batch_size('a:1', 1)
+ c.set_max_write_batch_size('b:2', 2)
+
+ # Starts with default max batch size.
+ self.assertEqual(1000, c.max_write_batch_size)
+ c.db.collection.find_one() # Connect.
+ # Uses primary's max batch size.
+ self.assertEqual(c.max_write_batch_size, 1)
+
+ # b becomes primary.
+ c.mock_primary = 'b:2'
+ c.disconnect()
+ self.assertEqual(1000, c.max_write_batch_size)
+ c.db.collection.find_one() # Connect.
+ self.assertEqual(c.max_write_batch_size, 2)
+
def test_wire_version_mongos_ha(self):
c = MockClient(
standalones=[],
View
8 test/test_collection.py
@@ -1893,6 +1893,14 @@ def test_insert_large_batch(self):
finally:
self.client.end_request()
+ def test_numerous_inserts(self):
+ # Ensure we don't exceed server's 1000-document batch size limit.
+ self.db.test.remove()
+ n_docs = 2100
+ self.db.test.insert({} for _ in range(n_docs))
+ self.assertEqual(n_docs, self.db.test.count())
+ self.db.test.remove()
+
# Starting in PyMongo 2.6 we no longer use message.insert for inserts, but
# message.insert is part of the public API. Do minimal testing here; there
# isn't really a better place.
View
26 test/test_replica_set_client.py
@@ -1212,5 +1212,31 @@ def test_connect_with_internal_ips(self):
replicaSet='rs')
+class TestReplicaSetClientMaxWriteBatchSize(unittest.TestCase):
+ def test_max_write_batch_size(self):
+ c = MockReplicaSetClient(
+ standalones=[],
+ members=['a:1', 'b:2'],
+ mongoses=[],
+ host='a:1',
+ replicaSet='rs',
+ _connect=False)
+
+ c.set_max_write_batch_size('a:1', 1)
+ c.set_max_write_batch_size('b:2', 2)
+
+ # Starts with default max batch size.
+ self.assertEqual(1000, c.max_write_batch_size)
+ c.db.collection.find_one() # Connect.
+
+ # Uses primary's max batch size.
+ self.assertEqual(c.max_write_batch_size, 1)
+
+ # b becomes primary.
+ c.mock_primary = 'b:2'
+ c.refresh()
+ self.assertEqual(c.max_write_batch_size, 2)
+
+
if __name__ == "__main__":
unittest.main()
Please sign in to comment.
Something went wrong with that request. Please try again.