Skip to content

Commit

Permalink
PING/PONG health checks
Browse files Browse the repository at this point in the history
The `Redis` class and the `ConnectionPool` class now support the
"health_check_interval=N" option. By default N=0, which turns off health
checks. `N` should be an integer, and when greater than 0, ensures that
a health check is performed just before command execution anytime the
underlying connection has been idle for more than N seconds. A health
check is a full PING/PONG round trip to the Redis server.

If a health check encounters a ConnectionError or TimeoutError, the connection
is disconnected and reconnected and the health check is retried exactly once.
Any error during the retry is raised to the caller. Health check retries
are not governed by any other options such as `retry_on_timeout`. In systems
where idle times are common, these health checks are the intended way to
reconnect to the Redis server without harming any user data.

When this option is enabled for PubSub connections, calling `get_message()` or
`listen()` will send a health check anytime a message has not been read on
the PubSub connection for `health_check_interval` seconds. Users should
call `get_message()` or `listen()` at least every `health_check_interval`
seconds in order to keep the connection open.
  • Loading branch information
andymccurdy committed Jul 28, 2019
1 parent 0984b10 commit f60b2b0
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 28 deletions.
7 changes: 6 additions & 1 deletion CHANGES
Expand Up @@ -25,7 +25,12 @@
* Allow for single connection client instances. These instances
are not thread safe but offer other benefits including a subtle
performance increase.

