Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/module.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ typedef struct RedisModuleCtx RedisModuleCtx;
#define REDISMODULE_CTX_THREAD_SAFE (1<<4) #define REDISMODULE_CTX_THREAD_SAFE (1<<4)
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5) #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5)
#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<6) #define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<6)
#define REDISMODULE_CTX_MULTI_EMITTED (1<<7)


/* This represents a Redis key opened with RM_OpenKey(). */ /* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey { struct RedisModuleKey {
Expand Down Expand Up @@ -599,6 +600,10 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {


/* We don't need to do anything here if the context was never used /* We don't need to do anything here if the context was never used
* in order to propagate commands. */ * in order to propagate commands. */
if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;

/* We don't need to do anything here if the server isn't inside
* a transaction. */
if (!server.propagate_in_transaction) return; if (!server.propagate_in_transaction) return;


/* If this command is executed from with Lua or MULTI/EXEC we do noy /* If this command is executed from with Lua or MULTI/EXEC we do noy
Expand All @@ -607,9 +612,9 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {


/* Handle the replication of the final EXEC, since whatever a command /* Handle the replication of the final EXEC, since whatever a command
* emits is always wrapped around MULTI/EXEC. */ * emits is always wrapped around MULTI/EXEC. */
beforePropagateMultiOrExec(0);
alsoPropagate(server.execCommand,c->db->id,&shared.exec,1, alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
PROPAGATE_AOF|PROPAGATE_REPL); PROPAGATE_AOF|PROPAGATE_REPL);
afterPropagateExec();


/* If this is not a module command context (but is instead a simple /* If this is not a module command context (but is instead a simple
* callback context), we have to handle directly the "also propagate" * callback context), we have to handle directly the "also propagate"
Expand Down Expand Up @@ -1707,10 +1712,12 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
* context, we have to setup the op array for the "also propagate" API * context, we have to setup the op array for the "also propagate" API
* so that RM_Replicate() will work. */ * so that RM_Replicate() will work. */
if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) { if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) {
serverAssert(ctx->saved_oparray.ops == NULL);
ctx->saved_oparray = server.also_propagate; ctx->saved_oparray = server.also_propagate;
redisOpArrayInit(&server.also_propagate); redisOpArrayInit(&server.also_propagate);
} }
execCommandPropagateMulti(ctx->client->db->id); execCommandPropagateMulti(ctx->client->db->id);
ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
} }


/* Replicate the specified command and arguments to slaves and AOF, as effect /* Replicate the specified command and arguments to slaves and AOF, as effect
Expand Down Expand Up @@ -4067,20 +4074,30 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
} }
} }


/* If we are using single commands replication, we need to wrap what /* We need to use a global replication_allowed flag in order to prevent
* we propagate into a MULTI/EXEC block, so that it will be atomic like * replication of nested RM_Calls. Example:
* a Lua script in the context of AOF and slaves. */ * 1. module1.foo does RM_Call of module2.bar without replication (i.e. no '!')
if (replicate) moduleReplicateMultiIfNeeded(ctx); * 2. module2.bar internally calls RM_Call of INCR with '!'
* 3. at the end of module1.foo we call RM_ReplicateVerbatim
* We want the replica/AOF to see only module1.foo and not the INCR from module2.bar */
int prev_replication_allowed = server.replication_allowed;
server.replication_allowed = replicate && server.replication_allowed;


/* Run the command */ /* Run the command */
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_NOWRAP; int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_NOWRAP;
if (replicate) { if (replicate) {
/* If we are using single commands replication, we need to wrap what
* we propagate into a MULTI/EXEC block, so that it will be atomic like
* a Lua script in the context of AOF and slaves. */
moduleReplicateMultiIfNeeded(ctx);

if (!(flags & REDISMODULE_ARGV_NO_AOF)) if (!(flags & REDISMODULE_ARGV_NO_AOF))
call_flags |= CMD_CALL_PROPAGATE_AOF; call_flags |= CMD_CALL_PROPAGATE_AOF;
if (!(flags & REDISMODULE_ARGV_NO_REPLICAS)) if (!(flags & REDISMODULE_ARGV_NO_REPLICAS))
call_flags |= CMD_CALL_PROPAGATE_REPL; call_flags |= CMD_CALL_PROPAGATE_REPL;
} }
call(c,call_flags); call(c,call_flags);
server.replication_allowed = prev_replication_allowed;


serverAssert((c->flags & CLIENT_BLOCKED) == 0); serverAssert((c->flags & CLIENT_BLOCKED) == 0);


Expand Down
14 changes: 7 additions & 7 deletions src/multi.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -113,30 +113,30 @@ void discardCommand(client *c) {
addReply(c,shared.ok); addReply(c,shared.ok);
} }


