Skip to content
Browse files

Implement read preferences for distributing reads among replica set m…

…embers PYTHON-367

Replace the 'mongo' dict with a Member object everywhere in ReplicaSetConnection.
A handful of commands obey read preferences; most are always sent to primary.
Track a 5-sample moving average of each replica set member's ping time.
Connection detects whether it's connected to primary, secondary, or mongos.
  • Loading branch information...
1 parent 70bf190 commit f275b2291ab080c111ee99f962cff5f89986bbd5 @ajdavis ajdavis committed Jul 24, 2012
View
2 doc/api/pymongo/collection.rst
@@ -22,6 +22,8 @@
.. autoattribute:: database
.. autoattribute:: slave_okay
.. autoattribute:: read_preference
+ .. autoattribute:: tag_sets
+ .. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: safe
.. autoattribute:: uuid_subtype
.. automethod:: get_lasterror_options
View
4 doc/api/pymongo/connection.rst
@@ -17,11 +17,15 @@
.. autoattribute:: host
.. autoattribute:: port
+ .. autoattribute:: is_primary
+ .. autoattribute:: is_mongos
.. autoattribute:: nodes
.. autoattribute:: max_pool_size
.. autoattribute:: document_class
.. autoattribute:: tz_aware
.. autoattribute:: read_preference
+ .. autoattribute:: tag_sets
+ .. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: slave_okay
.. autoattribute:: safe
.. autoattribute:: is_locked
View
2 doc/api/pymongo/database.rst
@@ -20,6 +20,8 @@
.. autoattribute:: slave_okay
.. autoattribute:: read_preference
+ .. autoattribute:: tag_sets
+ .. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: safe
.. automethod:: get_lasterror_options
.. automethod:: set_lasterror_options
View
2 doc/api/pymongo/index.rst
@@ -13,7 +13,7 @@
Alias for :class:`pymongo.replica_set_connection.ReplicaSetConnection`.
- .. autoclass:: pymongo.ReadPreference
+ .. autoclass:: pymongo.read_preferences.ReadPreference
.. autofunction:: has_c
Sub-modules:
View
2 doc/api/pymongo/replica_set_connection.rst
@@ -21,6 +21,8 @@
.. autoattribute:: primary
.. autoattribute:: secondaries
.. autoattribute:: read_preference
+ .. autoattribute:: tag_sets
+ .. autoattribute:: secondary_acceptable_latency_ms
.. autoattribute:: max_pool_size
.. autoattribute:: document_class
.. autoattribute:: tz_aware
View
2 doc/changelog.rst
@@ -100,7 +100,7 @@ Important New Features:
automatic failover handling and periodically checks the state of the
replica set to handle issues like primary stepdown or secondaries
being removed for backup operations. Read preferences are defined through
- :class:`~pymongo.ReadPreference`.
+ :class:`~pymongo.read_preferences.ReadPreference`.
- PyMongo supports the new BSON binary subtype 4 for UUIDs. The default
subtype to use can be set through
:attr:`~pymongo.collection.Collection.uuid_subtype`
View
5 doc/examples/gevent.rst
@@ -53,8 +53,9 @@ Additionally, it will use a background greenlet instead of a background thread
to monitor the state of the replica set.
Using :meth:`~pymongo.replica_set_connection.ReplicaSetConnection.start_request()`
-with :class:`~pymongo.ReadPreference` PRIMARY ensures that the current greenlet
-uses the same socket for all operations until a call to :meth:`end_request()`.
+with :class:`~pymongo.read_preferences.ReadPreference` PRIMARY ensures that the
+current greenlet uses the same socket for all operations until a call to
+:meth:`end_request()`.
You must `install Gevent <http://gevent.org/>`_ to use
:class:`~pymongo.replica_set_connection.ReplicaSetConnection`
View
108 doc/examples/replica_set.rst
@@ -155,31 +155,113 @@ the operation will succeed::
ReplicaSetConnection
--------------------
-In Pymongo-2.1 a new ReplicaSetConnection class was added that provides
-some new features not supported in the original Connection class. The most
-important of these is the ability to distribute queries to the secondary
-members of a replica set. To connect using ReplicaSetConnection just
-provide a host:port pair and the name of the replica set::
+Using a :class:`~pymongo.replica_set_connection.ReplicaSetConnection` instead
+of a simple :class:`~pymongo.connection.Connection` offers two key features:
+secondary reads and replica set health monitoring. To connect using
+`ReplicaSetConnection` just provide a host:port pair and the name of the
+replica set::
>>> from pymongo import ReplicaSetConnection
>>> ReplicaSetConnection("morton.local:27017", replicaSet='foo')
ReplicaSetConnection([u'morton.local:27019', u'morton.local:27017', u'morton.local:27018'])
+Secondary Reads
+'''''''''''''''
+
By default an instance of ReplicaSetConnection will only send queries to
-the primary member of the replica set. To use secondary members for queries
-we have to change the read preference::
+the primary member of the replica set. To use secondaries for queries
+we have to change the :class:`~pymongo.read_preference.ReadPreference`::
>>> db = ReplicaSetConnection("morton.local:27017", replicaSet='foo').test
- >>> from pymongo import ReadPreference
- >>> db.read_preference = ReadPreference.SECONDARY
+ >>> from pymongo.read_preference import ReadPreference
+ >>> db.read_preference = ReadPreference.SECONDARY_PREFERRED
Now all queries will be sent to the secondary members of the set. If there are
no secondary members the primary will be used as a fallback. If you have
queries you would prefer to never send to the primary you can specify that
-using the SECONDARY_ONLY read preference::
+using the ``SECONDARY`` read preference::
- >>> db.read_preference = ReadPreference.SECONDARY_ONLY
+ >>> db.read_preference = ReadPreference.SECONDARY
Read preference can be set on a connection, database, collection, or on a
-per-query basis.
-
+per-query basis, e.g.::
+
+ >>> db.collection.find_one(read_preference=ReadPreference.PRIMARY)
+
+Reads are configured using three options: **read_preference**, **tag_sets**,
+and **secondary_acceptable_latency_ms**.
+
+**read_preference**:
+
+- ``PRIMARY``: Read from the primary. This is the default, and provides the
+ strongest consistency. If no primary is available, raise
+ :class:`~pymongo.errors.AutoReconnect`.
+
+- ``PRIMARY_PREFERRED``: Read from the primary if available, or if there is
+ none, read from a secondary matching your choice of ``tag_sets`` and
+ ``secondary_acceptable_latency_ms``.
+
+- ``SECONDARY``: Read from a secondary matching your choice of ``tag_sets`` and
+ ``secondary_acceptable_latency_ms``. If no matching secondary is available,
+ raise :class:`~pymongo.errors.AutoReconnect`.
+
+- ``SECONDARY_PREFERRED``: Read from a secondary matching your choice of
+ ``tag_sets`` and ``secondary_acceptable_latency_ms`` if available, otherwise
+ from primary (regardless of the primary's tags and latency).
+
+- ``NEAREST``: Read from any member matching your choice of ``tag_sets`` and
+ ``secondary_acceptable_latency_ms``.
+
+**tag_sets**:
+
+Replica-set members can be `tagged
+<http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_ according to any
+criteria you choose. By default, ReplicaSetConnection ignores tags when
+choosing a member to read from, but it can be configured with the ``tag_sets``
+parameter. ``tag_sets`` must be a list of dictionaries, each dict providing tag
+values that the replica set member must match. ReplicaSetConnection tries each
+set of tags in turn until it finds a set of tags with at least one matching
+member. For example, to prefer reads from the New York data center, but fall
+back to the San Francisco data center, tag your replica set members according
+to their location and create a ReplicaSetConnection like so:
+
+ >>> rsc = ReplicaSetConnection(
+ ... "morton.local:27017",
+ ... replicaSet='foo'
+ ... read_preference=ReadPreference.SECONDARY,
+ ... tag_sets=[{'dc': 'ny'}, {'dc': 'sf'}]
+ ... )
+
+ReplicaSetConnection tries to find secondaries in New York, then San Francisco,
+and raises :class:`~pymongo.errors.AutoReconnect` if none are available. As an
+additional fallback, specify a final, empty tag set, ``{}``, which means "read
+from any member that matches the mode, ignoring tags."
+
+**secondary_acceptable_latency_ms**:
+
+If multiple members match the mode and tag sets, ReplicaSetConnection reads
+from among the nearest members, chosen according to ping time. By default,
+only members whose ping times are within 15 milliseconds of the nearest
+are used for queries. You can choose to distribute reads among members with
+higher latencies by setting ``secondary_acceptable_latency_ms`` to a larger
+number. In that case, ReplicaSetConnection distributes reads among matching
+members within ``secondary_acceptable_latency_ms`` of the closest member's
+ping time.
+
+Health Monitoring
+'''''''''''''''''
+
+When ReplicaSetConnection is initialized it launches a background task to
+monitor the replica set for changes in:
+
+* Health: detect when a member goes down or comes up, or if a different member
+ becomes primary
+* Configuration: detect changes in tags
+* Latency: track a moving average of each member's ping time
+
+Replica-set monitoring ensures queries are continually routed to the proper
+members as the state of the replica set changes.
+
+It is critical to call
+:meth:`~pymongo.replica_set_connection.ReplicaSetConnection.close` to terminate
+the monitoring task before your process exits.
View
9 gridfs/__init__.py
@@ -54,7 +54,8 @@ def __init__(self, database, collection="fs"):
self.__collection = database[collection]
self.__files = self.__collection.files
self.__chunks = self.__collection.chunks
- if not database.slave_okay and not database.read_preference:
+ connection = database.connection
+ if not hasattr(connection, 'is_primary') or connection.is_primary:
self.__chunks.ensure_index([("files_id", ASCENDING),
("n", ASCENDING)],
unique=True)
@@ -158,7 +159,7 @@ def get_version(self, filename=None, version=-1, **kwargs):
:Parameters:
- `filename`: ``"filename"`` of the file to get, or `None`
- - `version` (optional): version of the file to get (defualts
+ - `version` (optional): version of the file to get (defaults
to -1, the most recent version uploaded)
- `**kwargs` (optional): find files by custom metadata.
@@ -168,8 +169,8 @@ def get_version(self, filename=None, version=-1, **kwargs):
Accept keyword arguments to find files by custom metadata.
.. versionadded:: 1.9
"""
- database = self.__database
- if not database.slave_okay and not database.read_preference:
+ connection = self.__database.connection
+ if not hasattr(connection, 'is_primary') or connection.is_primary:
self.__files.ensure_index([("filename", ASCENDING),
("uploadDate", DESCENDING)])
View
44 pymongo/__init__.py
@@ -48,49 +48,6 @@
ALL = 2
"""Profile all operations."""
-class ReadPreference:
- """An enum that defines the read preferences supported by PyMongo.
-
- +----------------------+--------------------------------------------------+
- | Connection type | Read Preference |
- +======================+================+================+================+
- | |`PRIMARY` |`SECONDARY` |`SECONDARY_ONLY`|
- +----------------------+----------------+----------------+----------------+
- |Connection to a single|Queries are |Queries are |Same as |
- |host. |allowed if the |allowed if the |`SECONDARY` |
- | |connection is to|connection is to| |
- | |the replica set |the replica set | |
- | |primary. |primary or a | |
- | | |secondary. | |
- +----------------------+----------------+----------------+----------------+
- |Connection to a |Queries are sent|Queries are |Same as |
- |mongos. |to the primary |distributed |`SECONDARY` |
- | |of a shard. |among shard | |
- | | |secondaries. | |
- | | |Queries are sent| |
- | | |to the primary | |
- | | |if no | |
- | | |secondaries are | |
- | | |available. | |
- | | | | |
- +----------------------+----------------+----------------+----------------+
- |ReplicaSetConnection |Queries are sent|Queries are |Queries are |
- | |to the primary |distributed |never sent to |
- | |of the replica |among replica |the replica set |
- | |set. |set secondaries.|primary. An |
- | | |Queries are sent|exception is |
- | | |to the primary |raised if no |
- | | |if no |secondary is |
- | | |secondaries are |available. |
- | | |available. | |
- | | | | |
- +----------------------+----------------+----------------+----------------+
- """
-
- PRIMARY = 0
- SECONDARY = 1
- SECONDARY_ONLY = 2
-
version_tuple = (2, 2, 1, '+')
def get_version_string():
@@ -103,6 +60,7 @@ def get_version_string():
from pymongo.connection import Connection
from pymongo.replica_set_connection import ReplicaSetConnection
+from pymongo.read_preferences import ReadPreference
def has_c():
"""Is the C extension installed?
View
62 pymongo/collection.py
@@ -23,7 +23,7 @@
helpers,
message)
from pymongo.cursor import Cursor
-from pymongo.errors import ConfigurationError, InvalidName, InvalidOperation
+from pymongo.errors import ConfigurationError, InvalidName
def _gen_index_name(keys):
@@ -74,11 +74,14 @@ def __init__(self, database, name, create=False, **kwargs):
.. mongodoc:: collections
"""
- super(Collection,
- self).__init__(slave_okay=database.slave_okay,
- read_preference=database.read_preference,
- safe=database.safe,
- **(database.get_lasterror_options()))
+ super(Collection, self).__init__(
+ slave_okay=database.slave_okay,
+ read_preference=database.read_preference,
+ tag_sets=database.tag_sets,
+ secondary_acceptable_latency_ms=(
+ database.secondary_acceptable_latency_ms),
+ safe=database.safe,
+ **(database.get_lasterror_options()))
if not isinstance(name, basestring):
raise TypeError("name must be an instance "
@@ -590,13 +593,20 @@ def find(self, *args, **kwargs):
:class:`~pymongo.connection.Connection`-level default
- `read_preference` (optional): The read preference for
this query.
+ - `tag_sets` (optional): The tag sets for this query.
+ - `secondary_acceptable_latency_ms` (optional): Any replica-set
+ member whose ping time is within secondary_acceptable_latency_ms of
+ the nearest member may accept reads. Default 15 milliseconds.
.. note:: The `manipulate` parameter may default to False in
a future release.
.. note:: The `max_scan` parameter requires server
version **>= 1.5.1**
-
+
+ .. versionadded:: 2.2.1+
+ The `tag_sets` and `secondary_acceptable_latency_ms` parameters.
+
.. versionadded:: 1.11+
The `await_data`, `partial`, and `manipulate` parameters.
@@ -619,6 +629,11 @@ def find(self, *args, **kwargs):
kwargs['slave_okay'] = self.slave_okay
if not 'read_preference' in kwargs:
kwargs['read_preference'] = self.read_preference
+ if not 'tag_sets' in kwargs:
+ kwargs['tag_sets'] = self.tag_sets
+ if not 'secondary_acceptable_latency_ms' in kwargs:
+ kwargs['secondary_acceptable_latency_ms'] = (
+ self.secondary_acceptable_latency_ms)
return Cursor(self, *args, **kwargs)
def count(self):
@@ -924,7 +939,6 @@ def aggregate(self, pipeline):
return self.__database.command("aggregate", self.__name,
pipeline=pipeline,
- read_preference=self.read_preference,
slave_okay=self.slave_okay,
_use_master=use_master)
@@ -949,9 +963,10 @@ def group(self, key, condition, initial, reduce, finalize=None):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if the `read_preference` attribute of this instance is not set to
- :attr:`pymongo.ReadPreference.PRIMARY` or the (deprecated)
- `slave_okay` attribute of this instance is set to `True` the group
- command will be sent to a secondary or slave.
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED`, or
+ the (deprecated) `slave_okay` attribute of this instance is set to
+ `True`, the group command will be sent to a secondary or slave.
:Parameters:
- `key`: fields to group by (see above description)
@@ -989,6 +1004,9 @@ def group(self, key, condition, initial, reduce, finalize=None):
return self.__database.command("group", group,
uuid_subtype=self.__uuid_subtype,
read_preference=self.read_preference,
+ tag_sets=self.tag_sets,
+ secondary_acceptable_latency_ms=(
+ self.secondary_acceptable_latency_ms),
slave_okay=self.slave_okay,
_use_master=use_master)["retval"]
@@ -1089,10 +1107,20 @@ def map_reduce(self, map, reduce, out, full_response=False, **kwargs):
raise TypeError("'out' must be an instance of "
"%s or dict" % (basestring.__name__,))
+ if isinstance(out, dict) and out.get('inline'):
+ must_use_master = False
+ else:
+ must_use_master = True
+
response = self.__database.command("mapreduce", self.__name,
uuid_subtype=self.__uuid_subtype,
map=map, reduce=reduce,
- out=out, **kwargs)
+ read_preference=self.read_preference,
+ tag_sets=self.tag_sets,
+ secondary_acceptable_latency_ms=(
+ self.secondary_acceptable_latency_ms),
+ out=out, _use_master=must_use_master,
+ **kwargs)
if full_response or not response.get('result'):
return response
@@ -1117,9 +1145,10 @@ def inline_map_reduce(self, map, reduce, full_response=False, **kwargs):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
if the `read_preference` attribute of this instance is not set to
- :attr:`pymongo.ReadPreference.PRIMARY` or the (deprecated)
- `slave_okay` attribute of this instance is set to `True` the inline
- map reduce will be run on a secondary or slave.
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED`, or
+ the (deprecated) `slave_okay` attribute of this instance is set to
+ `True`, the inline map reduce will be run on a secondary or slave.
:Parameters:
- `map`: map function (as a JavaScript string)
@@ -1142,6 +1171,9 @@ def inline_map_reduce(self, map, reduce, full_response=False, **kwargs):
res = self.__database.command("mapreduce", self.__name,
uuid_subtype=self.__uuid_subtype,
read_preference=self.read_preference,
+ tag_sets=self.tag_sets,
+ secondary_acceptable_latency_ms=(
+ self.secondary_acceptable_latency_ms),
slave_okay=self.slave_okay,
_use_master=use_master,
map=map, reduce=reduce,
View
115 pymongo/common.py
@@ -15,8 +15,9 @@
"""Functions and classes common to multiple pymongo modules."""
import warnings
+from pymongo import read_preferences
-from pymongo import ReadPreference
+from pymongo.read_preferences import ReadPreference
from pymongo.errors import ConfigurationError
@@ -83,31 +84,60 @@ def validate_int_or_basestring(option, value):
"integer or a string" % (option,))
+def validate_positive_float(option, value):
+ """Validates that 'value' is a float, or can be converted to one, and is
+ positive.
+ """
+ err = ConfigurationError("%s must be a positive int or float" % (option,))
+ try:
+ value = float(value)
+ except (ValueError, TypeError):
+ raise err
+ if value <= 0:
+ raise err
+
+ return value
+
+
def validate_timeout_or_none(option, value):
"""Validates a timeout specified in milliseconds returning
a value in floating point seconds.
"""
if value is None:
return value
- try:
- value = float(value)
- except (ValueError, TypeError):
- raise ConfigurationError("%s must be an "
- "instance of int or float" % (option,))
- if value <= 0:
- raise ConfigurationError("%s must be a positive integer" % (option,))
- return value / 1000.0
+ return validate_positive_float(option, value) / 1000.0
def validate_read_preference(dummy, value):
"""Validate read preference for a ReplicaSetConnection.
"""
- if value not in range(ReadPreference.PRIMARY,
- ReadPreference.SECONDARY_ONLY + 1):
+ if value not in read_preferences.modes:
raise ConfigurationError("Not a valid read preference")
return value
+def validate_tag_sets(dummy, value):
+ """Validate tag sets for a ReplicaSetConnection.
+ """
+ if value is None:
+ return [{}]
+
+ if not isinstance(value, list):
+ raise ConfigurationError((
+ "Tag sets %s invalid, must be a list" ) % repr(value))
+ if len(value) == 0:
+ raise ConfigurationError((
+ "Tag sets %s invalid, must be None or contain at least one set of"
+ " tags") % repr(value))
+
+ for tags in value:
+ if not isinstance(tags, dict):
+ raise ConfigurationError(
+ "Tag set %s invalid, must be a dict" % repr(tags))
+
+ return value
+
+
# jounal is an alias for j,
# wtimeoutms is an alias for wtimeout
VALIDATORS = {
@@ -125,6 +155,9 @@ def validate_read_preference(dummy, value):
'sockettimeoutms': validate_timeout_or_none,
'ssl': validate_boolean,
'read_preference': validate_read_preference,
+ 'tag_sets': validate_tag_sets,
+ 'secondaryacceptablelatencyms': validate_positive_float,
+ 'secondary_acceptable_latency_ms': validate_positive_float,
'auto_start_request': validate_boolean,
'use_greenlets': validate_boolean,
}
@@ -160,9 +193,16 @@ def __init__(self, **options):
self.__slave_okay = False
self.__read_pref = ReadPreference.PRIMARY
+ self.__tag_sets = [{}]
+ self.__secondary_acceptable_latency_ms = 15
self.__safe = False
self.__safe_opts = {}
self.__set_options(options)
+ if (self.__read_pref == ReadPreference.PRIMARY
+ and self.__tag_sets != [{}]
+ ):
+ raise ConfigurationError(
+ "ReadPreference PRIMARY cannot be combined with tags")
def __set_safe_option(self, option, value, check=False):
"""Validates and sets getlasterror options for this
@@ -183,6 +223,14 @@ def __set_options(self, options):
self.__slave_okay = validate_boolean(option, value)
elif option == 'read_preference':
self.__read_pref = validate_read_preference(option, value)
+ elif option == 'tag_sets':
+ self.__tag_sets = validate_tag_sets(option, value)
+ elif option in (
+ 'secondaryAcceptableLatencyMS',
+ 'secondary_acceptable_latency_ms'
+ ):
+ self.__secondary_acceptable_latency_ms = \
+ validate_positive_float(option, value)
elif option == 'safe':
self.__safe = validate_boolean(option, value)
elif option in SAFE_OPTIONS:
@@ -211,9 +259,9 @@ def __set_slave_okay(self, value):
slave_okay = property(__get_slave_okay, __set_slave_okay)
def __get_read_pref(self):
- """The read preference for this instance.
+ """The read preference mode for this instance.
- See :class:`~pymongo.ReadPreference` for available options.
+ See :class:`~pymongo.read_preferences.ReadPreference` for available options.
.. versionadded:: 2.1
"""
@@ -224,6 +272,47 @@ def __set_read_pref(self, value):
self.__read_pref = validate_read_preference('read_preference', value)
read_preference = property(__get_read_pref, __set_read_pref)
+
+ def __get_acceptable_latency(self):
+ """Any replica-set member whose ping time is within
+ secondary_acceptable_latency_ms of the nearest member may accept
+ reads. Defaults to 15 milliseconds.
+
+ See :class:`~pymongo.read_preferences.ReadPreference`.
+
+ .. versionadded:: 2.2.1+
+ """
+ return self.__secondary_acceptable_latency_ms
+
+ def __set_acceptable_latency(self, value):
+ """Property setter for secondary_acceptable_latency_ms"""
+ self.__secondary_acceptable_latency_ms = (validate_positive_float(
+ 'secondary_acceptable_latency_ms', value))
+
+ secondary_acceptable_latency_ms = property(
+ __get_acceptable_latency, __set_acceptable_latency)
+
+ def __get_tag_sets(self):
+ """Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
+ read only from members whose ``dc`` tag has the value ``"ny"``.
+ To specify a priority-order for tag sets, provide a list of
+ tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
+ set, ``{}``, means "read from any member that matches the mode,
+ ignoring tags." ReplicaSetConnection tries each set of tags in turn
+ until it finds a set of tags with at least one matching member.
+
+ .. seealso:: `Data-Center Awareness
+ <http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_
+
+ .. versionadded:: 2.2.1+
+ """
+ return self.__tag_sets
+
+ def __set_tag_sets(self, value):
+ """Property setter for tag_sets"""
+ self.__tag_sets = validate_tag_sets('tag_sets', value)
+
+ tag_sets = property(__get_tag_sets, __set_tag_sets)
def __get_safe(self):
"""Use getlasterror with every write operation?
View
47 pymongo/connection.py
@@ -130,7 +130,7 @@ def __init__(self, host=None, port=None, max_pool_size=10,
Other optional parameters can be passed as keyword arguments:
- `safe`: Use getlasterror for each write operation?
- - `j` or `journal`: Block until write operations have been commited
+ - `j` or `journal`: Block until write operations have been committed
to the journal. Ignored if the server is running without journaling.
Implies safe=True.
- `w`: (integer or string) If this is a replica set write operations
@@ -154,7 +154,8 @@ def __init__(self, host=None, port=None, max_pool_size=10,
before timing out.
- `ssl`: If True, create the connection to the server using SSL.
- `read_preference`: The read preference for this connection.
- See :class:`~pymongo.ReadPreference` for available options.
+ See :class:`~pymongo.read_preferences.ReadPreference` for available
+ options.
- `auto_start_request`: If True (the default), each thread that
accesses this Connection has a socket allocated to it for the
thread's lifetime. This ensures consistent reads, even if you read
@@ -228,6 +229,8 @@ def __init__(self, host=None, port=None, max_pool_size=10,
self.__nodes = seeds
self.__host = None
self.__port = None
+ self.__is_primary = False
+ self.__is_mongos = False
for option, value in kwargs.iteritems():
option, value = common.validate(option, value)
@@ -426,6 +429,23 @@ def port(self):
return self.__port
@property
+ def is_primary(self):
+ """If this Connection is connected to a standalone, a replica-set
+ primary, or the master of a master-slave set.
+
+ .. versionadded:: 2.2.1+
+ """
+ return self.__is_primary
+
+ @property
+ def is_mongos(self):
+ """If this Connection is connected to mongos.
+
+ .. versionadded:: 2.2.1+
+ """
+ return self.__is_mongos
+
+ @property
def max_pool_size(self):
"""The maximum pool size limit set for this connection.
@@ -507,7 +527,7 @@ def __auth(self, sock_info, dbname, user, passwd):
def __try_node(self, node):
"""Try to connect to this node and see if it works
- for our connection type.
+ for our connection type. Returns ((host, port), is_primary, is_mongos).
:Parameters:
- `node`: The (host, port) pair to try.
@@ -539,7 +559,7 @@ def __try_node(self, node):
# TODO: Rework this for PYTHON-368 (mongos high availability).
if not self.__nodes:
self.__nodes = set([node])
- return node
+ return node, True, response.get('msg', '') == 'isdbgrid'
elif "primary" in response:
candidate = _partition_node(response["primary"])
return self.__try_node(candidate)
@@ -550,7 +570,7 @@ def __try_node(self, node):
# Direct connection
if response.get("arbiterOnly", False):
raise ConfigurationError("%s:%d is an arbiter" % node)
- return node
+ return node, response['ismaster'], response.get('msg', '') == 'isdbgrid'
def __find_node(self, seeds=None):
"""Find a host, port pair suitable for our connection type.
@@ -570,24 +590,27 @@ def __find_node(self, seeds=None):
In either case a connection to an arbiter will never succeed.
Sets __host and __port so that :attr:`host` and :attr:`port`
- will return the address of the connected host.
+ will return the address of the connected host. Sets __is_primary to
+ True if this is a primary or master, else False.
"""
errors = []
# self.__nodes may change size as we iterate.
candidates = seeds or self.__nodes.copy()
for candidate in candidates:
try:
- node = self.__try_node(candidate)
- if node:
- return node
+ node, is_primary, is_mongos = self.__try_node(candidate)
+ self.__is_primary = is_primary
+ self.__is_mongos = is_mongos
+ return node
except Exception, why:
errors.append(str(why))
# Try any hosts we discovered that were not in the seed list.
for candidate in self.__nodes - candidates:
try:
- node = self.__try_node(candidate)
- if node:
- return node
+ node, is_primary, is_mongos = self.__try_node(candidate)
+ self.__is_primary = is_primary
+ self.__is_mongos = is_mongos
+ return node
except Exception, why:
errors.append(str(why))
# Couldn't find a suitable host.
View
43 pymongo/cursor.py
@@ -16,9 +16,8 @@
from bson.code import Code
from bson.son import SON
-from pymongo import (helpers,
- message,
- ReadPreference)
+from pymongo import helpers, message, read_preferences
+from pymongo.read_preferences import ReadPreference
from pymongo.errors import (InvalidOperation,
AutoReconnect)
@@ -43,7 +42,8 @@ def __init__(self, collection, spec=None, fields=None, skip=0, limit=0,
timeout=True, snapshot=False, tailable=False, sort=None,
max_scan=None, as_class=None, slave_okay=False,
await_data=False, partial=False, manipulate=True,
- read_preference=ReadPreference.PRIMARY,
+ read_preference=ReadPreference.PRIMARY, tag_sets=[{}],
+ secondary_acceptable_latency_ms=None,
_must_use_master=False, _uuid_subtype=None, **kwargs):
"""Create a new cursor.
@@ -113,6 +113,8 @@ def __init__(self, collection, spec=None, fields=None, skip=0, limit=0,
self.__slave_okay = slave_okay
self.__manipulate = manipulate
self.__read_preference = read_preference
+ self.__tag_sets = tag_sets
+ self.__secondary_acceptable_latency_ms = secondary_acceptable_latency_ms
self.__tz_aware = collection.database.connection.tz_aware
self.__must_use_master = _must_use_master
self.__uuid_subtype = _uuid_subtype or collection.uuid_subtype
@@ -179,6 +181,9 @@ def clone(self):
copy.__partial = self.__partial
copy.__manipulate = self.__manipulate
copy.__read_preference = self.__read_preference
+ copy.__tag_sets = self.__tag_sets
+ copy.__secondary_acceptable_latency_ms = (
+ self.__secondary_acceptable_latency_ms)
copy.__must_use_master = self.__must_use_master
copy.__uuid_subtype = self.__uuid_subtype
copy.__query_flags = self.__query_flags
@@ -217,6 +222,14 @@ def __query_spec(self):
operators["$snapshot"] = True
if self.__max_scan:
operators["$maxScan"] = self.__max_scan
+ if self.__collection.database.connection.is_mongos:
+ read_pref = {
+ 'mode': read_preferences.mongos_mode(self.__read_preference)}
+
+ if self.__tag_sets and self.__tag_sets != [{}]:
+ read_pref['tags'] = self.__tag_sets
+
+ operators['$readPreference'] = read_pref
if operators:
# Make a shallow copy so we can cleanly rewind or clone.
@@ -243,7 +256,9 @@ def __query_options(self):
options = self.__query_flags
if self.__tailable:
options |= _QUERY_OPTIONS["tailable_cursor"]
- if self.__slave_okay or self.__read_preference:
+ if (self.__slave_okay
+ or self.__read_preference != ReadPreference.PRIMARY
+ ):
options |= _QUERY_OPTIONS["slave_okay"]
if not self.__timeout:
options |= _QUERY_OPTIONS["no_timeout"]
@@ -474,8 +489,10 @@ def count(self, with_limit_and_skip=False):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
- if `read_preference` is not :attr:`pymongo.ReadPreference.PRIMARY` or
- (deprecated) `slave_okay` is `True` the count command will be sent to
+ if `read_preference` is not
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
+ :attr:`pymongo.read_preferences.ReadPreference.PRIMARY_PREFERRED`, or
+ (deprecated) `slave_okay` is `True`, the count command will be sent to
a secondary or slave.
:Parameters:
@@ -494,6 +511,9 @@ def count(self, with_limit_and_skip=False):
command = {"query": self.__spec, "fields": self.__fields}
command['read_preference'] = self.__read_preference
+ command['tag_sets'] = self.__tag_sets
+ command['secondary_acceptable_latency_ms'] = (
+ self.__secondary_acceptable_latency_ms)
command['slave_okay'] = self.__slave_okay
use_master = not self.__slave_okay and not self.__read_preference
command['_use_master'] = use_master
@@ -522,7 +542,8 @@ def distinct(self, key):
With :class:`~pymongo.replica_set_connection.ReplicaSetConnection`
or :class:`~pymongo.master_slave_connection.MasterSlaveConnection`,
- if `read_preference` is not :attr:`pymongo.ReadPreference.PRIMARY` or
+ if `read_preference` is
+ not :attr:`pymongo.read_preferences.ReadPreference.PRIMARY` or
(deprecated) `slave_okay` is `True` the distinct command will be sent
to a secondary or slave.
@@ -544,6 +565,9 @@ def distinct(self, key):
options["query"] = self.__spec
options['read_preference'] = self.__read_preference
+ options['tag_sets'] = self.__tag_sets
+ options['secondary_acceptable_latency_ms'] = (
+ self.__secondary_acceptable_latency_ms)
options['slave_okay'] = self.__slave_okay
use_master = not self.__slave_okay and not self.__read_preference
options['_use_master'] = use_master
@@ -629,6 +653,9 @@ def __send_message(self, message):
db = self.__collection.database
kwargs = {"_must_use_master": self.__must_use_master}
kwargs["read_preference"] = self.__read_preference
+ kwargs["tag_sets"] = self.__tag_sets
+ kwargs["secondary_acceptable_latency_ms"] = (
+ self.__secondary_acceptable_latency_ms)
if self.__connection_id is not None:
kwargs["_connection_to_use"] = self.__connection_id
kwargs.update(self.__kwargs)
View
44 pymongo/database.py
@@ -26,6 +26,7 @@
InvalidName,
OperationFailure)
from pymongo.son_manipulator import ObjectIdInjector
+from pymongo import read_preferences as rp
def _check_name(name):
@@ -62,6 +63,9 @@ def __init__(self, connection, name):
super(Database,
self).__init__(slave_okay=connection.slave_okay,
read_preference=connection.read_preference,
+ tag_sets=connection.tag_sets,
+ secondary_acceptable_latency_ms=(
+ connection.secondary_acceptable_latency_ms),
safe=connection.safe,
**(connection.get_lasterror_options()))
@@ -313,9 +317,23 @@ def command(self, command, value=1,
in this list will be ignored by error-checking
- `uuid_subtype` (optional): The BSON binary subtype to use
for a UUID used in this command.
+ - `read_preference`: The read preference for this connection.
+ See :class:`~pymongo.read_preferences.ReadPreference` for available
+ options.
+ - `tag_sets`: Read from replica-set members with these tags.
+ To specify a priority-order for tag sets, provide a list of
+ tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
+ set, ``{}``, means "read from any member that matches the mode,
+ ignoring tags." ReplicaSetConnection tries each set of tags in turn
+ until it finds a set of tags with at least one matching member.
+ - `secondary_acceptable_latency_ms`: Any replica-set member whose
+ ping time is within secondary_acceptable_latency_ms of the nearest
+ member may accept reads. Default 15 milliseconds.
- `**kwargs` (optional): additional keyword arguments will
be added to the command document before it is sent
+ .. versionchanged:: 2.2.1+
+ Added `tag_sets` and `secondary_acceptable_latency_ms` options.
.. versionchanged:: 2.2
Added support for `as_class` - the class you want to use for
the resulting documents
@@ -332,15 +350,35 @@ def command(self, command, value=1,
if isinstance(command, basestring):
command = SON([(command, value)])
+ command_name = command.keys()[0]
+ must_use_master = kwargs.pop('_use_master', False)
+ if command_name.lower() not in rp.secondary_ok_commands:
+ must_use_master = True
+
+ # Special-case: mapreduce can go to secondaries only if inline
+ if command_name == 'mapreduce':
+ out = command.get('out') or kwargs.get('out')
+ if not isinstance(out, dict) or not out.get('inline'):
+ must_use_master = True
+
extra_opts = {
'as_class': kwargs.pop('as_class', None),
- 'read_preference': kwargs.pop('read_preference',
- self.read_preference),
'slave_okay': kwargs.pop('slave_okay', self.slave_okay),
- '_must_use_master': kwargs.pop('_use_master', True),
+ '_must_use_master': must_use_master,
'_uuid_subtype': uuid_subtype
}
+ if not must_use_master:
+ extra_opts['read_preference'] = kwargs.pop(
+ 'read_preference',
+ self.read_preference)
+ extra_opts['tag_sets'] = kwargs.pop(
+ 'tag_sets',
+ self.tag_sets)
+ extra_opts['secondary_acceptable_latency_ms'] = kwargs.pop(
+ 'secondary_acceptable_latency_ms',
+ self.secondary_acceptable_latency_ms)
+
fields = kwargs.get('fields')
if fields is not None and not isinstance(fields, dict):
kwargs['fields'] = helpers._fields_list_to_dict(fields)
View
3 pymongo/errors.py
@@ -39,6 +39,9 @@ class AutoReconnect(ConnectionFailure):
will continue to raise this exception until the first successful
connection is made).
"""
+ def __init__(self, message='', errors=None):
+ self.errors = errors or []
+ ConnectionFailure.__init__(self, message)
class ConfigurationError(PyMongoError):
View
12 pymongo/master_slave_connection.py
@@ -39,8 +39,8 @@ def __init__(self, master, slaves=[], document_class=dict, tz_aware=False):
to create this `MasterSlaveConnection` can themselves make use of
connection pooling, etc. 'Connection' instances used as slaves should
be created with the read_preference option set to
- :attr:`~pymongo.ReadPreference.SECONDARY`. Safe options are
- inherited from `master` and can be changed in this instance.
+ :attr:`~pymongo.read_preferences.ReadPreference.SECONDARY`. Safe
+ options are inherited from `master` and can be changed in this instance.
Raises TypeError if `master` is not an instance of `Connection` or
slaves is not a list of at least one `Connection` instances.
@@ -85,6 +85,14 @@ def master(self):
def slaves(self):
return self.__slaves
+ @property
+ def is_mongos(self):
+ """If this MasterSlaveConnection is connected to mongos (always False)
+
+ .. versionadded:: 2.2.1+
+ """
+ return False
+
def get_document_class(self):
return self.__document_class
View
208 pymongo/read_preferences.py
@@ -0,0 +1,208 @@
+# Copyright 2012 10gen, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License",
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Utilities for choosing which member of a replica set to read from."""
+
+import random
+from collections import deque
+
+from pymongo.errors import ConfigurationError
+
+
+class ReadPreference:
+ """An enum that defines the read preferences supported by PyMongo. Used in
+ three cases:
+
+ :class:`~pymongo.connection.Connection` to a single host:
+
+ * `PRIMARY`: Queries are allowed if the connection is to the replica set
+ primary.
+ * `PRIMARY_PREFERRED`: Queries are allowed if the connection is to the
+ primary or a secondary.
+ * `SECONDARY`: Queries are allowed if the connection is to a secondary.
+ * `SECONDARY_PREFERRED`: Same as `PRIMARY_PREFERRED`.
+ * `NEAREST`: Same as `PRIMARY_PREFERRED`.
+
+ :class:`~pymongo.connection.Connection` to a mongos, with a sharded cluster
+ of replica sets:
+
+ * `PRIMARY`: Queries are sent to the primary of a shard.
+ * `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
+ otherwise a secondary.
+ * `SECONDARY`: Queries are distributed among shard secondaries. An error
+ is raised if no secondaries are available.
+ * `SECONDARY_PREFERRED`: Queries are distributed among shard secondaries,
+ or the primary if no secondary is available.
+ * `NEAREST`: Queries are distributed among all members of a shard.
+
+ :class:`~pymongo.replica_set_connection.ReplicaSetConnection`:
+
+ * `PRIMARY`: Queries are sent to the primary of the replica set.
+ * `PRIMARY_PREFERRED`: Queries are sent to the primary if available,
+ otherwise a secondary.
+ * `SECONDARY`: Queries are distributed among secondaries. An error
+ is raised if no secondaries are available.
+ * `SECONDARY_PREFERRED`: Queries are distributed among secondaries,
+ or the primary if no secondary is available.
+ * `NEAREST`: Queries are distributed among all members.
+ """
+
+ PRIMARY = 0
+ PRIMARY_PREFERRED = 1
+ SECONDARY = 2
+ SECONDARY_ONLY = 2
+ SECONDARY_PREFERRED = 3
+ NEAREST = 4
+
+# For formatting error messages
+modes = {
+ ReadPreference.PRIMARY: 'PRIMARY',
+ ReadPreference.PRIMARY_PREFERRED: 'PRIMARY_PREFERRED',
+ ReadPreference.SECONDARY: 'SECONDARY',
+ ReadPreference.SECONDARY_PREFERRED: 'SECONDARY_PREFERRED',
+ ReadPreference.NEAREST: 'NEAREST',
+}
+
+def select_primary(members):
+ for member in members:
+ if member.is_primary:
+ if member.up:
+ return member
+ else:
+ return None
+
+ return None
+
+
+def select_member_with_tags(members, tags, secondary_only, latency):
+ candidates = []
+
+ for candidate in members:
+ if not candidate.up:
+ continue
+
+ if secondary_only and candidate.is_primary:
+ continue
+
+ if candidate.matches_tags(tags):
+ candidates.append(candidate)
+
+ if not candidates:
+ return None
+
+ # ping_time is in seconds
+ fastest = min([candidate.get_avg_ping_time() for candidate in candidates])
+ near_candidates = [
+ candidate for candidate in candidates
+ if candidate.get_avg_ping_time() - fastest < latency / 1000.]
+
+ return random.choice(near_candidates)
+
+
+def select_member(
+ members,
+ mode=ReadPreference.PRIMARY,
+ tag_sets=None,
+ latency=15
+):
+ """Return a Member or None.
+ """
+ if tag_sets is None:
+ tag_sets = [{}]
+
+ # For brevity
+ PRIMARY = ReadPreference.PRIMARY
+ PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
+ SECONDARY = ReadPreference.SECONDARY
+ SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
+ NEAREST = ReadPreference.NEAREST
+
+ if mode == PRIMARY:
+ if tag_sets != [{}]:
+ raise ConfigurationError("PRIMARY cannot be combined with tags")
+ return select_primary(members)
+
+ elif mode == PRIMARY_PREFERRED:
+ candidate_primary = select_member(members, PRIMARY, [{}], latency)
+ if candidate_primary:
+ return candidate_primary
+ else:
+ return select_member(members, SECONDARY, tag_sets, latency)
+
+ elif mode == SECONDARY:
+ for tags in tag_sets:
+ candidate = select_member_with_tags(members, tags, True, latency)
+ if candidate:
+ return candidate
+
+ return None
+
+ elif mode == SECONDARY_PREFERRED:
+ candidate_secondary = select_member(
+ members, SECONDARY, tag_sets, latency)
+ if candidate_secondary:
+ return candidate_secondary
+ else:
+ return select_member(members, PRIMARY, [{}], latency)
+
+ elif mode == NEAREST:
+ for tags in tag_sets:
+ candidate = select_member_with_tags(members, tags, False, latency)
+ if candidate:
+ return candidate
+
+ # Ran out of tags.
+ return None
+
+ else:
+ raise ConfigurationError("Invalid mode %s" % repr(mode))
+
+
+"""Commands that may be sent to replica-set secondaries, depending on
+ ReadPreference and tags. All other commands are always run on the primary.
+"""
+secondary_ok_commands = set([
+ "group", "aggregate", "collstats", "dbstats", "count", "distinct",
+ "geonear", "geosearch", "geowalk", "mapreduce",
+])
+
+
+class MovingAverage(object):
+ """Tracks a moving average. Not thread-safe.
+ """
+ def __init__(self, window_sz):
+ self.window_sz = window_sz
+ self.samples = deque()
+ self.total = 0
+
+ def update(self, sample):
+ self.samples.append(sample)
+ self.total += sample
+ if len(self.samples) > self.window_sz:
+ self.total -= self.samples.popleft()
+
+ def get(self):
+ if self.samples:
+ return self.total / float(len(self.samples))
+ else:
+ return None
+
+def mongos_mode(mode):
+ return {
+ ReadPreference.PRIMARY: 'primary',
+ ReadPreference.PRIMARY_PREFERRED: 'primaryPreferred',
+ ReadPreference.SECONDARY: 'secondary',
+ ReadPreference.SECONDARY_PREFERRED: 'secondaryPreferred',
+ ReadPreference.NEAREST: 'nearest',
+ }[mode]
View
457 pymongo/replica_set_connection.py
@@ -47,8 +47,9 @@
helpers,
message,
pool,
- uri_parser,
- ReadPreference)
+ uri_parser)
+from pymongo.read_preferences import (
+ ReadPreference, select_member, modes, MovingAverage)
from pymongo.errors import (AutoReconnect,
ConfigurationError,
ConnectionFailure,
@@ -58,7 +59,7 @@
EMPTY = b("")
MAX_BSON_SIZE = 4 * 1024 * 1024
-
+MAX_RETRY = 3
def _partition_node(node):
"""Split a host:port string returned from mongod/s into
@@ -77,24 +78,32 @@ def _partition_node(node):
class Monitor(object):
"""Base class for replica set monitors.
"""
- def __init__(self, rsc, interval, event_class):
+ _refresh_interval = 30
+ def __init__(self, rsc, event_class):
self.rsc = weakref.proxy(rsc, self.shutdown)
- self.interval = interval
self.event = event_class()
+ self.stopped = False
def shutdown(self, dummy):
"""Signal the monitor to shutdown.
"""
+ self.stopped = True
+ self.event.set()
+
+ def schedule_refresh(self):
+ """Refresh immediately
+ """
self.event.set()
def monitor(self):
"""Run until the RSC is collected or an
unexpected error occurs.
"""
- while not self.event.isSet():
- self.event.wait(self.interval)
- if self.event.isSet():
+ while True:
+ self.event.wait(Monitor._refresh_interval)
+ if self.stopped:
break
+ self.event.clear()
try:
self.rsc.refresh()
except AutoReconnect:
@@ -108,8 +117,8 @@ def monitor(self):
class MonitorThread(Monitor, threading.Thread):
"""Thread based replica set monitor.
"""
- def __init__(self, rsc, interval=5):
- Monitor.__init__(self, rsc, interval, threading.Event)
+ def __init__(self, rsc):
+ Monitor.__init__(self, rsc, threading.Event)
threading.Thread.__init__(self)
self.setName("ReplicaSetMonitorThread")
@@ -123,13 +132,16 @@ def run(self):
try:
from gevent import Greenlet
from gevent.event import Event
+
+ # Used by ReplicaSetConnection
+ from gevent.local import local as gevent_local
have_gevent = True
class MonitorGreenlet(Monitor, Greenlet):
"""Greenlet based replica set monitor.
"""
- def __init__(self, rsc, interval=5):
- Monitor.__init__(self, rsc, interval, Event)
+ def __init__(self, rsc):
+ Monitor.__init__(self, rsc, Event)
Greenlet.__init__(self)
# Don't override `run` in a Greenlet. Add _run instead.
@@ -144,6 +156,69 @@ def _run(self):
pass
+class Member(object):
+ """Represent one member of a replica set
+ """
+ # For unittesting only. Use under no circumstances!
+ _host_to_ping_time = {}
+
+ def __init__(self, host, ismaster_response, ping_time, connection_pool):
+ self.host = host
+ self.pool = connection_pool
+ self.ping_time = MovingAverage(5)
+ self.update(ismaster_response, ping_time)
+
+ def update(self, ismaster_response, ping_time):
+ self.is_primary = ismaster_response['ismaster']
+ self.max_bson_size = ismaster_response.get(
+ 'maxBsonObjectSize', MAX_BSON_SIZE)
+ self.tags = ismaster_response.get('tags', {})
+ self.record_ping_time(ping_time)
+ self.up = True
+
+ def get_avg_ping_time(self):
+ """Get a moving average of this member's ping times
+ """
+ if self.host in Member._host_to_ping_time:
+ # Simulate ping times for unittesting
+ return Member._host_to_ping_time[self.host]
+
+ return self.ping_time.get()
+
+ def record_ping_time(self, ping_time):
+ self.ping_time.update(ping_time)
+
+ def matches_mode(self, mode):
+ if mode == ReadPreference.PRIMARY and not self.is_primary:
+ return False
+
+ if mode == ReadPreference.SECONDARY and self.is_primary:
+ return False
+
+ return True
+
+ def matches_tags(self, tags):
+ """Return True if this member's tags are a superset of the passed-in
+ tags. E.g., if this member is tagged {'dc': 'ny', 'rack': '1'},
+ then it matches {'dc': 'ny'}.
+ """
+ for key, value in tags.items():
+ if key not in self.tags or self.tags[key] != value:
+ return False
+
+ return True
+
+ def matches_tag_sets(self, tag_sets):
+ """Return True if this member matches any of the tag sets, e.g.
+ [{'dc': 'ny'}, {'dc': 'la'}, {}]
+ """
+ for tags in tag_sets:
+ if self.matches_tags(tags):
+ return True
+
+ return False
+
+
class ReplicaSetConnection(common.BaseObject):
"""Connection to a MongoDB replica set.
"""
@@ -198,7 +273,7 @@ def __init__(self, hosts_or_uri=None, max_pool_size=10,
Other optional parameters can be passed as keyword arguments:
- `safe`: Use getlasterror for each write operation?
- - `j` or `journal`: Block until write operations have been commited
+ - `j` or `journal`: Block until write operations have been committed
to the journal. Ignored if the server is running without
journaling. Implies safe=True.
- `w`: (integer or string) If this is a replica set write operations
@@ -217,16 +292,23 @@ def __init__(self, hosts_or_uri=None, max_pool_size=10,
before timing out.
- `ssl`: If True, create the connection to the servers using SSL.
- `read_preference`: The read preference for this connection.
- See :class:`~pymongo.ReadPreference` for available options.
+ See :class:`~pymongo.read_preferences.ReadPreference` for available
+ - `tag_sets`: Read from replica-set members with these tags.
+ To specify a priority-order for tag sets, provide a list of
+ tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
+ set, ``{}``, means "read from any member that matches the mode,
+ ignoring tags." ReplicaSetConnection tries each set of tags in turn
+ until it finds a set of tags with at least one matching member.
+ - `secondary_acceptable_latency_ms`: Any replica-set member whose
+ ping time is within secondary_acceptable_latency_ms of the nearest
+ member may accept reads. Default 15 milliseconds.
- `auto_start_request`: If True (the default), each thread that
accesses this :class:`ReplicaSetConnection` has a socket allocated
to it for the thread's lifetime, for each member of the set. For
- :class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
- ensures consistent reads, even if you read after an unsafe
- write. For read preferences other than PRIMARY, there are no
- consistency guarantees. (The semantics of auto_start_request,
- :class:`~pymongo.ReadPreference`, and :class:`ReplicaSetConnection`
- may change in future releases of PyMongo.)
+ :class:`~pymongo.read_preferences.ReadPreference` PRIMARY,
+ auto_start_request=True ensures consistent reads, even if you read
+ after an unsafe write. For read preferences other than PRIMARY,
+ there are no consistency guarantees.
- `use_greenlets` (optional): if ``True``, use a background Greenlet
instead of a background thread to monitor state of replica set.
:meth:`start_request()` will ensure that the current greenlet uses
@@ -246,6 +328,8 @@ def __init__(self, hosts_or_uri=None, max_pool_size=10,
connection.Connection.
+ .. versionchanged:: 2.2.1+
+ Added `tag_sets` and `secondary_acceptable_latency_ms` options.
.. versionchanged:: 2.2
Added `auto_start_request` and `use_greenlets` options.
Added support for `host`, `port`, and `network_timeout` keyword
@@ -258,14 +342,15 @@ def __init__(self, hosts_or_uri=None, max_pool_size=10,
self.__arbiters = set()
self.__writer = None
self.__readers = []
- self.__pools = {}
+ self.__members = {}
self.__index_cache = {}
self.__auth_credentials = {}
self.__max_pool_size = common.validate_positive_integer(
'max_pool_size', max_pool_size)
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
self.__document_class = document_class
+ self.__monitor = None
# Compatibility with connection.Connection
host = kwargs.pop('host', hosts_or_uri)
@@ -311,6 +396,7 @@ def __init__(self, hosts_or_uri=None, max_pool_size=10,
self.__auto_start_request = self.__opts.get('auto_start_request', True)
self.__in_request = self.__auto_start_request
+ self.__reset_pinned_hosts()
self.__name = self.__opts.get('replicaset')
if not self.__name:
raise ConfigurationError("the replicaSet "
@@ -358,7 +444,6 @@ def __init__(self, hosts_or_uri=None, max_pool_size=10,
self.__monitor.setDaemon(True)
self.__monitor.start()
-
def _cached(self, dbname, coll, index):
"""Test if `index` is cached.
"""
@@ -499,6 +584,14 @@ def arbiters(self):
return self.__arbiters
@property
+ def is_mongos(self):
+ """If this ReplicaSetConnection is connected to mongos (always False)
+
+ .. versionadded:: 2.2.1+
+ """
+ return False
+
+ @property
def max_pool_size(self):
"""The maximum pool size limit set for this connection.
"""
@@ -530,7 +623,7 @@ def max_bson_size(self):
0 if no primary is available.
"""
if self.__writer:
- return self.__pools[self.__writer]['max_bson_size']
+ return self.__members[self.__writer].max_bson_size
return 0
@property
@@ -539,14 +632,17 @@ def auto_start_request(self):
def __simple_command(self, sock_info, dbname, spec):
"""Send a command to the server.
+ Returns (response, ping_time in seconds).
"""
rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec)
+ start = time.time()
sock_info.sock.sendall(msg)
response = self.__recv_msg(1, rqst_id, sock_info)
+ end = time.time()
response = helpers._unpack_response(response)['data'][0]
msg = "command %r failed: %%s" % spec
helpers._check_command_response(response, None, msg)
- return response
+ return response, end - start
def __auth(self, sock_info, dbname, user, passwd):
"""Authenticate socket against database `dbname`.
@@ -563,52 +659,86 @@ def __auth(self, sock_info, dbname, user, passwd):
def __is_master(self, host):
"""Directly call ismaster.
+ Returns (response, connection_pool, ping_time in seconds).
"""
- mpool = self.pool_class(host, self.__max_pool_size,
- self.__net_timeout, self.__conn_timeout,
- self.__use_ssl)
- sock_info = mpool.get_socket()
+ connection_pool = self.pool_class(
+ host, self.__max_pool_size, self.__net_timeout, self.__conn_timeout,
+ self.__use_ssl)
+
+ sock_info = connection_pool.get_socket()
try:
- response = self.__simple_command(
+ response, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1}
)
- mpool.maybe_return_socket(sock_info)
- return response, mpool
+ connection_pool.maybe_return_socket(sock_info)
+ return response, connection_pool, ping_time
except (ConnectionFailure, socket.error):
- mpool.discard_socket(sock_info)
+ connection_pool.discard_socket(sock_info)
raise
def __update_pools(self):
"""Update the mapping of (host, port) pairs to connection pools.
"""
+ primary = None
secondaries = []
for host in self.__hosts:
- mongo, sock_info = None, None
+ member, sock_info = None, None
try:
- if host in self.__pools:
- mongo = self.__pools[host]
- sock_info = self.__socket(mongo)
- res = self.__simple_command(sock_info, 'admin', {'ismaster': 1})
- mongo['pool'].maybe_return_socket(sock_info)
+ if host in self.__members:
+ member = self.__members[host]
+ sock_info = self.__socket(member)
+ res, ping_time = self.__simple_command(
+ sock_info, 'admin', {'ismaster': 1})
+ member.pool.maybe_return_socket(sock_info)
+ member.update(res, ping_time)
else:
- res, conn = self.__is_master(host)
- bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
- self.__pools[host] = {'pool': conn,
- 'last_checkout': time.time(),
- 'max_bson_size': bson_max}
+ res, connection_pool, ping_time = self.__is_master(host)
+ self.__members[host] = Member(
+ host=host,
+ ismaster_response=res,
+ ping_time=ping_time,
+ connection_pool=connection_pool)
except (ConnectionFailure, socket.error):
- if mongo:
- mongo['pool'].discard_socket(sock_info)
+ if member:
+ member.pool.discard_socket(sock_info)
+ self.__members.pop(member.host, None)
continue
# Only use hosts that are currently in 'secondary' state
# as readers.
if res['secondary']:
secondaries.append(host)
elif res['ismaster']:
- self.__writer = host
+ primary = host
+
+ if primary != self.__writer:
+ self.__reset_pinned_hosts()
+
+ self.__writer = primary
self.__readers = secondaries
+ def __schedule_refresh(self):
+ self.__monitor.schedule_refresh()
+
+ def __pin_host(self, host):
+ # After first successful read in a request, continue reading from same
+ # member until read preferences change, host goes down, or
+ # end_request(). This offers a small assurance that reads won't jump
+ # around in time.
+ self.__threadlocal.host = host
+
+ def __pinned_host(self):
+ return getattr(self.__threadlocal, 'host', None)
+
+ def __unpin_host(self):
+ self.__threadlocal.host = None
+
+ def __reset_pinned_hosts(self):
+ if self.__opts.get('use_greenlets', False):
+ self.__threadlocal = gevent_local()
+ else:
+ self.__threadlocal = threading.local()
+
def refresh(self):
"""Iterate through the existing host list, or possibly the
seed list, to update the list of hosts and arbiters in this
@@ -619,16 +749,16 @@ def refresh(self):
hosts = set()
for node in nodes:
- mongo, sock_info = None, None
+ member, sock_info = None, None
try:
- if node in self.__pools:
- mongo = self.__pools[node]
- sock_info = self.__socket(mongo)
- response = self.__simple_command(sock_info, 'admin',
- {'ismaster': 1})
- mongo['pool'].maybe_return_socket(sock_info)
+ if node in self.__members:
+ member = self.__members[node]
+ sock_info = self.__socket(member)
+ response, _ = self.__simple_command(
+ sock_info, 'admin', {'ismaster': 1})
+ member.pool.maybe_return_socket(sock_info)
else:
- response, _ = self.__is_master(node)
+ response, _, _ = self.__is_master(node)
# Check that this host is part of the given replica set.
set_name = response.get('setName')
@@ -650,8 +780,8 @@ def refresh(self):
hosts.update([_partition_node(h)
for h in response["passives"]])
except (ConnectionFailure, socket.error), why:
- if mongo:
- mongo['pool'].discard_socket(sock_info)
+ if member:
+ member.pool.discard_socket(sock_info)
errors.append("%s:%d: %s" % (node[0], node[1], str(why)))
if hosts:
self.__hosts = hosts
@@ -666,27 +796,28 @@ def refresh(self):
def __check_is_primary(self, host):
"""Checks if this host is the primary for the replica set.
"""
- mongo, sock_info = None, None
+ member, sock_info = None, None
try:
- if host in self.__pools:
- mongo = self.__pools[host]
- sock_info = self.__socket(mongo)
- res = self.__simple_command(
+ if host in self.__members:
+ member = self.__members[host]
+ sock_info = self.__socket(member)
+ res, ping_time = self.__simple_command(
sock_info, 'admin', {'ismaster': 1}
)
else:
- res, conn = self.__is_master(host)
- bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
- self.__pools[host] = {'pool': conn,
- 'last_checkout': time.time(),
- 'max_bson_size': bson_max}
+ res, connection_pool, ping_time = self.__is_master(host)
+ self.__members[host] = Member(
+ host=host,
+ ismaster_response=res,
+ ping_time=ping_time,
+ connection_pool=connection_pool)
except (ConnectionFailure, socket.error), why:
- if mongo:
- mongo['pool'].discard_socket(sock_info)
+ if member:
+ member.pool.discard_socket(sock_info)
raise ConnectionFailure("%s:%d: %s" % (host[0], host[1], str(why)))
- if mongo and sock_info:
- mongo['pool'].maybe_return_socket(sock_info)
+ if member and sock_info:
+ member.pool.maybe_return_socket(sock_info)
if res["ismaster"]:
return host
@@ -704,7 +835,9 @@ def __find_primary(self):
if one exists.
"""
if self.__writer:
- return self.__pools[self.__writer]
+ primary = self.__members[self.__writer]
+ if primary.up:
+ return primary
# This is either the first connection or we had a failover.
self.refresh()
@@ -713,21 +846,20 @@ def __find_primary(self):
for candidate in self.__hosts:
try:
self.__writer = self.__check_is_primary(candidate)
- return self.__pools[self.__writer]
+ return self.__members[self.__writer]
except (ConnectionFailure, socket.error), why:
errors.append(str(why))
# Couldn't find the primary.
raise AutoReconnect(', '.join(errors))
- def __socket(self, mongo):
+ def __socket(self, member):
"""Get a SocketInfo from the pool.
"""
- mpool = mongo['pool']
if self.__auto_start_request:
# No effect if a request already started
self.start_request()
- sock_info = mpool.get_socket()
+ sock_info = member.pool.get_socket()
if self.__auth_credentials:
self.__check_auth(sock_info)
@@ -736,9 +868,9 @@ def __socket(self, mongo):
def disconnect(self):
"""Disconnect from the replica set primary.
"""
- mongo = self.__pools.get(self.__writer)
- if mongo and 'pool' in mongo:
- mongo['pool'].reset()
+ member = self.__members.get(self.__writer)
+ if member:
+ member.pool.reset()
self.__writer = None
def close(self):
@@ -757,7 +889,7 @@ def close(self):
self.__monitor.join(1.0)
self.__monitor = None
self.__writer = None
- self.__pools = {}
+ self.__members = {}
def __check_response_to_last_error(self, response):
"""Check a response to a lastError message for errors.
@@ -851,15 +983,14 @@ def _send_message(self, msg, safe=False, _connection_to_use=None):
- `safe`: check getLastError status after sending the message
"""
if _connection_to_use in (None, -1):
- mongo = self.__find_primary()
+ member = self.__find_primary()
else:
- mongo = self.__pools[_connection_to_use]
+ member = self.__members[_connection_to_use]
sock_info = None
try:
- sock_info = self.__socket(mongo)
- rqst_id, data = self.__check_bson_size(msg,
- mongo['max_bson_size'])
+ sock_info = self.__socket(member)
+ rqst_id, data = self.__check_bson_size(msg, member.max_bson_size)
sock_info.sock.sendall(data)
# Safe mode. We pack the message together with a lastError
# message and send both. We then get the response (to the
@@ -869,101 +1000,166 @@ def _send_message(self, msg, safe=False, _connection_to_use=None):
if safe:
response = self.__recv_msg(1, rqst_id, sock_info)
rv = self.__check_response_to_last_error(response)
- mongo['pool'].maybe_return_socket(sock_info)
+ member.pool.maybe_return_socket(sock_info)
return rv
except(ConnectionFailure, socket.error), why:
- mongo['pool'].discard_socket(sock_info)
+ member.pool.discard_socket(sock_info)
if _connection_to_use in (None, -1):
self.disconnect()
raise AutoReconnect(str(why))
except:
- mongo['pool'].discard_socket(sock_info)
+ member.pool.discard_socket(sock_info)
raise
- def __send_and_receive(self, mongo, msg, **kwargs):
+ def __send_and_receive(self, member, msg, **kwargs):
"""Send a message on the given socket and return the response data.
"""
sock_info = None
try:
- sock_info = self.__socket(mongo)
+ sock_info = self.__socket(member)
if "network_timeout" in kwargs:
sock_info.sock.settimeout(kwargs['network_timeout'])
- rqst_id, data = self.__check_bson_size(msg,
- mongo['max_bson_size'])
+ rqst_id, data = self.__check_bson_size(msg, member.max_bson_size)
sock_info.sock.sendall(data)
response = self.__recv_msg(1, rqst_id, sock_info)
if "network_timeout" in kwargs:
sock_info.sock.settimeout(self.__net_timeout)
- mongo['pool'].maybe_return_socket(sock_info)
+ member.pool.maybe_return_socket(sock_info)
return response
except (ConnectionFailure, socket.error), why:
- host, port = mongo['pool'].pair
- mongo['pool'].discard_socket(sock_info)
+ host, port = member.pool.pair
+ member.pool.discard_socket(sock_info)
raise AutoReconnect("%s:%d: %s" % (host, port, str(why)))
except:
- mongo['pool'].discard_socket(sock_info)
+ member.pool.discard_socket(sock_info)
+ raise
+
+ def __try_read(self, member, msg, **kwargs):
+ """Attempt a read from a member; on failure mark the member "down" and
+ wake up the monitor thread to refresh as soon as possible.
+ """
+ try:
+ return self.__send_and_receive(member, msg, **kwargs)
+ except AutoReconnect:
+ member.up = False
+ self.__schedule_refresh()
raise
def _send_message_with_response(self, msg, _connection_to_use=None,
_must_use_master=False, **kwargs):
"""Send a message to Mongo and return the response.
- Sends the given message and returns the response.
+ Sends the given message and returns (host used, response).
:Parameters:
- `msg`: (request_id, data) pair making up the message to send
"""
- read_pref = kwargs.get('read_preference', ReadPreference.PRIMARY)
- mongo = None
+
+ # If we've disconnected since last read, trigger refresh
+ try:
+ self.__find_primary()
+ except AutoReconnect:
+ # We'll throw an error later
+ pass
+
+ tag_sets = kwargs.get('tag_sets', [{}])
+ mode = kwargs.get('read_preference', ReadPreference.PRIMARY)
+ if _must_use_master:
+ mode = ReadPreference.PRIMARY
+ tag_sets = [{}]
+
+ secondary_acceptable_latency_ms = kwargs.get(
+ 'secondary_acceptable_latency_ms',
+ self.secondary_acceptable_latency_ms)
+
+ member = None
try:
if _connection_to_use is not None:
if _connection_to_use == -1:
- mongo = self.__find_primary()
+ member = self.__find_primary()
else:
- mongo = self.__pools[_connection_to_use]
- return mongo['pool'].pair, self.__send_and_receive(mongo,
- msg,
- **kwargs)
- elif _must_use_master or not read_pref:
- mongo = self.__find_primary()
- return mongo['pool'].pair, self.__send_and_receive(mongo,
- msg,
- **kwargs)
+ member = self.__members[_connection_to_use]
+ return member.pool.pair, self.__try_read(
+ member, msg, **kwargs)
except AutoReconnect:
- if mongo == self.__pools.get(self.__writer):
+ if member == self.__members.get(self.__writer):
self.disconnect()
raise
errors = []
- for host in helpers.shuffled(self.__readers):
+ pinned_member = self.__members.get(self.__pinned_host())
+ if (pinned_member
+ and pinned_member.matches_mode(mode)
+ and pinned_member.matches_tag_sets(tag_sets)
+ and pinned_member.up
+ ):
try:
- mongo = self.__pools[host]
- return host, self.__send_and_receive(mongo, msg, **kwargs)
+ return (
+ pinned_member.host,
+ self.__try_read(pinned_member, msg, **kwargs))
except AutoReconnect, why:
- errors.append(str(why))
- # Fallback to primary
- if read_pref == ReadPreference.SECONDARY:
+ if _must_use_master or mode == ReadPreference.PRIMARY:
+ self.disconnect()
+ raise
+ else:
+ errors.append(str(why))
+
+ # No pinned member, or pinned member down or doesn't match read pref
+ self.__unpin_host()
+
+ members = self.__members.copy().values()
+
+ while len(errors) < MAX_RETRY:
+ member = select_member(
+ members=members,
+ mode=mode,
+ tag_sets=tag_sets,
+ latency=secondary_acceptable_latency_ms)
+
+ if not member:
+ # Ran out of members to try
+ break
+
try:
- mongo = self.__find_primary()
- return mongo['pool'].pair, self.__send_and_receive(mongo,
- msg,
- **kwargs)
+ # Sets member.up False on failure, so select_member won't try
+ # it again.
+ response = self.__try_read(member, msg, **kwargs)
+
+ # Success
+ if self.in_request():
+ # Keep reading from this member in this thread / greenlet
+ self.__pin_host(member.host)
+ return member.host, response
except AutoReconnect, why:
- self.disconnect()
errors.append(str(why))
- raise AutoReconnect(', '.join(errors))
+ members.remove(member)
+
+ # Ran out of tries
+ if mode == ReadPreference.PRIMARY:
+ msg = "No replica set primary available for query"
+ elif mode == ReadPreference.SECONDARY:
+ msg = "No replica set secondary available for query"
+ else:
+ msg = "No replica set members available for query"
+
+ msg += " with ReadPreference %s" % modes[mode]
+
+ if tag_sets != [{}]:
+ msg += " and tags " + repr(tag_sets)
+
+ raise AutoReconnect(msg, errors)
def start_request(self):
"""Ensure the current thread or greenlet always uses the same socket
until it calls :meth:`end_request`. For
- :class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
- ensures consistent reads, even if you read after an unsafe write. For
- read preferences other than PRIMARY, there are no consistency
- guarantees.
+ :class:`~pymongo.read_preferences.ReadPreference` PRIMARY,
+ auto_start_request=True ensures consistent reads, even if you read
+ after an unsafe write. For read preferences other than PRIMARY, there
+ are no consistency guarantees.
In Python 2.6 and above, or in Python 2.5 with
"from __future__ import with_statement", :meth:`start_request` can be
@@ -983,9 +1179,8 @@ def start_request(self):
The :class:`~pymongo.pool.Request` return value.
:meth:`start_request` previously returned None
"""
- for mongo in self.__pools.values():
- if 'pool' in mongo:
- mongo['pool'].start_request()
+ for member in self.__members.values():
+ member.pool.start_request()
self.__in_request = True
return