Skip to content

Commit

Permalink
Change cluster docker to edge and enable debug command (#2853)
Browse files Browse the repository at this point in the history
* debug in cluster docker, replace for master

* sleep time for cluster to settle...

* fix test_cluster_delslotsrange

* fix tests

---------

Co-authored-by: dvora-h <dvora.heller@redis.com>
  • Loading branch information
chayim and dvora-h committed Jul 26, 2023
1 parent 2732a85 commit 2c2860d
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
pip install hiredis
fi
invoke devenv
sleep 5 # time to settle
sleep 10 # time to settle
invoke ${{matrix.test-type}}-tests
- uses: actions/upload-artifact@v2
Expand Down
2 changes: 1 addition & 1 deletion dockers/Dockerfile.cluster
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM redis/redis-stack-server:latest as rss
FROM redis/redis-stack-server:edge as rss

COPY dockers/create_cluster.sh /create_cluster.sh
RUN ls -R /opt/redis-stack
Expand Down
1 change: 1 addition & 0 deletions dockers/cluster.redis.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
protected-mode no
enable-debug-command yes
loadmodule /opt/redis-stack/lib/redisearch.so
loadmodule /opt/redis-stack/lib/redisgraph.so
loadmodule /opt/redis-stack/lib/redistimeseries.so
Expand Down
171 changes: 98 additions & 73 deletions redis/_parsers/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,53 @@
from redis.asyncio.cluster import ClusterNode


class CommandsParser:
class AbstractCommandsParser:
def _get_pubsub_keys(self, *args):
"""
Get the keys from pubsub command.
Although PubSub commands have predetermined key locations, they are not
supported in the 'COMMAND's output, so the key positions are hardcoded
in this method
"""
if len(args) < 2:
# The command has no keys in it
return None
args = [str_if_bytes(arg) for arg in args]
command = args[0].upper()
keys = None
if command == "PUBSUB":
# the second argument is a part of the command name, e.g.
# ['PUBSUB', 'NUMSUB', 'foo'].
pubsub_type = args[1].upper()
if pubsub_type in ["CHANNELS", "NUMSUB", "SHARDCHANNELS", "SHARDNUMSUB"]:
keys = args[2:]
elif command in ["SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE"]:
# format example:
# SUBSCRIBE channel [channel ...]
keys = list(args[1:])
elif command in ["PUBLISH", "SPUBLISH"]:
# format example:
# PUBLISH channel message
keys = [args[1]]
return keys

def parse_subcommand(self, command, **options):
cmd_dict = {}
cmd_name = str_if_bytes(command[0])
cmd_dict["name"] = cmd_name
cmd_dict["arity"] = int(command[1])
cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]]
cmd_dict["first_key_pos"] = command[3]
cmd_dict["last_key_pos"] = command[4]
cmd_dict["step_count"] = command[5]
if len(command) > 7:
cmd_dict["tips"] = command[7]
cmd_dict["key_specifications"] = command[8]
cmd_dict["subcommands"] = command[9]
return cmd_dict


class CommandsParser(AbstractCommandsParser):
"""
Parses Redis commands to get command keys.
COMMAND output is used to determine key locations.
Expand All @@ -30,21 +76,6 @@ def initialize(self, r):
commands[cmd.lower()] = commands.pop(cmd)
self.commands = commands

def parse_subcommand(self, command, **options):
cmd_dict = {}
cmd_name = str_if_bytes(command[0])
cmd_dict["name"] = cmd_name
cmd_dict["arity"] = int(command[1])
cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]]
cmd_dict["first_key_pos"] = command[3]
cmd_dict["last_key_pos"] = command[4]
cmd_dict["step_count"] = command[5]
if len(command) > 7:
cmd_dict["tips"] = command[7]
cmd_dict["key_specifications"] = command[8]
cmd_dict["subcommands"] = command[9]
return cmd_dict

# As soon as this PR is merged into Redis, we should reimplement
# our logic to use COMMAND INFO changes to determine the key positions
# https://github.com/redis/redis/pull/8324
Expand Down Expand Up @@ -138,37 +169,8 @@ def _get_moveable_keys(self, redis_conn, *args):
raise e
return keys

def _get_pubsub_keys(self, *args):
"""
Get the keys from pubsub command.
Although PubSub commands have predetermined key locations, they are not
supported in the 'COMMAND's output, so the key positions are hardcoded
in this method
"""
if len(args) < 2:
# The command has no keys in it
return None
args = [str_if_bytes(arg) for arg in args]
command = args[0].upper()
keys = None
if command == "PUBSUB":
# the second argument is a part of the command name, e.g.
# ['PUBSUB', 'NUMSUB', 'foo'].
pubsub_type = args[1].upper()
if pubsub_type in ["CHANNELS", "NUMSUB", "SHARDCHANNELS", "SHARDNUMSUB"]:
keys = args[2:]
elif command in ["SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE"]:
# format example:
# SUBSCRIBE channel [channel ...]
keys = list(args[1:])
elif command in ["PUBLISH", "SPUBLISH"]:
# format example:
# PUBLISH channel message
keys = [args[1]]
return keys


class AsyncCommandsParser:
class AsyncCommandsParser(AbstractCommandsParser):
"""
Parses Redis commands to get command keys.
Expand All @@ -194,52 +196,75 @@ async def initialize(self, node: Optional["ClusterNode"] = None) -> None:
self.node = node