void beforePropagateMultiOrExec(int multi) { void beforePropagateMulti() {
if (multi) {
/* Propagating MULTI */ /* Propagating MULTI */
serverAssert(!server.propagate_in_transaction); serverAssert(!server.propagate_in_transaction);
server.propagate_in_transaction = 1; server.propagate_in_transaction = 1;
} else { }

void afterPropagateExec() {
/* Propagating EXEC */ /* Propagating EXEC */
serverAssert(server.propagate_in_transaction == 1); serverAssert(server.propagate_in_transaction == 1);
server.propagate_in_transaction = 0; server.propagate_in_transaction = 0;
} }
}


/* Send a MULTI command to all the slaves and AOF file. Check the execCommand /* Send a MULTI command to all the slaves and AOF file. Check the execCommand
* implementation for more information. */ * implementation for more information. */
void execCommandPropagateMulti(int dbid) { void execCommandPropagateMulti(int dbid) {
beforePropagateMultiOrExec(1); beforePropagateMulti();
propagate(server.multiCommand,dbid,&shared.multi,1, propagate(server.multiCommand,dbid,&shared.multi,1,
PROPAGATE_AOF|PROPAGATE_REPL); PROPAGATE_AOF|PROPAGATE_REPL);
} }


void execCommandPropagateExec(int dbid) { void execCommandPropagateExec(int dbid) {
beforePropagateMultiOrExec(0);
propagate(server.execCommand,dbid,&shared.exec,1, propagate(server.execCommand,dbid,&shared.exec,1,
PROPAGATE_AOF|PROPAGATE_REPL); PROPAGATE_AOF|PROPAGATE_REPL);
afterPropagateExec();
} }


/* Aborts a transaction, with a specific error message. /* Aborts a transaction, with a specific error message.
Expand Down Expand Up @@ -254,7 +254,6 @@ void execCommand(client *c) {
if (server.propagate_in_transaction) { if (server.propagate_in_transaction) {
int is_master = server.masterhost == NULL; int is_master = server.masterhost == NULL;
server.dirty++; server.dirty++;
beforePropagateMultiOrExec(0);
/* If inside the MULTI/EXEC block this instance was suddenly /* If inside the MULTI/EXEC block this instance was suddenly
* switched from master to slave (using the SLAVEOF command), the * switched from master to slave (using the SLAVEOF command), the
* initial MULTI was propagated into the replication backlog, but the * initial MULTI was propagated into the replication backlog, but the
Expand All @@ -264,6 +263,7 @@ void execCommand(client *c) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd)); feedReplicationBacklog(execcmd,strlen(execcmd));
} }
afterPropagateExec();
} }


server.in_exec = 0; server.in_exec = 0;
Expand Down
5 changes: 5 additions & 0 deletions src/server.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3158,6 +3158,7 @@ void initServer(void) {
server.clients_pending_write = listCreate(); server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate(); server.clients_pending_read = listCreate();
server.clients_timeout_table = raxNew(); server.clients_timeout_table = raxNew();
server.replication_allowed = 1;
server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate(); server.unblocked_clients = listCreate();
server.ready_keys = listCreate(); server.ready_keys = listCreate();
Expand Down Expand Up @@ -3502,6 +3503,7 @@ void redisOpArrayFree(redisOpArray *oa) {
zfree(op->argv); zfree(op->argv);
} }
zfree(oa->ops); zfree(oa->ops);
oa->ops = NULL;
} }


/* ====================== Commands lookup and execution ===================== */ /* ====================== Commands lookup and execution ===================== */
Expand Down Expand Up @@ -3552,6 +3554,9 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags) int flags)
{ {
if (!server.replication_allowed)
return;

/* Propagate a MULTI request once we encounter the first command which /* Propagate a MULTI request once we encounter the first command which
* is a write command. * is a write command.
* This way we'll deliver the MULTI/..../EXEC block as a whole and * This way we'll deliver the MULTI/..../EXEC block as a whole and
Expand Down
4 changes: 3 additions & 1 deletion src/server.h
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1400,6 +1400,7 @@ struct redisServer {
int child_info_nread; /* Num of bytes of the last read from pipe */ int child_info_nread; /* Num of bytes of the last read from pipe */
/* Propagation of commands in AOF / replication */ /* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */ redisOpArray also_propagate; /* Additional command to propagate. */
int replication_allowed; /* Are we allowed to replicate? */
/* Logging */ /* Logging */
char *logfile; /* Path of log file */ char *logfile; /* Path of log file */
int syslog_enabled; /* Is syslog enabled? */ int syslog_enabled; /* Is syslog enabled? */
Expand Down Expand Up @@ -1932,7 +1933,8 @@ void flagTransaction(client *c);
void execCommandAbort(client *c, sds error); void execCommandAbort(client *c, sds error);
void execCommandPropagateMulti(int dbid); void execCommandPropagateMulti(int dbid);
void execCommandPropagateExec(int dbid); void execCommandPropagateExec(int dbid);
void beforePropagateMultiOrExec(int multi); void beforePropagateMulti();
void afterPropagateExec();


/* Redis object implementation */ /* Redis object implementation */
void decrRefCount(robj *o); void decrRefCount(robj *o);
Expand Down
74 changes: 74 additions & 0 deletions tests/modules/propagate.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -70,6 +70,43 @@ int propagateTestTimerCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
return REDISMODULE_OK; return REDISMODULE_OK;
} }


