Skip to content

PYTHON-2672 SDAM, CMAP, and server selection changes for load balancers #621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pymongo/client_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def _parse_pool_options(options):
options.get('compressors', []),
options.get('zlibcompressionlevel', -1))
ssl_context, ssl_match_hostname = _parse_ssl_options(options)
load_balanced = options.get('loadbalanced')
return PoolOptions(max_pool_size,
min_pool_size,
max_idle_time_seconds,
Expand All @@ -140,7 +141,8 @@ def _parse_pool_options(options):
appname,
driver,
compression_settings,
server_api=server_api)
server_api=server_api,
load_balanced=load_balanced)


class ClientOptions(object):
Expand Down
11 changes: 9 additions & 2 deletions pymongo/ismaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def _get_server_type(doc):
if not doc.get('ok'):
return SERVER_TYPE.Unknown

if doc.get('isreplicaset'):
if doc.get('serviceId'):
return SERVER_TYPE.LoadBalancer
elif doc.get('isreplicaset'):
return SERVER_TYPE.RSGhost
elif doc.get('setName'):
if doc.get('hidden'):
Expand Down Expand Up @@ -58,7 +60,8 @@ def __init__(self, doc, awaitable=False):
self._is_writable = self._server_type in (
SERVER_TYPE.RSPrimary,
SERVER_TYPE.Standalone,
SERVER_TYPE.Mongos)
SERVER_TYPE.Mongos,
SERVER_TYPE.LoadBalancer)

self._is_readable = (
self.server_type == SERVER_TYPE.RSSecondary
Expand Down Expand Up @@ -185,3 +188,7 @@ def topology_version(self):
@property
def awaitable(self):
return self._awaitable

@property
def service_id(self):
return self._doc.get('serviceId')
3 changes: 2 additions & 1 deletion pymongo/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,8 @@ def address(self):
'Cannot use "address" property when load balancing among'
' mongoses, use "nodes" instead.')
if topology_type not in (TOPOLOGY_TYPE.ReplicaSetWithPrimary,
TOPOLOGY_TYPE.Single):
TOPOLOGY_TYPE.Single,
TOPOLOGY_TYPE.LoadBalanced):
return None
return self._server_property('address')

Expand Down
28 changes: 25 additions & 3 deletions pymongo/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class PoolOptions(object):
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
'__event_listeners', '__appname', '__driver', '__metadata',
'__compression_settings', '__max_connecting',
'__pause_enabled', '__server_api')
'__pause_enabled', '__server_api', '__load_balanced')

def __init__(self, max_pool_size=MAX_POOL_SIZE,
min_pool_size=MIN_POOL_SIZE,
Expand All @@ -272,7 +272,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
ssl_match_hostname=True, socket_keepalive=True,
event_listeners=None, appname=None, driver=None,
compression_settings=None, max_connecting=MAX_CONNECTING,
pause_enabled=True, server_api=None):
pause_enabled=True, server_api=None, load_balanced=None):
self.__max_pool_size = max_pool_size
self.__min_pool_size = min_pool_size
self.__max_idle_time_seconds = max_idle_time_seconds
Expand All @@ -290,6 +290,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
self.__max_connecting = max_connecting
self.__pause_enabled = pause_enabled
self.__server_api = server_api
self.__load_balanced = load_balanced
self.__metadata = copy.deepcopy(_METADATA)
if appname:
self.__metadata['application'] = {'name': appname}
Expand Down Expand Up @@ -452,6 +453,12 @@ def server_api(self):
"""
return self.__server_api

@property
def load_balanced(self):
"""True if this Pool is configured in load balanced mode.
"""
return self.__load_balanced


def _negotiate_creds(all_credentials):
"""Return one credential that needs mechanism negotiation, if any.
Expand Down Expand Up @@ -531,6 +538,8 @@ def __init__(self, sock, pool, address, id):
self.cancel_context = _CancellationContext()
self.opts = pool.opts
self.more_to_come = False
# For load balancer support.
self.service_id = None

