Skip to content

Commit

Permalink
Cherry-pick for 4.3.5 (#2468)
Browse files Browse the repository at this point in the history
Co-authored-by: pedro.frazao <perl.pf@netcf.org>
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
Co-authored-by: Gauthier Imbert <gauthier@PC17>
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Co-authored-by: szumka <106675199+szumka@users.noreply.github.com>
Co-authored-by: Mehdi ABAAKOUK <sileht@sileht.net>
Co-authored-by: Tim Gates <tim.gates@iress.com>
Co-authored-by: Utkarsh Gupta <utkarshgupta137@gmail.com>
Co-authored-by: Nial Daly <34862917+nialdaly@users.noreply.github.com>
Co-authored-by: pedrofrazao <603718+pedrofrazao@users.noreply.github.com>
Co-authored-by: Антон Безденежных <gamer392@yandex.ru>
Co-authored-by: Iglesys <g.imbert34@gmail.com>
Co-authored-by: Kristján Valur Jónsson <sweskman@gmail.com>
Co-authored-by: DvirDukhan <dvir@redis.com>
Co-authored-by: Alibi Shalgymbay <a.shalgymbay@mycar.kz>
Co-authored-by: dvora-h <dvora.heller@redis.com>
Co-authored-by: Alibi <aliby.bbb@gmail.com>
Co-authored-by: Aarni Koskela <akx@iki.fi>
  • Loading branch information
18 people committed Nov 21, 2022
1 parent e6cd4fd commit 72f90c2
Show file tree
Hide file tree
Showing 35 changed files with 2,461 additions and 921 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
* Added dynaminc_startup_nodes configuration to RedisCluster
* Fix reusing the old nodes' connections when cluster topology refresh is being done
* Fix RedisCluster to immediately raise AuthenticationError without a retry
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
* Remove compatibility code for old versions of Hiredis, drop Packaging dependency
* The `deprecated` library is no longer a dependency

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
* Add redis5 and redis4 dockers (#1871)
Expand Down
18 changes: 9 additions & 9 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,6 @@ class ClusterNode:
"""

__slots__ = (
"_command_stack",
"_connections",
"_free",
"connection_class",
Expand Down Expand Up @@ -796,7 +795,6 @@ def __init__(

self._connections: List[Connection] = []
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
self._command_stack: List["PipelineCommand"] = []

def __repr__(self) -> str:
return (
Expand Down Expand Up @@ -887,18 +885,18 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
# Release connection
self._free.append(connection)

async def execute_pipeline(self) -> bool:
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()

# Execute command
await connection.send_packed_command(
connection.pack_commands(cmd.args for cmd in self._command_stack), False
connection.pack_commands(cmd.args for cmd in commands), False
)

# Read responses
ret = False
for cmd in self._command_stack:
for cmd in commands:
try:
cmd.result = await self.parse_response(
connection, cmd.args[0], **cmd.kwargs
Expand Down Expand Up @@ -1365,12 +1363,14 @@ async def _execute(

node = target_nodes[0]
if node.name not in nodes:
nodes[node.name] = node
node._command_stack = []
node._command_stack.append(cmd)
nodes[node.name] = (node, [])
nodes[node.name][1].append(cmd)

errors = await asyncio.gather(
*(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
*(
asyncio.ensure_future(node[0].execute_pipeline(node[1]))
for node in nodes.values()
)
)

if any(errors):
Expand Down
13 changes: 11 additions & 2 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ def __del__(self):

@property
def is_connected(self):
return self._reader and self._writer
return self._reader is not None and self._writer is not None

def register_connect_callback(self, callback):
self._connect_callbacks.append(weakref.WeakMethod(callback))
Expand Down Expand Up @@ -767,7 +767,16 @@ async def _connect(self):
def _error_message(self, exception):
# args for socket.error can either be (errno, "message")
# or just "message"
if len(exception.args) == 1:
if not exception.args:
# asyncio has a bug where on Connection reset by peer, the
# exception is not instanciated, so args is empty. This is the
# workaround.
# See: https://github.com/redis/redis-py/issues/2237
# See: https://github.com/python/cpython/issues/94061
return (
f"Error connecting to {self.host}:{self.port}. Connection reset by peer"
)
elif len(exception.args) == 1:
return f"Error connecting to {self.host}:{self.port}. {exception.args[0]}."
else:
return (
Expand Down
9 changes: 7 additions & 2 deletions redis/commands/bf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,16 @@ def __init__(self, client, **kwargs):
# TDIGEST_RESET: bool_ok,
# TDIGEST_ADD: spaceHolder,
# TDIGEST_MERGE: spaceHolder,
TDIGEST_CDF: float,
TDIGEST_QUANTILE: float,
TDIGEST_CDF: parse_to_list,
TDIGEST_QUANTILE: parse_to_list,
TDIGEST_MIN: float,
TDIGEST_MAX: float,
TDIGEST_TRIMMED_MEAN: float,
TDIGEST_INFO: TDigestInfo,
TDIGEST_RANK: parse_to_list,
TDIGEST_REVRANK: parse_to_list,
TDIGEST_BYRANK: parse_to_list,
TDIGEST_BYREVRANK: parse_to_list,
}

self.client = client
Expand Down
103 changes: 81 additions & 22 deletions redis/commands/bf/commands.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from redis.client import NEVER_DECODE
from redis.exceptions import ModuleError
from redis.utils import HIREDIS_AVAILABLE
from redis.utils import HIREDIS_AVAILABLE, deprecated_function

BF_RESERVE = "BF.RESERVE"
BF_ADD = "BF.ADD"
Expand Down Expand Up @@ -49,6 +49,11 @@
TDIGEST_MIN = "TDIGEST.MIN"
TDIGEST_MAX = "TDIGEST.MAX"
TDIGEST_INFO = "TDIGEST.INFO"
TDIGEST_TRIMMED_MEAN = "TDIGEST.TRIMMED_MEAN"
TDIGEST_RANK = "TDIGEST.RANK"
TDIGEST_REVRANK = "TDIGEST.REVRANK"
TDIGEST_BYRANK = "TDIGEST.BYRANK"
TDIGEST_BYREVRANK = "TDIGEST.BYREVRANK"


class BFCommands:
Expand All @@ -67,6 +72,8 @@ def create(self, key, errorRate, capacity, expansion=None, noScale=None):
self.append_no_scale(params, noScale)
return self.execute_command(BF_RESERVE, *params)

reserve = create

def add(self, key, item):
"""
Add to a Bloom Filter `key` an `item`.
Expand Down Expand Up @@ -176,6 +183,8 @@ def create(
self.append_max_iterations(params, max_iterations)
return self.execute_command(CF_RESERVE, *params)

reserve = create

def add(self, key, item):
"""
Add an `item` to a Cuckoo Filter `key`.
Expand Down Expand Up @@ -316,6 +325,7 @@ def query(self, key, *items):
""" # noqa
return self.execute_command(TOPK_QUERY, key, *items)

@deprecated_function(version="4.4.0", reason="deprecated since redisbloom 2.4.0")
def count(self, key, *items):
"""
Return count for one `item` or more from `key`.
Expand Down Expand Up @@ -344,12 +354,12 @@ def info(self, key):


class TDigestCommands:
def create(self, key, compression):
def create(self, key, compression=100):
"""
Allocate the memory and initialize the t-digest.
For more information see `TDIGEST.CREATE <https://redis.io/commands/tdigest.create>`_.
""" # noqa
return self.execute_command(TDIGEST_CREATE, key, compression)
return self.execute_command(TDIGEST_CREATE, key, "COMPRESSION", compression)

def reset(self, key):
"""
Expand All @@ -358,26 +368,30 @@ def reset(self, key):
""" # noqa
return self.execute_command(TDIGEST_RESET, key)

def add(self, key, values, weights):
def add(self, key, values):
"""
Add one or more samples (value with weight) to a sketch `key`.
Both `values` and `weights` are lists.
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
Adds one or more observations to a t-digest sketch `key`.
Example:
>>> tdigestadd('A', [1500.0], [1.0])
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
""" # noqa
params = [key]
self.append_values_and_weights(params, values, weights)
return self.execute_command(TDIGEST_ADD, *params)
return self.execute_command(TDIGEST_ADD, key, *values)

def merge(self, toKey, fromKey):
def merge(self, destination_key, num_keys, *keys, compression=None, override=False):
"""
Merge all of the values from 'fromKey' to 'toKey' sketch.
Merges all of the values from `keys` to 'destination-key' sketch.
It is mandatory to provide the `num_keys` before passing the input keys and
the other (optional) arguments.
If `destination_key` already exists its values are merged with the input keys.
If you wish to override the destination key contents use the `OVERRIDE` parameter.
For more information see `TDIGEST.MERGE <https://redis.io/commands/tdigest.merge>`_.
""" # noqa
return self.execute_command(TDIGEST_MERGE, toKey, fromKey)
params = [destination_key, num_keys, *keys]
if compression is not None:
params.extend(["COMPRESSION", compression])
if override:
params.append("OVERRIDE")
return self.execute_command(TDIGEST_MERGE, *params)

def min(self, key):
"""
Expand All @@ -393,20 +407,21 @@ def max(self, key):
""" # noqa
return self.execute_command(TDIGEST_MAX, key)

def quantile(self, key, quantile):
def quantile(self, key, quantile, *quantiles):
"""
Return double value estimate of the cutoff such that a specified fraction of the data
added to this TDigest would be less than or equal to the cutoff.
Returns estimates of one or more cutoffs such that a specified fraction of the
observations added to this t-digest would be less than or equal to each of the
specified cutoffs. (Multiple quantiles can be returned with one call)
For more information see `TDIGEST.QUANTILE <https://redis.io/commands/tdigest.quantile>`_.
""" # noqa
return self.execute_command(TDIGEST_QUANTILE, key, quantile)
return self.execute_command(TDIGEST_QUANTILE, key, quantile, *quantiles)

def cdf(self, key, value):
def cdf(self, key, value, *values):
"""
Return double fraction of all points added which are <= value.
For more information see `TDIGEST.CDF <https://redis.io/commands/tdigest.cdf>`_.
""" # noqa
return self.execute_command(TDIGEST_CDF, key, value)
return self.execute_command(TDIGEST_CDF, key, value, *values)

def info(self, key):
"""
Expand All @@ -416,6 +431,50 @@ def info(self, key):
""" # noqa
return self.execute_command(TDIGEST_INFO, key)

def trimmed_mean(self, key, low_cut_quantile, high_cut_quantile):
"""
Return mean value from the sketch, excluding observation values outside
the low and high cutoff quantiles.
For more information see `TDIGEST.TRIMMED_MEAN <https://redis.io/commands/tdigest.trimmed_mean>`_.
""" # noqa
return self.execute_command(
TDIGEST_TRIMMED_MEAN, key, low_cut_quantile, high_cut_quantile
)

def rank(self, key, value, *values):
"""
Retrieve the estimated rank of value (the number of observations in the sketch
that are smaller than value + half the number of observations that are equal to value).
For more information see `TDIGEST.RANK <https://redis.io/commands/tdigest.rank>`_.
""" # noqa
return self.execute_command(TDIGEST_RANK, key, value, *values)

def revrank(self, key, value, *values):
"""
Retrieve the estimated rank of value (the number of observations in the sketch
that are larger than value + half the number of observations that are equal to value).
For more information see `TDIGEST.REVRANK <https://redis.io/commands/tdigest.revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_REVRANK, key, value, *values)

def byrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given rank.
For more information see `TDIGEST.BY_RANK <https://redis.io/commands/tdigest.by_rank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYRANK, key, rank, *ranks)

def byrevrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given reverse rank.
For more information see `TDIGEST.BY_REVRANK <https://redis.io/commands/tdigest.by_revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYREVRANK, key, rank, *ranks)


class CMSCommands:
"""Count-Min Sketch Commands"""
Expand Down
22 changes: 12 additions & 10 deletions redis/commands/bf/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ def __init__(self, args):
class TDigestInfo(object):
compression = None
capacity = None
mergedNodes = None
unmergedNodes = None
mergedWeight = None
unmergedWeight = None
totalCompressions = None
merged_nodes = None
unmerged_nodes = None
merged_weight = None
unmerged_weight = None
total_compressions = None
memory_usage = None

def __init__(self, args):
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
self.compression = response["Compression"]
self.capacity = response["Capacity"]
self.mergedNodes = response["Merged nodes"]
self.unmergedNodes = response["Unmerged nodes"]
self.mergedWeight = response["Merged weight"]
self.unmergedWeight = response["Unmerged weight"]
self.totalCompressions = response["Total compressions"]
self.merged_nodes = response["Merged nodes"]
self.unmerged_nodes = response["Unmerged nodes"]
self.merged_weight = response["Merged weight"]
self.unmerged_weight = response["Unmerged weight"]
self.total_compressions = response["Total compressions"]
self.memory_usage = response["Memory usage"]
3 changes: 3 additions & 0 deletions redis/commands/bf/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def parse_tdigest_quantile(response):
"""Parse TDIGEST.QUANTILE response."""
return [float(x) for x in response]
19 changes: 19 additions & 0 deletions redis/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,25 @@ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
# Sum up the reply from each command
return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))

async def _execute_pipeline_by_slot(
self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
) -> List[Any]:
if self._initialize:
await self.initialize()
read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
pipe = self.pipeline()
[
pipe.execute_command(
command,
*slot_args,
target_nodes=[
self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
],
)
for slot, slot_args in slots_to_args.items()
]
return await pipe.execute()


class ClusterManagementCommands(ManagementCommands):
"""
Expand Down
Loading

0 comments on commit 72f90c2

Please sign in to comment.