Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail XREAD[GROUP] if duplicate keys were given #11705

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -7389,8 +7389,8 @@ struct redisCommand redisCommandTable[] = {
{"xlen","Return the number of entries in a stream","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XLEN_History,XLEN_tips,xlenCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XLEN_Args},
{"xpending","Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged.","O(N) with N being the number of elements returned, so asking for a small fixed number of entries per call is O(1). O(M), where M is the total number of entries scanned when used with the IDLE filter. When the command returns just the summary and the list of consumers is small, it runs in O(1) time; otherwise, an additional O(N) time for iterating every consumer.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XPENDING_History,XPENDING_tips,xpendingCommand,-3,CMD_READONLY,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XPENDING_Args},
{"xrange","Return a range of elements in a stream, with IDs matching the specified IDs interval","O(N) with N being the number of elements being returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XRANGE_History,XRANGE_tips,xrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XRANGE_Args},
{"xread","Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.","For each stream mentioned: O(N) with N being the number of elements being returned, it means that XREAD-ing with a fixed COUNT is O(1). Note that when the BLOCK option is used, XADD will pay O(M) time in order to serve the M clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREAD_History,XREAD_tips,xreadCommand,-4,CMD_BLOCKING|CMD_READONLY|CMD_BLOCKING,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",1},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREAD_Args},
{"xreadgroup","Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.","For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREADGROUP_History,XREADGROUP_tips,xreadCommand,-7,CMD_BLOCKING|CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",4},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREADGROUP_Args},
{"xread","Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.","O(N^2+NM) where N is the number of streams and M is the number of elements returned. When reading from a single stream, if M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when the command blocks, XADD will pay the O(L) time in order to serve the L clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREAD_History,XREAD_tips,xreadCommand,-4,CMD_BLOCKING|CMD_READONLY|CMD_BLOCKING,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",1},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREAD_Args},
{"xreadgroup","Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.","O(N^2+NM) where N is the number of streams and M is the number of elements returned. When reading from a single stream, if M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when the command blocks, XADD will pay the O(L) time in order to serve the L clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREADGROUP_History,XREADGROUP_tips,xreadCommand,-7,CMD_BLOCKING|CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",4},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREADGROUP_Args},
{"xrevrange","Return a range of elements in a stream, with IDs matching the specified IDs interval, in reverse order (from greater to smaller IDs) compared to XRANGE","O(N) with N being the number of elements returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREVRANGE_History,XREVRANGE_tips,xrevrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XREVRANGE_Args},
{"xsetid","An internal command for replicating stream values","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XSETID_History,XSETID_tips,xsetidCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XSETID_Args},
{"xtrim","Trims the stream to (approximately if '~' is passed) a certain size","O(N), with N being the number of evicted entries. Constant times are very small however, since entries are organized in macro nodes containing multiple entries that can be released with a single deallocation.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XTRIM_History,XTRIM_tips,xtrimCommand,-4,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XTRIM_Args},
Expand Down
2 changes: 1 addition & 1 deletion src/commands/xread.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"XREAD": {
"summary": "Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.",
"complexity": "For each stream mentioned: O(N) with N being the number of elements being returned, it means that XREAD-ing with a fixed COUNT is O(1). Note that when the BLOCK option is used, XADD will pay O(M) time in order to serve the M clients blocked on the stream getting new data.",
"complexity": "O(N^2+NM) where N is the number of streams and M is the number of elements returned. When reading from a single stream, if M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when the command blocks, XADD will pay the O(L) time in order to serve the L clients blocked on the stream getting new data.",
"group": "stream",
"since": "5.0.0",
"arity": -4,
Expand Down
2 changes: 1 addition & 1 deletion src/commands/xreadgroup.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"XREADGROUP": {
"summary": "Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.",
"complexity": "For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.",
"complexity": "O(N^2+NM) where N is the number of streams and M is the number of elements returned. When reading from a single stream, if M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when the command blocks, XADD will pay the O(L) time in order to serve the L clients blocked on the stream getting new data.",
"group": "stream",
"since": "5.0.0",
"arity": -7,
Expand Down
11 changes: 11 additions & 0 deletions src/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,17 @@ void xreadCommand(client *c) {
return;
}

for (int i = streams_arg + streams_count; i < c->argc; i++) {
for (int j = i + 1; j < c->argc; j++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small question which might not be super important- does that implies that the command is no longer O(NM) where N=number of keys and M is the number of msgs returned by each key, but rather O(N^2+NM)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, i'll update the json files

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

considering N is expected to be rather low, maybe it shouldn't be there?
@itamarhaber?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or if it isn't expected to be always rather low, maybe this change is wrong..

Copy link
Collaborator

@ranshid ranshid Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also with the new blocking mechanism we will repeat this when reprocessing... might be able to optimize in case this is run during unblock?
Alternative might also be to keep some bit filter on the keys (like simple bloom filter), but again in the worse case it will still be (N^2)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can skip that validation in that case, but in order to do that we'll probably need to add a new flag to the client struct, saying we are reprocessing after unblocking... sounds good?

i'm wondering if there are any other expensive validations/calculations we can skip in other commands... can you please have a quick look?

Copy link
Collaborator

@ranshid ranshid Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sure we could spare some checks, but I will argue that every command should have arity check function (could even be generated automatically from json), so we could better way to separate the 2 calls and fail the command on invalid arguments at much earlier stage. However that is a big change, and I guess making this case O(2(N^2)+NM) is not the critical thing.

To you question - the only flag that can currently be used is the CLIENT_UNBLOCKED flag... but that might change in the future

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we restrict this PR to just adding validation for duplicate key access in XREAD[GROUP] and probably file a separate issue/PR for avoiding revalidation during reprocessing ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we can.. that way we can maybe handle the re-validation optimization earlier.

robj *curr = c->argv[i-streams_count];
robj *other = c->argv[j-streams_count];
if (!sdscmp(curr->ptr, other->ptr)) {
addReplyError(c,"Duplicate keys are not supported");
return;
}
}
}

/* Parse the IDs and resolve the group name. */
if (streams_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*streams_count);
Expand Down
33 changes: 14 additions & 19 deletions tests/unit/type/stream.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -428,32 +428,21 @@ start_server {
$rd close
}

test {XREAD with same stream name multiple times should work} {
r XADD s2 * old abcd1234
set rd [redis_deferring_client]
$rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
wait_for_blocked_clients_count 1
r XADD s2 * new abcd1234
set res [$rd read]
assert {[lindex $res 0 0] eq {s2}}
assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
$rd close
}

test {XREAD + multiple XADD inside transaction} {
r XADD s2 * old abcd1234
set rd [redis_deferring_client]
$rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
$rd XREAD BLOCK 20000 STREAMS s1{t} s2{t} s3{t} $ $ $
wait_for_blocked_clients_count 1
r MULTI
r XADD s2 * field one
r XADD s2 * field two
r XADD s2 * field three
r XADD s1{t} * field one
r XADD s2{t} * field two
r XADD s3{t} * field three
r EXEC
set res [$rd read]
assert {[lindex $res 0 0] eq {s2}}
assert {[lindex $res 0 1 0 1] eq {field one}}
assert {[lindex $res 0 1 1 1] eq {field two}}
assert_equal [llength $res] 3
assert_equal [lindex $res 0 0] {s1{t}}
assert_equal [lindex $res 1 0] {s2{t}}
assert_equal [lindex $res 2 0] {s3{t}}
$rd close
}

Expand Down Expand Up @@ -901,6 +890,12 @@ start_server {tags {"stream"}} {
r XADD x 1-18446744073709551615 f1 v1
assert_error {*The ID specified in XADD is equal or smaller*} {r XADD x 1-* f2 v2}
}

test {XREAD dup keys} {
r XADD s 1-0 f v
catch {r XREAD STREAMS x x 0 0} e
assert_match "*Duplicate keys are not supported*" $e
}
}

start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} {
Expand Down