Skip to content

Commit

Permalink
Use hiredis::pack_command to serialized the commands. (#2570)
Browse files Browse the repository at this point in the history
* Implemented pack command and pack bytes

* 1) refactored the command packer construction process
2) now hiredis.pack_bytes is the default choice. Though it's still possible to run redisrs-py (fix the flag in utils.py) or hiredis.pack_command (flag in connection.py)

* Switch to hiredis.pack_command

* Remove the rust extension module.

* 1) Introduce HIREDIS_PACK_AVAILABLE environment variable.
2) Extract serialization functionality out of Connection class.

* 1) Fix typo.
2) Add change log entry.
3) Revert the benchmark changes

* Ditch the hiredis version check for pack_command.

* Fix linter errors

* Revert version changes

* Fix linter issues

* Looks like the current redis-py version is 4.4.1

---------

Co-authored-by: Sergey Prokazov <sergey.prokazov@redis.com>
  • Loading branch information
prokazov and prokazov-redis committed Feb 6, 2023
1 parent 31a1c0b commit ffbe879
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Use hiredis-py pack_command if available.
* Support `.unlink()` in ClusterPipeline
* Simplify synchronous SocketBuffer state management
* Fix string cleanse in Redis Graph
Expand Down
135 changes: 90 additions & 45 deletions redis/connection.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io
import os
import socket
import sys
import threading
import weakref
from io import SEEK_END
Expand Down Expand Up @@ -32,7 +33,12 @@
TimeoutError,
)
from redis.retry import Retry
from redis.utils import CRYPTOGRAPHY_AVAILABLE, HIREDIS_AVAILABLE, str_if_bytes
from redis.utils import (
CRYPTOGRAPHY_AVAILABLE,
HIREDIS_AVAILABLE,
HIREDIS_PACK_AVAILABLE,
str_if_bytes,
)

try:
import ssl
Expand Down Expand Up @@ -509,6 +515,75 @@ def read_response(self, disable_decoding=False):
DefaultParser = PythonParser


class HiredisRespSerializer:
def pack(self, *args):
"""Pack a series of arguments into the Redis protocol"""
output = []

if isinstance(args[0], str):
args = tuple(args[0].encode().split()) + args[1:]
elif b" " in args[0]:
args = tuple(args[0].split()) + args[1:]
try:
output.append(hiredis.pack_command(args))
except TypeError:
_, value, traceback = sys.exc_info()
raise DataError(value).with_traceback(traceback)

return output


class PythonRespSerializer:
def __init__(self, buffer_cutoff, encode) -> None:
self._buffer_cutoff = buffer_cutoff
self.encode = encode

def pack(self, *args):
"""Pack a series of arguments into the Redis protocol"""
output = []
# the client might have included 1 or more literal arguments in
# the command name, e.g., 'CONFIG GET'. The Redis server expects these
# arguments to be sent separately, so split the first argument
# manually. These arguments should be bytestrings so that they are
# not encoded.
if isinstance(args[0], str):
args = tuple(args[0].encode().split()) + args[1:]
elif b" " in args[0]:
args = tuple(args[0].split()) + args[1:]

buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))

buffer_cutoff = self._buffer_cutoff
for arg in map(self.encode, args):
# to avoid large string mallocs, chunk the command into the
# output list if we're sending large values or memoryviews
arg_length = len(arg)
if (
len(buff) > buffer_cutoff
or arg_length > buffer_cutoff
or isinstance(arg, memoryview)
):
buff = SYM_EMPTY.join(
(buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF)
)
output.append(buff)
output.append(arg)
buff = SYM_CRLF
else:
buff = SYM_EMPTY.join(
(
buff,
SYM_DOLLAR,
str(arg_length).encode(),
SYM_CRLF,
arg,
SYM_CRLF,
)
)
output.append(buff)
return output


class Connection:
"Manages TCP communication to and from a Redis server"