commands = await self.node.execute_command("COMMAND")
for cmd, command in commands.items():
if "movablekeys" in command["flags"]:
commands[cmd] = -1
elif command["first_key_pos"] == 0 and command["last_key_pos"] == 0:
commands[cmd] = 0
elif command["first_key_pos"] == 1 and command["last_key_pos"] == 1:
commands[cmd] = 1
self.commands = {cmd.upper(): command for cmd, command in commands.items()}
self.commands = {cmd.lower(): command for cmd, command in commands.items()}

# As soon as this PR is merged into Redis, we should reimplement
# our logic to use COMMAND INFO changes to determine the key positions
# https://github.com/redis/redis/pull/8324
async def get_keys(self, *args: Any) -> Optional[Tuple[str, ...]]:
"""
Get the keys from the passed command.
NOTE: Due to a bug in redis<7.0, this function does not work properly
for EVAL or EVALSHA when the `numkeys` arg is 0.
- issue: https://github.com/redis/redis/issues/9493
- fix: https://github.com/redis/redis/pull/9733
So, don't use this function with EVAL or EVALSHA.
"""
if len(args) < 2:
# The command has no keys in it
return None

try:
command = self.commands[args[0]]
except KeyError:
# try to split the command name and to take only the main command
cmd_name = args[0].lower()
if cmd_name not in self.commands:
# try to split the command name and to take only the main command,
# e.g. 'memory' for 'memory usage'
args = args[0].split() + list(args[1:])
cmd_name = args[0].upper()
if cmd_name not in self.commands:
cmd_name_split = cmd_name.split()
cmd_name = cmd_name_split[0]
if cmd_name in self.commands:
# save the splitted command to args
args = cmd_name_split + list(args[1:])
else:
# We'll try to reinitialize the commands cache, if the engine
# version has changed, the commands may not be current
await self.initialize()
if cmd_name not in self.commands:
raise RedisError(
f"{cmd_name} command doesn't exist in Redis commands"
f"{cmd_name.upper()} command doesn't exist in Redis commands"
)

command = self.commands[cmd_name]
command = self.commands.get(cmd_name)
if "movablekeys" in command["flags"]:
keys = await self._get_moveable_keys(*args)
elif "pubsub" in command["flags"] or command["name"] == "pubsub":
keys = self._get_pubsub_keys(*args)
else:
if (
command["step_count"] == 0
and command["first_key_pos"] == 0
and command["last_key_pos"] == 0
):
is_subcmd = False
if "subcommands" in command:
subcmd_name = f"{cmd_name}|{args[1].lower()}"
for subcmd in command["subcommands"]:
if str_if_bytes(subcmd[0]) == subcmd_name:
command = self.parse_subcommand(subcmd)
is_subcmd = True

if command == 1:
return (args[1],)
if command == 0:
return None
if command == -1:
return await self._get_moveable_keys(*args)
# The command doesn't have keys in it
if not is_subcmd:
return None
last_key_pos = command["last_key_pos"]
if last_key_pos < 0:
last_key_pos = len(args) - abs(last_key_pos)
keys_pos = list(
range(command["first_key_pos"], last_key_pos + 1, command["step_count"])
)
keys = [args[pos] for pos in keys_pos]

last_key_pos = command["last_key_pos"]
if last_key_pos < 0:
last_key_pos = len(args) + last_key_pos
return args[command["first_key_pos"] : last_key_pos + 1 : command["step_count"]]
return keys

