Skip to content

Commit

Permalink
PYTHON-1150 - Add maxStalenessMS to $readPreference
Browse files Browse the repository at this point in the history
  • Loading branch information
behackett committed Sep 19, 2016
1 parent 1a45a0f commit 5f16e33
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 14 deletions.
9 changes: 6 additions & 3 deletions pymongo/message.py
Expand Up @@ -73,13 +73,16 @@ def _maybe_add_read_preference(spec, read_preference):
"""Add $readPreference to spec when appropriate."""
mode = read_preference.mode
tag_sets = read_preference.tag_sets
max_staleness = read_preference.max_staleness
# Only add $readPreference if it's something other than primary to avoid
# problems with mongos versions that don't support read preferences. Also,
# for maximum backwards compatibility, don't add $readPreference for
# secondaryPreferred unless tags are in use (setting the slaveOkay bit
# has the same effect).
# secondaryPreferred unless tags or maxStalenessMS are in use (setting the
# slaveOkay bit has the same effect).
if mode and (
mode != ReadPreference.SECONDARY_PREFERRED.mode or tag_sets != [{}]):
mode != ReadPreference.SECONDARY_PREFERRED.mode
or tag_sets != [{}]
or max_staleness):

if "$query" not in spec:
spec = SON([("$query", spec)])
Expand Down
23 changes: 16 additions & 7 deletions pymongo/read_preferences.py
Expand Up @@ -63,18 +63,22 @@ def _validate_tag_sets(tag_sets):


def _validate_max_staleness(max_staleness):
"""Validate maxStalenessMS."""
"""Validate max_staleness."""
if max_staleness is None:
return 0.0

errmsg = "maxStalenessMS must be an integer or float"
errmsg = "max_staleness must be an integer or float"
try:
max_staleness = float(max_staleness)
except ValueError:
raise ValueError(errmsg)
except TypeError:
raise TypeError(errmsg)

if not 0 < max_staleness < 1e9:
raise ValueError(
"max_staleness must be greater than 0 and less than one billion")

return max_staleness


Expand All @@ -100,9 +104,12 @@ def name(self):
def document(self):
"""Read preference as a document.
"""
if self.__tag_sets in (None, [{}]):
return {'mode': self.__mongos_mode}
return {'mode': self.__mongos_mode, 'tags': self.__tag_sets}
doc = {'mode': self.__mongos_mode}
if self.__tag_sets not in (None, [{}]):
doc['tags'] = self.__tag_sets
if self.__max_staleness:
doc['maxStalenessMS'] = int(self.__max_staleness * 1000)
return doc

@property
def mode(self):
Expand Down Expand Up @@ -130,6 +137,8 @@ def max_staleness(self):
"""The maximum estimated length of time (in seconds) a replica set
secondary can fall behind the primary in replication before it will
no longer be selected for operations."""
if not self.__max_staleness:
return None
return self.__max_staleness

@property
Expand All @@ -146,8 +155,8 @@ def min_wire_version(self):
return 5 if self.__max_staleness else 0

def __repr__(self):
return "%s(tag_sets=%r)" % (
self.name, self.__tag_sets)
return "%s(tag_sets=%r, max_staleness=%r)" % (
self.name, self.__tag_sets, self.max_staleness)

def __eq__(self, other):
if isinstance(other, _ServerMode):
Expand Down
2 changes: 2 additions & 0 deletions test/test_max_staleness.py
Expand Up @@ -159,6 +159,8 @@ def run_scenario(self):
mode_string = mode_string[:1].lower() + mode_string[1:]
mode = read_preferences.read_pref_mode_from_name(mode_string)
max_staleness = pref_def.get('maxStalenessMS', 0) / 1000.0
if not max_staleness:
max_staleness = None
tag_sets = pref_def.get('tag_sets')

if scenario_def.get('error'):
Expand Down
113 changes: 111 additions & 2 deletions test/test_read_preferences.py
Expand Up @@ -24,7 +24,7 @@

