diff --git a/src/ae.c b/src/ae.c index b6a1ce0b10..36b6131410 100644 --- a/src/ae.c +++ b/src/ae.c @@ -63,6 +63,15 @@ #endif #endif +#define AE_LOCK(eventLoop) \ + if ((eventLoop)->flags & AE_PROTECT_POLL) { \ + assert(pthread_mutex_lock(&(eventLoop)->poll_mutex) == 0); \ + } + +#define AE_UNLOCK(eventLoop) \ + if ((eventLoop)->flags & AE_PROTECT_POLL) { \ + assert(pthread_mutex_unlock(&(eventLoop)->poll_mutex) == 0); \ + } aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; @@ -81,7 +90,14 @@ aeEventLoop *aeCreateEventLoop(int setsize) { eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; + eventLoop->custompoll = NULL; eventLoop->flags = 0; + /* Initialize the eventloop mutex with PTHREAD_MUTEX_ERRORCHECK type */ + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); + if (pthread_mutex_init(&eventLoop->poll_mutex, &attr) != 0) goto err; + if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ @@ -122,11 +138,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait) { * * Otherwise AE_OK is returned and the operation is successful. */ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { + AE_LOCK(eventLoop); + int ret = AE_OK; int i; - if (setsize == eventLoop->setsize) return AE_OK; - if (eventLoop->maxfd >= setsize) return AE_ERR; - if (aeApiResize(eventLoop, setsize) == -1) return AE_ERR; + if (setsize == eventLoop->setsize) goto done; + if (eventLoop->maxfd >= setsize) goto err; + if (aeApiResize(eventLoop, setsize) == -1) goto err; eventLoop->events = zrealloc(eventLoop->events, sizeof(aeFileEvent) * setsize); eventLoop->fired = zrealloc(eventLoop->fired, sizeof(aeFiredEvent) * setsize); @@ -135,7 +153,13 @@ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { /* Make sure that if we created new slots, they are initialized with * an AE_NONE mask. */ for (i = eventLoop->maxfd + 1; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; - return AE_OK; + goto done; + +err: + ret = AE_ERR; +done: + AE_UNLOCK(eventLoop); + return ret; } void aeDeleteEventLoop(aeEventLoop *eventLoop) { @@ -159,25 +183,35 @@ void aeStop(aeEventLoop *eventLoop) { } int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { + AE_LOCK(eventLoop); + int ret = AE_ERR; + if (fd >= eventLoop->setsize) { errno = ERANGE; - return AE_ERR; + goto done; } aeFileEvent *fe = &eventLoop->events[fd]; - if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; + if (aeApiAddEvent(eventLoop, fd, mask) == -1) goto done; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; - return AE_OK; + + ret = AE_OK; + +done: + AE_UNLOCK(eventLoop); + return ret; } void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { - if (fd >= eventLoop->setsize) return; + AE_LOCK(eventLoop); + if (fd >= eventLoop->setsize) goto done; + aeFileEvent *fe = &eventLoop->events[fd]; - if (fe->mask == AE_NONE) return; + if (fe->mask == AE_NONE) goto done; /* We want to always remove AE_BARRIER if set when AE_WRITABLE * is removed. */ @@ -204,6 +238,9 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { * which is required by evport and epoll */ aeApiDelEvent(eventLoop, fd, mask); } + +done: + AE_UNLOCK(eventLoop); } void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) { @@ -345,6 +382,17 @@ static int processTimeEvents(aeEventLoop *eventLoop) { return processed; } +/* This function provides direct access to the aeApiPoll call. + * It is intended to be called from a custom poll function.*/ +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) { + AE_LOCK(eventLoop); + + int ret = aeApiPoll(eventLoop, tvp); + + AE_UNLOCK(eventLoop); + return ret; +} + /* Process every pending file event, then every pending time event * (that may be registered by file event callbacks just processed). * Without special flags the function sleeps until some file event @@ -377,25 +425,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop); - /* The eventLoop->flags may be changed inside beforesleep. - * So we should check it after beforesleep be called. At the same time, - * the parameter flags always should have the highest priority. - * That is to say, once the parameter flag is set to AE_DONT_WAIT, - * no matter what value eventLoop->flags is set to, we should ignore it. */ - if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { - tv.tv_sec = tv.tv_usec = 0; - tvp = &tv; - } else if (flags & AE_TIME_EVENTS) { - usUntilTimer = usUntilEarliestTimer(eventLoop); - if (usUntilTimer >= 0) { - tv.tv_sec = usUntilTimer / 1000000; - tv.tv_usec = usUntilTimer % 1000000; + if (eventLoop->custompoll != NULL) { + numevents = eventLoop->custompoll(eventLoop); + } else { + /* The eventLoop->flags may be changed inside beforesleep. + * So we should check it after beforesleep be called. At the same time, + * the parameter flags always should have the highest priority. + * That is to say, once the parameter flag is set to AE_DONT_WAIT, + * no matter what value eventLoop->flags is set to, we should ignore it. */ + if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { + tv.tv_sec = tv.tv_usec = 0; tvp = &tv; + } else if (flags & AE_TIME_EVENTS) { + usUntilTimer = usUntilEarliestTimer(eventLoop); + if (usUntilTimer >= 0) { + tv.tv_sec = usUntilTimer / 1000000; + tv.tv_usec = usUntilTimer % 1000000; + tvp = &tv; + } } + /* Call the multiplexing API, will return only on timeout or when + * some event fires. */ + numevents = aeApiPoll(eventLoop, tvp); } - /* Call the multiplexing API, will return only on timeout or when - * some event fires. */ - numevents = aeApiPoll(eventLoop, tvp); /* Don't process file events if not requested. */ if (!(flags & AE_FILE_EVENTS)) { @@ -503,3 +555,17 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) { eventLoop->aftersleep = aftersleep; } + +/* This function allows setting a custom poll procedure to be used by the event loop. + * The custom poll procedure, if set, will be called instead of the default aeApiPoll */ +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) { + eventLoop->custompoll = custompoll; +} + +void aeSetPollProtect(aeEventLoop *eventLoop, int protect) { + if (protect) { + eventLoop->flags |= AE_PROTECT_POLL; + } else { + eventLoop->flags &= ~AE_PROTECT_POLL; + } +} diff --git a/src/ae.h b/src/ae.h index 3b1c96a01d..652b42a8f5 100644 --- a/src/ae.h +++ b/src/ae.h @@ -34,6 +34,7 @@ #define __AE_H__ #include "monotonic.h" +#include #define AE_OK 0 #define AE_ERR -1 @@ -54,6 +55,7 @@ #define AE_DONT_WAIT (1 << 2) #define AE_CALL_BEFORE_SLEEP (1 << 3) #define AE_CALL_AFTER_SLEEP (1 << 4) +#define AE_PROTECT_POLL (1 << 5) #define AE_NOMORE -1 #define AE_DELETED_EVENT_ID -1 @@ -61,6 +63,7 @@ /* Macros */ #define AE_NOTUSED(V) ((void)V) +struct timeval; /* forward declaration */ struct aeEventLoop; /* Types and data structures */ @@ -69,6 +72,7 @@ typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *client typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents); +typedef int aeCustomPollProc(struct aeEventLoop *eventLoop); /* File event structure */ typedef struct aeFileEvent { @@ -109,6 +113,8 @@ typedef struct aeEventLoop { void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeAfterSleepProc *aftersleep; + aeCustomPollProc *custompoll; + pthread_mutex_t poll_mutex; int flags; } aeEventLoop; @@ -132,6 +138,9 @@ void aeMain(aeEventLoop *eventLoop); char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep); +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll); +void aeSetPollProtect(aeEventLoop *eventLoop, int protect); +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeSetDontWait(aeEventLoop *eventLoop, int noWait); diff --git a/src/db.c b/src/db.c index 5a6562a1e2..073bec6ca3 100644 --- a/src/db.c +++ b/src/db.c @@ -32,6 +32,7 @@ #include "latency.h" #include "script.h" #include "functions.h" +#include "io_threads.h" #include #include @@ -297,7 +298,10 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn old = dictGetVal(de); } kvstoreDictSetVal(db->keys, slot, de, val); - if (server.lazyfree_lazy_server_del) { + /* For efficiency, let the I/O thread that allocated an object also deallocate it. */ + if (tryOffloadFreeObjToIOThreads(old) == C_OK) { + /* OK */ + } else if (server.lazyfree_lazy_server_del) { freeObjAsync(key, old, db->id); } else { decrRefCount(old); diff --git a/src/dict.c b/src/dict.c index 280f0b6abc..2eb3dd386f 100644 --- a/src/dict.c +++ b/src/dict.c @@ -329,6 +329,12 @@ int _dictResize(dict *d, unsigned long size, int *malloc_failed) { return DICT_OK; } + if (d->type->no_incremental_rehash) { + /* If the dict type does not support incremental rehashing, we need to + * rehash the whole table immediately. */ + while (dictRehash(d, 1000)); + } + return DICT_OK; } diff --git a/src/dict.h b/src/dict.h index a7c5c71826..97a79910cb 100644 --- a/src/dict.h +++ b/src/dict.h @@ -87,6 +87,8 @@ typedef struct dictType { /* If embedded_entry flag is set, it indicates that a copy of the key is created and the key is embedded * as part of the dict entry. */ unsigned int embedded_entry : 1; + /* Perform rehashing during resizing instead of incrementally rehashing across multiple steps */ + unsigned int no_incremental_rehash : 1; } dictType; #define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1 << (exp)) diff --git a/src/io_threads.c b/src/io_threads.c index 6149febabc..95d5895d03 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -124,6 +124,22 @@ int inMainThread(void) { return thread_id == 0; } +int getIOThreadID(void) { + return thread_id; +} + +/* Drains the I/O threads queue by waiting for all jobs to be processed. + * This function must be called from the main thread. */ +void drainIOThreadsQueue(void) { + serverAssert(inMainThread()); + for (int i = 1; i < IO_THREADS_MAX_NUM; i++) { /* No need to drain thread 0, which is the main thread. */ + while (!IOJobQueue_isEmpty(&io_jobs[i])) { + /* memory barrier acquire to get the latest job queue state */ + atomic_thread_fence(memory_order_acquire); + } + } +} + /* Wait until the IO-thread is done with the client */ void waitForClientIO(client *c) { /* No need to wait if the client was not offloaded to the IO thread. */ @@ -177,6 +193,17 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only) { } } +/* This function performs polling on the given event loop and updates the server's + * IO fired events count and poll state. */ +void IOThreadPoll(void *data) { + aeEventLoop *el = (aeEventLoop *)data; + struct timeval tvp = {0, 0}; + int num_events = aePoll(el, &tvp); + + server.io_ae_fired_events = num_events; + atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release); +} + static void *IOThreadMain(void *myid) { /* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */ long id = (long)myid; @@ -267,6 +294,8 @@ void killIOThreads(void) { /* Initialize the data structures needed for I/O threads. */ void initIOThreads(void) { server.active_io_threads_num = 1; /* We start with threads not active. */ + server.io_poll_state = AE_IO_STATE_NONE; + server.io_ae_fired_events = 0; /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ @@ -375,3 +404,146 @@ int trySendWriteToIOThreads(client *c) { IOJobQueue_push(jq, ioThreadWriteToClient, c); return C_OK; } + +/* Internal function to free the client's argv in an IO thread. */ +void IOThreadFreeArgv(void *data) { + robj **argv = (robj **)data; + int last_arg = 0; + for (int i = 0;; i++) { + robj *o = argv[i]; + if (o == NULL) { + continue; + } + + /* The main-thread set the refcount to 0 to indicate that this is the last argument to free */ + if (o->refcount == 0) { + last_arg = 1; + o->refcount = 1; + } + + decrRefCount(o); + + if (last_arg) { + break; + } + } + + zfree(argv); +} + +/* This function attempts to offload the client's argv to an IO thread. + * Returns C_OK if the client's argv were successfully offloaded to an IO thread, + * C_ERR otherwise. */ +int tryOffloadFreeArgvToIOThreads(client *c) { + if (server.active_io_threads_num <= 1 || c->argc == 0) { + return C_ERR; + } + + size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1; + + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) { + return C_ERR; + } + + int last_arg_to_free = -1; + + /* Prepare the argv */ + for (int j = 0; j < c->argc; j++) { + if (c->argv[j]->refcount > 1) { + decrRefCount(c->argv[j]); + /* Set argv[j] to NULL to avoid double free */ + c->argv[j] = NULL; + } else { + last_arg_to_free = j; + } + } + + /* If no argv to free, free the argv array at the main thread */ + if (last_arg_to_free == -1) { + zfree(c->argv); + return C_OK; + } + + /* We set the refcount of the last arg to free to 0 to indicate that + * this is the last argument to free. With this approach, we don't need to + * send the argc to the IO thread and we can send just the argv ptr. */ + c->argv[last_arg_to_free]->refcount = 0; + + /* Must succeed as we checked the free space before. */ + IOJobQueue_push(jq, IOThreadFreeArgv, c->argv); + + return C_OK; +} + +/* This function attempts to offload the free of an object to an IO thread. + * Returns C_OK if the object was successfully offloaded to an IO thread, + * C_ERR otherwise.*/ +int tryOffloadFreeObjToIOThreads(robj *obj) { + if (server.active_io_threads_num <= 1) { + return C_ERR; + } + + if (obj->refcount > 1) return C_ERR; + + /* We select the thread ID in a round-robin fashion. */ + size_t tid = (server.stat_io_freed_objects % (server.active_io_threads_num - 1)) + 1; + + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) { + return C_ERR; + } + + IOJobQueue_push(jq, decrRefCountVoid, obj); + server.stat_io_freed_objects++; + return C_OK; +} + +/* This function retrieves the results of the IO Thread poll. + * returns the number of fired events if the IO thread has finished processing poll events, 0 otherwise. */ +static int getIOThreadPollResults(aeEventLoop *eventLoop) { + int io_state; + io_state = atomic_load_explicit(&server.io_poll_state, memory_order_acquire); + if (io_state == AE_IO_STATE_POLL) { + /* IO thread is still processing poll events. */ + return 0; + } + + /* IO thread is done processing poll events. */ + serverAssert(io_state == AE_IO_STATE_DONE); + server.stat_poll_processed_by_io_threads++; + server.io_poll_state = AE_IO_STATE_NONE; + + /* Remove the custom poll proc. */ + aeSetCustomPollProc(eventLoop, NULL); + aeSetPollProtect(eventLoop, 0); + return server.io_ae_fired_events; +} + +void trySendPollJobToIOThreads(void) { + if (server.active_io_threads_num <= 1) { + return; + } + + /* If there are no pending jobs, let the main thread do the poll-wait by itself. */ + if (listLength(server.clients_pending_io_write) + listLength(server.clients_pending_io_read) == 0) { + return; + } + + /* If the IO thread is already processing poll events, don't send another job. */ + if (server.io_poll_state != AE_IO_STATE_NONE) { + return; + } + + /* The poll is sent to the last thread. While a random thread could have been selected, + * the last thread has a slightly better chance of being less loaded compared to other threads, + * As we activate the lowest threads first. */ + int tid = server.active_io_threads_num - 1; + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) return; /* The main thread will handle the poll itself. */ + + server.io_poll_state = AE_IO_STATE_POLL; + aeSetCustomPollProc(server.el, getIOThreadPollResults); + aeSetPollProtect(server.el, 1); + IOJobQueue_push(jq, IOThreadPoll, server.el); +} diff --git a/src/io_threads.h b/src/io_threads.h index 30d1cdad79..f9a9cf762f 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -8,6 +8,10 @@ void killIOThreads(void); int inMainThread(void); int trySendReadToIOThreads(client *c); int trySendWriteToIOThreads(client *c); +int tryOffloadFreeObjToIOThreads(robj *o); +int tryOffloadFreeArgvToIOThreads(client *c); void adjustIOThreadsByEventLoad(int numevents, int increase_only); +void drainIOThreadsQueue(void); +void trySendPollJobToIOThreads(void); #endif /* IO_THREADS_H */ diff --git a/src/module.c b/src/module.c index 876b948323..5eb5c3ac84 100644 --- a/src/module.c +++ b/src/module.c @@ -61,6 +61,7 @@ #include "hdr_histogram.h" #include "crc16_slottable.h" #include "valkeymodule.h" +#include "io_threads.h" #include #include #include @@ -684,7 +685,7 @@ void moduleReleaseTempClient(client *c) { c->raw_flag = 0; c->flag.module = 1; c->user = NULL; /* Root user */ - c->cmd = c->lastcmd = c->realcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = c->io_parsed_cmd = NULL; if (c->bstate.async_rm_call_handle) { ValkeyModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ @@ -1295,7 +1296,8 @@ int VM_CreateCommand(ValkeyModuleCtx *ctx, ValkeyModuleCommand *cp = moduleCreateCommandProxy(ctx->module, declared_name, sdsdup(declared_name), cmdfunc, flags, firstkey, lastkey, keystep); cp->serverCmd->arity = cmdfunc ? -1 : -2; /* Default value, can be changed later via dedicated API */ - + /* Drain IO queue before modifying commands dictionary to prevent concurrent access while modifying it. */ + drainIOThreadsQueue(); serverAssert(dictAdd(server.commands, sdsdup(declared_name), cp->serverCmd) == DICT_OK); serverAssert(dictAdd(server.orig_commands, sdsdup(declared_name), cp->serverCmd) == DICT_OK); cp->serverCmd->id = ACLGetCommandID(declared_name); /* ID used for ACL. */ @@ -6281,7 +6283,7 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const if (error_as_call_replies) reply = callReplyCreateError(err, ctx); goto cleanup; } - if (!commandCheckArity(c, error_as_call_replies ? &err : NULL)) { + if (!commandCheckArity(c->cmd, c->argc, error_as_call_replies ? &err : NULL)) { errno = EINVAL; if (error_as_call_replies) reply = callReplyCreateError(err, ctx); goto cleanup; @@ -10710,6 +10712,8 @@ void moduleCallCommandFilters(client *c) { ValkeyModuleCommandFilterCtx filter = {.argv = c->argv, .argv_len = c->argv_len, .argc = c->argc, .c = c}; + robj *tmp = c->argv[0]; + incrRefCount(tmp); while ((ln = listNext(&li))) { ValkeyModuleCommandFilter *f = ln->value; @@ -10725,6 +10729,12 @@ void moduleCallCommandFilters(client *c) { c->argv = filter.argv; c->argv_len = filter.argv_len; c->argc = filter.argc; + if (tmp != c->argv[0]) { + /* With I/O thread command-lookup offload, we set c->io_parsed_cmd to the command corresponding to c->argv[0]. + * Since the command filter just changed it, we need to reset c->io_parsed_cmd to null. */ + c->io_parsed_cmd = NULL; + } + decrRefCount(tmp); } /* Return the number of arguments a filtered command has. The number of @@ -12072,6 +12082,8 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) { } void moduleUnregisterCommands(struct ValkeyModule *module) { + /* Drain IO queue before modifying commands dictionary to prevent concurrent access while modifying it. */ + drainIOThreadsQueue(); /* Unregister all the commands registered by this module. */ dictIterator *di = dictGetSafeIterator(server.commands); dictEntry *de; diff --git a/src/networking.c b/src/networking.c index 7442a69e5b..501476e35d 100644 --- a/src/networking.c +++ b/src/networking.c @@ -169,7 +169,7 @@ client *createClient(connection *conn) { c->nread = 0; c->read_flags = 0; c->write_flags = 0; - c->cmd = c->lastcmd = c->realcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = c->io_parsed_cmd = NULL; c->cur_script = NULL; c->multibulklen = 0; c->bulklen = -1; @@ -1438,13 +1438,15 @@ void freeClientOriginalArgv(client *c) { } void freeClientArgv(client *c) { - int j; - for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + if (tryOffloadFreeArgvToIOThreads(c) == C_ERR) { + for (int j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + zfree(c->argv); + } c->argc = 0; c->cmd = NULL; + c->io_parsed_cmd = NULL; c->argv_len_sum = 0; c->argv_len = 0; - zfree(c->argv); c->argv = NULL; } @@ -4691,6 +4693,24 @@ void ioThreadReadQueryFromClient(void *data) { parseCommand(c); + /* Parsing was not completed - let the main-thread handle it. */ + if (!(c->read_flags & READ_FLAGS_PARSING_COMPLETED)) { + goto done; + } + + /* Empty command - Multibulk processing could see a <= 0 length. */ + if (c->argc == 0) { + goto done; + } + + /* Lookup command offload */ + c->io_parsed_cmd = lookupCommand(c->argv, c->argc); + if (c->io_parsed_cmd && commandCheckArity(c->io_parsed_cmd, c->argc, NULL) == 0) { + /* The command was found, but the arity is invalid. + * In this case, we reset the parsed_cmd and will let the main thread handle it. */ + c->io_parsed_cmd = NULL; + } + done: trimClientQueryBuffer(c); atomic_thread_fence(memory_order_release); diff --git a/src/server.c b/src/server.c index fe8b3dd6b0..024787acdc 100644 --- a/src/server.c +++ b/src/server.c @@ -492,12 +492,13 @@ dictType dbExpiresDictType = { /* Command table. sds string -> command struct pointer. */ dictType commandTableDictType = { - dictSdsCaseHash, /* hash function */ - NULL, /* key dup */ - dictSdsKeyCaseCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - NULL, /* val destructor */ - NULL /* allow to expand */ + dictSdsCaseHash, /* hash function */ + NULL, /* key dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL, /* val destructor */ + NULL, /* allow to expand */ + .no_incremental_rehash = 1, /* no incremental rehash as the command table may be accessed from IO threads. */ }; /* Hash type hash table (note that small hashes are represented with listpacks) */ @@ -1564,6 +1565,9 @@ extern int ProcessingEventsWhileBlocked; void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* When I/O threads are enabled and there are pending I/O jobs, the poll is offloaded to one of the I/O threads. */ + trySendPollJobToIOThreads(); + size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used; @@ -1595,10 +1599,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */ connTypeProcessPendingData(); - /* If any connection type(typical TLS) still has pending unread data or if there are clients - * with pending IO reads/writes, don't sleep at all. */ - int dont_sleep = connTypeHasPendingData() || listLength(server.clients_pending_io_read) > 0 || - listLength(server.clients_pending_io_write) > 0; + /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */ + int dont_sleep = connTypeHasPendingData(); /* Call the Cluster before sleep function. Note that this function * may change the state of Cluster (from ok to fail or vice versa), @@ -2494,6 +2496,8 @@ void resetServerStats(void) { server.stat_io_reads_processed = 0; server.stat_total_reads_processed = 0; server.stat_io_writes_processed = 0; + server.stat_io_freed_objects = 0; + server.stat_poll_processed_by_io_threads = 0; server.stat_total_writes_processed = 0; server.stat_client_qbuf_limit_disconnections = 0; server.stat_client_outbuf_limit_disconnections = 0; @@ -3724,11 +3728,11 @@ int commandCheckExistence(client *c, sds *err) { /* Check if c->argc is valid for c->cmd, fills `err` with details in case it isn't. * Return 1 if valid. */ -int commandCheckArity(client *c, sds *err) { - if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { +int commandCheckArity(struct serverCommand *cmd, int argc, sds *err) { + if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) { if (err) { *err = sdsnew(NULL); - *err = sdscatprintf(*err, "wrong number of arguments for '%s' command", c->cmd->fullname); + *err = sdscatprintf(*err, "wrong number of arguments for '%s' command", cmd->fullname); } return 0; } @@ -3799,13 +3803,14 @@ int processCommand(client *c) { * In case we are reprocessing a command after it was blocked, * we do not have to repeat the same checks */ if (!client_reprocessing_command) { - c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv, c->argc); + struct serverCommand *cmd = c->io_parsed_cmd ? c->io_parsed_cmd : lookupCommand(c->argv, c->argc); + c->cmd = c->lastcmd = c->realcmd = cmd; sds err; if (!commandCheckExistence(c, &err)) { rejectCommandSds(c, err); return C_OK; } - if (!commandCheckArity(c, &err)) { + if (!commandCheckArity(c->cmd, c->argc, &err)) { rejectCommandSds(c, err); return C_OK; } @@ -5667,6 +5672,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "total_writes_processed:%lld\r\n", server.stat_total_writes_processed, "io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed, "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, + "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, + "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index 9f755150d3..5266dff3c7 100644 --- a/src/server.h +++ b/src/server.h @@ -658,6 +658,9 @@ typedef enum { #define BUSY_MODULE_YIELD_EVENTS (1 << 0) #define BUSY_MODULE_YIELD_CLIENTS (1 << 1) +/* IO poll */ +typedef enum { AE_IO_STATE_NONE, AE_IO_STATE_POLL, AE_IO_STATE_DONE } AeIoState; + /*----------------------------------------------------------------------------- * Data types *----------------------------------------------------------------------------*/ @@ -1257,6 +1260,7 @@ typedef struct client { struct serverCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified during the command invocation (like on GEOADD for example). */ + struct serverCommand *io_parsed_cmd; /* The command that was parsed by the IO thread. */ user *user; /* User associated with this connection. If the user is set to NULL the connection can do anything (admin). */ @@ -1648,6 +1652,8 @@ struct valkeyServer { dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; + _Atomic AeIoState io_poll_state; /* Indicates the state of the IO polling. */ + int io_ae_fired_events; /* Number of poll events received by the IO thread. */ rax *errors; /* Errors table */ unsigned int lruclock; /* Clock for LRU eviction */ volatile sig_atomic_t shutdown_asap; /* Shutdown ordered by signal handler. */ @@ -1807,6 +1813,8 @@ struct valkeyServer { long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ long long stat_io_reads_processed; /* Number of read events processed by IO threads */ long long stat_io_writes_processed; /* Number of write events processed by IO threads */ + long long stat_io_freed_objects; /* Number of objects freed by IO threads */ + long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */ long long stat_total_reads_processed; /* Total number of read events processed */ long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ @@ -3239,7 +3247,7 @@ struct serverCommand *lookupCommandByCStringLogic(dict *commands, const char *s) struct serverCommand *lookupCommandByCString(const char *s); struct serverCommand *lookupCommandOrOriginal(robj **argv, int argc); int commandCheckExistence(client *c, sds *err); -int commandCheckArity(client *c, sds *err); +int commandCheckArity(struct serverCommand *cmd, int argc, sds *err); void startCommandExecution(void); int incrCommandStatsOnError(struct serverCommand *cmd, int flags); void call(client *c, int flags);