Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pymongo/client_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ def __init__(
self.__auto_encryption_opts = options.get("auto_encryption_opts")
self.__load_balanced = options.get("loadbalanced")
self.__timeout = options.get("timeoutms")
self.__server_monitoring_mode = options.get(
"servermonitoringmode", common.SERVER_MONITORING_MODE
)

@property
def _options(self) -> Mapping[str, Any]:
Expand Down Expand Up @@ -284,7 +287,7 @@ def read_concern(self) -> ReadConcern:
def timeout(self) -> Optional[float]:
"""The configured timeoutMS converted to seconds, or None.

.. versionadded: 4.2
.. versionadded:: 4.2
"""
return self.__timeout

Expand Down Expand Up @@ -318,3 +321,11 @@ def event_listeners(self) -> list[_EventListeners]:
"""
assert self.__pool_options._event_listeners is not None
return self.__pool_options._event_listeners.event_listeners()

@property
def server_monitoring_mode(self) -> str:
"""The configured serverMonitoringMode option.

.. versionadded:: 4.5
"""
return self.__server_monitoring_mode
13 changes: 13 additions & 0 deletions pymongo/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@
# Default value for srvServiceName
SRV_SERVICE_NAME = "mongodb"

# Default value for serverMonitoringMode
SERVER_MONITORING_MODE = "auto" # poll/stream/auto


def partition_node(node: str) -> tuple[str, int]:
"""Split a host:port string into (host, int(port)) pair."""
Expand Down Expand Up @@ -664,6 +667,15 @@ def validate_datetime_conversion(option: Any, value: Any) -> Optional[DatetimeCo
raise TypeError(f"{option} must be a str or int representing DatetimeConversion")


def validate_server_monitoring_mode(option: str, value: str) -> str:
"""Validate the serverMonitoringMode option."""
if value not in {"auto", "stream", "poll"}:
raise ValueError(
f'{option}={value!r} is invalid. Must be one of "auto", "stream", or "poll"'
)
return value


# Dictionary where keys are the names of public URI options, and values
# are lists of aliases for that option.
URI_OPTIONS_ALIAS_MAP: dict[str, list[str]] = {
Expand Down Expand Up @@ -712,6 +724,7 @@ def validate_datetime_conversion(option: Any, value: Any) -> Optional[DatetimeCo
"srvservicename": validate_string,
"srvmaxhosts": validate_non_negative_integer,
"timeoutms": validate_timeoutms,
"servermonitoringmode": validate_server_monitoring_mode,
}

# Dictionary where keys are the names of URI options specific to pymongo,
Expand Down
6 changes: 6 additions & 0 deletions pymongo/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ def __init__(
- `heartbeatFrequencyMS`: (optional) The number of milliseconds
between periodic server checks, or None to accept the default
frequency of 10 seconds.
- `serverMonitoringMode`: (optional) The server monitoring mode to use.
Valid values are the strings: "auto", "stream", "poll". Defaults to "auto".
- `appname`: (string or None) The name of the application that
created this MongoClient instance. The server will log this value
upon establishing each connection. It is also recorded in the slow
Expand Down Expand Up @@ -585,6 +587,9 @@ def __init__(

.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.

.. versionchanged:: 4.5
Added the ``serverMonitoringMode`` keyword argument.

.. versionchanged:: 4.2
Added the ``timeoutMS`` keyword argument.

Expand Down Expand Up @@ -846,6 +851,7 @@ def __init__(
load_balanced=options.load_balanced,
srv_service_name=srv_service_name,
srv_max_hosts=srv_max_hosts,
server_monitoring_mode=options.server_monitoring_mode,
)

self._init_background()
Expand Down
27 changes: 22 additions & 5 deletions pymongo/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from pymongo.hello import Hello
from pymongo.lock import _create_lock
from pymongo.periodic_executor import _shutdown_executors
from pymongo.pool import _is_faas
from pymongo.read_preferences import MovingAverage
from pymongo.server_description import ServerDescription
from pymongo.srv_resolver import _SrvResolver
Expand Down Expand Up @@ -138,7 +139,12 @@ def __init__(
topology_settings,
topology._create_pool_for_monitor(server_description.address),
)
self.heartbeater = None
if topology_settings.server_monitoring_mode == "stream":
self._stream = True
elif topology_settings.server_monitoring_mode == "poll":
self._stream = False
else:
self._stream = not _is_faas()

def cancel_check(self) -> None:
"""Cancel any concurrent hello check.
Expand Down Expand Up @@ -200,7 +206,7 @@ def _run(self) -> None:
self._server_description, reset_pool=self._server_description.error
)