/* Timer callback. */
void timerNestedHandler(RedisModuleCtx *ctx, void *data) {
int repl = (long long)data;

/* The goal is the trigger a module command that calls RM_Replicate
* in order to test MULTI/EXEC structre */
RedisModule_Replicate(ctx,"INCRBY","cc","timer-nested-start","1");
RedisModule_Call(ctx,"propagate-test.nested", repl? "!" : "");
RedisModule_Replicate(ctx,"INCRBY","cc","timer-nested-end","1");
}

int propagateTestTimerNestedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);

RedisModuleTimerID timer_id =
RedisModule_CreateTimer(ctx,100,timerNestedHandler,(void*)0);
REDISMODULE_NOT_USED(timer_id);

RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

int propagateTestTimerNestedReplCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);

RedisModuleTimerID timer_id =
RedisModule_CreateTimer(ctx,100,timerNestedHandler,(void*)1);
REDISMODULE_NOT_USED(timer_id);

RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

/* The thread entry point. */ /* The thread entry point. */
void *threadMain(void *arg) { void *threadMain(void *arg) {
REDISMODULE_NOT_USED(arg); REDISMODULE_NOT_USED(arg);
Expand Down Expand Up @@ -131,6 +168,28 @@ int propagateTestMixedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
return REDISMODULE_OK; return REDISMODULE_OK;
} }


int propagateTestNestedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleCallReply *reply;

/* This test mixes multiple propagation systems. */
reply = RedisModule_Call(ctx, "INCR", "c!", "using-call");
RedisModule_FreeCallReply(reply);

RedisModule_Call(ctx,"propagate-test.simple", "!");

RedisModule_Replicate(ctx,"INCR","c","counter-3");
RedisModule_Replicate(ctx,"INCR","c","counter-4");

reply = RedisModule_Call(ctx, "INCR", "c!", "after-call");
RedisModule_FreeCallReply(reply);

RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc); REDISMODULE_NOT_USED(argc);
Expand All @@ -143,6 +202,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"",1,1,1) == REDISMODULE_ERR) "",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR; return REDISMODULE_ERR;


if (RedisModule_CreateCommand(ctx,"propagate-test.timer-nested",
propagateTestTimerNestedCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.timer-nested-repl",
propagateTestTimerNestedReplCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.thread", if (RedisModule_CreateCommand(ctx,"propagate-test.thread",
propagateTestThreadCommand, propagateTestThreadCommand,
"",1,1,1) == REDISMODULE_ERR) "",1,1,1) == REDISMODULE_ERR)
Expand All @@ -158,5 +227,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"",1,1,1) == REDISMODULE_ERR) "",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR; return REDISMODULE_ERR;


if (RedisModule_CreateCommand(ctx,"propagate-test.nested",
propagateTestNestedCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

return REDISMODULE_OK; return REDISMODULE_OK;
} }
53 changes: 53 additions & 0 deletions tests/unit/moduleapi/propagate.tcl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -42,6 +42,59 @@ tags "modules" {
close_replication_stream $repl close_replication_stream $repl
} }


test {module propagates nested ctx case1} {
set repl [attach_to_replication_stream]

$master del timer-nested-start
$master del timer-nested-end
$master propagate-test.timer-nested

wait_for_condition 5000 10 {
[$replica get timer-nested-end] eq "1"
} else {
fail "The two counters don't match the expected value."
}

assert_replication_stream $repl {
{select *}
{multi}
{incrby timer-nested-start 1}
{incrby timer-nested-end 1}
{exec}
}
close_replication_stream $repl
}

test {module propagates nested ctx case2} {
set repl [attach_to_replication_stream]

$master del timer-nested-start
$master del timer-nested-end
$master propagate-test.timer-nested-repl

wait_for_condition 5000 10 {
[$replica get timer-nested-end] eq "1"
} else {
fail "The two counters don't match the expected value."
}

# Note the 'after-call' and 'timer-nested-start' propagation below is out of order (known limitation)
assert_replication_stream $repl {
{select *}
{multi}
{incr using-call}
{incr counter-1}
{incr counter-2}
{incr after-call}
{incr counter-3}
{incr counter-4}
{incrby timer-nested-start 1}
{incrby timer-nested-end 1}
{exec}
}
close_replication_stream $repl
}

test {module propagates from thread} { test {module propagates from thread} {
set repl [attach_to_replication_stream] set repl [attach_to_replication_stream]


Expand Down