Expand Down Expand Up @@ -536,6 +611,7 @@ def __init__(
retry=None,
redis_connect_func=None,
credential_provider: Optional[CredentialProvider] = None,
command_packer=None,
):
"""
Initialize a new Connection.
Expand Down Expand Up @@ -590,6 +666,7 @@ def __init__(
self.set_parser(parser_class)
self._connect_callbacks = []
self._buffer_cutoff = 6000
self._command_packer = self._construct_command_packer(command_packer)

def __repr__(self):
repr_args = ",".join([f"{k}={v}" for k, v in self.repr_pieces()])
Expand All @@ -607,6 +684,14 @@ def __del__(self):
except Exception:
pass

def _construct_command_packer(self, packer):
if packer is not None:
return packer
elif HIREDIS_PACK_AVAILABLE:
return HiredisRespSerializer()
else:
return PythonRespSerializer(self._buffer_cutoff, self.encoder.encode)

def register_connect_callback(self, callback):
self._connect_callbacks.append(weakref.WeakMethod(callback))

Expand Down Expand Up @@ -827,7 +912,8 @@ def send_packed_command(self, command, check_health=True):
def send_command(self, *args, **kwargs):
"""Pack and send a command to the Redis server"""
self.send_packed_command(
self.pack_command(*args), check_health=kwargs.get("check_health", True)
self._command_packer.pack(*args),
check_health=kwargs.get("check_health", True),
)

def can_read(self, timeout=0):
Expand Down Expand Up @@ -872,48 +958,7 @@ def read_response(self, disable_decoding=False):

def pack_command(self, *args):
"""Pack a series of arguments into the Redis protocol"""
output = []
# the client might have included 1 or more literal arguments in
# the command name, e.g., 'CONFIG GET'. The Redis server expects these
# arguments to be sent separately, so split the first argument
# manually. These arguments should be bytestrings so that they are
# not encoded.
if isinstance(args[0], str):
args = tuple(args[0].encode().split()) + args[1:]
elif b" " in args[0]:
args = tuple(args[0].split()) + args[1:]

buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))

buffer_cutoff = self._buffer_cutoff
for arg in map(self.encoder.encode, args):
# to avoid large string mallocs, chunk the command into the
# output list if we're sending large values or memoryviews
arg_length = len(arg)
if (
len(buff) > buffer_cutoff
or arg_length > buffer_cutoff
or isinstance(arg, memoryview)
):
buff = SYM_EMPTY.join(
(buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF)
)
output.append(buff)
output.append(arg)
buff = SYM_CRLF
else:
buff = SYM_EMPTY.join(
(
buff,
SYM_DOLLAR,
str(arg_length).encode(),
SYM_CRLF,
arg,
SYM_CRLF,
)
)
output.append(buff)
return output
return self._command_packer.pack(*args)

def pack_commands(self, commands):
"""Pack multiple commands into the Redis protocol"""
Expand All @@ -923,7 +968,7 @@ def pack_commands(self, commands):
buffer_cutoff = self._buffer_cutoff

for cmd in commands:
for chunk in self.pack_command(*cmd):
for chunk in self._command_packer.pack(*cmd):
chunklen = len(chunk)
if (
buffer_length > buffer_cutoff
Expand Down
2 changes: 2 additions & 0 deletions redis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

# Only support Hiredis >= 1.0:
HIREDIS_AVAILABLE = not hiredis.__version__.startswith("0.")
HIREDIS_PACK_AVAILABLE = hasattr(hiredis, "pack_command")
except ImportError:
HIREDIS_AVAILABLE = False
HIREDIS_PACK_AVAILABLE = False

try:
import cryptography # noqa
Expand Down
5 changes: 5 additions & 0 deletions tests/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import redis
from redis.connection import Connection
from redis.utils import HIREDIS_PACK_AVAILABLE

from .conftest import _get_client

Expand Down Expand Up @@ -75,6 +76,10 @@ def test_replace(self, request):
assert r.get("a") == "foo\ufffd"


@pytest.mark.skipif(
HIREDIS_PACK_AVAILABLE,
reason="Packing via hiredis does not preserve memoryviews",
)
class TestMemoryviewsAreNotPacked:
def test_memoryviews_are_not_packed(self):
c = Connection()
Expand Down

0 comments on commit ffbe879

Please sign in to comment.