if (
if self._stream and (
self._server_description.is_server_type_known
and self._server_description.topology_version
):
Expand Down Expand Up @@ -237,7 +243,7 @@ def _check_server(self) -> ServerDescription:
address = sd.address
duration = time.monotonic() - start
if self._publish:
awaited = bool(sd.is_server_type_known and sd.topology_version)
awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version)
assert self._listeners is not None
self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited)
self._reset_connection()
Expand All @@ -255,7 +261,16 @@ def _check_once(self) -> ServerDescription:
address = self._server_description.address
if self._publish:
assert self._listeners is not None
self._listeners.publish_server_heartbeat_started(address)
sd = self._server_description
# XXX: "awaited" could be incorrectly set to True in the rare case
# the pool checkout closes and recreates a connection.
awaited = bool(
self._pool.conns
and self._stream
and sd.is_server_type_known
and sd.topology_version
)
self._listeners.publish_server_heartbeat_started(address, awaited)

if self._cancel_context and self._cancel_context.cancelled:
self._reset_connection()
Expand Down Expand Up @@ -284,7 +299,9 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:
if conn.more_to_come:
# Read the next streaming hello (MongoDB 4.4+).
response = Hello(conn._next_reply(), awaitable=True)
elif conn.performed_handshake and self._server_description.topology_version:
elif (
self._stream and conn.performed_handshake and self._server_description.topology_version
):
# Initiate streaming hello (MongoDB 4.4+).
response = conn._hello(
cluster_time,
Expand Down
38 changes: 25 additions & 13 deletions pymongo/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,10 +1292,11 @@ class TopologyClosedEvent(TopologyEvent):
class _ServerHeartbeatEvent:
"""Base class for server heartbeat events."""

__slots__ = "__connection_id"
__slots__ = ("__connection_id", "__awaited")

def __init__(self, connection_id: _Address) -> None:
def __init__(self, connection_id: _Address, awaited: bool = False) -> None:
self.__connection_id = connection_id
self.__awaited = awaited

@property
def connection_id(self) -> _Address:
Expand All @@ -1304,8 +1305,16 @@ def connection_id(self) -> _Address:
"""
return self.__connection_id

@property
def awaited(self) -> bool:
"""Whether the heartbeat was issued as an awaitable hello command.

.. versionadded:: 4.6
"""
return self.__awaited

def __repr__(self) -> str:
return f"<{self.__class__.__name__} {self.connection_id}>"
return f"<{self.__class__.__name__} {self.connection_id} awaited: {self.awaited}>"


class ServerHeartbeatStartedEvent(_ServerHeartbeatEvent):
Expand All @@ -1323,15 +1332,14 @@ class ServerHeartbeatSucceededEvent(_ServerHeartbeatEvent):
.. versionadded:: 3.3
"""

__slots__ = ("__duration", "__reply", "__awaited")
__slots__ = ("__duration", "__reply")

def __init__(
self, duration: float, reply: Hello, connection_id: _Address, awaited: bool = False
) -> None:
super().__init__(connection_id)
super().__init__(connection_id, awaited)
self.__duration = duration
self.__reply = reply
self.__awaited = awaited

@property
def duration(self) -> float:
Expand All @@ -1350,8 +1358,10 @@ def awaited(self) -> bool:
If true, then :meth:`duration` reflects the sum of the round trip time
to the server and the time that the server waited before sending a
response.

.. versionadded:: 3.11
"""
return self.__awaited
return super().awaited

def __repr__(self) -> str:
return "<{} {} duration: {}, awaited: {}, reply: {}>".format(
Expand All @@ -1370,15 +1380,14 @@ class ServerHeartbeatFailedEvent(_ServerHeartbeatEvent):
.. versionadded:: 3.3
"""

__slots__ = ("__duration", "__reply", "__awaited")
__slots__ = ("__duration", "__reply")

def __init__(
self, duration: float, reply: Exception, connection_id: _Address, awaited: bool = False
) -> None:
super().__init__(connection_id)
super().__init__(connection_id, awaited)
self.__duration = duration
self.__reply = reply
self.__awaited = awaited

@property
def duration(self) -> float:
Expand All @@ -1397,8 +1406,10 @@ def awaited(self) -> bool:
If true, then :meth:`duration` reflects the sum of the round trip time
to the server and the time that the server waited before sending a
response.

.. versionadded:: 3.11
"""
return self.__awaited
return super().awaited

def __repr__(self) -> str:
return "<{} {} duration: {}, awaited: {}, reply: {!r}>".format(
Expand Down Expand Up @@ -1602,14 +1613,15 @@ def publish_command_failure(
except Exception:
_handle_exception()

def publish_server_heartbeat_started(self, connection_id: _Address) -> None:
def publish_server_heartbeat_started(self, connection_id: _Address, awaited: bool) -> None:
"""Publish a ServerHeartbeatStartedEvent to all server heartbeat
listeners.

:Parameters:
- `connection_id`: The address (host, port) pair of the connection.
- `awaited`: True if this heartbeat is part of an awaitable hello command.
"""
event = ServerHeartbeatStartedEvent(connection_id)
event = ServerHeartbeatStartedEvent(connection_id, awaited)
for subscriber in self.__server_heartbeat_listeners:
try:
subscriber.started(event)
Expand Down
4 changes: 4 additions & 0 deletions pymongo/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ def _is_vercel() -> bool:
return bool(os.getenv("VERCEL"))


def _is_faas() -> bool:
return _is_lambda() or _is_azure_func() or _is_gcp_func() or _is_vercel()


def _getenv_int(key: str) -> Optional[int]:
"""Like os.getenv but returns an int, or None if the value is missing/malformed."""
val = os.getenv(key)
Expand Down
7 changes: 7 additions & 0 deletions pymongo/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
load_balanced: Optional[bool] = None,
srv_service_name: str = common.SRV_SERVICE_NAME,
srv_max_hosts: int = 0,
server_monitoring_mode: str = common.SERVER_MONITORING_MODE,
):
"""Represent MongoClient's configuration.

Expand All @@ -72,6 +73,7 @@ def __init__(
self._load_balanced = load_balanced
self._srv_service_name = srv_service_name
self._srv_max_hosts = srv_max_hosts or 0
self._server_monitoring_mode = server_monitoring_mode

self._topology_id = ObjectId()
# Store the allocation traceback to catch unclosed clients in the
Expand Down Expand Up @@ -146,6 +148,11 @@ def srv_max_hosts(self) -> int:
"""The srvMaxHosts."""
return self._srv_max_hosts

@property
def server_monitoring_mode(self) -> str:
"""The serverMonitoringMode."""
return self._server_monitoring_mode

def get_topology_type(self) -> int:
if self.load_balanced:
return TOPOLOGY_TYPE.LoadBalanced
Expand Down
2 changes: 1 addition & 1 deletion test/discovery_and_monitoring/unified/auth-error.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "auth-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "auth-misc-command-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "auth-network-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "auth-network-timeout-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "auth-shutdown-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "cancel-server-check",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "connectTimeoutMS",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "find-network-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "find-network-timeout-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "find-shutdown-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "hello-command-error",
"schemaVersion": "1.10",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "4.9",
Expand Down
Loading