Skip to content

Commit

Permalink
Fix WAIT_MORE_DATA during stream (#35)
Browse files Browse the repository at this point in the history
Required to flush cache in the iterations of the loop that read global PEL as well.
  • Loading branch information
moticless committed Dec 7, 2023
1 parent 22630ea commit 22d5276
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 48 deletions.
2 changes: 1 addition & 1 deletion api/librdb-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ typedef void (*RdbLoggerCB) (RdbLogLevel l, const char *msg);
*
* This level of callback registration is adequate mainly when it is required
* to RESTORE from RDB source and play it against live Redis server.
****************************************************************/
****************************************************************/

typedef struct RdbHandlersRawCallbacks {
HANDLERS_COMMON_CALLBACKS
Expand Down
5 changes: 4 additions & 1 deletion src/ext/handlersToJson.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,10 @@ static RdbRes toJsonStreamItem(RdbParser *p, void *userData, RdbStreamID *id, Rd

static RdbRes toJsonStreamMetadata(RdbParser *p, void *userData, RdbStreamMeta *meta) {
RdbxToJson *ctx = userData;
if (ctx->state != R2J_IN_STREAM_ENTRIES) {

if (ctx->state == R2J_IN_KEY) /* no entries recorded. place empty array */
fprintf(ctx->outfile, "{\n \"entries\":[");
else if (ctx->state != R2J_IN_STREAM_ENTRIES) {
RDB_reportError(p, (RdbRes) RDBX_ERR_R2J_INVALID_STATE,
"toJsonStreamMetadata(): Invalid state value: %d", ctx->state);
return (RdbRes) RDBX_ERR_R2J_INVALID_STATE;
Expand Down
44 changes: 22 additions & 22 deletions src/lib/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -1662,29 +1662,29 @@ RdbStatus elementHash(RdbParser *p) {
case ST_HASH_NEXT: {
BulkInfo *binfoField, *binfoValue;

if (ctx->hash.visitingField == ctx->hash.numFields)
return nextParsingElement(p, PE_END_KEY);
while(ctx->hash.visitingField < ctx->hash.numFields) {
IF_NOT_OK_RETURN(rdbLoadString(p, RQ_ALLOC_APP_BULK, NULL, &binfoField));
IF_NOT_OK_RETURN(rdbLoadString(p, RQ_ALLOC_APP_BULK, NULL, &binfoValue));

IF_NOT_OK_RETURN(rdbLoadString(p, RQ_ALLOC_APP_BULK, NULL, &binfoField));
IF_NOT_OK_RETURN(rdbLoadString(p, RQ_ALLOC_APP_BULK, NULL, &binfoValue));
/*** ENTER SAFE STATE ***/

/*** ENTER SAFE STATE ***/
registerAppBulkForNextCb(p, binfoField);
registerAppBulkForNextCb(p, binfoValue);
if (p->elmCtx.key.handleByLevel == RDB_LEVEL_STRUCT) {
CALL_HANDLERS_CB(p, NOP, RDB_LEVEL_STRUCT, rdbStruct.handleHashPlain,
binfoField->ref,
binfoValue->ref);
}
else {
CALL_HANDLERS_CB(p, NOP, RDB_LEVEL_DATA, rdbData.handleHashField,
binfoField->ref,
binfoValue->ref);
}
++ctx->hash.visitingField;

registerAppBulkForNextCb(p, binfoField);
registerAppBulkForNextCb(p, binfoValue);
if (p->elmCtx.key.handleByLevel == RDB_LEVEL_STRUCT) {
CALL_HANDLERS_CB(p, NOP, RDB_LEVEL_STRUCT, rdbStruct.handleHashPlain,
binfoField->ref,
binfoValue->ref);
updateElementState(p, ST_HASH_NEXT, 0);
}
else {
CALL_HANDLERS_CB(p, NOP, RDB_LEVEL_DATA, rdbData.handleHashField,
binfoField->ref,
binfoValue->ref);
}

++ctx->hash.visitingField;
return updateElementState(p, ST_HASH_NEXT, 0);
return nextParsingElement(p, PE_END_KEY);
}

default:
Expand Down Expand Up @@ -1889,10 +1889,10 @@ RdbStatus elementZset(RdbParser *p) {
else
CALL_HANDLERS_CB(p, NOP, RDB_LEVEL_DATA, rdbData.handleZsetMember, binfoItem->ref, score);

if (--ctx->zset.left)
updateElementState(p, ST_ZSET_NEXT_ITEM, 0);
else
if ((--ctx->zset.left) == 0)
return nextParsingElement(p, PE_END_KEY);

updateElementState(p, ST_ZSET_NEXT_ITEM, 0);
}
}
default:
Expand Down
73 changes: 49 additions & 24 deletions src/lib/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,32 +445,32 @@ static inline void registerAppBulkForNextCb(RdbParser *p, BulkInfo *binfo) {
p->appCbCtx.bulks[p->appCbCtx.numBulks++] = binfo;
}

/****************************************************************
* Update element state & cache flushing
*
* On each completion of element state, beside of updating the next state (or
* next element) we need to flush the cache as well. By flushing the cache we
* ensure that it won't hold outdated data from previous state so if the state
* gets interrupted in the middle of read, the parser can later resume the same
* interrupted element state with a relevant cache that reflects only the
* interrupted state and not before (more description at bulkAlloc.h).
*
* Some of the flows are optimized to make short transition to the next state
* without return back to the main loop. In that kind of on transition, it is
* still required to update the state, and in turn flush the cache. See for example
* state ST_LIST_HEADER which makes direct "fall-thru" to state ST_LIST_NEXT_NODE.
*
* Another pattern of short transition, is a state that iterates multiple times
* without returning to the main loop. At the end of each iteration it is required
* to update the state, i.e., flush the cache. See for example state
* ST_LIST_NEXT_NODE which takes care at the end of iteration to update the state,
* and flush the cache, even though at first glance it might look redundant.
****************************************************************/
static inline RdbStatus updateElementState(RdbParser *p, int newState, int noFlush);
static inline RdbStatus nextParsingElementState(RdbParser *p, ParsingElementType next, int st);
static inline RdbStatus nextParsingElement(RdbParser *p, ParsingElementType next);
extern void bulkPoolFlush(RdbParser *p); /* avoid cyclic headers inclusion */

static inline RdbStatus updateElementState(RdbParser *p, int newState, int noFlush) {

/* if state completed without allocating anything then
* we can save few cpu cycles by avoid flushing the pool */
if (!noFlush) bulkPoolFlush(p);

p->elmCtx.state = newState;
return RDB_STATUS_OK;
}

static inline RdbStatus nextParsingElementState(RdbParser *p, ParsingElementType next, int st) {
bulkPoolFlush(p);
p->elmCtx.state = st;
p->parsingElement = next;
return RDB_STATUS_OK;
}

static inline RdbStatus nextParsingElement(RdbParser *p, ParsingElementType next) {
bulkPoolFlush(p);
p->elmCtx.state = 0;
p->parsingElement = next;
return RDB_STATUS_OK;
}

/*** sub-element parsing ***/
RdbStatus subElementCall(RdbParser *p, ParsingElementType next, int returnState);
RdbStatus subElementReturn(RdbParser *p, BulkInfo *bulkResult);
Expand Down Expand Up @@ -557,4 +557,29 @@ RdbStatus elementRawZsetZL(RdbParser *p);
RdbStatus elementRawModule(RdbParser *p);
RdbStatus elementRawStreamLP(RdbParser *p);

/*** inline functions ***/

static inline RdbStatus updateElementState(RdbParser *p, int newState, int noFlush) {
/* if state completed without allocating anything then
* we can save few cpu cycles by avoid flushing the pool */
if (!noFlush) bulkPoolFlush(p);

p->elmCtx.state = newState;
return RDB_STATUS_OK;
}

static inline RdbStatus nextParsingElementState(RdbParser *p, ParsingElementType next, int st) {
bulkPoolFlush(p);
p->elmCtx.state = st;
p->parsingElement = next;
return RDB_STATUS_OK;
}

static inline RdbStatus nextParsingElement(RdbParser *p, ParsingElementType next) {
bulkPoolFlush(p);
p->elmCtx.state = 0;
p->parsingElement = next;
return RDB_STATUS_OK;
}

#endif /*LIBRDB_PARSER_H*/
1 change: 1 addition & 0 deletions src/lib/parserRaw.c
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,7 @@ RdbStatus elementRawStreamLP(RdbParser *p) {
IF_NOT_OK_RETURN(aggUpdateWritten(p, pelLen));

streamCtx->globPelLeft--;
updateElementState(p, ST_LOAD_GLOBAL_PEL, 0);
}
updateElementState(p, ST_LOAD_NUM_CONSUMERS, 0); /* fall-thru */

Expand Down
Binary file added test/dumps/misc_with_stream.rdb
Binary file not shown.
6 changes: 6 additions & 0 deletions test/test_rdb_to_resp.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ static void test_r2r_stream(void **state) {
runWithAndWithoutRestore("stream_v11.rdb");
}

static void test_r2r_misc_with_stream(void **state) {
UNUSED(state);
runWithAndWithoutRestore("misc_with_stream.rdb");
}

static void test_r2r_module(void **state) {
UNUSED(state);
unsigned char expRespData[] = {
Expand Down Expand Up @@ -335,6 +340,7 @@ int group_rdb_to_resp(void) {
cmocka_unit_test(test_r2r_stream),
cmocka_unit_test(test_r2r_stream_with_target_62_and_72),
/* misc */
cmocka_unit_test(test_r2r_misc_with_stream),
cmocka_unit_test(test_r2r_multiple_lists_and_strings),
cmocka_unit_test(test_r2r_del_before_write_restore_replace),
};
Expand Down

0 comments on commit 22d5276

Please sign in to comment.