diff --git a/pymongo/client_options.py b/pymongo/client_options.py index d1342652a1..d5f9cfcccd 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -215,6 +215,9 @@ def __init__( self.__auto_encryption_opts = options.get("auto_encryption_opts") self.__load_balanced = options.get("loadbalanced") self.__timeout = options.get("timeoutms") + self.__server_monitoring_mode = options.get( + "servermonitoringmode", common.SERVER_MONITORING_MODE + ) @property def _options(self) -> Mapping[str, Any]: @@ -284,7 +287,7 @@ def read_concern(self) -> ReadConcern: def timeout(self) -> Optional[float]: """The configured timeoutMS converted to seconds, or None. - .. versionadded: 4.2 + .. versionadded:: 4.2 """ return self.__timeout @@ -318,3 +321,11 @@ def event_listeners(self) -> list[_EventListeners]: """ assert self.__pool_options._event_listeners is not None return self.__pool_options._event_listeners.event_listeners() + + @property + def server_monitoring_mode(self) -> str: + """The configured serverMonitoringMode option. + + .. versionadded:: 4.5 + """ + return self.__server_monitoring_mode diff --git a/pymongo/common.py b/pymongo/common.py index fad24030fe..794b7e31a2 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -136,6 +136,9 @@ # Default value for srvServiceName SRV_SERVICE_NAME = "mongodb" +# Default value for serverMonitoringMode +SERVER_MONITORING_MODE = "auto" # poll/stream/auto + def partition_node(node: str) -> tuple[str, int]: """Split a host:port string into (host, int(port)) pair.""" @@ -664,6 +667,15 @@ def validate_datetime_conversion(option: Any, value: Any) -> Optional[DatetimeCo raise TypeError(f"{option} must be a str or int representing DatetimeConversion") +def validate_server_monitoring_mode(option: str, value: str) -> str: + """Validate the serverMonitoringMode option.""" + if value not in {"auto", "stream", "poll"}: + raise ValueError( + f'{option}={value!r} is invalid. Must be one of "auto", "stream", or "poll"' + ) + return value + + # Dictionary where keys are the names of public URI options, and values # are lists of aliases for that option. URI_OPTIONS_ALIAS_MAP: dict[str, list[str]] = { @@ -712,6 +724,7 @@ def validate_datetime_conversion(option: Any, value: Any) -> Optional[DatetimeCo "srvservicename": validate_string, "srvmaxhosts": validate_non_negative_integer, "timeoutms": validate_timeoutms, + "servermonitoringmode": validate_server_monitoring_mode, } # Dictionary where keys are the names of URI options specific to pymongo, diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index becddb65fc..72ea671fe6 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -330,6 +330,8 @@ def __init__( - `heartbeatFrequencyMS`: (optional) The number of milliseconds between periodic server checks, or None to accept the default frequency of 10 seconds. + - `serverMonitoringMode`: (optional) The server monitoring mode to use. + Valid values are the strings: "auto", "stream", "poll". Defaults to "auto". - `appname`: (string or None) The name of the application that created this MongoClient instance. The server will log this value upon establishing each connection. It is also recorded in the slow @@ -585,6 +587,9 @@ def __init__( .. seealso:: The MongoDB documentation on `connections `_. + .. versionchanged:: 4.5 + Added the ``serverMonitoringMode`` keyword argument. + .. versionchanged:: 4.2 Added the ``timeoutMS`` keyword argument. @@ -846,6 +851,7 @@ def __init__( load_balanced=options.load_balanced, srv_service_name=srv_service_name, srv_max_hosts=srv_max_hosts, + server_monitoring_mode=options.server_monitoring_mode, ) self._init_background() diff --git a/pymongo/monitor.py b/pymongo/monitor.py index d52e1e4c2f..92b12f7317 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -27,6 +27,7 @@ from pymongo.hello import Hello from pymongo.lock import _create_lock from pymongo.periodic_executor import _shutdown_executors +from pymongo.pool import _is_faas from pymongo.read_preferences import MovingAverage from pymongo.server_description import ServerDescription from pymongo.srv_resolver import _SrvResolver @@ -138,7 +139,12 @@ def __init__( topology_settings, topology._create_pool_for_monitor(server_description.address), ) - self.heartbeater = None + if topology_settings.server_monitoring_mode == "stream": + self._stream = True + elif topology_settings.server_monitoring_mode == "poll": + self._stream = False + else: + self._stream = not _is_faas() def cancel_check(self) -> None: """Cancel any concurrent hello check. @@ -200,7 +206,7 @@ def _run(self) -> None: self._server_description, reset_pool=self._server_description.error ) - if ( + if self._stream and ( self._server_description.is_server_type_known and self._server_description.topology_version ): @@ -237,7 +243,7 @@ def _check_server(self) -> ServerDescription: address = sd.address duration = time.monotonic() - start if self._publish: - awaited = bool(sd.is_server_type_known and sd.topology_version) + awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version) assert self._listeners is not None self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited) self._reset_connection() @@ -255,7 +261,16 @@ def _check_once(self) -> ServerDescription: address = self._server_description.address if self._publish: assert self._listeners is not None - self._listeners.publish_server_heartbeat_started(address) + sd = self._server_description + # XXX: "awaited" could be incorrectly set to True in the rare case + # the pool checkout closes and recreates a connection. + awaited = bool( + self._pool.conns + and self._stream + and sd.is_server_type_known + and sd.topology_version + ) + self._listeners.publish_server_heartbeat_started(address, awaited) if self._cancel_context and self._cancel_context.cancelled: self._reset_connection() @@ -284,7 +299,9 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]: if conn.more_to_come: # Read the next streaming hello (MongoDB 4.4+). response = Hello(conn._next_reply(), awaitable=True) - elif conn.performed_handshake and self._server_description.topology_version: + elif ( + self._stream and conn.performed_handshake and self._server_description.topology_version + ): # Initiate streaming hello (MongoDB 4.4+). response = conn._hello( cluster_time, diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 5bc3fda497..d8f370a122 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -1292,10 +1292,11 @@ class TopologyClosedEvent(TopologyEvent): class _ServerHeartbeatEvent: """Base class for server heartbeat events.""" - __slots__ = "__connection_id" + __slots__ = ("__connection_id", "__awaited") - def __init__(self, connection_id: _Address) -> None: + def __init__(self, connection_id: _Address, awaited: bool = False) -> None: self.__connection_id = connection_id + self.__awaited = awaited @property def connection_id(self) -> _Address: @@ -1304,8 +1305,16 @@ def connection_id(self) -> _Address: """ return self.__connection_id + @property + def awaited(self) -> bool: + """Whether the heartbeat was issued as an awaitable hello command. + + .. versionadded:: 4.6 + """ + return self.__awaited + def __repr__(self) -> str: - return f"<{self.__class__.__name__} {self.connection_id}>" + return f"<{self.__class__.__name__} {self.connection_id} awaited: {self.awaited}>" class ServerHeartbeatStartedEvent(_ServerHeartbeatEvent): @@ -1323,15 +1332,14 @@ class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent): .. versionadded:: 3.3 """ - __slots__ = ("__duration", "__reply", "__awaited") + __slots__ = ("__duration", "__reply") def __init__( self, duration: float, reply: Hello, connection_id: _Address, awaited: bool = False ) -> None: - super().__init__(connection_id) + super().__init__(connection_id, awaited) self.__duration = duration self.__reply = reply - self.__awaited = awaited @property def duration(self) -> float: @@ -1350,8 +1358,10 @@ def awaited(self) -> bool: If true, then :meth:`duration` reflects the sum of the round trip time to the server and the time that the server waited before sending a response. + + .. versionadded:: 3.11 """ - return self.__awaited + return super().awaited def __repr__(self) -> str: return "<{} {} duration: {}, awaited: {}, reply: {}>".format( @@ -1370,15 +1380,14 @@ class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent): .. versionadded:: 3.3 """ - __slots__ = ("__duration", "__reply", "__awaited") + __slots__ = ("__duration", "__reply") def __init__( self, duration: float, reply: Exception, connection_id: _Address, awaited: bool = False ) -> None: - super().__init__(connection_id) + super().__init__(connection_id, awaited) self.__duration = duration self.__reply = reply - self.__awaited = awaited @property def duration(self) -> float: @@ -1397,8 +1406,10 @@ def awaited(self) -> bool: If true, then :meth:`duration` reflects the sum of the round trip time to the server and the time that the server waited before sending a response. + + .. versionadded:: 3.11 """ - return self.__awaited + return super().awaited def __repr__(self) -> str: return "<{} {} duration: {}, awaited: {}, reply: {!r}>".format( @@ -1602,14 +1613,15 @@ def publish_command_failure( except Exception: _handle_exception() - def publish_server_heartbeat_started(self, connection_id: _Address) -> None: + def publish_server_heartbeat_started(self, connection_id: _Address, awaited: bool) -> None: """Publish a ServerHeartbeatStartedEvent to all server heartbeat listeners. :Parameters: - `connection_id`: The address (host, port) pair of the connection. + - `awaited`: True if this heartbeat is part of an awaitable hello command. """ - event = ServerHeartbeatStartedEvent(connection_id) + event = ServerHeartbeatStartedEvent(connection_id, awaited) for subscriber in self.__server_heartbeat_listeners: try: subscriber.started(event) diff --git a/pymongo/pool.py b/pymongo/pool.py index 600a42853f..afe3a4313e 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -290,6 +290,10 @@ def _is_vercel() -> bool: return bool(os.getenv("VERCEL")) +def _is_faas() -> bool: + return _is_lambda() or _is_azure_func() or _is_gcp_func() or _is_vercel() + + def _getenv_int(key: str) -> Optional[int]: """Like os.getenv but returns an int, or None if the value is missing/malformed.""" val = os.getenv(key) diff --git a/pymongo/settings.py b/pymongo/settings.py index a4be2295d5..4a3e7be4cd 100644 --- a/pymongo/settings.py +++ b/pymongo/settings.py @@ -46,6 +46,7 @@ def __init__( load_balanced: Optional[bool] = None, srv_service_name: str = common.SRV_SERVICE_NAME, srv_max_hosts: int = 0, + server_monitoring_mode: str = common.SERVER_MONITORING_MODE, ): """Represent MongoClient's configuration. @@ -72,6 +73,7 @@ def __init__( self._load_balanced = load_balanced self._srv_service_name = srv_service_name self._srv_max_hosts = srv_max_hosts or 0 + self._server_monitoring_mode = server_monitoring_mode self._topology_id = ObjectId() # Store the allocation traceback to catch unclosed clients in the @@ -146,6 +148,11 @@ def srv_max_hosts(self) -> int: """The srvMaxHosts.""" return self._srv_max_hosts + @property + def server_monitoring_mode(self) -> str: + """The serverMonitoringMode.""" + return self._server_monitoring_mode + def get_topology_type(self) -> int: if self.load_balanced: return TOPOLOGY_TYPE.LoadBalanced diff --git a/test/discovery_and_monitoring/unified/auth-error.json b/test/discovery_and_monitoring/unified/auth-error.json index 5c78ecfe50..62d26494c7 100644 --- a/test/discovery_and_monitoring/unified/auth-error.json +++ b/test/discovery_and_monitoring/unified/auth-error.json @@ -1,6 +1,6 @@ { "description": "auth-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/auth-misc-command-error.json b/test/discovery_and_monitoring/unified/auth-misc-command-error.json index 6e1b645461..fd62fe604e 100644 --- a/test/discovery_and_monitoring/unified/auth-misc-command-error.json +++ b/test/discovery_and_monitoring/unified/auth-misc-command-error.json @@ -1,6 +1,6 @@ { "description": "auth-misc-command-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 7606d2db7a..84763af32e 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -1,6 +1,6 @@ { "description": "auth-network-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/auth-network-timeout-error.json b/test/discovery_and_monitoring/unified/auth-network-timeout-error.json index 22066e8bae..3cf9576eba 100644 --- a/test/discovery_and_monitoring/unified/auth-network-timeout-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-timeout-error.json @@ -1,6 +1,6 @@ { "description": "auth-network-timeout-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/auth-shutdown-error.json b/test/discovery_and_monitoring/unified/auth-shutdown-error.json index 5dd7b5bb6f..b9e503af66 100644 --- a/test/discovery_and_monitoring/unified/auth-shutdown-error.json +++ b/test/discovery_and_monitoring/unified/auth-shutdown-error.json @@ -1,6 +1,6 @@ { "description": "auth-shutdown-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/cancel-server-check.json b/test/discovery_and_monitoring/unified/cancel-server-check.json index 896cc8d087..a60ccfcb41 100644 --- a/test/discovery_and_monitoring/unified/cancel-server-check.json +++ b/test/discovery_and_monitoring/unified/cancel-server-check.json @@ -1,6 +1,6 @@ { "description": "cancel-server-check", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.0", diff --git a/test/discovery_and_monitoring/unified/connectTimeoutMS.json b/test/discovery_and_monitoring/unified/connectTimeoutMS.json index 67a4d9da1d..d3e860a9cb 100644 --- a/test/discovery_and_monitoring/unified/connectTimeoutMS.json +++ b/test/discovery_and_monitoring/unified/connectTimeoutMS.json @@ -1,6 +1,6 @@ { "description": "connectTimeoutMS", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/find-network-error.json b/test/discovery_and_monitoring/unified/find-network-error.json index 651466bfa6..c1b6db40ca 100644 --- a/test/discovery_and_monitoring/unified/find-network-error.json +++ b/test/discovery_and_monitoring/unified/find-network-error.json @@ -1,6 +1,6 @@ { "description": "find-network-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/find-network-timeout-error.json b/test/discovery_and_monitoring/unified/find-network-timeout-error.json index 2bde6daa5d..e5ac9f21aa 100644 --- a/test/discovery_and_monitoring/unified/find-network-timeout-error.json +++ b/test/discovery_and_monitoring/unified/find-network-timeout-error.json @@ -1,6 +1,6 @@ { "description": "find-network-timeout-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/find-shutdown-error.json b/test/discovery_and_monitoring/unified/find-shutdown-error.json index 624ad352fc..6e5a2cac05 100644 --- a/test/discovery_and_monitoring/unified/find-shutdown-error.json +++ b/test/discovery_and_monitoring/unified/find-shutdown-error.json @@ -1,6 +1,6 @@ { "description": "find-shutdown-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/hello-command-error.json b/test/discovery_and_monitoring/unified/hello-command-error.json index 7d6046b76f..9afea87e77 100644 --- a/test/discovery_and_monitoring/unified/hello-command-error.json +++ b/test/discovery_and_monitoring/unified/hello-command-error.json @@ -1,6 +1,6 @@ { "description": "hello-command-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.9", diff --git a/test/discovery_and_monitoring/unified/hello-network-error.json b/test/discovery_and_monitoring/unified/hello-network-error.json index f44b26a9f9..55373c90cc 100644 --- a/test/discovery_and_monitoring/unified/hello-network-error.json +++ b/test/discovery_and_monitoring/unified/hello-network-error.json @@ -1,6 +1,6 @@ { "description": "hello-network-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.9", diff --git a/test/discovery_and_monitoring/unified/hello-timeout.json b/test/discovery_and_monitoring/unified/hello-timeout.json index dfa6b48d66..fe7cf4e78d 100644 --- a/test/discovery_and_monitoring/unified/hello-timeout.json +++ b/test/discovery_and_monitoring/unified/hello-timeout.json @@ -1,6 +1,6 @@ { "description": "hello-timeout", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/insert-network-error.json b/test/discovery_and_monitoring/unified/insert-network-error.json index e4ba6684ae..bfe41a4cb6 100644 --- a/test/discovery_and_monitoring/unified/insert-network-error.json +++ b/test/discovery_and_monitoring/unified/insert-network-error.json @@ -1,6 +1,6 @@ { "description": "insert-network-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/insert-shutdown-error.json b/test/discovery_and_monitoring/unified/insert-shutdown-error.json index 3c724fa5e4..af7c6c987a 100644 --- a/test/discovery_and_monitoring/unified/insert-shutdown-error.json +++ b/test/discovery_and_monitoring/unified/insert-shutdown-error.json @@ -1,6 +1,6 @@ { "description": "insert-shutdown-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/minPoolSize-error.json b/test/discovery_and_monitoring/unified/minPoolSize-error.json index 0234ac9929..7e294baf66 100644 --- a/test/discovery_and_monitoring/unified/minPoolSize-error.json +++ b/test/discovery_and_monitoring/unified/minPoolSize-error.json @@ -1,6 +1,6 @@ { "description": "minPoolSize-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.9", diff --git a/test/discovery_and_monitoring/unified/pool-cleared-error.json b/test/discovery_and_monitoring/unified/pool-cleared-error.json index 9a7dfd901c..b7f6924f2b 100644 --- a/test/discovery_and_monitoring/unified/pool-cleared-error.json +++ b/test/discovery_and_monitoring/unified/pool-cleared-error.json @@ -1,6 +1,6 @@ { "description": "pool-cleared-error", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.9", diff --git a/test/discovery_and_monitoring/unified/rediscover-quickly-after-step-down.json b/test/discovery_and_monitoring/unified/rediscover-quickly-after-step-down.json index c7c2494857..3147a07a1e 100644 --- a/test/discovery_and_monitoring/unified/rediscover-quickly-after-step-down.json +++ b/test/discovery_and_monitoring/unified/rediscover-quickly-after-step-down.json @@ -1,6 +1,6 @@ { "description": "rediscover-quickly-after-step-down", - "schemaVersion": "1.10", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "4.4", diff --git a/test/discovery_and_monitoring/unified/serverMonitoringMode.json b/test/discovery_and_monitoring/unified/serverMonitoringMode.json new file mode 100644 index 0000000000..7d681b4f9e --- /dev/null +++ b/test/discovery_and_monitoring/unified/serverMonitoringMode.json @@ -0,0 +1,449 @@ +{ + "description": "serverMonitoringMode", + "schemaVersion": "1.17", + "runOnRequirements": [ + { + "topologies": [ + "single", + "sharded", + "sharded-replicaset" + ], + "serverless": "forbid" + } + ], + "tests": [ + { + "description": "connect with serverMonitoringMode=auto >=4.4", + "runOnRequirements": [ + { + "minServerVersion": "4.4.0" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "auto" + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": true + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=auto <4.4", + "runOnRequirements": [ + { + "maxServerVersion": "4.2.99" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "auto", + "heartbeatFrequencyMS": 500 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=stream >=4.4", + "runOnRequirements": [ + { + "minServerVersion": "4.4.0" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "stream" + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": true + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=stream <4.4", + "runOnRequirements": [ + { + "maxServerVersion": "4.2.99" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "stream", + "heartbeatFrequencyMS": 500 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + } + ] + } + ] + }, + { + "description": "connect with serverMonitoringMode=poll", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "serverMonitoringMode": "poll", + "heartbeatFrequencyMS": 500 + }, + "useMultipleMongoses": false, + "observeEvents": [ + "serverHeartbeatStartedEvent", + "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent" + ] + } + }, + { + "database": { + "id": "db", + "client": "client", + "databaseName": "sdam-tests" + } + } + ] + } + }, + { + "name": "runCommand", + "object": "db", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverHeartbeatStartedEvent": {} + }, + "count": 2 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "sdam", + "ignoreExtraEvents": true, + "events": [ + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + }, + { + "serverHeartbeatSucceededEvent": { + "awaited": false + } + }, + { + "serverHeartbeatStartedEvent": { + "awaited": false + } + } + ] + } + ] + } + ] +} diff --git a/test/lambda/mongodb/app.py b/test/lambda/mongodb/app.py index 66c2164672..d56fbec3aa 100644 --- a/test/lambda/mongodb/app.py +++ b/test/lambda/mongodb/app.py @@ -18,6 +18,7 @@ open_connections = 0 heartbeat_count = 0 +streaming_heartbeat_count = 0 total_heartbeat_duration = 0 total_commands = 0 total_command_duration = 0 @@ -49,9 +50,11 @@ def started(self, event): print("server heartbeat started", event) def succeeded(self, event): - global heartbeat_count, total_heartbeat_duration + global heartbeat_count, total_heartbeat_duration, streaming_heartbeat_count heartbeat_count += 1 total_heartbeat_duration += event.duration + if event.awaited: + streaming_heartbeat_count += 1 print("server heartbeat succeeded", event) def failed(self, event): @@ -115,7 +118,9 @@ def pool_closed(self, event): def create_response(): return dict( averageCommandDuration=total_command_duration / total_commands, - averageHeartbeatDuration=total_heartbeat_duration / heartbeat_count, + averageHeartbeatDuration=total_heartbeat_duration / heartbeat_count + if heartbeat_count + else 0, openConnections=open_connections, heartbeatCount=heartbeat_count, ) @@ -145,5 +150,8 @@ def lambda_handler(event, context): response = json.dumps(create_response()) reset() print("finished!") + assert ( + streaming_heartbeat_count == 0 + ), f"streaming_heartbeat_count was {streaming_heartbeat_count} not 0" return dict(statusCode=200, body=response) diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index accd1b7039..8946f256a1 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -34,6 +34,7 @@ single_client, wait_until, ) +from unittest.mock import patch from bson import Timestamp, json_util from pymongo import common, monitoring @@ -341,6 +342,51 @@ def test_pool_unpause(self): listener.wait_for_event(monitoring.PoolReadyEvent, 1) +class TestSdamMode(IntegrationTest): + @client_context.require_no_serverless + @client_context.require_no_load_balancer + def setUp(self): + super().setUp() + + def test_rtt_connection_is_enabled_stream(self): + client = rs_or_single_client(serverMonitoringMode="stream") + self.addCleanup(client.close) + client.admin.command("ping") + for _, server in client._topology._servers.items(): + monitor = server._monitor + self.assertTrue(monitor._stream) + if client_context.version >= (4, 4): + self.assertIsNotNone(monitor._rtt_monitor._executor._thread) + else: + self.assertIsNone(monitor._rtt_monitor._executor._thread) + + def test_rtt_connection_is_disabled_poll(self): + client = rs_or_single_client(serverMonitoringMode="poll") + self.addCleanup(client.close) + self.assert_rtt_connection_is_disabled(client) + + def test_rtt_connection_is_disabled_auto(self): + envs = [ + {"AWS_EXECUTION_ENV": "AWS_Lambda_python3.9"}, + {"FUNCTIONS_WORKER_RUNTIME": "python"}, + {"K_SERVICE": "gcpservicename"}, + {"FUNCTION_NAME": "gcpfunctionname"}, + {"VERCEL": "1"}, + ] + for env in envs: + with patch.dict("os.environ", env): + client = rs_or_single_client(serverMonitoringMode="auto") + self.addCleanup(client.close) + self.assert_rtt_connection_is_disabled(client) + + def assert_rtt_connection_is_disabled(self, client): + client.admin.command("ping") + for _, server in client._topology._servers.items(): + monitor = server._monitor + self.assertFalse(monitor._stream) + self.assertIsNone(monitor._rtt_monitor._executor._thread) + + # Generate unified tests. globals().update(generate_test_classes(os.path.join(SDAM_PATH, "unified"), module=__name__)) diff --git a/test/test_monitoring.py b/test/test_monitoring.py index 8ccc844d32..e135a52e7a 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -1185,7 +1185,9 @@ def test_command_event_repr(self): def test_server_heartbeat_event_repr(self): connection_id = ("localhost", 27017) event = monitoring.ServerHeartbeatStartedEvent(connection_id) - self.assertEqual(repr(event), "") + self.assertEqual( + repr(event), "" + ) delta = 0.1 event = monitoring.ServerHeartbeatSucceededEvent( delta, {"ok": 1}, connection_id # type: ignore[arg-type] diff --git a/test/unified_format.py b/test/unified_format.py index 80c6f03342..a6676c6015 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -100,6 +100,10 @@ PoolReadyEvent, ServerClosedEvent, ServerDescriptionChangedEvent, + ServerHeartbeatFailedEvent, + ServerHeartbeatListener, + ServerHeartbeatStartedEvent, + ServerHeartbeatSucceededEvent, ServerListener, ServerOpeningEvent, TopologyEvent, @@ -107,6 +111,7 @@ _ConnectionEvent, _PoolEvent, _ServerEvent, + _ServerHeartbeatEvent, ) from pymongo.operations import SearchIndexModel from pymongo.read_concern import ReadConcern @@ -288,7 +293,7 @@ def close(self): self.client = None -class EventListenerUtil(CMAPListener, CommandListener, ServerListener): +class EventListenerUtil(CMAPListener, CommandListener, ServerListener, ServerHeartbeatListener): def __init__( self, observe_events, ignore_commands, observe_sensitive_commands, store_events, entity_map ): @@ -319,7 +324,11 @@ def get_events(self, event_type): return [e for e in self.events if isinstance(e, _CommandEvent)] if event_type == "cmap": return [e for e in self.events if isinstance(e, (_ConnectionEvent, _PoolEvent))] - return [e for e in self.events if isinstance(e, (_ServerEvent, TopologyEvent))] + return [ + e + for e in self.events + if isinstance(e, (_ServerEvent, TopologyEvent, _ServerHeartbeatEvent)) + ] def add_event(self, event): event_name = type(event).__name__.lower() @@ -339,23 +348,32 @@ def _command_event(self, event): self.add_event(event) def started(self, event): - if event.command == {}: - # Command is redacted. Observe only if flag is set. - if self._observe_sensitive_commands: + if isinstance(event, CommandStartedEvent): + if event.command == {}: + # Command is redacted. Observe only if flag is set. + if self._observe_sensitive_commands: + self._command_event(event) + else: self._command_event(event) else: - self._command_event(event) + self.add_event(event) def succeeded(self, event): - if event.reply == {}: - # Command is redacted. Observe only if flag is set. - if self._observe_sensitive_commands: + if isinstance(event, CommandSucceededEvent): + if event.reply == {}: + # Command is redacted. Observe only if flag is set. + if self._observe_sensitive_commands: + self._command_event(event) + else: self._command_event(event) else: - self._command_event(event) + self.add_event(event) def failed(self, event): - self._command_event(event) + if isinstance(event, CommandFailedEvent): + self._command_event(event) + else: + self.add_event(event) def opened(self, event: ServerOpeningEvent) -> None: self.add_event(event) @@ -833,6 +851,18 @@ def match_event(self, event_type, expectation, actual): ) if "newDescription" in spec: self.match_server_description(actual.new_description, spec["newDescription"]) + elif name == "serverHeartbeatStartedEvent": + self.test.assertIsInstance(actual, ServerHeartbeatStartedEvent) + if "awaited" in spec: + self.test.assertEqual(actual.awaited, spec["awaited"]) + elif name == "serverHeartbeatSucceededEvent": + self.test.assertIsInstance(actual, ServerHeartbeatSucceededEvent) + if "awaited" in spec: + self.test.assertEqual(actual.awaited, spec["awaited"]) + elif name == "serverHeartbeatFailedEvent": + self.test.assertIsInstance(actual, ServerHeartbeatFailedEvent) + if "awaited" in spec: + self.test.assertEqual(actual.awaited, spec["awaited"]) else: raise Exception(f"Unsupported event type {name}") @@ -868,7 +898,7 @@ class UnifiedSpecTestMixinV1(IntegrationTest): a class attribute ``TEST_SPEC``. """ - SCHEMA_VERSION = Version.from_string("1.15") + SCHEMA_VERSION = Version.from_string("1.17") RUN_ON_LOAD_BALANCER = True RUN_ON_SERVERLESS = True TEST_SPEC: Any diff --git a/test/uri_options/sdam-options.json b/test/uri_options/sdam-options.json new file mode 100644 index 0000000000..673f5607ee --- /dev/null +++ b/test/uri_options/sdam-options.json @@ -0,0 +1,46 @@ +{ + "tests": [ + { + "description": "serverMonitoringMode=auto", + "uri": "mongodb://example.com/?serverMonitoringMode=auto", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "serverMonitoringMode": "auto" + } + }, + { + "description": "serverMonitoringMode=stream", + "uri": "mongodb://example.com/?serverMonitoringMode=stream", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "serverMonitoringMode": "stream" + } + }, + { + "description": "serverMonitoringMode=poll", + "uri": "mongodb://example.com/?serverMonitoringMode=poll", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "serverMonitoringMode": "poll" + } + }, + { + "description": "invalid serverMonitoringMode", + "uri": "mongodb://example.com/?serverMonitoringMode=invalid", + "valid": true, + "warning": true, + "hosts": null, + "auth": null, + "options": {} + } + ] +} diff --git a/test/utils.py b/test/utils.py index 776aba8239..51a7903c4a 100644 --- a/test/utils.py +++ b/test/utils.py @@ -300,6 +300,7 @@ def __init__(self, address, options, handshake=True): self._lock = _create_lock() self.opts = options self.operation_count = 0 + self.conns = [] def stale_generation(self, gen, service_id): return self.gen.stale(gen, service_id)