async def _get_moveable_keys(self, *args: Any) -> Optional[Tuple[str, ...]]:
try:
Expand Down
1 change: 1 addition & 0 deletions redis/_parsers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ def string_keys_to_dict(key_string, callback):
"MODULE UNLOAD": bool,
"PING": lambda r: str_if_bytes(r) == "PONG",
"PUBSUB NUMSUB": parse_pubsub_numsub,
"PUBSUB SHARDNUMSUB": parse_pubsub_numsub,
"QUIT": bool_ok,
"SET": parse_set_result,
"SCAN": parse_scan,
Expand Down
2 changes: 2 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def parse_cluster_shards(resp, **options):
"""
Parse CLUSTER SHARDS response.
"""
if isinstance(resp[0], dict):
return resp
shards = []
for x in resp:
shard = {"slots": [], "nodes": []}
Expand Down
41 changes: 28 additions & 13 deletions tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
from redis.utils import str_if_bytes
from tests.conftest import (
assert_resp_response,
is_resp2_connection,
skip_if_redis_enterprise,
skip_if_server_version_gte,
skip_if_server_version_lt,
skip_unless_arch_bits,
)
Expand Down Expand Up @@ -157,7 +159,7 @@ async def execute_command(*_args, **_kwargs):

def cmd_init_mock(self, r: ClusterNode) -> None:
self.commands = {
"GET": {
"get": {
"name": "get",
"arity": 2,
"flags": ["readonly", "fast"],
Expand Down Expand Up @@ -607,7 +609,7 @@ def map_7007(self):

def cmd_init_mock(self, r: ClusterNode) -> None:
self.commands = {
"GET": {
"get": {
"name": "get",
"arity": 2,
"flags": ["readonly", "fast"],
Expand Down Expand Up @@ -818,6 +820,8 @@ async def test_not_require_full_coverage_cluster_down_error(
assert all(await r.cluster_delslots(missing_slot))
with pytest.raises(ClusterDownError):
await r.exists("foo")
except ResponseError as e:
assert "CLUSTERDOWN" in str(e)
finally:
try:
# Add back the missing slot
Expand Down Expand Up @@ -1065,11 +1069,14 @@ async def test_cluster_delslots(self) -> None:

@skip_if_server_version_lt("7.0.0")
@skip_if_redis_enterprise()
async def test_cluster_delslotsrange(self, r: RedisCluster):
async def test_cluster_delslotsrange(self):
r = await get_mocked_redis_client(host=default_host, port=default_port)
mock_all_nodes_resp(r, "OK")
node = r.get_random_node()
mock_node_resp(node, "OK")
await r.cluster_addslots(node, 1, 2, 3, 4, 5)
assert await r.cluster_delslotsrange(1, 5)
assert node._free.pop().read_response.called
await r.close()

@skip_if_redis_enterprise()
async def test_cluster_failover(self, r: RedisCluster) -> None:
Expand Down Expand Up @@ -1255,11 +1262,18 @@ async def test_cluster_replicas(self, r: RedisCluster) -> None:
async def test_cluster_links(self, r: RedisCluster):
node = r.get_random_node()
res = await r.cluster_links(node)
links_to = sum(x.count("to") for x in res)
links_for = sum(x.count("from") for x in res)
assert links_to == links_for
for i in range(0, len(res) - 1, 2):
assert res[i][3] == res[i + 1][3]
if is_resp2_connection(r):
links_to = sum(x.count(b"to") for x in res)
links_for = sum(x.count(b"from") for x in res)
assert links_to == links_for
for i in range(0, len(res) - 1, 2):
assert res[i][3] == res[i + 1][3]
else:
links_to = len(list(filter(lambda x: x[b"direction"] == b"to", res)))
links_for = len(list(filter(lambda x: x[b"direction"] == b"from", res)))
assert links_to == links_for
for i in range(0, len(res) - 1, 2):
assert res[i][b"node"] == res[i + 1][b"node"]

@skip_if_redis_enterprise()
async def test_readonly(self) -> None:
Expand Down Expand Up @@ -1896,25 +1910,25 @@ async def test_cluster_bzpopmin(self, r: RedisCluster) -> None:
r,
await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
(b"{foo}b", b"b1", 10),
[b"b", b"b1", 10],
[b"{foo}b", b"b1", 10],
)
assert_resp_response(
r,
await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
(b"{foo}b", b"b2", 20),
[b"b", b"b2", 20],
[b"{foo}b", b"b2", 20],
)
assert_resp_response(
r,
await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
(b"{foo}a", b"a1", 1),
[b"a", b"a1", 1],
[b"{foo}a", b"a1", 1],
)
assert_resp_response(
r,
await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
(b"{foo}a", b"a2", 2),
[b"a", b"a2", 2],
[b"{foo}a", b"a2", 2],
)
assert await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) is None
await r.zadd("{foo}c", {"c1": 100})
Expand Down Expand Up @@ -2744,6 +2758,7 @@ async def test_asking_error(self, r: RedisCluster) -> None:
assert ask_node._free.pop().read_response.await_count
assert res == ["MOCK_OK"]

@skip_if_server_version_gte("7.0.0")
async def test_moved_redirection_on_slave_with_default(
self, r: RedisCluster
) -> None:
Expand Down

0 comments on commit 2c2860d

Please sign in to comment.