Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XREADGROUP from PEL should not affect server.dirty #13251

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ struct client;
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
Expand Down
28 changes: 17 additions & 11 deletions src/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
boundaries, just the entries. */
#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi, unsigned long *propCount) {
void *arraylen_ptr = NULL;
size_t arraylen = 0;
streamIterator si;
Expand All @@ -1666,6 +1666,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
int propagate_last_id = 0;
int noack = flags & STREAM_RWR_NOACK;

if (propCount) *propCount = 0;

/* If the client is asking for some history, we serve it using a
* different function, so that we return entries *solely* from its
* own PEL. This ensures each consumer will always and only see
Expand Down Expand Up @@ -1764,15 +1766,18 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
robj *idarg = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg);
if (propCount) (*propCount)++;
}
}

arraylen++;
if (count && count == arraylen) break;
}

if (spi && propagate_last_id)
if (spi && propagate_last_id) {
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
if (propCount) (*propCount)++;
}

streamIteratorStop(&si);
if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
Expand Down Expand Up @@ -1808,7 +1813,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
streamID thisid;
streamDecodeID(ri.key,&thisid);
if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL) == 0)
STREAM_RWR_RAWENTRIES,NULL,NULL) == 0)
{
/* Note that we may have a not acknowledged entry in the PEL
* about a message that's no longer here because was removed
Expand Down Expand Up @@ -2124,7 +2129,7 @@ void xrangeGenericCommand(client *c, int rev) {
addReplyNullArray(c);
} else {
if (count == -1) count = 0;
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL,NULL);
}
}

Expand Down Expand Up @@ -2386,12 +2391,13 @@ void xreadCommand(client *c) {
addReplyBulk(c,c->argv[streams_arg+i]);

int flags = 0;
unsigned long propCount = 0;
if (noack) flags |= STREAM_RWR_NOACK;
if (serve_history) flags |= STREAM_RWR_HISTORY;
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
consumer, flags, &spi);
if (groups) server.dirty++;
consumer, flags, &spi, &propCount);
if (propCount) server.dirty++;
}
}

Expand Down Expand Up @@ -3298,7 +3304,7 @@ void xclaimCommand(client *c) {
if (justid) {
addReplyStreamID(c,&id);
} else {
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1);
}
arraylen++;

Expand Down Expand Up @@ -3473,7 +3479,7 @@ void xautoclaimCommand(client *c) {
if (justid) {
addReplyStreamID(c,&id);
} else {
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL,NULL) == 1);
}
arraylen++;
count--;
Expand Down Expand Up @@ -3697,18 +3703,18 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
end.ms = end.seq = UINT64_MAX;
addReplyBulkCString(c,"first-entry");
emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
STREAM_RWR_RAWENTRIES,NULL,NULL);
if (!emitted) addReplyNull(c);
addReplyBulkCString(c,"last-entry");
emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
STREAM_RWR_RAWENTRIES,NULL,NULL);
if (!emitted) addReplyNull(c);
} else {
/* XINFO STREAM <key> FULL [COUNT <count>] */

/* Stream entries */
addReplyBulkCString(c,"entries");
streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL);
streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL,NULL);

/* Consumer groups */
addReplyBulkCString(c,"groups");
Expand Down
75 changes: 75 additions & 0 deletions tests/unit/type/stream-cgroups.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,68 @@ start_server {
assert_error "*NOGROUP*" {r XGROUP CREATECONSUMER mystream mygroup consumer}
}

test {XREADGROUP of multiple entries changes dirty by one} {
r DEL x
r XADD x 1-0 data a
r XADD x 2-0 data b
r XADD x 3-0 data c
r XADD x 4-0 data d
r XGROUP CREATE x g1 0
r XGROUP CREATECONSUMER x g1 Alice

set dirty [s rdb_changes_since_last_save]
set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x ">"]
assert_equal $res {{x {{1-0 {data a}} {2-0 {data b}}}}}
set dirty2 [s rdb_changes_since_last_save]
assert {$dirty2 == $dirty + 1}

set dirty [s rdb_changes_since_last_save]
set res [r XREADGROUP GROUP g1 Alice NOACK COUNT 2 STREAMS x ">"]
assert_equal $res {{x {{3-0 {data c}} {4-0 {data d}}}}}
set dirty2 [s rdb_changes_since_last_save]
assert {$dirty2 == $dirty + 1}
}

test {XREADGROUP from PEL does not change dirty} {
# Techinally speaking, XREADGROUP from PEL should cause propagation
# because it change the delivery count/time
# It was decided that this metadata changes are too insiginificant
# to justify propagation
# This test covers that.
r DEL x
r XADD x 1-0 data a
r XADD x 2-0 data b
r XADD x 3-0 data c
r XADD x 4-0 data d
r XGROUP CREATE x g1 0
r XGROUP CREATECONSUMER x g1 Alice

set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x ">"]
assert_equal $res {{x {{1-0 {data a}} {2-0 {data b}}}}}

set dirty [s rdb_changes_since_last_save]
set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x 0]
assert_equal $res {{x {{1-0 {data a}} {2-0 {data b}}}}}
set dirty2 [s rdb_changes_since_last_save]
assert {$dirty2 == $dirty}

set dirty [s rdb_changes_since_last_save]
set res [r XREADGROUP GROUP g1 Alice COUNT 2 STREAMS x 9000]
assert_equal $res {{x {}}}
set dirty2 [s rdb_changes_since_last_save]
assert {$dirty2 == $dirty}

# The current behavior is that we create the consumer (causes dirty++) even
# if we onlyneed to read from PEL.
# It feels like we shouldn't create the consumer in that case, but I added
# this test just for coverage of current behavior
set dirty [s rdb_changes_since_last_save]
set res [r XREADGROUP GROUP g1 noconsumer COUNT 2 STREAMS x 0]
assert_equal $res {{x {}}}
set dirty2 [s rdb_changes_since_last_save]
assert {$dirty2 == $dirty + 1}
}

start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no appendfsync always}} {
test {XREADGROUP with NOACK creates consumer} {
r del mystream
Expand Down Expand Up @@ -1329,6 +1391,19 @@ start_server {
assert_equal [dict get $group entries-read] 3
assert_equal [dict get $group lag] 0
}

test {XREADGROUP from PEL inside MULTI} {
# This scenario used to cause propagation of EXEC without MULTI in 6.2
$replica config set propagation-error-behavior panic
$master del mystream
$master xadd mystream 1-0 a b c d e f
$master xgroup create mystream mygroup 0
$master xreadgroup group mygroup ryan count 1 streams mystream >
$master multi
$master xreadgroup group mygroup ryan count 1 streams mystream 0
set reply [$master exec]
assert_equal $reply {{{mystream {{1-0 {a b c d e f}}}}}}
}
}

start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} {
Expand Down
Loading