Skip to content

Commit

Permalink
PYTHON-1105 Configurable heartbeatFrequencyMS.
Browse files Browse the repository at this point in the history
  • Loading branch information
ajdavis committed Jul 8, 2016
1 parent 7d81a01 commit a2f97f9
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 6 deletions.
7 changes: 7 additions & 0 deletions pymongo/client_options.py
Expand Up @@ -140,6 +140,8 @@ def __init__(self, username, password, database, options):
self.__write_concern = _parse_write_concern(options)
self.__read_concern = _parse_read_concern(options)
self.__connect = options.get('connect')
self.__heartbeat_frequency = options.get(
'heartbeatfrequencyms', common.HEARTBEAT_FREQUENCY)

@property
def _options(self):
Expand Down Expand Up @@ -171,6 +173,11 @@ def server_selection_timeout(self):
"""The server selection timeout for this instance in seconds."""
return self.__server_selection_timeout

@property
def heartbeat_frequency(self):
"""The monitoring frequency in seconds."""
return self.__heartbeat_frequency

@property
def pool_options(self):
"""A :class:`~pymongo.pool.PoolOptions` instance."""
Expand Down
1 change: 1 addition & 0 deletions pymongo/common.py
Expand Up @@ -458,6 +458,7 @@ def validate_ok_for_update(update):
'sockettimeoutms': validate_timeout_or_none,
'waitqueuetimeoutms': validate_timeout_or_none,
'serverselectiontimeoutms': validate_timeout_or_zero,
'heartbeatfrequencyms': validate_timeout_or_none,
'maxidletimems': validate_timeout_or_none,
}

Expand Down
6 changes: 5 additions & 1 deletion pymongo/mongo_client.py
Expand Up @@ -159,6 +159,9 @@ def __init__(
- `socketKeepAlive`: (boolean) Whether to send periodic keep-alive
packets on connected sockets. Defaults to ``False`` (do not send
keep-alive packets).
- `heartbeatFrequencyMS`: (optional) The number of milliseconds
between periodic server checks, or None to accept the default
frequency of 10 seconds.
- `event_listeners`: a list or tuple of event listeners. See
:mod:`~pymongo.monitoring` for details.
Expand Down Expand Up @@ -389,7 +392,8 @@ def __init__(
monitor_class=monitor_class,
condition_class=condition_class,
local_threshold_ms=options.local_threshold_ms,
server_selection_timeout=options.server_selection_timeout)
server_selection_timeout=options.server_selection_timeout,
heartbeat_frequency=options.heartbeat_frequency)

self._topology = Topology(self._topology_settings)
if connect:
Expand Down
2 changes: 1 addition & 1 deletion pymongo/monitor.py
Expand Up @@ -58,7 +58,7 @@ def target():
return True

executor = periodic_executor.PeriodicExecutor(
interval=common.HEARTBEAT_FREQUENCY,
interval=self._settings.heartbeat_frequency,
min_interval=common.MIN_HEARTBEAT_INTERVAL,
target=target,
name="pymongo_server_monitor_thread")
Expand Down
15 changes: 13 additions & 2 deletions pymongo/settings.py
Expand Up @@ -17,8 +17,9 @@
import threading

from bson.objectid import ObjectId
from pymongo import monitor, pool
from pymongo import common, monitor, pool
from pymongo.common import LOCAL_THRESHOLD_MS, SERVER_SELECTION_TIMEOUT
from pymongo.errors import ConfigurationError
from pymongo.topology_description import TOPOLOGY_TYPE
from pymongo.pool import PoolOptions
from pymongo.server_description import ServerDescription
Expand All @@ -33,11 +34,16 @@ def __init__(self,
monitor_class=None,
condition_class=None,
local_threshold_ms=LOCAL_THRESHOLD_MS,
server_selection_timeout=SERVER_SELECTION_TIMEOUT):
server_selection_timeout=SERVER_SELECTION_TIMEOUT,
heartbeat_frequency=common.HEARTBEAT_FREQUENCY):
"""Represent MongoClient's configuration.
Take a list of (host, port) pairs and optional replica set name.
"""
if heartbeat_frequency < common.MIN_HEARTBEAT_INTERVAL:
raise ConfigurationError("%s cannot be less than %.1f" % (
'heartbeatFrequencyMS', common.MIN_HEARTBEAT_INTERVAL))