from bson.py3compat import MAXSIZE
from bson.son import SON
from pymongo.errors import ConfigurationError
from pymongo.errors import ConfigurationError, OperationFailure
from pymongo.message import _maybe_add_read_preference
from pymongo.mongo_client import MongoClient
from pymongo.read_preferences import (ReadPreference, MovingAverage,
Expand Down Expand Up @@ -446,6 +446,77 @@ def test_moving_average(self):

class TestMongosAndReadPreference(unittest.TestCase):

def test_read_preference_document(self):

pref = Primary()
self.assertEqual(
pref.document,
{'mode': 'primary'})

pref = PrimaryPreferred()
self.assertEqual(
pref.document,
{'mode': 'primaryPreferred'})
pref = PrimaryPreferred(tag_sets=[{'dc': 'sf'}])
self.assertEqual(
pref.document,
{'mode': 'primaryPreferred', 'tags': [{'dc': 'sf'}]})
pref = PrimaryPreferred(
tag_sets=[{'dc': 'sf'}], max_staleness=30)
self.assertEqual(
pref.document,
{'mode': 'primaryPreferred',
'tags': [{'dc': 'sf'}],
'maxStalenessMS': 30000})

pref = Secondary()
self.assertEqual(
pref.document,
{'mode': 'secondary'})
pref = Secondary(tag_sets=[{'dc': 'sf'}])
self.assertEqual(
pref.document,
{'mode': 'secondary', 'tags': [{'dc': 'sf'}]})
pref = Secondary(
tag_sets=[{'dc': 'sf'}], max_staleness=30)
self.assertEqual(
pref.document,
{'mode': 'secondary',
'tags': [{'dc': 'sf'}],
'maxStalenessMS': 30000})

pref = SecondaryPreferred()
self.assertEqual(
pref.document,
{'mode': 'secondaryPreferred'})
pref = SecondaryPreferred(tag_sets=[{'dc': 'sf'}])
self.assertEqual(
pref.document,
{'mode': 'secondaryPreferred', 'tags': [{'dc': 'sf'}]})
pref = SecondaryPreferred(
tag_sets=[{'dc': 'sf'}], max_staleness=30)
self.assertEqual(
pref.document,
{'mode': 'secondaryPreferred',
'tags': [{'dc': 'sf'}],
'maxStalenessMS': 30000})

pref = Nearest()
self.assertEqual(
pref.document,
{'mode': 'nearest'})
pref = Nearest(tag_sets=[{'dc': 'sf'}])
self.assertEqual(
pref.document,
{'mode': 'nearest', 'tags': [{'dc': 'sf'}]})
pref = Nearest(
tag_sets=[{'dc': 'sf'}], max_staleness=30)
self.assertEqual(
pref.document,
{'mode': 'nearest',
'tags': [{'dc': 'sf'}],
'maxStalenessMS': 30000})

def test_maybe_add_read_preference(self):

# Primary doesn't add $readPreference
Expand All @@ -470,12 +541,17 @@ def test_maybe_add_read_preference(self):
self.assertEqual(
out, SON([("$query", {}), ("$readPreference", pref.document)]))

# SecondaryPreferred without tag_sets doesn't add $readPreference
# SecondaryPreferred without tag_sets or max_staleness doesn't add
# $readPreference
pref = SecondaryPreferred()
out = _maybe_add_read_preference({}, pref)
self.assertEqual(out, {})
pref = SecondaryPreferred(tag_sets=[{'dc': 'nyc'}])
out = _maybe_add_read_preference({}, pref)
self.assertEqual(
out, SON([("$query", {}), ("$readPreference", pref.document)]))
pref = SecondaryPreferred(max_staleness=120)
out = _maybe_add_read_preference({}, pref)
self.assertEqual(
out, SON([("$query", {}), ("$readPreference", pref.document)]))

Expand Down Expand Up @@ -533,6 +609,39 @@ def test_mongos(self):
self.assertEqual(first_id, results[-1]["_id"])
self.assertEqual(last_id, results[0]["_id"])

@client_context.require_mongos
@client_context.require_version_min(3, 3, 12)
def test_mongos_max_staleness(self):
# Sanity check that we're sending maxStalenessMS
coll = client_context.client.pymongo_test.get_collection(
"test", read_preference=SecondaryPreferred(max_staleness=120))
# No error
coll.find_one()

coll = client_context.client.pymongo_test.get_collection(
"test", read_preference=SecondaryPreferred(max_staleness=10))
try:
coll.find_one()
except OperationFailure as exc:
self.assertEqual(160, exc.code)
else:
self.fail("mongos accepted invalid staleness")

coll = single_client(
readPreference='secondaryPreferred',
maxStalenessMS=120000).pymongo_test.test
# No error
coll.find_one()

coll = single_client(
readPreference='secondaryPreferred',
maxStalenessMS=10000).pymongo_test.test
try:
coll.find_one()
except OperationFailure as exc:
self.assertEqual(160, exc.code)
else:
self.fail("mongos accepted invalid staleness")

if __name__ == "__main__":
unittest.main()
4 changes: 2 additions & 2 deletions test/test_topology.py
Expand Up @@ -710,12 +710,12 @@ def test_no_secondary(self):

self.assertMessage(
'No replica set members match selector'
' "Secondary(tag_sets=None)"',
' "Secondary(tag_sets=None, max_staleness=None)"',
t, ReadPreference.SECONDARY)

self.assertMessage(
"No replica set members match selector"
" \"Secondary(tag_sets=[{'dc': 'ny'}])\"",
" \"Secondary(tag_sets=[{'dc': 'ny'}], max_staleness=None)\"",
t, Secondary(tag_sets=[{'dc': 'ny'}]))

def test_bad_replica_set_name(self):
Expand Down

0 comments on commit 5f16e33

Please sign in to comment.