diff --git a/pymongo/_cmessagemodule.c b/pymongo/_cmessagemodule.c index 70df158abf..845c14bd54 100644 --- a/pymongo/_cmessagemodule.c +++ b/pymongo/_cmessagemodule.c @@ -62,302 +62,6 @@ static int buffer_write_bytes_ssize_t(buffer_t buffer, const char* data, Py_ssiz return buffer_write_bytes(buffer, data, downsize); } -/* add a lastError message on the end of the buffer. - * returns 0 on failure */ -static int add_last_error(PyObject* self, buffer_t buffer, - int request_id, char* ns, Py_ssize_t nslen, - codec_options_t* options, PyObject* args) { - struct module_state *state = GETSTATE(self); - - int message_start; - int document_start; - int message_length; - int document_length; - PyObject* key = NULL; - PyObject* value = NULL; - Py_ssize_t pos = 0; - PyObject* one; - char *p = strchr(ns, '.'); - /* Length of the database portion of ns. */ - nslen = p ? (int)(p - ns) : nslen; - - message_start = buffer_save_space(buffer, 4); - if (message_start == -1) { - return 0; - } - if (!buffer_write_int32(buffer, (int32_t)request_id) || - !buffer_write_bytes(buffer, - "\x00\x00\x00\x00" /* responseTo */ - "\xd4\x07\x00\x00" /* opcode */ - "\x00\x00\x00\x00", /* options */ - 12) || - !buffer_write_bytes_ssize_t(buffer, ns, nslen) || /* database */ - !buffer_write_bytes(buffer, - ".$cmd\x00" /* collection name */ - "\x00\x00\x00\x00" /* skip */ - "\xFF\xFF\xFF\xFF", /* limit (-1) */ - 14)) { - return 0; - } - - /* save space for length */ - document_start = buffer_save_space(buffer, 4); - if (document_start == -1) { - return 0; - } - - /* getlasterror: 1 */ - if (!(one = PyLong_FromLong(1))) - return 0; - - if (!write_pair(state->_cbson, buffer, "getlasterror", 12, one, 0, - options, 1)) { - Py_DECREF(one); - return 0; - } - Py_DECREF(one); - - /* getlasterror options */ - while (PyDict_Next(args, &pos, &key, &value)) { - if (!decode_and_write_pair(state->_cbson, buffer, key, value, 0, - options, 0)) { - return 0; - } - } - - /* EOD */ - if (!buffer_write_bytes(buffer, "\x00", 1)) { - return 0; - } - - message_length = buffer_get_position(buffer) - message_start; - document_length = buffer_get_position(buffer) - document_start; - buffer_write_int32_at_position( - buffer, message_start, (int32_t)message_length); - buffer_write_int32_at_position( - buffer, document_start, (int32_t)document_length); - return 1; -} - -static int init_insert_buffer(buffer_t buffer, int request_id, int options, - const char* coll_name, Py_ssize_t coll_name_len, - int compress) { - int length_location = 0; - if (!compress) { - /* Save space for message length */ - int length_location = buffer_save_space(buffer, 4); - if (length_location == -1) { - return length_location; - } - if (!buffer_write_int32(buffer, (int32_t)request_id) || - !buffer_write_bytes(buffer, - "\x00\x00\x00\x00" - "\xd2\x07\x00\x00", - 8)) { - return -1; - } - } - if (!buffer_write_int32(buffer, (int32_t)options) || - !buffer_write_bytes_ssize_t(buffer, - coll_name, - coll_name_len + 1)) { - return -1; - } - return length_location; -} - -static PyObject* _cbson_insert_message(PyObject* self, PyObject* args) { - /* Used by the Bulk API to insert into pre-2.6 servers. Collection.insert - * uses _cbson_do_batched_insert. */ - struct module_state *state = GETSTATE(self); - - /* NOTE just using a random number as the request_id */ - int request_id = rand(); - char* collection_name = NULL; - Py_ssize_t collection_name_length; - PyObject* docs; - PyObject* doc; - PyObject* iterator; - int before, cur_size, max_size = 0; - int flags = 0; - unsigned char check_keys; - unsigned char continue_on_error; - codec_options_t options; - buffer_t buffer = NULL; - int length_location, message_length; - PyObject* result = NULL; - - if (!PyArg_ParseTuple(args, "et#ObbO&", - "utf-8", - &collection_name, - &collection_name_length, - &docs, &check_keys, - &continue_on_error, - convert_codec_options, &options)) { - return NULL; - } - if (continue_on_error) { - flags += 1; - } - buffer = buffer_new(); - if (!buffer) { - goto fail; - } - - length_location = init_insert_buffer(buffer, - request_id, - flags, - collection_name, - collection_name_length, - 0); - if (length_location == -1) { - goto fail; - } - - iterator = PyObject_GetIter(docs); - if (iterator == NULL) { - PyObject* InvalidOperation = _error("InvalidOperation"); - if (InvalidOperation) { - PyErr_SetString(InvalidOperation, "input is not iterable"); - Py_DECREF(InvalidOperation); - } - goto fail; - } - while ((doc = PyIter_Next(iterator)) != NULL) { - before = buffer_get_position(buffer); - if (!write_dict(state->_cbson, buffer, doc, check_keys, - &options, 1)) { - Py_DECREF(doc); - Py_DECREF(iterator); - goto fail; - } - Py_DECREF(doc); - cur_size = buffer_get_position(buffer) - before; - max_size = (cur_size > max_size) ? cur_size : max_size; - } - Py_DECREF(iterator); - - if (PyErr_Occurred()) { - goto fail; - } - - if (!max_size) { - PyObject* InvalidOperation = _error("InvalidOperation"); - if (InvalidOperation) { - PyErr_SetString(InvalidOperation, "cannot do an empty bulk insert"); - Py_DECREF(InvalidOperation); - } - goto fail; - } - - message_length = buffer_get_position(buffer) - length_location; - buffer_write_int32_at_position( - buffer, length_location, (int32_t)message_length); - - /* objectify buffer */ - result = Py_BuildValue("iy#i", request_id, - buffer_get_buffer(buffer), - (Py_ssize_t)buffer_get_position(buffer), - max_size); -fail: - PyMem_Free(collection_name); - destroy_codec_options(&options); - if (buffer) { - buffer_free(buffer); - } - return result; -} - -static PyObject* _cbson_update_message(PyObject* self, PyObject* args) { - /* NOTE just using a random number as the request_id */ - struct module_state *state = GETSTATE(self); - - int request_id = rand(); - char* collection_name = NULL; - Py_ssize_t collection_name_length; - int before, cur_size, max_size = 0; - PyObject* doc; - PyObject* spec; - unsigned char multi; - unsigned char upsert; - unsigned char check_keys; - codec_options_t options; - int flags; - buffer_t buffer = NULL; - int length_location, message_length; - PyObject* result = NULL; - - if (!PyArg_ParseTuple(args, "et#bbOObO&", - "utf-8", - &collection_name, - &collection_name_length, - &upsert, &multi, &spec, &doc, &check_keys, - convert_codec_options, &options)) { - return NULL; - } - - flags = 0; - if (upsert) { - flags += 1; - } - if (multi) { - flags += 2; - } - buffer = buffer_new(); - if (!buffer) { - goto fail; - } - - // save space for message length - length_location = buffer_save_space(buffer, 4); - if (length_location == -1) { - goto fail; - } - if (!buffer_write_int32(buffer, (int32_t)request_id) || - !buffer_write_bytes(buffer, - "\x00\x00\x00\x00" - "\xd1\x07\x00\x00" - "\x00\x00\x00\x00", - 12) || - !buffer_write_bytes_ssize_t(buffer, - collection_name, - collection_name_length + 1) || - !buffer_write_int32(buffer, (int32_t)flags)) { - goto fail; - } - - before = buffer_get_position(buffer); - if (!write_dict(state->_cbson, buffer, spec, 0, &options, 1)) { - goto fail; - } - max_size = buffer_get_position(buffer) - before; - - before = buffer_get_position(buffer); - if (!write_dict(state->_cbson, buffer, doc, check_keys, - &options, 1)) { - goto fail; - } - cur_size = buffer_get_position(buffer) - before; - max_size = (cur_size > max_size) ? cur_size : max_size; - - message_length = buffer_get_position(buffer) - length_location; - buffer_write_int32_at_position( - buffer, length_location, (int32_t)message_length); - - /* objectify buffer */ - result = Py_BuildValue("iy#i", request_id, - buffer_get_buffer(buffer), - (Py_ssize_t)buffer_get_position(buffer), - max_size); -fail: - PyMem_Free(collection_name); - destroy_codec_options(&options); - if (buffer) { - buffer_free(buffer); - } - return result; -} - static PyObject* _cbson_query_message(PyObject* self, PyObject* args) { /* NOTE just using a random number as the request_id */ struct module_state *state = GETSTATE(self); @@ -688,333 +392,6 @@ _set_document_too_large(int size, long max) { } } -static PyObject* -_send_insert(PyObject* self, PyObject* ctx, - PyObject* gle_args, buffer_t buffer, - char* coll_name, Py_ssize_t coll_len, int request_id, int safe, - codec_options_t* options, PyObject* to_publish, int compress) { - - if (safe) { - if (!add_last_error(self, buffer, request_id, - coll_name, coll_len, options, gle_args)) { - return NULL; - } - } - - /* The max_doc_size parameter for legacy_bulk_insert is the max size of - * any document in buffer. We enforced max size already, pass 0 here. */ - return PyObject_CallMethod(ctx, "legacy_bulk_insert", - "iy#iNOi", - request_id, - buffer_get_buffer(buffer), - (Py_ssize_t)buffer_get_position(buffer), - 0, - PyBool_FromLong((long)safe), - to_publish, compress); -} - -static PyObject* _cbson_do_batched_insert(PyObject* self, PyObject* args) { - struct module_state *state = GETSTATE(self); - - /* NOTE just using a random number as the request_id */ - int request_id = rand(); - int send_safe, flags = 0; - int length_location, message_length; - Py_ssize_t collection_name_length; - int compress; - char* collection_name = NULL; - PyObject* docs; - PyObject* doc; - PyObject* iterator; - PyObject* ctx; - PyObject* last_error_args; - PyObject* result; - PyObject* max_bson_size_obj; - PyObject* max_message_size_obj; - PyObject* compress_obj; - PyObject* to_publish = NULL; - unsigned char check_keys; - unsigned char safe; - unsigned char continue_on_error; - codec_options_t options; - unsigned char empty = 1; - long max_bson_size; - long max_message_size; - buffer_t buffer; - PyObject *exc_type = NULL, *exc_value = NULL, *exc_trace = NULL; - - if (!PyArg_ParseTuple(args, "et#ObbObO&O", - "utf-8", - &collection_name, - &collection_name_length, - &docs, &check_keys, &safe, - &last_error_args, - &continue_on_error, - convert_codec_options, &options, - &ctx)) { - return NULL; - } - if (continue_on_error) { - flags += 1; - } - /* - * If we are doing unacknowledged writes *and* continue_on_error - * is True it's pointless (and slower) to send GLE. - */ - send_safe = (safe || !continue_on_error); - max_bson_size_obj = PyObject_GetAttrString(ctx, "max_bson_size"); - max_bson_size = PyLong_AsLong(max_bson_size_obj); - Py_XDECREF(max_bson_size_obj); - if (max_bson_size == -1) { - destroy_codec_options(&options); - PyMem_Free(collection_name); - return NULL; - } - - max_message_size_obj = PyObject_GetAttrString(ctx, "max_message_size"); - max_message_size = PyLong_AsLong(max_message_size_obj); - Py_XDECREF(max_message_size_obj); - if (max_message_size == -1) { - destroy_codec_options(&options); - PyMem_Free(collection_name); - return NULL; - } - - compress_obj = PyObject_GetAttrString(ctx, "compress"); - compress = PyObject_IsTrue(compress_obj); - Py_XDECREF(compress_obj); - if (compress == -1) { - destroy_codec_options(&options); - PyMem_Free(collection_name); - return NULL; - } - - compress = compress && !(safe || send_safe); - - buffer = buffer_new(); - if (!buffer) { - destroy_codec_options(&options); - PyMem_Free(collection_name); - return NULL; - } - - length_location = init_insert_buffer(buffer, - request_id, - flags, - collection_name, - collection_name_length, - compress); - if (length_location == -1) { - goto insertfail; - } - - if (!(to_publish = PyList_New(0))) { - goto insertfail; - } - - iterator = PyObject_GetIter(docs); - if (iterator == NULL) { - PyObject* InvalidOperation = _error("InvalidOperation"); - if (InvalidOperation) { - PyErr_SetString(InvalidOperation, "input is not iterable"); - Py_DECREF(InvalidOperation); - } - goto insertfail; - } - while ((doc = PyIter_Next(iterator)) != NULL) { - int before = buffer_get_position(buffer); - int cur_size; - if (!write_dict(state->_cbson, buffer, doc, check_keys, - &options, 1)) { - goto iterfail; - } - - cur_size = buffer_get_position(buffer) - before; - if (cur_size > max_bson_size) { - /* If we've encoded anything send it before raising. */ - if (!empty) { - buffer_update_position(buffer, before); - if (!compress) { - message_length = buffer_get_position(buffer) - length_location; - buffer_write_int32_at_position( - buffer, length_location, (int32_t)message_length); - } - result = _send_insert(self, ctx, last_error_args, buffer, - collection_name, collection_name_length, - request_id, send_safe, &options, - to_publish, compress); - if (!result) - goto iterfail; - Py_DECREF(result); - } - _set_document_too_large(cur_size, max_bson_size); - goto iterfail; - } - empty = 0; - - /* We have enough data, send this batch. */ - if (buffer_get_position(buffer) > max_message_size) { - int new_request_id = rand(); - int message_start; - buffer_t new_buffer = buffer_new(); - if (!new_buffer) { - goto iterfail; - } - message_start = init_insert_buffer(new_buffer, - new_request_id, - flags, - collection_name, - collection_name_length, - compress); - if (message_start == -1) { - buffer_free(new_buffer); - goto iterfail; - } - - /* Copy the overflow encoded document into the new buffer. */ - if (!buffer_write_bytes(new_buffer, - (const char*)buffer_get_buffer(buffer) + before, cur_size)) { - buffer_free(new_buffer); - goto iterfail; - } - - /* Roll back to the beginning of this document. */ - buffer_update_position(buffer, before); - if (!compress) { - message_length = buffer_get_position(buffer) - length_location; - buffer_write_int32_at_position( - buffer, length_location, (int32_t)message_length); - } - - result = _send_insert(self, ctx, last_error_args, buffer, - collection_name, collection_name_length, - request_id, send_safe, &options, to_publish, - compress); - - buffer_free(buffer); - buffer = new_buffer; - request_id = new_request_id; - length_location = message_start; - - Py_DECREF(to_publish); - if (!(to_publish = PyList_New(0))) { - goto insertfail; - } - - if (!result) { - PyObject *etype = NULL, *evalue = NULL, *etrace = NULL; - PyObject* OperationFailure; - PyErr_Fetch(&etype, &evalue, &etrace); - OperationFailure = _error("OperationFailure"); - if (OperationFailure) { - if (PyErr_GivenExceptionMatches(etype, OperationFailure)) { - if (!safe || continue_on_error) { - Py_DECREF(OperationFailure); - if (!safe) { - /* We're doing unacknowledged writes and - * continue_on_error is False. Just return. */ - Py_DECREF(etype); - Py_XDECREF(evalue); - Py_XDECREF(etrace); - Py_DECREF(to_publish); - Py_DECREF(iterator); - Py_DECREF(doc); - buffer_free(buffer); - PyMem_Free(collection_name); - Py_RETURN_NONE; - } - /* continue_on_error is True, store the error - * details to re-raise after the final batch */ - Py_XDECREF(exc_type); - Py_XDECREF(exc_value); - Py_XDECREF(exc_trace); - exc_type = etype; - exc_value = evalue; - exc_trace = etrace; - if (PyList_Append(to_publish, doc) < 0) { - goto iterfail; - } - Py_CLEAR(doc); - continue; - } - } - Py_DECREF(OperationFailure); - } - /* This isn't OperationFailure, we couldn't - * import OperationFailure, or we are doing - * acknowledged writes. Re-raise immediately. */ - PyErr_Restore(etype, evalue, etrace); - goto iterfail; - } else { - Py_DECREF(result); - } - } - if (PyList_Append(to_publish, doc) < 0) { - goto iterfail; - } - Py_CLEAR(doc); - } - Py_DECREF(iterator); - - if (PyErr_Occurred()) { - goto insertfail; - } - - if (empty) { - PyObject* InvalidOperation = _error("InvalidOperation"); - if (InvalidOperation) { - PyErr_SetString(InvalidOperation, "cannot do an empty bulk insert"); - Py_DECREF(InvalidOperation); - } - goto insertfail; - } - - if (!compress) { - message_length = buffer_get_position(buffer) - length_location; - buffer_write_int32_at_position( - buffer, length_location, (int32_t)message_length); - } - - /* Send the last (or only) batch */ - result = _send_insert(self, ctx, last_error_args, buffer, - collection_name, collection_name_length, - request_id, safe, &options, to_publish, compress); - - Py_DECREF(to_publish); - PyMem_Free(collection_name); - buffer_free(buffer); - - if (!result) { - Py_XDECREF(exc_type); - Py_XDECREF(exc_value); - Py_XDECREF(exc_trace); - return NULL; - } else { - Py_DECREF(result); - } - - if (exc_type) { - /* Re-raise any previously stored exception - * due to continue_on_error being True */ - PyErr_Restore(exc_type, exc_value, exc_trace); - return NULL; - } - - Py_RETURN_NONE; - -iterfail: - Py_XDECREF(doc); - Py_DECREF(iterator); -insertfail: - Py_XDECREF(exc_type); - Py_XDECREF(exc_value); - Py_XDECREF(exc_trace); - Py_XDECREF(to_publish); - buffer_free(buffer); - PyMem_Free(collection_name); - return NULL; -} - #define _INSERT 0 #define _UPDATE 1 #define _DELETE 2 @@ -1591,94 +968,13 @@ _cbson_encode_batched_write_command(PyObject* self, PyObject* args) { return result; } -static PyObject* -_cbson_batched_write_command(PyObject* self, PyObject* args) { - char *ns = NULL; - unsigned char op; - unsigned char check_keys; - Py_ssize_t ns_len; - int request_id; - int position; - PyObject* command; - PyObject* docs; - PyObject* ctx = NULL; - PyObject* to_publish = NULL; - PyObject* result = NULL; - codec_options_t options; - buffer_t buffer; - struct module_state *state = GETSTATE(self); - - if (!PyArg_ParseTuple(args, "et#bOObO&O", "utf-8", - &ns, &ns_len, &op, &command, &docs, &check_keys, - convert_codec_options, &options, - &ctx)) { - return NULL; - } - if (!(buffer = buffer_new())) { - PyMem_Free(ns); - destroy_codec_options(&options); - return NULL; - } - /* Save space for message length and request id */ - if ((buffer_save_space(buffer, 8)) == -1) { - goto fail; - } - if (!buffer_write_bytes(buffer, - "\x00\x00\x00\x00" /* responseTo */ - "\xd4\x07\x00\x00", /* opcode */ - 8)) { - goto fail; - } - if (!(to_publish = PyList_New(0))) { - goto fail; - } - - if (!_batched_write_command( - ns, - ns_len, - op, - check_keys, - command, - docs, - ctx, - to_publish, - options, - buffer, - state)) { - goto fail; - } - - request_id = rand(); - position = buffer_get_position(buffer); - buffer_write_int32_at_position(buffer, 0, (int32_t)position); - buffer_write_int32_at_position(buffer, 4, (int32_t)request_id); - result = Py_BuildValue("iy#O", request_id, - buffer_get_buffer(buffer), - (Py_ssize_t)buffer_get_position(buffer), - to_publish); -fail: - PyMem_Free(ns); - destroy_codec_options(&options); - buffer_free(buffer); - Py_XDECREF(to_publish); - return result; -} - static PyMethodDef _CMessageMethods[] = { - {"_insert_message", _cbson_insert_message, METH_VARARGS, - "Create an insert message to be sent to MongoDB"}, - {"_update_message", _cbson_update_message, METH_VARARGS, - "create an update message to be sent to MongoDB"}, {"_query_message", _cbson_query_message, METH_VARARGS, "create a query message to be sent to MongoDB"}, {"_get_more_message", _cbson_get_more_message, METH_VARARGS, "create a get more message to be sent to MongoDB"}, {"_op_msg", _cbson_op_msg, METH_VARARGS, "create an OP_MSG message to be sent to MongoDB"}, - {"_do_batched_insert", _cbson_do_batched_insert, METH_VARARGS, - "insert a batch of documents, splitting the batch as needed"}, - {"_batched_write_command", _cbson_batched_write_command, METH_VARARGS, - "Create the next batched insert, update, or delete command"}, {"_encode_batched_write_command", _cbson_encode_batched_write_command, METH_VARARGS, "Encode the next batched insert, update, or delete command"}, {"_batched_op_msg", _cbson_batched_op_msg, METH_VARARGS, diff --git a/pymongo/aggregation.py b/pymongo/aggregation.py index 812ca23b79..2a34a05d3a 100644 --- a/pymongo/aggregation.py +++ b/pymongo/aggregation.py @@ -88,11 +88,6 @@ def _database(self): """The database against which the aggregation command is run.""" raise NotImplementedError - @staticmethod - def _check_compat(sock_info): - """Check whether the server version in-use supports aggregation.""" - pass - def _process_result(self, result, session, server, sock_info, secondary_ok): if self._result_processor: self._result_processor( @@ -104,9 +99,6 @@ def get_read_preference(self, session): return self._target._read_preference_for(session) def get_cursor(self, session, server, sock_info, secondary_ok): - # Ensure command compatibility. - self._check_compat(sock_info) - # Serialize command. cmd = SON([("aggregate", self._aggregation_target), ("pipeline", self._pipeline)]) @@ -117,8 +109,7 @@ def get_cursor(self, session, server, sock_info, secondary_ok): # - server version is >= 4.2 or # - server version is >= 3.2 and pipeline doesn't use $out if (('readConcern' not in cmd) and - ((sock_info.max_wire_version >= 4 and - not self._performs_write) or + (not self._performs_write or (sock_info.max_wire_version >= 8))): read_concern = self._target.read_concern else: @@ -218,11 +209,3 @@ def _cursor_collection(self, cursor): # aggregate too by defaulting to the .$cmd.aggregate namespace. _, collname = cursor.get("ns", self._cursor_namespace).split(".", 1) return self._database[collname] - - @staticmethod - def _check_compat(sock_info): - # Older server version don't raise a descriptive error, so we raise - # one instead. - if not sock_info.max_wire_version >= 6: - err_msg = "Database.aggregate() is only supported on MongoDB 3.6+." - raise ConfigurationError(err_msg) diff --git a/pymongo/auth.py b/pymongo/auth.py index bfec7ab60e..b946980865 100644 --- a/pymongo/auth.py +++ b/pymongo/auth.py @@ -455,10 +455,6 @@ def _authenticate_x509(credentials, sock_info): return cmd = _X509Context(credentials).speculate_command() - if credentials.username is None and sock_info.max_wire_version < 5: - raise ConfigurationError( - "A username is required for MONGODB-X509 authentication " - "when connected to MongoDB versions older than 3.4.") sock_info.command('$external', cmd) @@ -496,10 +492,8 @@ def _authenticate_default(credentials, sock_info): return _authenticate_scram(credentials, sock_info, 'SCRAM-SHA-256') else: return _authenticate_scram(credentials, sock_info, 'SCRAM-SHA-1') - elif sock_info.max_wire_version >= 3: - return _authenticate_scram(credentials, sock_info, 'SCRAM-SHA-1') else: - return _authenticate_mongo_cr(credentials, sock_info) + return _authenticate_scram(credentials, sock_info, 'SCRAM-SHA-1') _AUTH_MAP = { diff --git a/pymongo/bulk.py b/pymongo/bulk.py index ff3f5974df..0f57309287 100644 --- a/pymongo/bulk.py +++ b/pymongo/bulk.py @@ -35,7 +35,6 @@ InvalidOperation, OperationFailure) from pymongo.message import (_INSERT, _UPDATE, _DELETE, - _do_batched_insert, _randint, _BulkWriteContext, _EncryptedBulkWriteContext) @@ -256,17 +255,6 @@ def gen_unordered(self): def _execute_command(self, generator, write_concern, session, sock_info, op_id, retryable, full_result): - if sock_info.max_wire_version < 5: - if self.uses_collation: - raise ConfigurationError( - 'Must be connected to MongoDB 3.4+ to use a collation.') - if self.uses_hint: - raise ConfigurationError( - 'Must be connected to MongoDB 3.4+ to use hint.') - if sock_info.max_wire_version < 6 and self.uses_array_filters: - raise ConfigurationError( - 'Must be connected to MongoDB 3.6+ to use arrayFilters.') - db_name = self.collection.database.name client = self.collection.database.client listeners = client._event_listeners @@ -283,7 +271,7 @@ def _execute_command(self, generator, write_concern, session, ('ordered', self.ordered)]) if not write_concern.is_server_default: cmd['writeConcern'] = write_concern.document - if self.bypass_doc_val and sock_info.max_wire_version >= 4: + if self.bypass_doc_val: cmd['bypassDocumentValidation'] = True bwc = self.bulk_ctx_class( db_name, cmd, sock_info, op_id, listeners, session, @@ -358,24 +346,6 @@ def retryable_bulk(session, sock_info, retryable): _raise_bulk_write_error(full_result) return full_result - def execute_insert_no_results(self, sock_info, run, op_id, acknowledged): - """Execute insert, returning no results. - """ - command = SON([('insert', self.collection.name), - ('ordered', self.ordered)]) - concern = {'w': int(self.ordered)} - command['writeConcern'] = concern - if self.bypass_doc_val and sock_info.max_wire_version >= 4: - command['bypassDocumentValidation'] = True - db = self.collection.database - bwc = _BulkWriteContext( - db.name, command, sock_info, op_id, db.client._event_listeners, - None, _INSERT, self.collection.codec_options) - # Legacy batched OP_INSERT. - _do_batched_insert( - self.collection.full_name, run.ops, True, acknowledged, concern, - not self.ordered, self.collection.codec_options, bwc) - def execute_op_msg_no_results(self, sock_info, generator): """Execute write commands with OP_MSG and w=0 writeConcern, unordered. """ @@ -441,62 +411,13 @@ def execute_no_results(self, sock_info, generator): raise ConfigurationError( 'hint is unsupported for unacknowledged writes.') # Cannot have both unacknowledged writes and bypass document validation. - if self.bypass_doc_val and sock_info.max_wire_version >= 4: + if self.bypass_doc_val: raise OperationFailure("Cannot set bypass_document_validation with" " unacknowledged write concern") - # OP_MSG - if sock_info.max_wire_version > 5: - if self.ordered: - return self.execute_command_no_results(sock_info, generator) - return self.execute_op_msg_no_results(sock_info, generator) - - coll = self.collection - # If ordered is True we have to send GLE or use write - # commands so we can abort on the first error. - write_concern = WriteConcern(w=int(self.ordered)) - op_id = _randint() - - next_run = next(generator) - while next_run: - # An ordered bulk write needs to send acknowledged writes to short - # circuit the next run. However, the final message on the final - # run can be unacknowledged. - run = next_run - next_run = next(generator, None) - needs_ack = self.ordered and next_run is not None - try: - if run.op_type == _INSERT: - self.execute_insert_no_results( - sock_info, run, op_id, needs_ack) - elif run.op_type == _UPDATE: - for operation in run.ops: - doc = operation['u'] - check_keys = True - if doc and next(iter(doc)).startswith('$'): - check_keys = False - coll._update( - sock_info, - operation['q'], - doc, - upsert=operation['upsert'], - check_keys=check_keys, - multi=operation['multi'], - write_concern=write_concern, - op_id=op_id, - ordered=self.ordered, - bypass_doc_val=self.bypass_doc_val) - else: - for operation in run.ops: - coll._delete(sock_info, - operation['q'], - not operation['limit'], - write_concern, - op_id, - self.ordered) - except OperationFailure: - if self.ordered: - break + if self.ordered: + return self.execute_command_no_results(sock_info, generator) + return self.execute_op_msg_no_results(sock_info, generator) def execute(self, write_concern, session): """Execute operations. diff --git a/pymongo/collection.py b/pymongo/collection.py index a5d2fa5c3e..b30bcf22f9 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -435,59 +435,6 @@ def bulk_write(self, requests, ordered=True, return BulkWriteResult(bulk_api_result, True) return BulkWriteResult({}, False) - def _legacy_write(self, sock_info, name, cmd, op_id, - bypass_doc_val, func, *args): - """Internal legacy unacknowledged write helper.""" - # Cannot have both unacknowledged write and bypass document validation. - if bypass_doc_val and sock_info.max_wire_version >= 4: - raise OperationFailure("Cannot set bypass_document_validation with" - " unacknowledged write concern") - listeners = self.database.client._event_listeners - publish = listeners.enabled_for_commands - - if publish: - start = datetime.datetime.now() - args = args + (sock_info.compression_context,) - rqst_id, msg, max_size = func(*args) - if publish: - duration = datetime.datetime.now() - start - listeners.publish_command_start( - cmd, self.__database.name, rqst_id, sock_info.address, op_id, - sock_info.service_id) - start = datetime.datetime.now() - try: - result = sock_info.legacy_write(rqst_id, msg, max_size, False) - except Exception as exc: - if publish: - dur = (datetime.datetime.now() - start) + duration - if isinstance(exc, OperationFailure): - details = exc.details - # Succeed if GLE was successful and this is a write error. - if details.get("ok") and "n" in details: - reply = message._convert_write_result( - name, cmd, details) - listeners.publish_command_success( - dur, reply, name, rqst_id, sock_info.address, - op_id, sock_info.service_id) - raise - else: - details = message._convert_exception(exc) - listeners.publish_command_failure( - dur, details, name, rqst_id, sock_info.address, op_id, - sock_info.service_id) - raise - if publish: - if result is not None: - reply = message._convert_write_result(name, cmd, result) - else: - # Comply with APM spec. - reply = {'ok': 1} - duration = (datetime.datetime.now() - start) + duration - listeners.publish_command_success( - duration, reply, name, rqst_id, sock_info.address, op_id, - sock_info.service_id) - return result - def _insert_one( self, doc, ordered, check_keys, write_concern, op_id, bypass_doc_val, @@ -502,15 +449,7 @@ def _insert_one( command['writeConcern'] = write_concern.document def _insert_command(session, sock_info, retryable_write): - if not sock_info.op_msg_enabled and not acknowledged: - # Legacy OP_INSERT. - return self._legacy_write( - sock_info, 'insert', command, op_id, - bypass_doc_val, message._insert, self.__full_name, - [doc], check_keys, False, - self.__write_response_codec_options) - - if bypass_doc_val and sock_info.max_wire_version >= 4: + if bypass_doc_val: command['bypassDocumentValidation'] = True result = sock_info.command( @@ -658,28 +597,19 @@ def _update(self, sock_info, criteria, document, upsert=False, ('multi', multi), ('upsert', upsert)]) if collation is not None: - if sock_info.max_wire_version < 5: - raise ConfigurationError( - 'Must be connected to MongoDB 3.4+ to use collations.') - elif not acknowledged: + if not acknowledged: raise ConfigurationError( 'Collation is unsupported for unacknowledged writes.') else: update_doc['collation'] = collation if array_filters is not None: - if sock_info.max_wire_version < 6: - raise ConfigurationError( - 'Must be connected to MongoDB 3.6+ to use array_filters.') - elif not acknowledged: + if not acknowledged: raise ConfigurationError( 'arrayFilters is unsupported for unacknowledged writes.') else: update_doc['arrayFilters'] = array_filters if hint is not None: - if sock_info.max_wire_version < 5: - raise ConfigurationError( - 'Must be connected to MongoDB 3.4+ to use hint.') - elif not acknowledged: + if not acknowledged: raise ConfigurationError( 'hint is unsupported for unacknowledged writes.') if not isinstance(hint, str): @@ -692,16 +622,8 @@ def _update(self, sock_info, criteria, document, upsert=False, if not write_concern.is_server_default: command['writeConcern'] = write_concern.document - if not sock_info.op_msg_enabled and not acknowledged: - # Legacy OP_UPDATE. - return self._legacy_write( - sock_info, 'update', command, op_id, - bypass_doc_val, message._update, self.__full_name, upsert, - multi, criteria, document, check_keys, - self.__write_response_codec_options) - # Update command. - if bypass_doc_val and sock_info.max_wire_version >= 4: + if bypass_doc_val: command['bypassDocumentValidation'] = True # The command result has to be published for APM unmodified @@ -1018,19 +940,13 @@ def _delete( ('limit', int(not multi))]) collation = validate_collation_or_none(collation) if collation is not None: - if sock_info.max_wire_version < 5: - raise ConfigurationError( - 'Must be connected to MongoDB 3.4+ to use collations.') - elif not acknowledged: + if not acknowledged: raise ConfigurationError( 'Collation is unsupported for unacknowledged writes.') else: delete_doc['collation'] = collation if hint is not None: - if sock_info.max_wire_version < 5: - raise ConfigurationError( - 'Must be connected to MongoDB 3.4+ to use hint.') - elif not acknowledged: + if not acknowledged: raise ConfigurationError( 'hint is unsupported for unacknowledged writes.') if not isinstance(hint, str): @@ -1042,13 +958,6 @@ def _delete( if not write_concern.is_server_default: command['writeConcern'] = write_concern.document - if not sock_info.op_msg_enabled and not acknowledged: - # Legacy OP_DELETE. - return self._legacy_write( - sock_info, 'delete', command, op_id, - False, message._delete, self.__full_name, criteria, - self.__write_response_codec_options, - int(not multi)) # Delete command. result = sock_info.command( self.__database.name, @@ -1635,7 +1544,6 @@ def __create_indexes(self, indexes, session, **kwargs): """ names = [] with self._socket_for_writes(session) as sock_info: - supports_collations = sock_info.max_wire_version >= 5 supports_quorum = sock_info.max_wire_version >= 9 def gen_indexes(): @@ -1645,10 +1553,6 @@ def gen_indexes(): "%r is not an instance of " "pymongo.operations.IndexModel" % (index,)) document = index.document - if "collation" in document and not supports_collations: - raise ConfigurationError( - "Must be connected to MongoDB " - "3.4+ to use collations.") names.append(document["name"]) yield document @@ -1880,32 +1784,21 @@ def list_indexes(self, session=None): def _cmd(session, server, sock_info, secondary_ok): cmd = SON([("listIndexes", self.__name), ("cursor", {})]) - if sock_info.max_wire_version > 2: - with self.__database.client._tmp_session(session, False) as s: - try: - cursor = self._command(sock_info, cmd, secondary_ok, - read_pref, - codec_options, - session=s)["cursor"] - except OperationFailure as exc: - # Ignore NamespaceNotFound errors to match the behavior - # of reading from *.system.indexes. - if exc.code != 26: - raise - cursor = {'id': 0, 'firstBatch': []} - cmd_cursor = CommandCursor( - coll, cursor, sock_info.address, session=s, - explicit_session=session is not None) - else: - res = message._first_batch( - sock_info, self.__database.name, "system.indexes", - {"ns": self.__full_name}, 0, secondary_ok, codec_options, - read_pref, cmd, - self.database.client._event_listeners) - cursor = res["cursor"] - # Note that a collection can only have 64 indexes, so there - # will never be a getMore call. - cmd_cursor = CommandCursor(coll, cursor, sock_info.address) + with self.__database.client._tmp_session(session, False) as s: + try: + cursor = self._command(sock_info, cmd, secondary_ok, + read_pref, + codec_options, + session=s)["cursor"] + except OperationFailure as exc: + # Ignore NamespaceNotFound errors to match the behavior + # of reading from *.system.indexes. + if exc.code != 26: + raise + cursor = {'id': 0, 'firstBatch': []} + cmd_cursor = CommandCursor( + coll, cursor, sock_info.address, session=s, + explicit_session=session is not None) cmd_cursor._maybe_pin_connection(sock_info) return cmd_cursor @@ -2356,10 +2249,6 @@ def __find_and_modify(self, filter, projection, sort, upsert=None, def _find_and_modify(session, sock_info, retryable_write): if array_filters is not None: - if sock_info.max_wire_version < 6: - raise ConfigurationError( - 'Must be connected to MongoDB 3.6+ to use ' - 'arrayFilters.') if not write_concern.acknowledged: raise ConfigurationError( 'arrayFilters is unsupported for unacknowledged ' @@ -2373,8 +2262,7 @@ def _find_and_modify(session, sock_info, retryable_write): raise ConfigurationError( 'hint is unsupported for unacknowledged writes.') cmd['hint'] = hint - if (sock_info.max_wire_version >= 4 and - not write_concern.is_server_default): + if not write_concern.is_server_default: cmd['writeConcern'] = write_concern.document out = self._command(sock_info, cmd, read_preference=ReadPreference.PRIMARY, diff --git a/pymongo/database.py b/pymongo/database.py index 35a878322d..c30d29bde4 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -14,25 +14,19 @@ """Database level operations.""" -import warnings - from bson.codec_options import DEFAULT_CODEC_OPTIONS from bson.dbref import DBRef from bson.son import SON -from pymongo import auth, common +from pymongo import common from pymongo.aggregation import _DatabaseAggregationCommand from pymongo.change_stream import DatabaseChangeStream from pymongo.collection import Collection from pymongo.command_cursor import CommandCursor from pymongo.errors import (CollectionInvalid, InvalidName) -from pymongo.message import _first_batch from pymongo.read_preferences import ReadPreference -_INDEX_REGEX = {"name": {"$regex": r"^(?!.*\$)"}} - - def _check_name(name): """Check if a database name is valid. """ @@ -618,37 +612,21 @@ def _list_collections(self, sock_info, secondary_okay, session, coll = self.get_collection( "$cmd", read_preference=read_preference) - if sock_info.max_wire_version > 2: - cmd = SON([("listCollections", 1), - ("cursor", {})]) - cmd.update(kwargs) - with self.__client._tmp_session( - session, close=False) as tmp_session: - cursor = self._command( - sock_info, cmd, secondary_okay, - read_preference=read_preference, - session=tmp_session)["cursor"] - cmd_cursor = CommandCursor( - coll, - cursor, - sock_info.address, - session=tmp_session, - explicit_session=session is not None) - else: - match = _INDEX_REGEX - if "filter" in kwargs: - match = {"$and": [_INDEX_REGEX, kwargs["filter"]]} - dblen = len(self.name.encode("utf8") + b".") - pipeline = [ - {"$project": {"name": {"$substr": ["$name", dblen, -1]}, - "options": 1}}, - {"$match": match} - ] - cmd = SON([("aggregate", "system.namespaces"), - ("pipeline", pipeline), - ("cursor", kwargs.get("cursor", {}))]) - cursor = self._command(sock_info, cmd, secondary_okay)["cursor"] - cmd_cursor = CommandCursor(coll, cursor, sock_info.address) + cmd = SON([("listCollections", 1), + ("cursor", {})]) + cmd.update(kwargs) + with self.__client._tmp_session( + session, close=False) as tmp_session: + cursor = self._command( + sock_info, cmd, secondary_okay, + read_preference=read_preference, + session=tmp_session)["cursor"] + cmd_cursor = CommandCursor( + coll, + cursor, + sock_info.address, + session=tmp_session, + explicit_session=session is not None) cmd_cursor._maybe_pin_connection(sock_info) return cmd_cursor diff --git a/pymongo/helpers.py b/pymongo/helpers.py index 86d1e9f484..37d6f59b74 100644 --- a/pymongo/helpers.py +++ b/pymongo/helpers.py @@ -168,41 +168,6 @@ def _check_command_response(response, max_wire_version, raise OperationFailure(errmsg, code, response, max_wire_version) -def _check_gle_response(result, max_wire_version): - """Return getlasterror response as a dict, or raise OperationFailure.""" - # Did getlasterror itself fail? - _check_command_response(result, max_wire_version) - - if result.get("wtimeout", False): - # MongoDB versions before 1.8.0 return the error message in an "errmsg" - # field. If "errmsg" exists "err" will also exist set to None, so we - # have to check for "errmsg" first. - raise WTimeoutError(result.get("errmsg", result.get("err")), - result.get("code"), - result) - - error_msg = result.get("err", "") - if error_msg is None: - return result - - if error_msg.startswith(HelloCompat.LEGACY_ERROR): - raise NotPrimaryError(error_msg, result) - - details = result - - # mongos returns the error code in an error object for some errors. - if "errObjects" in result: - for errobj in result["errObjects"]: - if errobj.get("err") == error_msg: - details = errobj - break - - code = details.get("code") - if code in (11000, 11001, 12582): - raise DuplicateKeyError(details["err"], code, result) - raise OperationFailure(details["err"], code, result) - - def _raise_last_write_error(write_errors): # If the last batch had multiple errors only report # the last error to emulate continue_on_error. diff --git a/pymongo/message.py b/pymongo/message.py index a30975db9e..8a496d5b1b 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -32,7 +32,6 @@ _decode_selective, _dict_to_bson, _make_c_string) -from bson.codec_options import DEFAULT_CODEC_OPTIONS from bson.int64 import Int64 from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument) @@ -52,7 +51,6 @@ OperationFailure, ProtocolError) from pymongo.hello import HelloCompat -from pymongo.read_concern import DEFAULT_READ_CONCERN from pymongo.read_preferences import ReadPreference from pymongo.write_concern import WriteConcern @@ -271,7 +269,7 @@ def namespace(self): def use_command(self, sock_info): use_find_cmd = False - if sock_info.max_wire_version >= 4 and not self.exhaust: + if not self.exhaust: use_find_cmd = True elif sock_info.max_wire_version >= 8: # OP_MSG supports exhaust on MongoDB 4.2+ @@ -283,18 +281,7 @@ def use_command(self, sock_info): % (self.read_concern.level, sock_info.max_wire_version)) - if sock_info.max_wire_version < 5 and self.collation is not None: - raise ConfigurationError( - 'Specifying a collation is unsupported with a max wire ' - 'version of %d.' % (sock_info.max_wire_version,)) - - if sock_info.max_wire_version < 4 and self.allow_disk_use is not None: - raise ConfigurationError( - 'Specifying allowDiskUse is unsupported with a max wire ' - 'version of %d.' % (sock_info.max_wire_version,)) - sock_info.validate_session(self.client, self.session) - return use_find_cmd def as_command(self, sock_info): @@ -342,24 +329,21 @@ def get_message(self, set_secondary_ok, sock_info, use_cmd=False): if use_cmd: spec = self.as_command(sock_info)[0] - if sock_info.op_msg_enabled: - request_id, msg, size, _ = _op_msg( - 0, spec, self.db, self.read_preference, - set_secondary_ok, False, self.codec_options, - ctx=sock_info.compression_context) - return request_id, msg, size - ns = "%s.%s" % (self.db, "$cmd") - ntoreturn = -1 # All DB commands return 1 document - else: - # OP_QUERY treats ntoreturn of -1 and 1 the same, return - # one document and close the cursor. We have to use 2 for - # batch size if 1 is specified. - ntoreturn = self.batch_size == 1 and 2 or self.batch_size - if self.limit: - if ntoreturn: - ntoreturn = min(self.limit, ntoreturn) - else: - ntoreturn = self.limit + request_id, msg, size, _ = _op_msg( + 0, spec, self.db, self.read_preference, + set_secondary_ok, False, self.codec_options, + ctx=sock_info.compression_context) + return request_id, msg, size + + # OP_QUERY treats ntoreturn of -1 and 1 the same, return + # one document and close the cursor. We have to use 2 for + # batch size if 1 is specified. + ntoreturn = self.batch_size == 1 and 2 or self.batch_size + if self.limit: + if ntoreturn: + ntoreturn = min(self.limit, ntoreturn) + else: + ntoreturn = self.limit if sock_info.is_mongos: spec = _maybe_add_read_preference(spec, @@ -400,7 +384,7 @@ def namespace(self): def use_command(self, sock_info): use_cmd = False - if sock_info.max_wire_version >= 4 and not self.exhaust: + if not self.exhaust: use_cmd = True elif sock_info.max_wire_version >= 8: # OP_MSG supports exhaust on MongoDB 4.2+ @@ -440,19 +424,15 @@ def get_message(self, dummy0, sock_info, use_cmd=False): if use_cmd: spec = self.as_command(sock_info)[0] - if sock_info.op_msg_enabled: - if self.sock_mgr: - flags = _OpMsg.EXHAUST_ALLOWED - else: - flags = 0 - request_id, msg, size, _ = _op_msg( - flags, spec, self.db, None, - False, False, self.codec_options, - ctx=sock_info.compression_context) - return request_id, msg, size - ns = "%s.%s" % (self.db, "$cmd") - return _query(0, ns, 0, -1, spec, None, self.codec_options, - ctx=ctx) + if self.sock_mgr: + flags = _OpMsg.EXHAUST_ALLOWED + else: + flags = 0 + request_id, msg, size, _ = _op_msg( + flags, spec, self.db, None, + False, False, self.codec_options, + ctx=sock_info.compression_context) + return request_id, msg, size return _get_more(ns, self.ntoreturn, self.cursor_id, ctx) @@ -464,7 +444,7 @@ def use_command(self, sock_info): if sock_info.max_wire_version >= 8: # MongoDB 4.2+ supports exhaust over OP_MSG return True - elif sock_info.op_msg_enabled and not self.exhaust: + elif not self.exhaust: return True return False @@ -476,7 +456,7 @@ def use_command(self, sock_info): if sock_info.max_wire_version >= 8: # MongoDB 4.2+ supports exhaust over OP_MSG return True - elif sock_info.op_msg_enabled and not self.exhaust: + elif not self.exhaust: return True return False @@ -528,16 +508,6 @@ def _compress(operation, data, ctx): return request_id, header + compressed -def __last_error(namespace, args): - """Data to send to do a lastError. - """ - cmd = SON([("getlasterror", 1)]) - cmd.update(args) - splitns = namespace.split('.', 1) - return _query(0, splitns[0] + '.$cmd', 0, -1, cmd, - None, DEFAULT_CODEC_OPTIONS) - - _pack_header = struct.Struct(" ctx.max_bson_size) - - message_length += encoded_length - if message_length < ctx.max_message_size and not too_large: - data.write(encoded) - to_send.append(doc) - has_docs = True - continue - - if has_docs: - # We have enough data, send this message. - try: - if compress: - rid, msg = None, data.getvalue() - else: - rid, msg = _insert_message(data.getvalue(), send_safe) - ctx.legacy_bulk_insert( - rid, msg, 0, send_safe, to_send, compress) - # Exception type could be OperationFailure or a subtype - # (e.g. DuplicateKeyError) - except OperationFailure as exc: - # Like it says, continue on error... - if continue_on_error: - # Store exception details to re-raise after the final batch. - last_error = exc - # With unacknowledged writes just return at the first error. - elif not safe: - return - # With acknowledged writes raise immediately. - else: - raise - - if too_large: - _raise_document_too_large( - "insert", encoded_length, ctx.max_bson_size) - - message_length = begin_loc + encoded_length - data.seek(begin_loc) - data.truncate() - data.write(encoded) - to_send = [doc] - - if not has_docs: - raise InvalidOperation("cannot do an empty bulk insert") - - if compress: - request_id, msg = None, data.getvalue() - else: - request_id, msg = _insert_message(data.getvalue(), safe) - ctx.legacy_bulk_insert(request_id, msg, 0, safe, to_send, compress) - - # Re-raise any exception stored due to continue_on_error - if last_error is not None: - raise last_error -if _use_c: - _do_batched_insert = _cmessage._do_batched_insert - # OP_MSG ------------------------------------------------------------- @@ -1335,20 +1066,6 @@ def _do_batched_op_msg( # End OP_MSG ----------------------------------------------------- -def _batched_write_command_compressed( - namespace, operation, command, docs, check_keys, opts, ctx): - """Create the next batched insert, update, or delete command, compressed. - """ - data, to_send = _encode_batched_write_command( - namespace, operation, command, docs, check_keys, opts, ctx) - - request_id, msg = _compress( - 2004, - data, - ctx.sock_info.compression_context) - return request_id, msg, to_send - - def _encode_batched_write_command( namespace, operation, command, docs, check_keys, opts, ctx): """Encode the next batched insert, update, or delete command. @@ -1362,53 +1079,6 @@ def _encode_batched_write_command( _encode_batched_write_command = _cmessage._encode_batched_write_command -def _batched_write_command( - namespace, operation, command, docs, check_keys, opts, ctx): - """Create the next batched insert, update, or delete command. - """ - buf = _BytesIO() - - # Save space for message length and request id - buf.write(_ZERO_64) - # responseTo, opCode - buf.write(b"\x00\x00\x00\x00\xd4\x07\x00\x00") - - # Write OP_QUERY write command - to_send, length = _batched_write_command_impl( - namespace, operation, command, docs, check_keys, opts, ctx, buf) - - # Header - request id and message length - buf.seek(4) - request_id = _randint() - buf.write(_pack_int(request_id)) - buf.seek(0) - buf.write(_pack_int(length)) - - return request_id, buf.getvalue(), to_send -if _use_c: - _batched_write_command = _cmessage._batched_write_command - - -def _do_batched_write_command( - namespace, operation, command, docs, check_keys, opts, ctx): - """Batched write commands entry point.""" - if ctx.sock_info.compression_context: - return _batched_write_command_compressed( - namespace, operation, command, docs, check_keys, opts, ctx) - return _batched_write_command( - namespace, operation, command, docs, check_keys, opts, ctx) - - -def _do_bulk_write_command( - namespace, operation, command, docs, check_keys, opts, ctx): - """Bulk write commands entry point.""" - if ctx.sock_info.max_wire_version > 5: - return _do_batched_op_msg( - namespace, operation, command, docs, check_keys, opts, ctx) - return _do_batched_write_command( - namespace, operation, command, docs, check_keys, opts, ctx) - - def _batched_write_command_impl( namespace, operation, command, docs, check_keys, opts, ctx, buf): """Create a batched OP_QUERY write command.""" @@ -1585,7 +1255,7 @@ def unpack(cls, msg): # PYTHON-945: ignore starting_from field. flags, cursor_id, _, number_returned = cls.UNPACK_FROM(msg) - documents = bytes(msg[20:]) + documents = msg[20:] return cls(flags, cursor_id, number_returned, documents) @@ -1665,7 +1335,7 @@ def unpack(cls, msg): if len(msg) != first_payload_size + 5: raise ProtocolError("Unsupported OP_MSG reply: >1 section") - payload_document = bytes(msg[5:]) + payload_document = msg[5:] return cls(flags, payload_document) @@ -1673,63 +1343,3 @@ def unpack(cls, msg): _OpReply.OP_CODE: _OpReply.unpack, _OpMsg.OP_CODE: _OpMsg.unpack, } - - -def _first_batch(sock_info, db, coll, query, ntoreturn, - secondary_ok, codec_options, read_preference, cmd, listeners): - """Simple query helper for retrieving a first (and possibly only) batch.""" - query = _Query( - 0, db, coll, 0, query, None, codec_options, - read_preference, ntoreturn, 0, DEFAULT_READ_CONCERN, None, None, - None, None, False) - - name = next(iter(cmd)) - publish = listeners.enabled_for_commands - if publish: - start = datetime.datetime.now() - - request_id, msg, max_doc_size = query.get_message(secondary_ok, sock_info) - - if publish: - encoding_duration = datetime.datetime.now() - start - listeners.publish_command_start( - cmd, db, request_id, sock_info.address, - service_id=sock_info.service_id) - start = datetime.datetime.now() - - sock_info.send_message(msg, max_doc_size) - reply = sock_info.receive_message(request_id) - try: - docs = reply.unpack_response(None, codec_options) - except Exception as exc: - if publish: - duration = (datetime.datetime.now() - start) + encoding_duration - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure = exc.details - else: - failure = _convert_exception(exc) - listeners.publish_command_failure( - duration, failure, name, request_id, sock_info.address, - service_id=sock_info.service_id) - raise - # listIndexes - if 'cursor' in cmd: - result = { - 'cursor': { - 'firstBatch': docs, - 'id': reply.cursor_id, - 'ns': '%s.%s' % (db, coll) - }, - 'ok': 1.0 - } - # fsyncUnlock, currentOp - else: - result = docs[0] if docs else {} - result['ok'] = 1.0 - if publish: - duration = (datetime.datetime.now() - start) + encoding_duration - listeners.publish_command_success( - duration, result, name, request_id, sock_info.address, - service_id=sock_info.service_id) - - return result diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index cce5f14d04..db56187b77 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -32,9 +32,7 @@ """ import contextlib -import datetime import threading -import warnings import weakref from collections import defaultdict @@ -62,8 +60,7 @@ ServerSelectionTimeoutError) from pymongo.pool import ConnectionClosedReason from pymongo.read_preferences import ReadPreference -from pymongo.server_selectors import (writable_preferred_server_selector, - writable_server_selector) +from pymongo.server_selectors import writable_server_selector from pymongo.server_type import SERVER_TYPE from pymongo.topology import (Topology, _ErrorContext) @@ -1377,8 +1374,6 @@ def _retryable_read(self, func, read_pref, session, address=None, try: server = self._select_server( read_pref, session, address=address) - if not server.description.retryable_reads_supported: - retryable = False with self._secondaryok_for_server(read_pref, server, session) as ( sock_info, secondary_ok): if retrying and not retryable: @@ -1561,51 +1556,10 @@ def _kill_cursors(self, cursor_ids, address, topology, session): self._kill_cursor_impl(cursor_ids, address, session, sock_info) def _kill_cursor_impl(self, cursor_ids, address, session, sock_info): - listeners = self._event_listeners - publish = listeners.enabled_for_commands - - try: - namespace = address.namespace - db, coll = namespace.split('.', 1) - except AttributeError: - namespace = None - db = coll = "OP_KILL_CURSORS" + namespace = address.namespace + db, coll = namespace.split('.', 1) spec = SON([('killCursors', coll), ('cursors', cursor_ids)]) - if sock_info.max_wire_version >= 4 and namespace is not None: - sock_info.command(db, spec, session=session, client=self) - else: - if publish: - start = datetime.datetime.now() - request_id, msg = message._kill_cursors(cursor_ids) - if publish: - duration = datetime.datetime.now() - start - # Here and below, address could be a tuple or - # _CursorAddress. We always want to publish a - # tuple to match the rest of the monitoring - # API. - listeners.publish_command_start( - spec, db, request_id, tuple(address), - service_id=sock_info.service_id) - start = datetime.datetime.now() - - try: - sock_info.send_message(msg, 0) - except Exception as exc: - if publish: - dur = ((datetime.datetime.now() - start) + duration) - listeners.publish_command_failure( - dur, message._convert_exception(exc), - 'killCursors', request_id, - tuple(address), service_id=sock_info.service_id) - raise - - if publish: - duration = ((datetime.datetime.now() - start) + duration) - # OP_KILL_CURSORS returns no reply, fake one. - reply = {'cursorsUnknown': cursor_ids, 'ok': 1} - listeners.publish_command_success( - duration, reply, 'killCursors', request_id, - tuple(address), service_id=sock_info.service_id) + sock_info.command(db, spec, session=session, client=self) def _process_kill_cursors(self): """Process any pending kill cursors requests.""" diff --git a/pymongo/pool.py b/pymongo/pool.py index 52c57c7f07..e85dbeae3b 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -580,7 +580,7 @@ def _hello(self, cluster_time, topology_version, self.sock.settimeout( self.opts.connect_timeout + heartbeat_frequency) - if self.max_wire_version >= 6 and cluster_time is not None: + if not performing_handshake and cluster_time is not None: cmd['$clusterTime'] = cluster_time # XXX: Simplify in PyMongo 4.0 when all_credentials is always a single @@ -615,7 +615,7 @@ def _hello(self, cluster_time, topology_version, hello.compressors) self.compression_context = ctx - self.op_msg_enabled = hello.max_wire_version >= 6 + self.op_msg_enabled = True if creds: self.negotiated_mechanisms[creds] = hello.sasl_supported_mechs if auth_ctx: @@ -687,23 +687,13 @@ def command(self, dbname, spec, secondary_ok=False, if not isinstance(spec, ORDERED_TYPES): spec = SON(spec) - if (read_concern and self.max_wire_version < 4 - and not read_concern.ok_for_legacy): - raise ConfigurationError( - 'read concern level of %s is not valid ' - 'with a max wire version of %d.' - % (read_concern.level, self.max_wire_version)) if not (write_concern is None or write_concern.acknowledged or collation is None): raise ConfigurationError( 'Collation is unsupported for unacknowledged writes.') - if (self.max_wire_version >= 5 and - write_concern and + if (write_concern and not write_concern.is_server_default): spec['writeConcern'] = write_concern.document - elif self.max_wire_version < 5 and collation is not None: - raise ConfigurationError( - 'Must be connected to MongoDB 3.4+ to use a collation.') self.add_server_api(spec) if session: @@ -769,25 +759,17 @@ def _raise_if_not_writable(self, unacknowledged): raise NotPrimaryError("not primary", { "ok": 0, "errmsg": "not primary", "code": 10107}) - def legacy_write(self, request_id, msg, max_doc_size, with_last_error): - """Send OP_INSERT, etc., optionally returning response as a dict. + def unack_write(self, msg, max_doc_size): + """Send unack OP_MSG. - Can raise ConnectionFailure or OperationFailure. + Can raise ConnectionFailure or InvalidDocument. :Parameters: - - `request_id`: an int. - - `msg`: bytes, an OP_INSERT, OP_UPDATE, or OP_DELETE message, - perhaps with a getlasterror command appended. + - `msg`: bytes, an OP_MSG message. - `max_doc_size`: size in bytes of the largest document in `msg`. - - `with_last_error`: True if a getlasterror command is appended. """ - self._raise_if_not_writable(not with_last_error) - + self._raise_if_not_writable(True) self.send_message(msg, max_doc_size) - if with_last_error: - reply = self.receive_message(request_id) - return helpers._check_gle_response(reply.command_response(), - self.max_wire_version) def write_command(self, request_id, msg): """Send "insert" etc. command, returning response as a dict. @@ -881,7 +863,7 @@ def socket_closed(self): def send_cluster_time(self, command, session, client): """Add cluster time for MongoDB >= 3.6.""" - if self.max_wire_version >= 6 and client: + if client: client._send_cluster_time(command, session) def add_server_api(self, command): diff --git a/test/__init__.py b/test/__init__.py index 388caf715e..c70e854d1f 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -605,10 +605,9 @@ def require_version_max(self, *ver): def require_auth(self, func): """Run a test only if the server is running with auth enabled.""" - return self.check_auth_with_sharding( - self._require(lambda: self.auth_enabled, - "Authentication is not enabled on the server", - func=func)) + return self._require(lambda: self.auth_enabled, + "Authentication is not enabled on the server", + func=func) def require_no_auth(self, func): """Run a test only if the server is running without auth enabled.""" @@ -706,14 +705,6 @@ def require_no_load_balancer(self, func): "Must not be connected to a load balancer", func=func) - def check_auth_with_sharding(self, func): - """Skip a test when connected to mongos < 2.0 and running with auth.""" - condition = lambda: not (self.auth_enabled and - self.is_mongos and self.version < (2,)) - return self._require(condition, - "Auth with sharding requires MongoDB >= 2.0.0", - func=func) - def is_topology_type(self, topologies): unknown = set(topologies) - {'single', 'replicaset', 'sharded', 'sharded-replicaset', 'load-balanced'} @@ -818,9 +809,7 @@ def supports_retryable_writes(self): return False if not self.sessions_enabled: return False - if self.version.at_least(3, 6): - return self.is_mongos or self.is_rs - return False + return self.is_mongos or self.is_rs def require_retryable_writes(self, func): """Run a test only if the deployment supports retryable writes.""" diff --git a/test/test_auth.py b/test/test_auth.py index 76b320fcb9..6504ce5777 100644 --- a/test/test_auth.py +++ b/test/test_auth.py @@ -307,17 +307,8 @@ def auth_string(user, password): class TestSCRAMSHA1(IntegrationTest): @client_context.require_auth - @client_context.require_version_min(2, 7, 2) def setUp(self): super(TestSCRAMSHA1, self).setUp() - # Before 2.7.7, SCRAM-SHA-1 had to be enabled from the command line. - if client_context.version < Version(2, 7, 7): - cmd_line = client_context.cmd_line - if 'SCRAM-SHA-1' not in cmd_line.get( - 'parsed', {}).get('setParameter', - {}).get('authenticationMechanisms', ''): - raise SkipTest('SCRAM-SHA-1 mechanism not enabled') - client_context.create_user( 'pymongo_test', 'user', 'pass', roles=['userAdmin', 'readWrite']) diff --git a/test/test_bulk.py b/test/test_bulk.py index f0cb52912e..aa2dcab928 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -164,18 +164,6 @@ def test_update_many(self): def test_update_many_pipeline(self): self._test_update_many([{'$set': {'foo': 'bar'}}]) - @client_context.require_version_max(3, 5, 5) - def test_array_filters_unsupported(self): - requests = [ - UpdateMany( - {}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}]), - UpdateOne( - {}, {'$set': {"y.$[i].b": 2}}, array_filters=[{'i.b': 3}]) - ] - for bulk_op in requests: - self.assertRaises( - ConfigurationError, self.coll.bulk_write, [bulk_op]) - def test_array_filters_validation(self): self.assertRaises(TypeError, UpdateMany, {}, {}, array_filters={}) self.assertRaises(TypeError, UpdateOne, {}, {}, array_filters={}) @@ -307,7 +295,6 @@ def test_numerous_inserts(self): self.assertEqual(n_docs, result.inserted_count) self.assertEqual(n_docs, self.coll.count_documents({})) - @client_context.require_version_min(3, 6) def test_bulk_max_message_size(self): self.coll.delete_many({}) self.addCleanup(self.coll.delete_many, {}) @@ -781,38 +768,29 @@ def setUpClass(cls): cls.secondary = single_client(*partition_node(member)) break - # We tested wtimeout errors by specifying a write concern greater than - # the number of members, but in MongoDB 2.7.8+ this causes a different - # sort of error, "Not enough data-bearing nodes". In recent servers we - # use a failpoint to pause replication on a secondary. - cls.need_replication_stopped = client_context.version.at_least(2, 7, 8) - @classmethod def tearDownClass(cls): if cls.secondary: cls.secondary.close() def cause_wtimeout(self, requests, ordered): - if self.need_replication_stopped: - if not client_context.test_commands_enabled: - self.skipTest("Test commands must be enabled.") + if not client_context.test_commands_enabled: + self.skipTest("Test commands must be enabled.") - self.secondary.admin.command('configureFailPoint', - 'rsSyncApplyStop', - mode='alwaysOn') - - try: - coll = self.coll.with_options( - write_concern=WriteConcern(w=self.w, wtimeout=1)) - return coll.bulk_write(requests, ordered=ordered) - finally: - self.secondary.admin.command('configureFailPoint', - 'rsSyncApplyStop', - mode='off') - else: + # Use the rsSyncApplyStop failpoint to pause replication on a + # secondary which will cause a wtimeout error. + self.secondary.admin.command('configureFailPoint', + 'rsSyncApplyStop', + mode='alwaysOn') + + try: coll = self.coll.with_options( - write_concern=WriteConcern(w=self.w + 1, wtimeout=1)) + write_concern=WriteConcern(w=self.w, wtimeout=1)) return coll.bulk_write(requests, ordered=ordered) + finally: + self.secondary.admin.command('configureFailPoint', + 'rsSyncApplyStop', + mode='off') @client_context.require_replica_set @client_context.require_secondaries_count(1) diff --git a/test/test_client.py b/test/test_client.py index 8ac1f86be9..d69025c8b0 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -729,17 +729,15 @@ def test_list_databases(self): for doc in client.list_databases(): self.assertIs(type(doc), dict) - if client_context.version.at_least(3, 4, 2): - self.client.pymongo_test.test.insert_one({}) - cursor = self.client.list_databases(filter={"name": "admin"}) - docs = list(cursor) - self.assertEqual(1, len(docs)) - self.assertEqual(docs[0]["name"], "admin") - - if client_context.version.at_least(3, 4, 3): - cursor = self.client.list_databases(nameOnly=True) - for doc in cursor: - self.assertEqual(["name"], list(doc)) + self.client.pymongo_test.test.insert_one({}) + cursor = self.client.list_databases(filter={"name": "admin"}) + docs = list(cursor) + self.assertEqual(1, len(docs)) + self.assertEqual(docs[0]["name"], "admin") + + cursor = self.client.list_databases(nameOnly=True) + for doc in cursor: + self.assertEqual(["name"], list(doc)) def test_list_database_names(self): self.client.pymongo_test.test.insert_one({"dummy": "object"}) @@ -763,15 +761,12 @@ def test_drop_database(self): self.assertIn("pymongo_test2", dbs) self.client.drop_database("pymongo_test") - if client_context.version.at_least(3, 3, 9) and client_context.is_rs: + if client_context.is_rs: wc_client = rs_or_single_client(w=len(client_context.nodes) + 1) with self.assertRaises(WriteConcernError): wc_client.drop_database('pymongo_test2') self.client.drop_database(self.client.pymongo_test2) - - raise SkipTest("This test often fails due to SERVER-2329") - dbs = self.client.list_database_names() self.assertNotIn("pymongo_test", dbs) self.assertNotIn("pymongo_test2", dbs) diff --git a/test/test_collation.py b/test/test_collation.py index d352fccc17..f0139b4a22 100644 --- a/test/test_collation.py +++ b/test/test_collation.py @@ -77,17 +77,6 @@ def test_constructor(self): }, Collation('en_US', backwards=True).document) -def raisesConfigurationErrorForOldMongoDB(func): - @functools.wraps(func) - def wrapper(self, *args, **kwargs): - if client_context.version.at_least(3, 3, 9): - return func(self, *args, **kwargs) - else: - with self.assertRaises(ConfigurationError): - return func(self, *args, **kwargs) - return wrapper - - class TestCollation(IntegrationTest): @classmethod @client_context.require_connection @@ -120,7 +109,6 @@ def assertCollationInLastCommand(self): self.collation.document, self.last_command_started()['collation']) - @raisesConfigurationErrorForOldMongoDB def test_create_collection(self): self.db.test.drop() self.db.create_collection('test', collation=self.collation) @@ -136,7 +124,6 @@ def test_index_model(self): model = IndexModel([('a', 1), ('b', -1)], collation=self.collation) self.assertEqual(self.collation.document, model.document['collation']) - @raisesConfigurationErrorForOldMongoDB def test_create_index(self): self.db.test.create_index('foo', collation=self.collation) ci_cmd = self.listener.results['started'][0].command @@ -144,18 +131,15 @@ def test_create_index(self): self.collation.document, ci_cmd['indexes'][0]['collation']) - @raisesConfigurationErrorForOldMongoDB def test_aggregate(self): self.db.test.aggregate([{'$group': {'_id': 42}}], collation=self.collation) self.assertCollationInLastCommand() - @raisesConfigurationErrorForOldMongoDB def test_count_documents(self): self.db.test.count_documents({}, collation=self.collation) self.assertCollationInLastCommand() - @raisesConfigurationErrorForOldMongoDB def test_distinct(self): self.db.test.distinct('foo', collation=self.collation) self.assertCollationInLastCommand() @@ -164,14 +148,12 @@ def test_distinct(self): self.db.test.find(collation=self.collation).distinct('foo') self.assertCollationInLastCommand() - @raisesConfigurationErrorForOldMongoDB def test_find_command(self): self.db.test.insert_one({'is this thing on?': True}) self.listener.results.clear() next(self.db.test.find(collation=self.collation)) self.assertCollationInLastCommand() - @raisesConfigurationErrorForOldMongoDB def test_explain_command(self): self.listener.results.clear() self.db.test.find(collation=self.collation).explain() @@ -180,7 +162,6 @@ def test_explain_command(self): self.collation.document, self.last_command_started()['explain']['collation']) - @raisesConfigurationErrorForOldMongoDB def test_delete(self): self.db.test.delete_one({'foo': 42}, collation=self.collation) command = self.listener.results['started'][0].command @@ -195,7 +176,6 @@ def test_delete(self): self.collation.document, command['deletes'][0]['collation']) - @raisesConfigurationErrorForOldMongoDB def test_update(self): self.db.test.replace_one({'foo': 42}, {'foo': 43}, collation=self.collation) @@ -220,7 +200,6 @@ def test_update(self): self.collation.document, command['updates'][0]['collation']) - @raisesConfigurationErrorForOldMongoDB def test_find_and(self): self.db.test.find_one_and_delete({'foo': 42}, collation=self.collation) self.assertCollationInLastCommand() @@ -235,7 +214,6 @@ def test_find_and(self): collation=self.collation) self.assertCollationInLastCommand() - @raisesConfigurationErrorForOldMongoDB def test_bulk_write(self): self.db.test.collection.bulk_write([ DeleteOne({'noCollation': 42}), @@ -266,7 +244,6 @@ def check_ops(ops): check_ops(delete_cmd['deletes']) check_ops(update_cmd['updates']) - @raisesConfigurationErrorForOldMongoDB def test_indexes_same_keys_different_collations(self): self.db.test.drop() usa_collation = Collation('en_US') @@ -302,7 +279,6 @@ def test_unacknowledged_write(self): with self.assertRaises(ConfigurationError): collection.bulk_write([update_one]) - @raisesConfigurationErrorForOldMongoDB def test_cursor_collation(self): self.db.test.insert_one({'hello': 'world'}) next(self.db.test.find().collation(self.collation)) diff --git a/test/test_collection.py b/test/test_collection.py index 26c616a4dd..a7d56bf7b6 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -134,7 +134,7 @@ def tearDown(self): @contextlib.contextmanager def write_concern_collection(self): - if client_context.version.at_least(3, 3, 9) and client_context.is_rs: + if client_context.is_rs: with self.assertRaises(WriteConcernError): # Unsatisfiable write concern. yield Collection( @@ -153,7 +153,6 @@ def test_equality(self): def test_hashable(self): self.assertIn(self.db.test.mike, {self.db["test.mike"]}) - @client_context.require_version_min(3, 3, 9) def test_create(self): # No Exception. db = client_context.client.pymongo_test @@ -322,9 +321,6 @@ def test_drop_index(self): @client_context.require_no_mongos @client_context.require_test_commands def test_index_management_max_time_ms(self): - if (client_context.version[:2] == (3, 4) and - client_context.version[2] < 4): - raise unittest.SkipTest("SERVER-27711") coll = self.db.test self.client.admin.command("configureFailPoint", "maxTimeAlwaysTimeOut", @@ -531,21 +527,6 @@ def _drop_dups_setup(self, db): db.test.insert_one({'i': 2}) # duplicate db.test.insert_one({'i': 3}) - @client_context.require_version_max(2, 6) - def test_index_drop_dups(self): - # Try dropping duplicates - db = self.db - self._drop_dups_setup(db) - - # No error, just drop the duplicate - db.test.create_index([('i', ASCENDING)], unique=True, dropDups=True) - - # Duplicate was dropped - self.assertEqual(3, db.test.count_documents({})) - - # Index was created, plus the index on _id - self.assertEqual(2, len(db.test.index_information())) - def test_index_dont_drop_dups(self): # Try *not* dropping duplicates db = self.db @@ -587,7 +568,6 @@ def get_plan_stage(self, root, stage): return stage return {} - @client_context.require_version_min(3, 1, 9, -1) def test_index_filter(self): db = self.db db.drop_collection("test") @@ -901,10 +881,8 @@ def test_write_large_document(self): unack_coll = self.db.test.with_options(write_concern=WriteConcern(w=0)) self.assertRaises(DocumentTooLarge, unack_coll.replace_one, {"bar": "x"}, {"bar": "x" * (max_size - 14)}) - # This will pass with OP_UPDATE or the update command. self.db.test.replace_one({"bar": "x"}, {"bar": "x" * (max_size - 32)}) - @client_context.require_version_min(3, 1, 9, -1) def test_insert_bypass_document_validation(self): db = self.db db.test.drop() @@ -923,14 +901,9 @@ def test_insert_bypass_document_validation(self): self.assertTrue(isinstance(result, InsertOneResult)) self.assertEqual(2, result.inserted_id) - if client_context.version < (3, 6): - # Uses OP_INSERT which does not support bypass_document_validation. - self.assertRaises(OperationFailure, db_w0.test.insert_one, - {"y": 1}, bypass_document_validation=True) - else: - db_w0.test.insert_one({"y": 1}, bypass_document_validation=True) - wait_until(lambda: db_w0.test.find_one({"y": 1}), - "find w:0 inserted document") + db_w0.test.insert_one({"y": 1}, bypass_document_validation=True) + wait_until(lambda: db_w0.test.find_one({"y": 1}), + "find w:0 inserted document") # Test insert_many docs = [{"_id": i, "x": 100 - i} for i in range(3, 100)] @@ -959,7 +932,6 @@ def test_insert_bypass_document_validation(self): [{"x": 1}, {"x": 2}], bypass_document_validation=True) - @client_context.require_version_min(3, 1, 9, -1) def test_replace_bypass_document_validation(self): db = self.db db.test.drop() @@ -997,18 +969,11 @@ def test_replace_bypass_document_validation(self): self.assertEqual(1, db.test.count_documents({"a": 103})) db.test.insert_one({"y": 1}, bypass_document_validation=True) - if client_context.version < (3, 6): - # Uses OP_UPDATE which does not support bypass_document_validation. - self.assertRaises(OperationFailure, db_w0.test.replace_one, - {"y": 1}, {"x": 1}, - bypass_document_validation=True) - else: - db_w0.test.replace_one({"y": 1}, {"x": 1}, - bypass_document_validation=True) - wait_until(lambda: db_w0.test.find_one({"x": 1}), - "find w:0 replaced document") + db_w0.test.replace_one({"y": 1}, {"x": 1}, + bypass_document_validation=True) + wait_until(lambda: db_w0.test.find_one({"x": 1}), + "find w:0 replaced document") - @client_context.require_version_min(3, 1, 9, -1) def test_update_bypass_document_validation(self): db = self.db db.test.drop() @@ -1048,16 +1013,10 @@ def test_update_bypass_document_validation(self): self.assertEqual(1, db.test.count_documents({"z": 0})) db.test.insert_one({"y": 1, "x": 0}, bypass_document_validation=True) - if client_context.version < (3, 6): - # Uses OP_UPDATE which does not support bypass_document_validation. - self.assertRaises(OperationFailure, db_w0.test.update_one, - {"y": 1}, {"$inc": {"x": 1}}, + db_w0.test.update_one({"y": 1}, {"$inc": {"x": 1}}, bypass_document_validation=True) - else: - db_w0.test.update_one({"y": 1}, {"$inc": {"x": 1}}, - bypass_document_validation=True) - wait_until(lambda: db_w0.test.find_one({"y": 1, "x": 1}), - "find w:0 updated document") + wait_until(lambda: db_w0.test.find_one({"y": 1, "x": 1}), + "find w:0 updated document") # Test update_many db.test.insert_many([{"z": i} for i in range(3, 101)]) @@ -1094,19 +1053,12 @@ def test_update_bypass_document_validation(self): db.test.insert_one({"m": 1, "x": 0}, bypass_document_validation=True) db.test.insert_one({"m": 1, "x": 0}, bypass_document_validation=True) - if client_context.version < (3, 6): - # Uses OP_UPDATE which does not support bypass_document_validation. - self.assertRaises(OperationFailure, db_w0.test.update_many, - {"m": 1}, {"$inc": {"x": 1}}, - bypass_document_validation=True) - else: - db_w0.test.update_many({"m": 1}, {"$inc": {"x": 1}}, - bypass_document_validation=True) - wait_until( - lambda: db_w0.test.count_documents({"m": 1, "x": 1}) == 2, - "find w:0 updated documents") + db_w0.test.update_many({"m": 1}, {"$inc": {"x": 1}}, + bypass_document_validation=True) + wait_until( + lambda: db_w0.test.count_documents({"m": 1, "x": 1}) == 2, + "find w:0 updated documents") - @client_context.require_version_min(3, 1, 9, -1) def test_bypass_document_validation_bulk_write(self): db = self.db db.test.drop() @@ -1499,24 +1451,6 @@ def test_update_many(self): self.assertRaises(InvalidOperation, lambda: result.upserted_id) self.assertFalse(result.acknowledged) - # MongoDB >= 3.5.8 allows dotted fields in updates - @client_context.require_version_max(3, 5, 7) - def test_update_with_invalid_keys(self): - self.db.drop_collection("test") - self.assertTrue(self.db.test.insert_one({"hello": "world"})) - doc = self.db.test.find_one() - doc['a.b'] = 'c' - - # Replace - self.assertRaises(OperationFailure, self.db.test.replace_one, - {"hello": "world"}, doc) - # Upsert - self.assertRaises(OperationFailure, self.db.test.replace_one, - {"foo": "bar"}, doc, upsert=True) - - # Check that the last two ops didn't actually modify anything - self.assertTrue('a.b' not in self.db.test.find_one()) - def test_update_check_keys(self): self.db.drop_collection("test") self.assertTrue(self.db.test.insert_one({"hello": "world"})) @@ -2041,19 +1975,6 @@ def __getattr__(self, name): c.insert_one({'bad': bad}) self.assertEqual('bar', c.find_one()['bad']['foo']) - @client_context.require_version_max(3, 5, 5) - def test_array_filters_unsupported(self): - c = self.db.test - with self.assertRaises(ConfigurationError): - c.update_one( - {}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}]) - with self.assertRaises(ConfigurationError): - c.update_many( - {}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}]) - with self.assertRaises(ConfigurationError): - c.find_one_and_update( - {}, {'$set': {'y.$[i].b': 5}}, array_filters=[{'i.b': 1}]) - def test_array_filters_validation(self): # array_filters must be a list. c = self.db.test @@ -2136,54 +2057,40 @@ def test_find_one_and_write_concern(self): # Authenticate the client and throw out auth commands from the listener. db.command('ping') results.clear() - if client_context.version.at_least(3, 1, 9, -1): - c_w0.find_one_and_update( - {'_id': 1}, {'$set': {'foo': 'bar'}}) - self.assertEqual( - {'w': 0}, results['started'][0].command['writeConcern']) - results.clear() + c_w0.find_one_and_update( + {'_id': 1}, {'$set': {'foo': 'bar'}}) + self.assertEqual( + {'w': 0}, results['started'][0].command['writeConcern']) + results.clear() - c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'}) - self.assertEqual( - {'w': 0}, results['started'][0].command['writeConcern']) - results.clear() + c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'}) + self.assertEqual( + {'w': 0}, results['started'][0].command['writeConcern']) + results.clear() - c_w0.find_one_and_delete({'_id': 1}) - self.assertEqual( - {'w': 0}, results['started'][0].command['writeConcern']) - results.clear() + c_w0.find_one_and_delete({'_id': 1}) + self.assertEqual( + {'w': 0}, results['started'][0].command['writeConcern']) + results.clear() - # Test write concern errors. - if client_context.is_rs: - c_wc_error = db.get_collection( - 'test', - write_concern=WriteConcern( - w=len(client_context.nodes) + 1)) - self.assertRaises( - WriteConcernError, - c_wc_error.find_one_and_update, - {'_id': 1}, {'$set': {'foo': 'bar'}}) - self.assertRaises( - WriteConcernError, - c_wc_error.find_one_and_replace, - {'w': 0}, results['started'][0].command['writeConcern']) - self.assertRaises( - WriteConcernError, - c_wc_error.find_one_and_delete, - {'w': 0}, results['started'][0].command['writeConcern']) - results.clear() - else: - c_w0.find_one_and_update( + # Test write concern errors. + if client_context.is_rs: + c_wc_error = db.get_collection( + 'test', + write_concern=WriteConcern( + w=len(client_context.nodes) + 1)) + self.assertRaises( + WriteConcernError, + c_wc_error.find_one_and_update, {'_id': 1}, {'$set': {'foo': 'bar'}}) - self.assertNotIn('writeConcern', results['started'][0].command) - results.clear() - - c_w0.find_one_and_replace({'_id': 1}, {'foo': 'bar'}) - self.assertNotIn('writeConcern', results['started'][0].command) - results.clear() - - c_w0.find_one_and_delete({'_id': 1}) - self.assertNotIn('writeConcern', results['started'][0].command) + self.assertRaises( + WriteConcernError, + c_wc_error.find_one_and_replace, + {'w': 0}, results['started'][0].command['writeConcern']) + self.assertRaises( + WriteConcernError, + c_wc_error.find_one_and_delete, + {'w': 0}, results['started'][0].command['writeConcern']) results.clear() c_default.find_one_and_update({'_id': 1}, {'$set': {'foo': 'bar'}}) diff --git a/test/test_command_monitoring_legacy.py b/test/test_command_monitoring_legacy.py index be8168edd7..7ff80d75e5 100644 --- a/test/test_command_monitoring_legacy.py +++ b/test/test_command_monitoring_legacy.py @@ -122,17 +122,6 @@ def run_scenario(self): tuple(coll.find(**args)) except OperationFailure: pass - # Wait for the killCursors thread to run if necessary. - if 'limit' in args and client_context.version[:2] < (3, 1): - self.client._kill_cursors_executor.wake() - started = self.listener.results['started'] - succeeded = self.listener.results['succeeded'] - wait_until( - lambda: started[-1].command_name == 'killCursors', - "publish a start event for killCursors.") - wait_until( - lambda: succeeded[-1].command_name == 'killCursors', - "publish a succeeded event for killCursors.") else: try: getattr(coll, name)(**args) diff --git a/test/test_cursor.py b/test/test_cursor.py index 09020a6de7..d56f9fc27d 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -198,7 +198,6 @@ def test_max_time_ms(self): "maxTimeAlwaysTimeOut", mode="off") - @client_context.require_version_min(3, 1, 9, -1) def test_max_await_time_ms(self): db = self.db db.pymongo_test.drop() @@ -573,12 +572,8 @@ def cursor_count(cursor, expected_count): cur = db.test.find().batch_size(1) next(cur) - if client_context.version.at_least(3, 1, 9): - # find command batchSize should be 1 - self.assertEqual(0, len(cur._Cursor__data)) - else: - # OP_QUERY ntoreturn should be 2 - self.assertEqual(1, len(cur._Cursor__data)) + # find command batchSize should be 1 + self.assertEqual(0, len(cur._Cursor__data)) next(cur) self.assertEqual(0, len(cur._Cursor__data)) next(cur) @@ -1169,27 +1164,19 @@ def test_with_statement(self): @client_context.require_no_mongos def test_comment(self): - # MongoDB 3.1.5 changed the ns for commands. - regex = {'$regex': r'pymongo_test.(\$cmd|test)'} - - if client_context.version.at_least(3, 5, 8, -1): - query_key = "command.comment" - elif client_context.version.at_least(3, 1, 8, -1): - query_key = "query.comment" - else: - query_key = "query.$comment" - self.client.drop_database(self.db) self.db.command('profile', 2) # Profile ALL commands. try: list(self.db.test.find().comment('foo')) count = self.db.system.profile.count_documents( - {'ns': 'pymongo_test.test', 'op': 'query', query_key: 'foo'}) + {'ns': 'pymongo_test.test', 'op': 'query', + 'command.comment': 'foo'}) self.assertEqual(count, 1) self.db.test.find().comment('foo').distinct('type') count = self.db.system.profile.count_documents( - {'ns': regex, 'op': 'command', 'command.distinct': 'test', + {'ns': 'pymongo_test.test', 'op': 'command', + 'command.distinct': 'test', 'command.comment': 'foo'}) self.assertEqual(count, 1) finally: @@ -1266,7 +1253,6 @@ def test_delete_not_initialized(self): cursor = Cursor.__new__(Cursor) # Skip calling __init__ cursor.__del__() # no error - @client_context.require_version_min(3, 6) def test_getMore_does_not_send_readPreference(self): listener = AllowListEventListener('find', 'getMore') client = rs_or_single_client( @@ -1408,16 +1394,9 @@ def test_get_item(self): with self.assertRaises(InvalidOperation): self.db.test.find_raw_batches()[0] - @client_context.require_version_min(3, 4) def test_collation(self): next(self.db.test.find_raw_batches(collation=Collation('en_US'))) - @client_context.require_version_max(3, 2) - def test_collation_error(self): - with self.assertRaises(ConfigurationError): - next(self.db.test.find_raw_batches(collation=Collation('en_US'))) - - @client_context.require_version_min(3, 2) @client_context.require_no_mmap # MMAPv1 does not support read concern def test_read_concern(self): self.db.get_collection( @@ -1425,12 +1404,6 @@ def test_read_concern(self): c = self.db.get_collection("test", read_concern=ReadConcern("majority")) next(c.find_raw_batches()) - @client_context.require_version_max(3, 1) - def test_read_concern_error(self): - c = self.db.get_collection("test", read_concern=ReadConcern("majority")) - with self.assertRaises(ConfigurationError): - next(c.find_raw_batches()) - def test_monitoring(self): listener = EventListener() client = rs_or_single_client(event_listeners=[listener]) @@ -1588,15 +1561,9 @@ def test_get_item(self): with self.assertRaises(InvalidOperation): self.db.test.aggregate_raw_batches([])[0] - @client_context.require_version_min(3, 4) def test_collation(self): next(self.db.test.aggregate_raw_batches([], collation=Collation('en_US'))) - @client_context.require_version_max(3, 2) - def test_collation_error(self): - with self.assertRaises(ConfigurationError): - next(self.db.test.aggregate_raw_batches([], collation=Collation('en_US'))) - def test_monitoring(self): listener = EventListener() client = rs_or_single_client(event_listeners=[listener]) diff --git a/test/test_custom_types.py b/test/test_custom_types.py index 619e4c10df..83d8c8a2c5 100644 --- a/test/test_custom_types.py +++ b/test/test_custom_types.py @@ -790,7 +790,6 @@ def run_test(doc_cls): class TestCollectionChangeStreamsWCustomTypes( IntegrationTest, ChangeStreamsWCustomTypesTestMixin): @classmethod - @client_context.require_version_min(3, 6, 0) @client_context.require_no_mmap @client_context.require_no_standalone def setUpClass(cls): diff --git a/test/test_database.py b/test/test_database.py index 4813f8d100..4adccc1b58 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -204,11 +204,8 @@ def test_list_collection_names_filter(self): self.assertIn("capped", names) self.assertIn("non_capped", names) command = results["started"][0].command - if client_context.version >= (3, 0): - self.assertIn("nameOnly", command) - self.assertTrue(command["nameOnly"]) - else: - self.assertNotIn("nameOnly", command) + self.assertIn("nameOnly", command) + self.assertTrue(command["nameOnly"]) def test_list_collections(self): self.client.drop_database("pymongo_test") @@ -324,7 +321,7 @@ def test_drop_collection(self): db.drop_collection(db.test.doesnotexist) - if client_context.version.at_least(3, 3, 9) and client_context.is_rs: + if client_context.is_rs: db_wc = Database(self.client, 'pymongo_test', write_concern=IMPOSSIBLE_WRITE_CONCERN) with self.assertRaises(WriteConcernError): @@ -377,19 +374,15 @@ def test_command(self): self.assertEqualReply(second, third) # We use 'aggregate' as our example command, since it's an easy way to - # retrieve a BSON regex from a collection using a command. But until - # MongoDB 2.3.2, aggregation turned regexes into strings: SERVER-6470. - # Note: MongoDB 3.5.2 requires the 'cursor' or 'explain' option for - # aggregate. - @client_context.require_version_max(3, 5, 0) + # retrieve a BSON regex from a collection using a command. def test_command_with_regex(self): db = self.client.pymongo_test db.test.drop() db.test.insert_one({'r': re.compile('.*')}) db.test.insert_one({'r': Regex('.*')}) - result = db.command('aggregate', 'test', pipeline=[]) - for doc in result['result']: + result = db.command('aggregate', 'test', pipeline=[], cursor={}) + for doc in result['cursor']['firstBatch']: self.assertTrue(isinstance(doc['r'], Regex)) def test_password_digest(self): @@ -647,13 +640,11 @@ def setUp(self): self.result = {"dummy": "dummy field"} self.admin = self.client.admin - @client_context.require_version_min(3, 6, 0) def test_database_aggregation(self): with self.admin.aggregate(self.pipeline) as cursor: result = next(cursor) self.assertEqual(result, self.result) - @client_context.require_version_min(3, 6, 0) @client_context.require_no_mongos def test_database_aggregation_fake_cursor(self): coll_name = "test_output" @@ -680,13 +671,6 @@ def test_database_aggregation_fake_cursor(self): result = wait_until(output_coll.find_one, "read unacknowledged write") self.assertEqual(result["dummy"], self.result["dummy"]) - @client_context.require_version_max(3, 6, 0, -1) - def test_database_aggregation_unsupported(self): - err_msg = r"Database.aggregate\(\) is only supported on MongoDB 3.6\+." - with self.assertRaisesRegex(ConfigurationError, err_msg): - with self.admin.aggregate(self.pipeline) as _: - pass - def test_bool(self): with self.assertRaises(NotImplementedError): bool(Database(self.client, "test")) diff --git a/test/test_decimal128.py b/test/test_decimal128.py index b242b7dfa5..249eae8218 100644 --- a/test/test_decimal128.py +++ b/test/test_decimal128.py @@ -27,10 +27,6 @@ class TestDecimal128(unittest.TestCase): def test_round_trip(self): - if not client_context.version.at_least(3, 3, 6): - raise unittest.SkipTest( - 'Round trip test requires MongoDB >= 3.3.6') - coll = client_context.client.pymongo_test.test coll.drop() diff --git a/test/test_encryption.py b/test/test_encryption.py index 4c9e28b585..5b1257966d 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -479,7 +479,6 @@ class TestSpec(SpecRunner): @classmethod @unittest.skipUnless(_HAVE_PYMONGOCRYPT, 'pymongocrypt is not installed') - @client_context.require_version_min(3, 6) # SpecRunner requires sessions. def setUpClass(cls): super(TestSpec, cls).setUpClass() diff --git a/test/test_examples.py b/test/test_examples.py index f96ca8b53c..dcf9dd2de3 100644 --- a/test/test_examples.py +++ b/test/test_examples.py @@ -660,7 +660,6 @@ def test_delete(self): self.assertEqual(db.inventory.count_documents({}), 0) - @client_context.require_version_min(3, 5, 11) @client_context.require_replica_set @client_context.require_no_mmap def test_change_streams(self): @@ -762,34 +761,31 @@ def test_aggregate_examples(self): ]) # End Aggregation Example 3 - # $lookup was new in 3.2. The let and pipeline options - # were added in 3.6. - if client_context.version.at_least(3, 6, 0): - # Start Aggregation Example 4 - db.air_alliances.aggregate([ - {"$lookup": { - "from": "air_airlines", - "let": {"constituents": "$airlines"}, - "pipeline": [ - {"$match": {"$expr": {"$in": ["$name", "$$constituents"]}}} - ], - "as": "airlines" - } - }, - {"$project": { - "_id": 0, - "name": 1, - "airlines": { - "$filter": { - "input": "$airlines", - "as": "airline", - "cond": {"$eq": ["$$airline.country", "Canada"]} - } + # Start Aggregation Example 4 + db.air_alliances.aggregate([ + {"$lookup": { + "from": "air_airlines", + "let": {"constituents": "$airlines"}, + "pipeline": [ + {"$match": {"$expr": {"$in": ["$name", "$$constituents"]}}} + ], + "as": "airlines" + } + }, + {"$project": { + "_id": 0, + "name": 1, + "airlines": { + "$filter": { + "input": "$airlines", + "as": "airline", + "cond": {"$eq": ["$$airline.country", "Canada"]} } } } - ]) - # End Aggregation Example 4 + } + ]) + # End Aggregation Example 4 def test_commands(self): db = self.db @@ -817,7 +813,6 @@ def test_index_management(self): ) # End Index Example 1 - @client_context.require_version_min(3, 6, 0) @client_context.require_replica_set def test_misc(self): # Marketing examples @@ -1069,7 +1064,6 @@ def callback(session): class TestCausalConsistencyExamples(IntegrationTest): - @client_context.require_version_min(3, 6, 0) @client_context.require_secondaries_count(1) @client_context.require_no_mmap def test_causal_consistency(self): diff --git a/test/test_max_staleness.py b/test/test_max_staleness.py index dd9169f284..1fd82884f1 100644 --- a/test/test_max_staleness.py +++ b/test/test_max_staleness.py @@ -116,7 +116,6 @@ def test_max_staleness_zero(self): self.assertEqual(-1, client.read_preference.max_staleness) self.assertIn("must be a positive integer", str(ctx[0])) - @client_context.require_version_min(3, 3, 6) # SERVER-8858 @client_context.require_replica_set def test_last_write_date(self): # From max-staleness-tests.rst, "Parse lastWriteDate". @@ -138,12 +137,6 @@ def test_last_write_date(self): self.assertGreater(second, first) self.assertLess(second, first + 10) - @client_context.require_version_max(3, 3) - def test_last_write_date_absent(self): - # From max-staleness-tests.rst, "Absent lastWriteDate". - client = rs_or_single_client() - sd = client._topology.select_server(writable_server_selector) - self.assertIsNone(sd.description.last_write_date) if __name__ == "__main__": unittest.main() diff --git a/test/test_monitoring.py b/test/test_monitoring.py index c9f4e5ae76..0d925b04bf 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -860,7 +860,6 @@ def test_insert_many(self): self.assertEqual(6, count) def test_insert_many_unacknowledged(self): - # On legacy servers this uses bulk OP_INSERT. coll = self.client.pymongo_test.test coll.drop() unack_coll = coll.with_options(write_concern=WriteConcern(w=0)) @@ -996,29 +995,6 @@ def test_bulk_write_command_error(self): self.assertEqual(event.failure['code'], 10107) self.assertTrue(event.failure['errmsg']) - @client_context.require_version_max(3, 4, 99) - def test_bulk_write_legacy_network_error(self): - self.listener.results.clear() - - # Make the delete operation run on a closed connection. - self.client.admin.command('ping') - pool = get_pool(self.client) - sock_info = pool.sockets[0] - sock_info.sock.close() - - # Test legacy unacknowledged write network error. - coll = self.client.pymongo_test.get_collection( - 'test', write_concern=WriteConcern(w=0)) - with self.assertRaises(AutoReconnect): - coll.bulk_write([InsertOne({'_id': 1})], ordered=False) - failed = self.listener.results['failed'] - self.assertEqual(1, len(failed)) - event = failed[0] - self.assertEqual(event.command_name, 'insert') - self.assertIsInstance(event.failure, dict) - self.assertEqual(event.failure['errtype'], 'AutoReconnect') - self.assertTrue(event.failure['errmsg']) - def test_write_errors(self): coll = self.client.pymongo_test.test coll.drop() diff --git a/test/test_read_concern.py b/test/test_read_concern.py index 0b9c742a11..81a6863f5e 100644 --- a/test/test_read_concern.py +++ b/test/test_read_concern.py @@ -15,7 +15,7 @@ """Test the read_concern module.""" from bson.son import SON -from pymongo.errors import ConfigurationError +from pymongo.errors import OperationFailure from pymongo.read_concern import ReadConcern from test import client_context, IntegrationTest @@ -64,17 +64,13 @@ def test_read_concern_uri(self): client = rs_or_single_client(uri, connect=False) self.assertEqual(ReadConcern('majority'), client.read_concern) - @client_context.require_version_max(3, 1) def test_invalid_read_concern(self): coll = self.db.get_collection( - 'coll', read_concern=ReadConcern('majority')) - with self.assertRaisesRegex( - ConfigurationError, - 'read concern level of majority is not valid ' - 'with a max wire version of [0-3]'): + 'coll', read_concern=ReadConcern('unknown')) + # We rely on the server to validate read concern. + with self.assertRaises(OperationFailure): coll.find_one() - @client_context.require_version_min(3, 1, 9, -1) def test_find_command(self): # readConcern not sent in command if not specified. coll = self.db.coll @@ -93,7 +89,6 @@ def test_find_command(self): ('readConcern', {'level': 'local'})]), self.listener.results['started'][0].command) - @client_context.require_version_min(3, 1, 9, -1) def test_command_cursor(self): # readConcern not sent in command if not specified. coll = self.db.coll diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index fcd9989918..128b41d5fa 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -579,8 +579,6 @@ def test_read_preference_document_hedge(self): self.assertEqual( out, SON([("$query", {}), ("$readPreference", pref.document)])) - # Require OP_MSG so that $readPreference is visible in the command event. - @client_context.require_version_min(3, 6) def test_send_hedge(self): cases = { 'primaryPreferred': PrimaryPreferred, @@ -697,7 +695,6 @@ def test_mongos(self): self.assertEqual(last_id, results[0]["_id"]) @client_context.require_mongos - @client_context.require_version_min(3, 3, 12) def test_mongos_max_staleness(self): # Sanity check that we're sending maxStalenessSeconds coll = client_context.client.pymongo_test.get_collection( diff --git a/test/test_read_write_concern_spec.py b/test/test_read_write_concern_spec.py index 49f1823d2b..b334ee9359 100644 --- a/test/test_read_write_concern_spec.py +++ b/test/test_read_write_concern_spec.py @@ -123,8 +123,6 @@ def insert_command(): ('delete_many', lambda: coll.delete_many({})), ('bulk_write', lambda: coll.bulk_write([InsertOne({})])), ('command', insert_command), - ] - ops_require_34 = [ ('aggregate', lambda: coll.aggregate([{'$out': 'out'}])), # SERVER-46668 Delete all the documents in the collection to # workaround a hang in createIndexes. @@ -136,11 +134,9 @@ def insert_command(): ('rename', lambda: coll.rename('new')), ('drop', lambda: db.new.drop()), ] - if client_context.version > (3, 4): - ops.extend(ops_require_34) - # SERVER-47194: dropDatabase does not respect wtimeout in 3.6. - if client_context.version[:2] != (3, 6): - ops.append(('drop_database', lambda: client.drop_database(db))) + # SERVER-47194: dropDatabase does not respect wtimeout in 3.6. + if client_context.version[:2] != (3, 6): + ops.append(('drop_database', lambda: client.drop_database(db))) for name, f in ops: # Ensure insert_many and bulk_write still raise BulkWriteError. @@ -161,8 +157,6 @@ def test_raise_write_concern_error(self): self.assertWriteOpsRaise( WriteConcern(w=client_context.w+1, wtimeout=1), WriteConcernError) - # MongoDB 3.2 introduced the stopReplProducer failpoint. - @client_context.require_version_min(3, 2) @client_context.require_secondaries_count(1) @client_context.require_test_commands def test_raise_wtimeout(self): diff --git a/test/test_retryable_writes.py b/test/test_retryable_writes.py index 32e0c32a90..bf9f08721a 100644 --- a/test/test_retryable_writes.py +++ b/test/test_retryable_writes.py @@ -165,7 +165,6 @@ def tearDownClass(cls): cls.client.close() super(TestRetryableWritesMMAPv1, cls).tearDownClass() - @client_context.require_version_min(3, 5) @client_context.require_no_standalone def test_actionable_error_message(self): if client_context.storage_engine != 'mmapv1': @@ -202,15 +201,13 @@ def tearDownClass(cls): super(TestRetryableWrites, cls).tearDownClass() def setUp(self): - if (client_context.version.at_least(3, 5) and client_context.is_rs - and client_context.test_commands_enabled): + if client_context.is_rs and client_context.test_commands_enabled: self.client.admin.command(SON([ ('configureFailPoint', 'onPrimaryTransactionalWrite'), ('mode', 'alwaysOn')])) def tearDown(self): - if (client_context.version.at_least(3, 5) and client_context.is_rs - and client_context.test_commands_enabled): + if client_context.is_rs and client_context.test_commands_enabled: self.client.admin.command(SON([ ('configureFailPoint', 'onPrimaryTransactionalWrite'), ('mode', 'off')])) @@ -230,7 +227,6 @@ def test_supported_single_statement_no_retry(self): 'txnNumber', event.command, '%s sent txnNumber with %s' % (msg, event.command_name)) - @client_context.require_version_min(3, 5) @client_context.require_no_standalone def test_supported_single_statement_supported_cluster(self): for method, args, kwargs in retryable_single_statement_ops( @@ -271,8 +267,7 @@ def test_supported_single_statement_supported_cluster(self): initial_transaction_id, msg) def test_supported_single_statement_unsupported_cluster(self): - if client_context.version.at_least(3, 5) and ( - client_context.is_rs or client_context.is_mongos): + if client_context.is_rs or client_context.is_mongos: raise SkipTest('This cluster supports retryable writes') for method, args, kwargs in retryable_single_statement_ops( @@ -319,7 +314,6 @@ def test_server_selection_timeout_not_retried(self): method(*args, **kwargs) self.assertEqual(len(listener.results['started']), 0, msg) - @client_context.require_version_min(3, 5) @client_context.require_replica_set @client_context.require_test_commands def test_retry_timeout_raises_original_error(self): @@ -352,7 +346,6 @@ def raise_error(*args, **kwargs): method(*args, **kwargs) self.assertEqual(len(listener.results['started']), 1, msg) - @client_context.require_version_min(3, 5) @client_context.require_replica_set @client_context.require_test_commands def test_batch_splitting(self): @@ -388,7 +381,6 @@ def test_batch_splitting(self): } self.assertEqual(bulk_result.bulk_api_result, expected_result) - @client_context.require_version_min(3, 5) @client_context.require_replica_set @client_context.require_test_commands def test_batch_splitting_retry_fails(self): @@ -572,7 +564,6 @@ def test_pool_paused_error_is_retryable(self): # TODO: Make this a real integration test where we stepdown the primary. class TestRetryableWritesTxnNumber(IgnoreDeprecationsTest): - @client_context.require_version_min(3, 6) @client_context.require_replica_set @client_context.require_no_mmap def test_increment_transaction_id_without_sending_command(self): diff --git a/test/test_session.py b/test/test_session.py index 608024e0b1..c8c80069a7 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -1020,14 +1020,6 @@ def test_cluster_time_no_server_support(self): self.assertIsNone(after_cluster_time) -class TestSessionsNotSupported(IntegrationTest): - @client_context.require_version_max(3, 5, 10) - def test_sessions_not_supported(self): - with self.assertRaisesRegex( - ConfigurationError, "Sessions are not supported"): - self.client.start_session() - - class TestClusterTime(IntegrationTest): def setUp(self): super(TestClusterTime, self).setUp() diff --git a/test/test_ssl.py b/test/test_ssl.py index 7c3cc4bacb..45614034e1 100644 --- a/test/test_ssl.py +++ b/test/test_ssl.py @@ -513,19 +513,14 @@ def test_mongodb_x509_auth(self): tlsCertificateKeyFile=CLIENT_PEM, event_listeners=[listener]) - if client_context.version.at_least(3, 3, 12): - # No error - auth.pymongo_test.test.find_one() - names = listener.started_command_names() - if client_context.version.at_least(4, 4, -1): - # Speculative auth skips the authenticate command. - self.assertEqual(names, ['find']) - else: - self.assertEqual(names, ['authenticate', 'find']) + # No error + auth.pymongo_test.test.find_one() + names = listener.started_command_names() + if client_context.version.at_least(4, 4, -1): + # Speculative auth skips the authenticate command. + self.assertEqual(names, ['find']) else: - # Should require a username - with self.assertRaises(ConfigurationError): - auth.pymongo_test.test.find_one() + self.assertEqual(names, ['authenticate', 'find']) uri = ('mongodb://%s@%s:%d/?authMechanism=' 'MONGODB-X509' % ( @@ -542,14 +537,8 @@ def test_mongodb_x509_auth(self): ssl=True, tlsAllowInvalidCertificates=True, tlsCertificateKeyFile=CLIENT_PEM) - if client_context.version.at_least(3, 3, 12): - # No error - client.pymongo_test.test.find_one() - else: - # Should require a username - with self.assertRaises(ConfigurationError): - client.pymongo_test.test.find_one() - + # No error + client.pymongo_test.test.find_one() # Auth should fail if username and certificate do not match uri = ('mongodb://%s@%s:%d/?authMechanism=' 'MONGODB-X509' % ( diff --git a/test/test_topology.py b/test/test_topology.py index 6881df2fcd..a309d622ab 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -522,10 +522,11 @@ def test_wire_version(self): 'setName': 'rs', 'hosts': ['a'], 'minWireVersion': 1, - 'maxWireVersion': 5}) + 'maxWireVersion': 6}) self.assertEqual(server.description.min_wire_version, 1) - self.assertEqual(server.description.max_wire_version, 5) + self.assertEqual(server.description.max_wire_version, 6) + t.select_servers(any_server_selector) # Incompatible. got_hello(t, address, {