def hello_cmd(self):
if self.opts.server_api:
Expand All @@ -551,6 +560,8 @@ def _ismaster(self, cluster_time, topology_version,
cmd['client'] = self.opts.metadata
if self.compression_settings:
cmd['compression'] = self.compression_settings.compressors
if self.opts.load_balanced:
cmd['loadBalanced'] = True
elif topology_version is not None:
cmd['topologyVersion'] = topology_version
cmd['maxAwaitTimeMS'] = int(heartbeat_frequency*1000)
Expand All @@ -574,6 +585,10 @@ def _ismaster(self, cluster_time, topology_version,

doc = self.command('admin', cmd, publish_events=False,
exhaust_allowed=awaitable)
# PYTHON-2712 will remove this topologyVersion fallback logic.
if self.opts.load_balanced:
process_id = doc.get('topologyVersion', {}).get('processId')
doc.setdefault('serviceId', process_id)
ismaster = IsMaster(doc, awaitable=awaitable)
self.is_writable = ismaster.is_writable
self.max_wire_version = ismaster.max_wire_version
Expand All @@ -595,6 +610,12 @@ def _ismaster(self, cluster_time, topology_version,
auth_ctx.parse_response(ismaster)
if auth_ctx.speculate_succeeded():
self.auth_ctx[auth_ctx.credentials] = auth_ctx
if self.opts.load_balanced:
if not ismaster.service_id:
raise ConfigurationError(
'Driver attempted to initialize in load balancing mode'
' but the server does not support this mode')
self.service_id = ismaster.service_id
return ismaster

def _next_reply(self):
Expand Down Expand Up @@ -1116,7 +1137,8 @@ def _reset(self, close, pause=True):
with self.size_cond:
if self.closed:
return
if self.opts.pause_enabled and pause:
if (self.opts.pause_enabled and pause and
not self.opts.load_balanced):
old_state, self.state = self.state, PoolState.PAUSED
self.generation += 1
newpid = os.getpid()
Expand Down
3 changes: 2 additions & 1 deletion pymongo/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def open(self):

Multiple calls have no effect.
"""
self._monitor.open()
if not self._pool.opts.load_balanced:
self._monitor.open()

def reset(self):
"""Clear the connection pool."""
Expand Down
3 changes: 2 additions & 1 deletion pymongo/server_description.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ def retryable_writes_supported(self):
"""Checks if this server supports retryable writes."""
return (
self._ls_timeout_minutes is not None and
self._server_type in (SERVER_TYPE.Mongos, SERVER_TYPE.RSPrimary))
self._server_type in (SERVER_TYPE.Mongos, SERVER_TYPE.RSPrimary,
SERVER_TYPE.LoadBalancer))

@property
def retryable_reads_supported(self):
Expand Down
2 changes: 1 addition & 1 deletion pymongo/server_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
SERVER_TYPE = namedtuple('ServerType',
['Unknown', 'Mongos', 'RSPrimary', 'RSSecondary',
'RSArbiter', 'RSOther', 'RSGhost',
'Standalone'])(*range(8))
'Standalone', 'LoadBalancer'])(*range(9))
4 changes: 3 additions & 1 deletion pymongo/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ def load_balanced(self):
return self._load_balanced

def get_topology_type(self):
if self.direct:
if self.load_balanced:
return TOPOLOGY_TYPE.LoadBalanced
elif self.direct:
return TOPOLOGY_TYPE.Single
elif self.replica_set_name is not None:
return TOPOLOGY_TYPE.ReplicaSetNoPrimary
Expand Down
56 changes: 38 additions & 18 deletions pymongo/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
PyMongoError,
ServerSelectionTimeoutError,
WriteError)
from pymongo.ismaster import IsMaster
from pymongo.monitor import SrvMonitor
from pymongo.pool import PoolOptions
from pymongo.server import Server
Expand Down Expand Up @@ -136,7 +137,8 @@ def target():
executor.open()

self._srv_monitor = None
if self._settings.fqdn is not None:
if (self._settings.fqdn is not None and
not self._settings.load_balanced):
self._srv_monitor = SrvMonitor(self, self._settings)

def open(self):
Expand Down Expand Up @@ -489,29 +491,38 @@ def pop_all_sessions(self):
with self._lock:
return self._session_pool.pop_all()

def get_server_session(self):
"""Start or resume a server session, or raise ConfigurationError."""
with self._lock:
session_timeout = self._description.logical_session_timeout_minutes
if session_timeout is None:
# Maybe we need an initial scan? Can raise ServerSelectionError.
if self._description.topology_type == TOPOLOGY_TYPE.Single:
if not self._description.has_known_servers:
self._select_servers_loop(
any_server_selector,
self._settings.server_selection_timeout,
None)
elif not self._description.readable_servers:
def _check_session_support(self):
"""Internal check for session support on non-load balanced clusters."""
session_timeout = self._description.logical_session_timeout_minutes
if session_timeout is None:
# Maybe we need an initial scan? Can raise ServerSelectionError.
if self._description.topology_type == TOPOLOGY_TYPE.Single:
if not self._description.has_known_servers:
self._select_servers_loop(
readable_server_selector,
any_server_selector,
self._settings.server_selection_timeout,
None)
elif not self._description.readable_servers:
self._select_servers_loop(
readable_server_selector,
self._settings.server_selection_timeout,
None)

session_timeout = self._description.logical_session_timeout_minutes
if session_timeout is None:
raise ConfigurationError(
"Sessions are not supported by this MongoDB deployment")
return session_timeout

def get_server_session(self):
"""Start or resume a server session, or raise ConfigurationError."""
with self._lock:
# Sessions are always supported in load balanced mode.
if not self._settings.load_balanced:
session_timeout = self._check_session_support()
else:
# Sessions never time out in load balanced mode.
session_timeout = float('inf')
return self._session_pool.get_server_session(session_timeout)

def return_server_session(self, server_session, lock):
Expand Down Expand Up @@ -551,6 +562,12 @@ def _ensure_opened(self):
SRV_POLLING_TOPOLOGIES):
self._srv_monitor.open()

if self._settings.load_balanced:
# Emit initial SDAM events for load balancer mode.
self._process_change(ServerDescription(
self._seed_addresses[0],
IsMaster({'ok': 1, 'serviceId': self._topology_id})))

# Ensure that the monitors are open.
for server in self._servers.values():
server.open()
Expand Down Expand Up @@ -608,20 +625,23 @@ def _handle_error(self, address, err_ctx):
if err_code in helpers._NOT_MASTER_CODES:
is_shutting_down = err_code in helpers._SHUTDOWN_CODES
# Mark server Unknown, clear the pool, and request check.
self._process_change(ServerDescription(address, error=error))
if not self._settings.load_balanced:
self._process_change(ServerDescription(address, error=error))
if is_shutting_down or (err_ctx.max_wire_version <= 7):
# Clear the pool.
server.reset()
server.request_check()
elif not err_ctx.completed_handshake:
# Unknown command error during the connection handshake.
self._process_change(ServerDescription(address, error=error))
if not self._settings.load_balanced:
self._process_change(ServerDescription(address, error=error))
# Clear the pool.
server.reset()
elif issubclass(exc_type, ConnectionFailure):
# "Client MUST replace the server's description with type Unknown
# ... MUST NOT request an immediate check of the server."
self._process_change(ServerDescription(address, error=error))
if not self._settings.load_balanced:
self._process_change(ServerDescription(address, error=error))
# Clear the pool.
server.reset()
# "When a client marks a server Unknown from `Network error when
Expand Down
50 changes: 28 additions & 22 deletions pymongo/topology_description.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@


# Enumeration for various kinds of MongoDB cluster topologies.
TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary',
'ReplicaSetWithPrimary', 'Sharded',
'Unknown'])(*range(5))
TOPOLOGY_TYPE = namedtuple('TopologyType', [
'Single', 'ReplicaSetNoPrimary', 'ReplicaSetWithPrimary', 'Sharded',
'Unknown', 'LoadBalanced'])(*range(6))

# Topologies compatible with SRV record polling.
SRV_POLLING_TOPOLOGIES = (TOPOLOGY_TYPE.Unknown, TOPOLOGY_TYPE.Sharded)
Expand Down Expand Up @@ -63,7 +63,28 @@ def __init__(self,

# Is PyMongo compatible with all servers' wire protocols?
self._incompatible_err = None
if self._topology_type != TOPOLOGY_TYPE.LoadBalanced:
self._init_incompatible_err()

# Server Discovery And Monitoring Spec: Whenever a client updates the
# TopologyDescription from an ismaster response, it MUST set
# TopologyDescription.logicalSessionTimeoutMinutes to the smallest
# logicalSessionTimeoutMinutes value among ServerDescriptions of all
# data-bearing server types. If any have a null
# logicalSessionTimeoutMinutes, then
# TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null.
readable_servers = self.readable_servers
if not readable_servers:
self._ls_timeout_minutes = None
elif any(s.logical_session_timeout_minutes is None
for s in readable_servers):
self._ls_timeout_minutes = None
else:
self._ls_timeout_minutes = min(s.logical_session_timeout_minutes
for s in readable_servers)

def _init_incompatible_err(self):
"""Internal compatibility check for non-load balanced topologies."""
for s in self._server_descriptions.values():
if not s.is_server_type_known:
continue
Expand Down Expand Up @@ -98,23 +119,6 @@ def __init__(self,

break

# Server Discovery And Monitoring Spec: Whenever a client updates the
# TopologyDescription from an ismaster response, it MUST set
# TopologyDescription.logicalSessionTimeoutMinutes to the smallest
# logicalSessionTimeoutMinutes value among ServerDescriptions of all
# data-bearing server types. If any have a null
# logicalSessionTimeoutMinutes, then
# TopologyDescription.logicalSessionTimeoutMinutes MUST be set to null.
readable_servers = self.readable_servers
if not readable_servers:
self._ls_timeout_minutes = None
elif any(s.logical_session_timeout_minutes is None
for s in readable_servers):
self._ls_timeout_minutes = None
else:
self._ls_timeout_minutes = min(s.logical_session_timeout_minutes
for s in readable_servers)

def check_compatible(self):
"""Raise ConfigurationError if any server is incompatible.
Expand Down Expand Up @@ -243,8 +247,9 @@ def apply_local_threshold(selection):
selector.min_wire_version,
common_wv))

if self.topology_type == TOPOLOGY_TYPE.Single:
# Ignore selectors for standalone.
if self.topology_type in (TOPOLOGY_TYPE.Single,
TOPOLOGY_TYPE.LoadBalanced):
# Ignore selectors for standalone and load balancer mode.
return self.known_servers
elif address:
# Ignore selectors when explicit address is requested.
Expand Down Expand Up @@ -306,6 +311,7 @@ def __repr__(self):
SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
# Note: SERVER_TYPE.LoadBalancer and Unknown are intentionally left out.
}


Expand Down