* Added extensive health checks that keep the connections lively.
Passing the "health_check_interval=N" option to the Redis client class
or to a ConnectionPool ensures that a round trip PING/PONG is successful
before any command if the underlying connection has been idle for more
than N seconds. ConnectionErrors and TimeoutErrors are automatically
retried once for health checks.
* 3.2.1
* Fix SentinelConnectionPool to work in multiprocess/forked environments.
* 3.2.0
Expand Down
53 changes: 42 additions & 11 deletions redis/client.py
Expand Up @@ -648,7 +648,8 @@ def __init__(self, host='localhost', port=6379,
decode_responses=False, retry_on_timeout=False,
ssl=False, ssl_keyfile=None, ssl_certfile=None,
ssl_cert_reqs='required', ssl_ca_certs=None,
max_connections=None, single_connection_client=False):
max_connections=None, single_connection_client=False,
health_check_interval=0):
if not connection_pool:
if charset is not None:
warnings.warn(DeprecationWarning(
Expand All @@ -667,7 +668,8 @@ def __init__(self, host='localhost', port=6379,
'encoding_errors': encoding_errors,
'decode_responses': decode_responses,
'retry_on_timeout': retry_on_timeout,
'max_connections': max_connections
'max_connections': max_connections,
'health_check_interval': health_check_interval,
}
# based on input, setup appropriate connection args
if unix_socket_path is not None:
Expand Down Expand Up @@ -3053,6 +3055,7 @@ class PubSub(object):
"""
PUBLISH_MESSAGE_TYPES = ('message', 'pmessage')
UNSUBSCRIBE_MESSAGE_TYPES = ('unsubscribe', 'punsubscribe')
HEALTH_CHECK_MESSAGE = 'redis-py-health-check'

def __init__(self, connection_pool, shard_hint=None,
ignore_subscribe_messages=False):
Expand All @@ -3063,6 +3066,13 @@ def __init__(self, connection_pool, shard_hint=None,
# we need to know the encoding options for this connection in order
# to lookup channel and pattern names for callback handlers.
self.encoder = self.connection_pool.get_encoder()
if self.encoder.decode_responses:
self.health_check_response = ['pong', self.HEALTH_CHECK_MESSAGE]
else:
self.health_check_response = [
b'pong',
self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
]
self.reset()

def __del__(self):
Expand Down Expand Up @@ -3111,7 +3121,7 @@ def subscribed(self):
"Indicates if there are subscriptions to any channels or patterns"
return bool(self.channels or self.patterns)

def execute_command(self, *args, **kwargs):
def execute_command(self, *args):
"Execute a publish/subscribe command"

# NOTE: don't parse the response in this function -- it could pull a
Expand All @@ -3127,11 +3137,12 @@ def execute_command(self, *args, **kwargs):
# were listening to when we were disconnected
self.connection.register_connect_callback(self.on_connect)
connection = self.connection
self._execute(connection, connection.send_command, *args)
kwargs = {'check_health': not self.subscribed}
self._execute(connection, connection.send_command, *args, **kwargs)

def _execute(self, connection, command, *args):
def _execute(self, connection, command, *args, **kwargs):
try:
return command(*args)
return command(*args, **kwargs)
except (ConnectionError, TimeoutError) as e:
connection.disconnect()
if not (connection.retry_on_timeout and
Expand All @@ -3143,18 +3154,38 @@ def _execute(self, connection, command, *args):
# the ``on_connect`` callback should haven been called by the
# connection to resubscribe us to any channels and patterns we were
# previously listening to
return command(*args)
return command(*args, **kwargs)

def parse_response(self, block=True, timeout=0):
"Parse the response from a publish/subscribe command"
connection = self.connection
if connection is None:
conn = self.connection
if conn is None:
raise RuntimeError(
'pubsub connection not set: '
'did you forget to call subscribe() or psubscribe()?')
if not block and not connection.can_read(timeout=timeout):

self.check_health()

if not block and not conn.can_read(timeout=timeout):
return None
response = self._execute(conn, conn.read_response)

if conn.health_check_interval and \
response == self.health_check_response:
# ignore the health check message as user might not expect it
return None
return self._execute(connection, connection.read_response)
return response

def check_health(self):
conn = self.connection
if conn is None:
raise RuntimeError(
'pubsub connection not set: '
'did you forget to call subscribe() or psubscribe()?')

if conn.health_check_interval and time.time() > conn.next_health_check:
conn.send_command('PING', self.HEALTH_CHECK_MESSAGE,
check_health=False)

def _normalize_keys(self, data):
"""
Expand Down
61 changes: 47 additions & 14 deletions redis/connection.py
Expand Up @@ -2,6 +2,7 @@
from distutils.version import StrictVersion
from errno import EWOULDBLOCK
from itertools import chain
from time import time
import io
import os
import socket
Expand All @@ -20,17 +21,17 @@
LifoQueue, Empty, Full, urlparse, parse_qs,
recv, recv_into, unquote, BlockingIOError)
from redis.exceptions import (
DataError,
RedisError,
ConnectionError,
TimeoutError,
AuthenticationError,
BusyLoadingError,
ResponseError,
ConnectionError,
DataError,
ExecAbortError,
InvalidResponse,
AuthenticationError,
NoScriptError,
ExecAbortError,
ReadOnlyError
ReadOnlyError,
RedisError,
ResponseError,
TimeoutError,
)
from redis.utils import HIREDIS_AVAILABLE
if HIREDIS_AVAILABLE:
Expand Down Expand Up @@ -460,7 +461,8 @@ def __init__(self, host='localhost', port=6379, db=0, password=None,
socket_keepalive=False, socket_keepalive_options=None,
socket_type=0, retry_on_timeout=False, encoding='utf-8',
encoding_errors='strict', decode_responses=False,
parser_class=DefaultParser, socket_read_size=65536):
parser_class=DefaultParser, socket_read_size=65536,
health_check_interval=0):
self.pid = os.getpid()
self.host = host
self.port = int(port)
Expand All @@ -472,6 +474,8 @@ def __init__(self, host='localhost', port=6379, db=0, password=None,
self.socket_keepalive_options = socket_keepalive_options or {}
self.socket_type = socket_type
self.retry_on_timeout = retry_on_timeout
self.health_check_interval = health_check_interval
self.next_health_check = 0
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
self._parser = parser_class(socket_read_size=socket_read_size)
Expand Down Expand Up @@ -579,7 +583,9 @@ def on_connect(self):

# if a password is specified, authenticate
if self.password:
self.send_command('AUTH', self.password)
# avoid checking health here -- PING will fail if we try
# to check the health prior to the AUTH
self.send_command('AUTH', self.password, check_health=False)
if nativestr(self.read_response()) != 'OK':
raise AuthenticationError('Invalid Password')

Expand All @@ -602,10 +608,28 @@ def disconnect(self):
pass
self._sock = None

def send_packed_command(self, command):
def check_health(self):
"Check the health of the connection with a PING/PONG"
if self.health_check_interval and time() > self.next_health_check:
try:
self.send_command('PING', check_health=False)
if nativestr(self.read_response()) != 'PONG':
raise ConnectionError(
'Bad response from PING health check')
except (ConnectionError, TimeoutError) as ex:
self.disconnect()
self.send_command('PING', check_health=False)
if nativestr(self.read_response()) != 'PONG':
raise ConnectionError(
'Bad response from PING health check')

def send_packed_command(self, command, check_health=True):
"Send an already packed command to the Redis server"
if not self._sock:
self.connect()
# guard against health check recurrsion
if check_health:
self.check_health()
try:
if isinstance(command, str):
command = [command]
Expand All @@ -628,9 +652,10 @@ def send_packed_command(self, command):
self.disconnect()
raise

def send_command(self, *args):
def send_command(self, *args, **kwargs):
"Pack and send a command to the Redis server"
self.send_packed_command(self.pack_command(*args))
self.send_packed_command(self.pack_command(*args),
check_health=kwargs.get('check_health', True))

def can_read(self, timeout=0):
"Poll the socket to see if there's data that can be read."
Expand All @@ -656,6 +681,10 @@ def read_response(self):
except: # noqa: E722
self.disconnect()
raise

if self.health_check_interval:
self.next_health_check = time() + self.health_check_interval

if isinstance(response, ResponseError):
raise response
return response
Expand Down Expand Up @@ -777,13 +806,16 @@ def __init__(self, path='', db=0, password=None,
socket_timeout=None, encoding='utf-8',
encoding_errors='strict', decode_responses=False,
retry_on_timeout=False,
parser_class=DefaultParser, socket_read_size=65536):
parser_class=DefaultParser, socket_read_size=65536,
health_check_interval=0):
self.pid = os.getpid()
self.path = path
self.db = db
self.password = password
self.socket_timeout = socket_timeout
self.retry_on_timeout = retry_on_timeout
self.health_check_interval = health_check_interval
self.next_health_check = 0
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
self._parser = parser_class(socket_read_size=socket_read_size)
Expand Down Expand Up @@ -829,6 +861,7 @@ def to_bool(value):
'socket_keepalive': to_bool,
'retry_on_timeout': to_bool,
'max_connections': int,
'health_check_interval': int,
}


Expand Down
3 changes: 2 additions & 1 deletion tests/test_commands.py
Expand Up @@ -110,7 +110,8 @@ def test_client_kill(self, r, r2):
clients_by_name = dict([(client.get('name'), client)
for client in clients])

assert r.client_kill(clients_by_name['redis-py-c2'].get('addr')) is True
client_addr = clients_by_name['redis-py-c2'].get('addr')
assert r.client_kill(client_addr) is True

clients = [client for client in r.client_list()
if client.get('name') in ['redis-py-c1', 'redis-py-c2']]
Expand Down

0 comments on commit f60b2b0

Please sign in to comment.