Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cli/rdb-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ static RdbRes formatRedis(RdbParser *parser, int argc, char **argv) {
if (getOptArg(argc, argv, &at, "-1", "--single-db", &(conf.singleDb), NULL)) continue;
if (getOptArg(argc, argv, &at, "-t", "--target-redis-ver", NULL, &(conf.dstRedisVersion))) continue;
if (getOptArgVal(argc, argv, &at, "-l", "--pipeline-depth", NULL, &pipeDepthVal, 1, 1000)) continue;
if (getOptArgVal(argc, argv, &at, "-n", "--start-cmd-num", NULL, &startCmdNum, 1, INT_MAX)) continue;
if (getOptArgVal(argc, argv, &at, "-s", "--start-cmd-num", NULL, &startCmdNum, 1, INT_MAX)) continue;
if (getOptArg(argc, argv, &at, "-u", "--user", NULL, &auth.user)) continue;
if (getOptArg(argc, argv, &at, "-P", "--password", NULL, &auth.pwd)) continue;
if (getOptArg(argc, argv, &at, "-e", "--enum-commands", &commandEnum, NULL)) continue;
Expand Down Expand Up @@ -313,7 +313,7 @@ static RdbRes formatResp(RdbParser *parser, int argc, char **argv) {
if (getOptArg(argc, argv, &at, "-1", "--single-db", &(conf.singleDb), NULL)) continue;
if (getOptArg(argc, argv, &at, "-t", "--target-redis-ver", NULL, &(conf.dstRedisVersion))) continue;
if (getOptArg(argc, argv, &at, "-e", "--enum-commands", &commandEnum, NULL)) continue;
if (getOptArgVal(argc, argv, &at, "-n", "--start-cmd-num", NULL, &startCmdNum, 1, INT_MAX)) continue;
if (getOptArgVal(argc, argv, &at, "-s", "--start-cmd-num", NULL, &startCmdNum, 1, INT_MAX)) continue;

loggerWrap(RDB_LOG_ERR, "Invalid 'resp' [FORMAT_OPTIONS] argument: %s\n", opt);
printUsage(1);
Expand Down
59 changes: 34 additions & 25 deletions src/ext/handlersToResp.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ static inline RdbRes onWriteNewCmdDbg(RdbxToResp *ctx) {
/* Write only commands starting from given command number */
if ((ctx->debug.flags & RFLAG_WRITE_FROM_CMD_ID) &&
(currCmdNum < ctx->debug.writeFromCmdNum))
return RDB_OK;
return RDB_OK_DONT_PROPAGATE;

/* enumerate and trace cmd-id by preceding each cmd with "SET _RDB_CLI_CMD_ID_ <CMD-ID>" */
if (ctx->debug.flags & RFLAG_ENUM_CMD_ID) {
Expand Down Expand Up @@ -256,8 +256,10 @@ static inline RdbRes writevWrap(RdbxToResp *ctx, struct iovec *iov, int cnt,
RdbxRespWriter *writer = &ctx->respWriter;

if (unlikely(ctx->debug.flags && startCmd)) {
if ((res = onWriteNewCmdDbg(ctx)) != RDB_OK)
if ((res = onWriteNewCmdDbg(ctx)) != RDB_OK) {
if (res == RDB_OK_DONT_PROPAGATE) return RDB_OK;
return RDB_getErrorCode(ctx->parser);
}
}

if (unlikely(writer->writev(writer->ctx, iov, cnt, startCmd, endCmd))) {
Expand Down Expand Up @@ -790,7 +792,7 @@ static RdbRes toRespStreamNewConsumer(RdbParser *p, void *userData, RdbBulk cons
UNUSED(meta);
RdbxToResp *ctx = userData;

/* (re)allocate mem to keep consumer name */
/* (re)allocate mem to keep consumer name for potential XCLAIM commands */
RDB_bulkCopyFree(p, ctx->streamCtx.consName);

ctx->streamCtx.consNameLen = RDB_bulkLen(p, consName);
Expand All @@ -800,7 +802,14 @@ static RdbRes toRespStreamNewConsumer(RdbParser *p, void *userData, RdbBulk cons
return RDB_OK;
}

/* Callback to handle a pending entry within a consumer */
/* Callback to handle a pending entry within a consumer
*
* Note that for a given consumer, if it doesn't have any pending entries, then
* this callback will not be called at all and the consumer will not be created.
* This is intentional since consumers without pending entries are not needed.
* This behavior is different from the `supportRestore` flow, which preserves ALL
* consumers blindly, including those without pending entries.
*/
static RdbRes toRespStreamConsumerPendingEntry(RdbParser *p, void *userData, RdbStreamID *streamId) {
RdbStreamPendingEntry *pe;
char cmdTrailer[256], idStr[100], keyLenStr[32], gNameLenStr[32], cNameLenStr[32], sentTime[32], sentCount[32];
Expand Down Expand Up @@ -1011,27 +1020,27 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *p, RdbxToRespConf *
ctx->streamCtx.groupPel = NULL;

static RdbHandlersDataCallbacks dataCb = {
toRespStartRdb,
toRespEndRdb,
toRespNewDb,
NULL, /*db-size*/
NULL, /*slot-info*/
NULL, /*aux-field*/
toRespNewKey,
toRespEndKey,
toRespString,
toRespList,
toRespHash,
toRespSet,
toRespZset,
toRespFunction,
NULL, /*module*/
toRespStreamMetaData,
toRespStreamItem,
toRespStreamNewCGroup,
toRespStreamCGroupPendingEntry,
toRespStreamNewConsumer,
toRespStreamConsumerPendingEntry
toRespStartRdb, /*handleStartRdb*/
toRespEndRdb, /*handleEndRdb*/
toRespNewDb, /*handleNewDb*/
NULL, /*handleDbSize*/
NULL, /*handleSlotInfo*/
NULL, /*handleAuxField*/
toRespNewKey, /*handleNewKey*/
toRespEndKey, /*handleEndKey*/
toRespString, /*handleStringValue*/
toRespList, /*handleListItem*/
toRespHash, /*handleHashField*/
toRespSet, /*handleSetMember*/
toRespZset, /*handleZsetMember*/
toRespFunction, /*handleFunction*/
NULL, /*handleModule*/
toRespStreamMetaData, /*handleStreamMetadata*/
toRespStreamItem, /*handleStreamItem*/
toRespStreamNewCGroup, /*handleStreamNewCGroup*/
toRespStreamCGroupPendingEntry, /*handleStreamCGroupPendingEntry*/
toRespStreamNewConsumer, /*handleStreamNewConsumer*/
toRespStreamConsumerPendingEntry /*handleStreamConsumerPendingEntry*/
};
RDB_createHandlersData(p, &dataCb, ctx, deleteRdbToRespCtx);

Expand Down
15 changes: 12 additions & 3 deletions src/lib/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -846,15 +846,24 @@ static void resolveMultipleLevelsRegistration(RdbParser *p) {
}

static void attachDebugHandlers(RdbParser *p) {
int registered = 0;
RDB_setLogLevel(p, RDB_LOG_DBG);
/* find the lowest level that handlers are registered and register DBG handlers accordingly */

/* Register debug handlers at all levels where there are existing handlers */
if (p->numHandlers[RDB_LEVEL_RAW]) {
registered = 1;
RdbHandlersRawCallbacks cb = {.handleNewKey = handleNewKeyPrintDbg, .handleEndKey = handleEndKeyPrintDbg};
RDB_createHandlersRaw(p, &cb, NULL, NULL);
} else if (p->numHandlers[RDB_LEVEL_STRUCT]) {
}

if (p->numHandlers[RDB_LEVEL_STRUCT]) {
registered = 1;
RdbHandlersStructCallbacks cb = {.handleNewKey = handleNewKeyPrintDbg, .handleEndKey = handleEndKeyPrintDbg};
RDB_createHandlersStruct(p, &cb, NULL, NULL);
} else {
}

/* If registerd at DATA level or if no handlers are registered at any level */
if ((p->numHandlers[RDB_LEVEL_DATA]) || (!registered)) {
RdbHandlersDataCallbacks cb = {.handleNewKey = handleNewKeyPrintDbg, .handleEndKey = handleEndKeyPrintDbg};
RDB_createHandlersData(p, &cb, NULL, NULL);
}
Expand Down