Skip to content

Commit

Permalink
Add user/client name to MONITOR and SLOWLOG
Browse files Browse the repository at this point in the history
  • Loading branch information
guybe7 committed Feb 5, 2020
1 parent bf53f92 commit ba601f2
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 19 deletions.
28 changes: 17 additions & 11 deletions src/module.c
Expand Up @@ -3212,6 +3212,19 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int
return NULL;
}

/* Create a connection-less client for various module
* operations (RM_Call, blocked client, etc.) */
client *moduleCreateClient(client *caller) {
client *c = createClient(NULL);
c->user = NULL; /* Root user. */
c->flags |= CLIENT_MODULE;
if (caller && caller->name) {
c->name = caller->name;
incrRefCount(caller->name);
}
return c;
}

/* Exported API to call any Redis command from modules.
* On success a RedisModuleCallReply object is returned, otherwise
* NULL is returned and errno is set to the following values:
Expand All @@ -3237,14 +3250,12 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch

/* Create the client and dispatch the command. */
va_start(ap, fmt);
c = createClient(NULL);
c->user = NULL; /* Root user. */
c = moduleCreateClient(ctx->client);
argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
replicate = flags & REDISMODULE_ARGV_REPLICATE;
va_end(ap);

/* Setup our fake client for command execution. */
c->flags |= CLIENT_MODULE;
c->db = ctx->client->db;
c->argv = argv;
c->argc = argc;
Expand Down Expand Up @@ -4334,8 +4345,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
bc->free_privdata = free_privdata;
bc->privdata = privdata;
bc->reply_client = createClient(NULL);
bc->reply_client->flags |= CLIENT_MODULE;
bc->reply_client = moduleCreateClient(c);
bc->dbid = c->db->id;
bc->blocked_on_keys = keys != NULL;
bc->unblocked = 0;
Expand Down Expand Up @@ -7024,9 +7034,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
if (ModulesInHooks == 0) {
ctx.client = moduleFreeContextReusedClient;
} else {
ctx.client = createClient(NULL);
ctx.client->flags |= CLIENT_MODULE;
ctx.client->user = NULL; /* Root user. */
ctx.client = moduleCreateClient(NULL);
}

void *moduledata = NULL;
Expand Down Expand Up @@ -7158,9 +7166,7 @@ void moduleInitModulesSystem(void) {

/* Set up the keyspace notification susbscriber list and static client */
moduleKeyspaceSubscribers = listCreate();
moduleFreeContextReusedClient = createClient(NULL);
moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
moduleFreeContextReusedClient->user = NULL; /* root user. */
moduleFreeContextReusedClient = moduleCreateClient(NULL);

/* Set up filter list */
moduleCommandFilters = listCreate();
Expand Down
16 changes: 11 additions & 5 deletions src/replication.c
Expand Up @@ -302,22 +302,28 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
}
}

void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
void replicationFeedMonitors(client *caller, list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
sds cmdrepr = sdsnew("+");
robj *cmdobj;
struct timeval tv;
client *c = (caller->flags & CLIENT_LUA) ? server.lua_caller : caller;

gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
if (c->flags & CLIENT_LUA) {
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld %s %s ",
(long)tv.tv_sec,(long)tv.tv_usec,
c->user ? c->user->name : "(root)",
c->name ? (char*)c->name->ptr : "(nil)");
if (caller->flags & CLIENT_LUA) {
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
} else if (c->flags & CLIENT_UNIX_SOCKET) {
} else if (caller->flags & CLIENT_MODULE) {
cmdrepr = sdscatprintf(cmdrepr,"[%d module] ",dictid);
} else if (caller->flags & CLIENT_UNIX_SOCKET) {
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
} else {
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(caller));
}

for (j = 0; j < argc; j++) {
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Expand Up @@ -1740,7 +1740,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void replicationFeedMonitors(client *caller, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
void replicationHandleMasterDisconnection(void);
Expand Down
5 changes: 4 additions & 1 deletion src/slowlog.c
Expand Up @@ -90,6 +90,7 @@ slowlogEntry *slowlogCreateEntry(client *c, robj **argv, int argc, long long dur
se->id = server.slowlog_entry_id++;
se->peerid = sdsnew(getClientPeerId(c));
se->cname = c->name ? sdsnew(c->name->ptr) : sdsempty();
se->username = c->user ? sdsnew(c->user->name) : sdsempty();
return se;
}

Expand All @@ -106,6 +107,7 @@ void slowlogFreeEntry(void *septr) {
zfree(se->argv);
sdsfree(se->peerid);
sdsfree(se->cname);
sdsfree(se->username);
zfree(se);
}

Expand Down Expand Up @@ -174,7 +176,7 @@ NULL
int j;

se = ln->value;
addReplyArrayLen(c,6);
addReplyArrayLen(c,7);
addReplyLongLong(c,se->id);
addReplyLongLong(c,se->time);
addReplyLongLong(c,se->duration);
Expand All @@ -183,6 +185,7 @@ NULL
addReplyBulk(c,se->argv[j]);
addReplyBulkCBuffer(c,se->peerid,sdslen(se->peerid));
addReplyBulkCBuffer(c,se->cname,sdslen(se->cname));
addReplyBulkCBuffer(c,se->username,sdslen(se->username));
sent++;
}
setDeferredArrayLen(c,totentries,sent);
Expand Down
1 change: 1 addition & 0 deletions src/slowlog.h
Expand Up @@ -41,6 +41,7 @@ typedef struct slowlogEntry {
long long duration; /* Time spent by the query, in microseconds. */
time_t time; /* Unix time at which the query was executed. */
sds cname; /* Client name. */
sds username; /* User name. */
sds peerid; /* Client network address. */
} slowlogEntry;

Expand Down
2 changes: 2 additions & 0 deletions tests/modules/blockonkeys.c
Expand Up @@ -113,6 +113,8 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (fsl->length >= 2)
RedisModule_SignalKeyAsReady(ctx, argv[1]);

RedisModule_Call(ctx, "incr", "c", "asdf");

return RedisModule_ReplyWithSimpleString(ctx, "OK");
}

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/slowlog.tcl
Expand Up @@ -34,7 +34,7 @@ start_server {tags {"slowlog"} overrides {slowlog-log-slower-than 1000000}} {
r client setname foobar
r debug sleep 0.2
set e [lindex [r slowlog get] 0]
assert_equal [llength $e] 6
assert_equal [llength $e] 7
assert_equal [lindex $e 0] 105
assert_equal [expr {[lindex $e 2] > 100000}] 1
assert_equal [lindex $e 3] {debug sleep 0.2}
Expand Down

0 comments on commit ba601f2

Please sign in to comment.