Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Lua transaction support #2701

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,11 @@ aof-load-truncated yes
# Set it to 0 or a negative value for unlimited execution without warnings.
lua-time-limit 5000

# Lua scripts can be run transactionally using EVALTXN or EVALSHATXN. Setting
# lua-all-transactions to 'yes' will force Lua script execution with EVAL and
# EVALSHA to use the same semantics as EVALTXN and EVALSHATXN.
lua-all-transactions no

################################ REDIS CLUSTER ###############################
#
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Expand Down
104 changes: 104 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -4440,6 +4440,110 @@ void restoreCommand(client *c) {
server.dirty++;
}

void prepareRollback(struct redisCommand *cmd, client *c) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this being in cluster.c? It does work in normal Redis, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three reasons:

  1. I didn't know where else to put it (so I am happy to move it)
  2. The current implementation is an automatic dump/restore
  3. Most of the guts of dump/restore are defined in cluster.c, implemented earlier in the file

Where should I move it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure where to move. Just wondered if this was a thoughtful decision or just happened to be there for no reason.

/* We need to get the list of keys being used by this command to
* determine which keys we have or have not persisted yet. */
int *keys, numkeys, j, skip;
rollbackItem* rb;
robj *o;

/* Todo: switch to using a dict instead of a singly-linked list. */
keys = getKeysFromCommand(cmd, c->argv, c->argc, &numkeys);
for (j = 0; j < numkeys; j++) {
skip = 0;
for (rb = server.lua_rollback; rb != (rollbackItem*)(1); rb = rb->next) {
if (strcmp(c->argv[keys[j]]->ptr, rb->key->ptr) == 0) {
skip = 1;
break;
}
}
if (skip) continue;

rb = zmalloc(sizeof(rollbackItem));
/* Set the key. */
rb->key = c->argv[keys[j]];
rb->key->refcount++;

if ((o = lookupKeyRead(c->db,c->argv[keys[j]])) != NULL) {
/* Get the dump, data is: rb->dump->io.buffer.ptr */
rb->dump = zmalloc(sizeof(rio));
createDumpPayload(rb->dump, o);

/* Get the ttl. */
rb->ttl = getExpire(c->db, rb->key);
if (rb->ttl != -1) rb->ttl -= mstime();
if (rb->ttl < 0) rb->ttl = 0;
} else {
/* No key, no dump. */
rb->dump = NULL;
rb->ttl = 0;
}
/* Update the rollback pointer. */
rb->next = server.lua_rollback;
server.lua_rollback = rb;
}
getKeysFreeResult(keys);
}

void cleanRollbackItem() {
rollbackItem *current = server.lua_rollback;
if (current && current != (rollbackItem*)(1)) {
if (current->dump != NULL) {
/* Handle dump cleanup. */
sdsfree(current->dump->io.buffer.ptr);
zfree(current->dump);
}

/* Handle key and rollback item cleanup. */
decrRefCount(current->key);
server.lua_rollback = current->next;
zfree(current);
}
}

void restoreFromRollback(client *c) {
int type;
robj *obj;

/* Ensure this is called from a transaction. */
if (server.lua_rollback != NULL) {
while (server.lua_rollback != (rollbackItem*)(1)) {
/* Delete the modified key as necessary. */
dbDelete(c->db, server.lua_rollback->key);

if (server.lua_rollback->dump != NULL) {
/* Restore the old data. */
server.lua_rollback->dump->io.buffer.pos = 0;
type = rdbLoadObjectType(server.lua_rollback->dump);
obj = rdbLoadObject(type, server.lua_rollback->dump);

/* Create the key and set the TTL if any. */
dbAdd(c->db, server.lua_rollback->key, obj);
if (server.lua_rollback->ttl) {
setExpire(c->db, server.lua_rollback->key, mstime() + server.lua_rollback->ttl);
}
}
/* Clean up the item we just finished rolling back. */
cleanRollbackItem();
}
}
/* Done rolling back. */
server.lua_rolled_back = server.lua_rollback != NULL;
server.lua_rollback = NULL;
}