self._seeds = seeds or [('localhost', 27017)]
self._replica_set_name = replica_set_name
self._pool_class = pool_class or pool.Pool
Expand All @@ -46,6 +52,7 @@ def __init__(self,
self._condition_class = condition_class or threading.Condition
self._local_threshold_ms = local_threshold_ms
self._server_selection_timeout = server_selection_timeout
self._heartbeat_frequency = heartbeat_frequency
self._direct = (len(self._seeds) == 1 and not replica_set_name)
self._topology_id = ObjectId()

Expand Down Expand Up @@ -82,6 +89,10 @@ def local_threshold_ms(self):
def server_selection_timeout(self):
return self._server_selection_timeout

@property
def heartbeat_frequency(self):
return self._heartbeat_frequency

@property
def direct(self):
"""Connect directly to a single server, or use a set of servers?
Expand Down
11 changes: 10 additions & 1 deletion test/__init__.py
Expand Up @@ -58,24 +58,31 @@ class client_knobs(object):
def __init__(
self,
heartbeat_frequency=None,
min_heartbeat_interval=None,
kill_cursor_frequency=None,
events_queue_frequency=None):
self.heartbeat_frequency = heartbeat_frequency
self.min_heartbeat_interval = min_heartbeat_interval
self.kill_cursor_frequency = kill_cursor_frequency
self.events_queue_frequency = events_queue_frequency

self.old_heartbeat_frequency = None
self.old_min_heartbeat_interval = None
self.old_kill_cursor_frequency = None
self.old_events_queue_frequency = None

def enable(self):
self.old_heartbeat_frequency = common.HEARTBEAT_FREQUENCY
self.old_min_heartbeat_interval = common.MIN_HEARTBEAT_INTERVAL
self.old_kill_cursor_frequency = common.KILL_CURSOR_FREQUENCY
self.old_events_queue_frequency = common.EVENTS_QUEUE_FREQUENCY

if self.heartbeat_frequency is not None:
common.HEARTBEAT_FREQUENCY = self.heartbeat_frequency

if self.min_heartbeat_interval is not None:
common.MIN_HEARTBEAT_INTERVAL = self.min_heartbeat_interval

if self.kill_cursor_frequency is not None:
common.KILL_CURSOR_FREQUENCY = self.kill_cursor_frequency

Expand All @@ -87,6 +94,7 @@ def __enter__(self):

def disable(self):
common.HEARTBEAT_FREQUENCY = self.old_heartbeat_frequency
common.MIN_HEARTBEAT_INTERVAL = self.old_min_heartbeat_interval
common.KILL_CURSOR_FREQUENCY = self.old_kill_cursor_frequency
common.EVENTS_QUEUE_FREQUENCY = self.old_events_queue_frequency

Expand Down Expand Up @@ -358,7 +366,8 @@ def setUp(self):
super(MockClientTest, self).setUp()

self.client_knobs = client_knobs(
heartbeat_frequency=0.001)
heartbeat_frequency=0.001,
min_heartbeat_interval=0.001)

self.client_knobs.enable()

Expand Down
22 changes: 22 additions & 0 deletions test/test_client.py
Expand Up @@ -40,6 +40,7 @@
OperationFailure,
NetworkTimeout,
InvalidURI)
from pymongo.monitoring import ServerHeartbeatStartedEvent
from pymongo.mongo_client import MongoClient
from pymongo.pool import SocketInfo
from pymongo.read_preferences import ReadPreference
Expand All @@ -61,6 +62,7 @@
from test.pymongo_mocks import MockClient
from test.utils import (assertRaisesExactly,
delay,
HeartbeatEventListener,
remove_all_users,
server_is_master_with_slave,
get_pool,
Expand All @@ -69,6 +71,7 @@
wait_until,
rs_or_single_client,
rs_or_single_client_noauth,
single_client,
lazy_client_trial,
NTHREADS)

Expand Down Expand Up @@ -899,6 +902,25 @@ def test_stale_getmore(self):
101, 1234, client.codec_options),
address=('not-a-member', 27017))

def test_heartbeat_frequency_ms(self):
listener = HeartbeatEventListener()
uri = "mongodb://%s:%d/?heartbeatFrequencyMS=500" % (host, port)
client = single_client(uri, event_listeners=[listener])
time.sleep(3)
started_events = [r for r in listener.results
if isinstance(r, ServerHeartbeatStartedEvent)]

# Frequency is 500ms, expect 5 or 6 events in 3 sec, but be forgiving.
self.assertGreaterEqual(len(started_events), 4)
client.close()

def test_small_heartbeat_frequency_ms(self):
uri = "mongodb://example/?heartbeatFrequencyMS=499"
with self.assertRaises(ConfigurationError) as context:
MongoClient(uri)

self.assertIn('heartbeatFrequencyMS', str(context.exception))


class TestExhaustCursor(IntegrationTest):
"""Test that clients properly handle errors from exhaust cursors."""
Expand Down
4 changes: 3 additions & 1 deletion test/test_heartbeat_monitoring.py
Expand Up @@ -71,7 +71,9 @@ def tearDown(cls):
monitoring._LISTENERS = cls.saved_listeners

def create_mock_monitor(self, responses, uri, expected_results):
with client_knobs(heartbeat_frequency=0.1, events_queue_frequency=0.1):
with client_knobs(heartbeat_frequency=0.1,
min_heartbeat_interval=0.1,
events_queue_frequency=0.1):
class MockMonitor(Monitor):
def _check_with_socket(self, sock_info):
if isinstance(responses[1], Exception):
Expand Down

0 comments on commit a2f97f9

Please sign in to comment.