void cleanRollbackBuffer() {
if (server.lua_rollback != NULL) {
while (server.lua_rollback != (rollbackItem*)(1)) {
/* Clean up this item. */
cleanRollbackItem();
}
}
/* Done cleaning up rollback buffer. */
server.lua_rollback = NULL;
}


/* MIGRATE socket cache implementation.
*
* We take a map between host:ip and a TCP socket that we used to connect
Expand Down
3 changes: 3 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ typedef struct {
master is up. */

/* ---------------------- API exported outside cluster.c -------------------- */
void prepareRollback(struct redisCommand *cmd, client *c);
void restoreFromRollback(client *c);
void cleanRollbackBuffer();
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
int clusterRedirectBlockedClientIfNeeded(client *c);
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
Expand Down
8 changes: 8 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ void loadServerConfigFromString(char *config) {
}
} else if (!strcasecmp(argv[0],"lua-time-limit") && argc == 2) {
server.lua_time_limit = strtoll(argv[1],NULL,10);
} else if (!strcasecmp(argv[0],"lua-all-transactions") && argc == 2) {
if ((server.lua_all_transactions = yesnotoi(argv[1])) == -1)
{
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"slowlog-log-slower-than") &&
argc == 2)
{
Expand Down Expand Up @@ -875,6 +880,9 @@ void configSetCommand(client *c) {
"activerehashing",server.activerehashing) {
} config_set_bool_field(
"stop-writes-on-bgsave-error",server.stop_writes_on_bgsave_err) {
} config_set_bool_field(
"lua-all-transactions",server.lua_all_transactions) {


/* Numerical fields.
* config_set_numerical_field(name,var,min,max) */
Expand Down
92 changes: 75 additions & 17 deletions src/scripting.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
}

if (cmd->flags & CMD_RANDOM) server.lua_random_dirty = 1;
if (cmd->flags & CMD_WRITE) server.lua_write_dirty = 1;
if (cmd->flags & CMD_WRITE) {
server.lua_write_dirty = 1;
if (server.lua_rollback) prepareRollback(cmd, c);
}


/* If this is a Redis Cluster node, we need to make sure Lua is not
* trying to access non-local keys, with the exception of commands
Expand Down Expand Up @@ -434,6 +438,8 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
argv_size = 0;
}

/* Handle rollback. */
raise_error = raise_error || server.lua_kill;
if (raise_error) {
/* If we are here we should have an error in the stack, in the
* form of a table with an "err" field. Extract the string to
Expand Down Expand Up @@ -537,6 +543,11 @@ int luaLogCommand(lua_State *lua) {
return 0;
}

int luaRollbackCommand(lua_State *lua) {
lua_pushstring(lua,"Rolling back due to script request.");
return lua_error(lua);
}

void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
long long elapsed;
UNUSED(ar);
Expand All @@ -551,12 +562,21 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
* we need to mask the client executing the script from the event loop.
* If we don't do that the client may disconnect and could no longer be
* here when the EVAL command will return. */
aeDeleteFileEvent(server.el, server.lua_caller->fd, AE_READABLE);
if (!server.lua_rollback) {
/* If server.lua_rollback is nonzero here, then we've already
* handled this part, so we can skip it. */
aeDeleteFileEvent(server.el, server.lua_caller->fd, AE_READABLE);
}
}
if (server.lua_timedout) processEventsWhileBlocked();
if (server.lua_timedout || server.lua_rollback) processEventsWhileBlocked();
if (server.lua_kill) {
serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL.");
lua_pushstring(lua,"Script killed by user with SCRIPT KILL...");
if (server.lua_write_dirty) {
serverLog(LL_WARNING,"Lua script killed by user with SCRIPT ROLLBACK.");
lua_pushstring(lua,"Script killed by user with SCRIPT ROLLBACK...");
} else {
serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL.");
lua_pushstring(lua,"Script killed by user with SCRIPT KILL...");
}
lua_error(lua);
}
}
Expand Down Expand Up @@ -684,6 +704,11 @@ void scriptingInit(void) {
lua_pushnumber(lua,LL_WARNING);
lua_settable(lua,-3);

/* internal rollback support */
lua_pushstring(lua,"ROLLBACK");
lua_pushcfunction(lua,luaRollbackCommand);
lua_settable(lua,-3);

/* redis.sha1hex */
lua_pushstring(lua, "sha1hex");
lua_pushcfunction(lua, luaRedisSha1hexCommand);
Expand Down Expand Up @@ -924,7 +949,10 @@ void evalGenericCommand(client *c, int evalsha) {
lua_State *lua = server.lua;
char funcname[43];
long long numkeys;
int delhook = 0, err;
int delhook = 0, err, txn = server.lua_all_transactions;
if (evalsha) txn = txn || strcasecmp(c->argv[0]->ptr,"evalshatxn") == 0;
else txn = txn || strcasecmp(c->argv[0]->ptr,"evaltxn") == 0;


/* We want the same PRNG sequence at every call so that our PRNG is
* not affected by external state. */
Expand All @@ -940,6 +968,7 @@ void evalGenericCommand(client *c, int evalsha) {
* is called after a random command was used. */
server.lua_random_dirty = 0;
server.lua_write_dirty = 0;
server.lua_rollback = (rollbackItem*)(long long)txn;

/* Get the number of arguments that are keys */
if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK)
Expand Down Expand Up @@ -1009,14 +1038,21 @@ void evalGenericCommand(client *c, int evalsha) {

/* Set a hook in order to be able to stop the script execution if it
* is running for too much time.
* We set the hook only if the time limit is enabled as the hook will
* make the Lua script execution slower. */
* We set the hook if the time limit is enabled or if transactions are
* enabled, as the hook will make the Lua script execution slower. */
server.lua_caller = c;
server.lua_time_start = mstime();
server.lua_kill = 0;
if (server.lua_time_limit > 0 && server.masterhost == NULL) {
if ((txn || server.lua_time_limit > 0) && server.masterhost == NULL) {
lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);
delhook = 1;
if (txn) {
/* We'll disable the read here for the same reason why we disable
* it above in luaMaskCountHook on command timeout, we're just
* doing it early here because a transaction can be killed at any
* time. */
aeDeleteFileEvent(server.el, server.lua_caller->fd, AE_READABLE);
}
}

/* At this point whether this script was never seen before or if it was
Expand All @@ -1026,7 +1062,7 @@ void evalGenericCommand(client *c, int evalsha) {

/* Perform some cleanup that we need to do both on error and success. */
if (delhook) lua_sethook(lua,luaMaskCountHook,0,0); /* Disable hook */
if (server.lua_timedout) {
if (server.lua_timedout || txn) {
server.lua_timedout = 0;
/* Restore the readable handler that was unregistered when the
* script timeout was detected. */
Expand All @@ -1053,15 +1089,28 @@ void evalGenericCommand(client *c, int evalsha) {
}

if (err) {
addReplyErrorFormat(c,"Error running script (call to %s): %s\n",
funcname, lua_tostring(lua,-1));
if (server.lua_rollback != NULL) {
/* When running the script as a transaction, we revert any writes
* that occurred if there was an error during script execution. This
* also allows us to kill a script as it is executing, even if the
* script has already written to Redis. */
restoreFromRollback(c);
addReplyErrorFormat(c,"Error running script, all writes rolled back (call to %s): %s\n",
funcname, lua_tostring(lua,-1));
} else {
addReplyErrorFormat(c,"Error running script (call to %s): %s\n",
funcname, lua_tostring(lua,-1));
}
lua_pop(lua,2); /* Consume the Lua reply and remove error handler. */
} else {
/* On success convert the Lua return value into Redis protocol, and
* send it to * the client. */
luaReplyToRedisReply(c,lua); /* Convert and consume the reply. */
lua_pop(lua,1); /* Remove the error handler. */
}
/* Clean out the Lua rollback buffer as necessary. */
cleanRollbackBuffer();

/* On success convert the Lua return value into Redis protocol, and
* send it to the client. */
luaReplyToRedisReply(c,lua); /* Convert and consume the reply. */
lua_pop(lua,1); /* Remove the error handler. */
}

/* EVALSHA should be propagated to Slave and AOF file as full EVAL, unless
* we are sure that the script was already in the context of all the
Expand Down Expand Up @@ -1192,6 +1241,15 @@ void scriptCommand(client *c) {
server.lua_kill = 1;
addReply(c,shared.ok);
}
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"rollback")) {
if (server.lua_caller == NULL) {
addReplySds(c,sdsnew("-NOTBUSY No scripts in execution right now.\r\n"));
} else if (server.lua_write_dirty && !server.lua_rollback) {
addReplySds(c,sdsnew("-UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.\r\n"));
} else {
server.lua_kill = 1;
addReply(c,shared.ok);
}
} else {
addReplyError(c, "Unknown SCRIPT subcommand or wrong # of args.");
}
Expand Down
22 changes: 19 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ struct redisServer server; /* server global state */
* is deterministic.
* l: Allow command while loading the database.
* t: Allow command while a slave has stale data but is not allowed to
* server this data. Normally no command is accepted in this condition
* serve this data. Normally no command is accepted in this condition
* but just a few.
* M: Do not automatically propagate the command on MONITOR.
* k: Perform an implicit ASKING for this command, so the command will be
Expand Down Expand Up @@ -273,6 +273,8 @@ struct redisCommand redisCommandTable[] = {
{"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
{"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
{"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
{"evaltxn",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
{"evalshatxn",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
{"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
{"script",scriptCommand,-2,"rs",0,NULL,0,0,0,0,0},
{"time",timeCommand,1,"rRF",0,NULL,0,0,0,0,0},
Expand Down Expand Up @@ -1352,7 +1354,7 @@ void createSharedObjects(void) {
shared.loadingerr = createObject(OBJ_STRING,sdsnew(
"-LOADING Redis is loading the dataset in memory\r\n"));
shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
"-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
"-BUSY Redis is busy running a script. You can only call SCRIPT KILL, SCRIPT ROLLBACK, or SHUTDOWN NOSAVE.\r\n"));
shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
"-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n"));
shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
Expand Down Expand Up @@ -1495,6 +1497,9 @@ void initServerConfig(void) {
server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT;
server.lua_client = NULL;
server.lua_timedout = 0;
server.lua_rollback = NULL;
server.lua_all_transactions = 0;
server.lua_rolled_back = 0;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.next_client_id = 1; /* Client IDs, start from 1 .*/
server.loading_process_events_interval_bytes = (1024*1024*2);
Expand Down Expand Up @@ -2135,6 +2140,16 @@ void call(client *c, int flags) {
c->cmd->calls++;
}

/* This server rolled back the script and the changes weren't applied. This
* flag is only set at the end of change rollback, so we can be sure that if
* it is set here, then we shouldn't replicate the call. */
if (server.lua_rolled_back &&
(c->cmd->proc == evalCommand ||
c->cmd->proc == evalShaCommand)) {
flags = PROPAGATE_NONE;
server.lua_rolled_back = 0;
}

/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) == 0) {
int flags = PROPAGATE_NONE;
Expand Down Expand Up @@ -2337,7 +2352,8 @@ int processCommand(client *c) {
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
(tolower(((char*)c->argv[1]->ptr)[0]) == 'k' ||
tolower(((char*)c->argv[1]->ptr)[0]) == 'r')))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
Expand Down
Loading