From 94bc15cb7147773d3b45e5a2f9c7e8d7de6f97c7 Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Fri, 19 Jul 2024 05:21:45 +0300 Subject: [PATCH] Io thread work offload (#763) ### IO-Threads Work Offloading This PR is the 2nd of 3 PRs intended to achieve the goal of 1M requests per second. (1st PR: https://github.com/valkey-io/valkey/pull/758) This PR offloads additional work to the I/O threads, beyond the current read-parse/write operations, to better utilize the I/O threads and reduce the load on the main thread. It contains the following 3 commits: ### Poll Offload Currently, the main thread is responsible for executing the poll-wait system call, while the IO threads wait for tasks from the main thread. The poll-wait operation is expensive and can consume up to 30% of the main thread's time. We could have let the IO threads do the poll-wait by themselves, with each thread listening to some of the clients and notifying the main thread when a client's command is ready to execute. However, the current approach, where the main thread listens for events from the network, has several benefits. The main thread remains in charge, allowing it to know the state of each client (idle/read/write/close) at any given time. Additionally, it makes the threads flexible, enabling us to drain an IO thread's job queue and stop a thread when the load is light without modifying the event loop and moving its clients to a different IO thread. Furthermore, with this approach, the IO threads don't need to wait for both messages from the network and from the main thread instead, the threads wait only for tasks from the main thread. To enjoy the benefits of both the main thread remaining in charge and the poll being offloaded, we propose offloading the poll-wait as a single-time, non-blocking job to one of the IO threads. The IO thread will perform a poll-wait non-blocking call while the main thread processes the client commands. Later, in `aeProcessEvents`, instead of sleeping on the poll, we check for the IO thread's poll-wait results. The poll-wait will be offloaded in `beforeSleep` only when there are ready events for the main thread to process. If no events are pending, the main thread will revert to the current behavior and sleep on the poll by itself. **Implementation Details** A new call back `custompoll` was added to the `aeEventLoop` when not set to `NULL` the ae will call the `custompoll` callback instead of the `aeApiPoll`. When the poll is offloaded we will set the `custompoll` to `getIOThreadPollResults` and send a poll-job to the thread. the thread will take a mutex, call a non-blocking (with timeout 0) to `aePoll` which will populate the fired events array. the IO thread will set the `server.io_fired_events` to the number of the returning `numevents`, later the main-thread in `custompoll` will return the `server.io_fired_events` and will set the `customPoll` back to `NULL`. To ensure thread safety when accessing server.el, all functions that modify the eventloop events were wrapped with a mutex to ensure mutual exclusion when modifying the events. ### Command Lookup Offload As the IO thread parses the command from the client's Querybuf, it can perform a command lookup in the commands dictionary, which can consume up to ~5% of the main-thread runtime. **Implementation details** The IO thread will store the looked-up command in the client's new field `io_parsed_cmd` field. We can't use `c->cmd` for that since we use `c->cmd `to check if a command was reprocessed or not. To ensure thread safety when accessing the command dictionary, we make sure the main thread isn't changing the dictionary while IO threads are accessing it. This is accomplished by introducing a new flag called `no_incremental_rehash` for the `dictType` commands. When performing `dictResize`, we will rehash the entire dictionary in place rather than deferring the process. ### Free Offload Since the command arguments are allocated by the I/O thread, it would be beneficial if they were also freed by the same thread. If the main thread frees objects allocated by the I/O thread, two issues arise: 1. During the freeing process, the main thread needs to access the SDS pointed to by the object to get its length. 2. With Jemalloc, each thread manages thread local pool (`tcache`) of buffers for quick reallocation without accessing the arena. If the main thread constantly frees objects allocated by other threads, those threads will have to frequently access the shared arena to obtain new memory allocations **Implementation Details** When freeing the client's argv, we will send the argv array to the thread that allocated it. The thread will be identified by the client ID. When freeing an object during `dbOverwrite`, we will offload the object free as well. We will extend this to offload the free during `dbDelete` in a future PR, as its effects on defrag/memory evictions need to be studied. --------- Signed-off-by: Uri Yagelnik --- src/ae.c | 116 +++++++++++++++++++++++++------- src/ae.h | 9 +++ src/db.c | 6 +- src/dict.c | 6 ++ src/dict.h | 2 + src/io_threads.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++ src/io_threads.h | 4 ++ src/module.c | 18 ++++- src/networking.c | 28 ++++++-- src/server.c | 37 +++++----- src/server.h | 10 ++- 11 files changed, 359 insertions(+), 49 deletions(-) diff --git a/src/ae.c b/src/ae.c index b6a1ce0b10..36b6131410 100644 --- a/src/ae.c +++ b/src/ae.c @@ -63,6 +63,15 @@ #endif #endif +#define AE_LOCK(eventLoop) \ + if ((eventLoop)->flags & AE_PROTECT_POLL) { \ + assert(pthread_mutex_lock(&(eventLoop)->poll_mutex) == 0); \ + } + +#define AE_UNLOCK(eventLoop) \ + if ((eventLoop)->flags & AE_PROTECT_POLL) { \ + assert(pthread_mutex_unlock(&(eventLoop)->poll_mutex) == 0); \ + } aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; @@ -81,7 +90,14 @@ aeEventLoop *aeCreateEventLoop(int setsize) { eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; + eventLoop->custompoll = NULL; eventLoop->flags = 0; + /* Initialize the eventloop mutex with PTHREAD_MUTEX_ERRORCHECK type */ + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); + if (pthread_mutex_init(&eventLoop->poll_mutex, &attr) != 0) goto err; + if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ @@ -122,11 +138,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait) { * * Otherwise AE_OK is returned and the operation is successful. */ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { + AE_LOCK(eventLoop); + int ret = AE_OK; int i; - if (setsize == eventLoop->setsize) return AE_OK; - if (eventLoop->maxfd >= setsize) return AE_ERR; - if (aeApiResize(eventLoop, setsize) == -1) return AE_ERR; + if (setsize == eventLoop->setsize) goto done; + if (eventLoop->maxfd >= setsize) goto err; + if (aeApiResize(eventLoop, setsize) == -1) goto err; eventLoop->events = zrealloc(eventLoop->events, sizeof(aeFileEvent) * setsize); eventLoop->fired = zrealloc(eventLoop->fired, sizeof(aeFiredEvent) * setsize); @@ -135,7 +153,13 @@ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { /* Make sure that if we created new slots, they are initialized with * an AE_NONE mask. */ for (i = eventLoop->maxfd + 1; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; - return AE_OK; + goto done; + +err: + ret = AE_ERR; +done: + AE_UNLOCK(eventLoop); + return ret; } void aeDeleteEventLoop(aeEventLoop *eventLoop) { @@ -159,25 +183,35 @@ void aeStop(aeEventLoop *eventLoop) { } int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { + AE_LOCK(eventLoop); + int ret = AE_ERR; + if (fd >= eventLoop->setsize) { errno = ERANGE; - return AE_ERR; + goto done; } aeFileEvent *fe = &eventLoop->events[fd]; - if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; + if (aeApiAddEvent(eventLoop, fd, mask) == -1) goto done; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; - return AE_OK; + + ret = AE_OK; + +done: + AE_UNLOCK(eventLoop); + return ret; } void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { - if (fd >= eventLoop->setsize) return; + AE_LOCK(eventLoop); + if (fd >= eventLoop->setsize) goto done; + aeFileEvent *fe = &eventLoop->events[fd]; - if (fe->mask == AE_NONE) return; + if (fe->mask == AE_NONE) goto done; /* We want to always remove AE_BARRIER if set when AE_WRITABLE * is removed. */ @@ -204,6 +238,9 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { * which is required by evport and epoll */ aeApiDelEvent(eventLoop, fd, mask); } + +done: + AE_UNLOCK(eventLoop); } void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) { @@ -345,6 +382,17 @@ static int processTimeEvents(aeEventLoop *eventLoop) { return processed; } +/* This function provides direct access to the aeApiPoll call. + * It is intended to be called from a custom poll function.*/ +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) { + AE_LOCK(eventLoop); + + int ret = aeApiPoll(eventLoop, tvp); + + AE_UNLOCK(eventLoop); + return ret; +} + /* Process every pending file event, then every pending time event * (that may be registered by file event callbacks just processed). * Without special flags the function sleeps until some file event @@ -377,25 +425,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop); - /* The eventLoop->flags may be changed inside beforesleep. - * So we should check it after beforesleep be called. At the same time, - * the parameter flags always should have the highest priority. - * That is to say, once the parameter flag is set to AE_DONT_WAIT, - * no matter what value eventLoop->flags is set to, we should ignore it. */ - if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { - tv.tv_sec = tv.tv_usec = 0; - tvp = &tv; - } else if (flags & AE_TIME_EVENTS) { - usUntilTimer = usUntilEarliestTimer(eventLoop); - if (usUntilTimer >= 0) { - tv.tv_sec = usUntilTimer / 1000000; - tv.tv_usec = usUntilTimer % 1000000; + if (eventLoop->custompoll != NULL) { + numevents = eventLoop->custompoll(eventLoop); + } else { + /* The eventLoop->flags may be changed inside beforesleep. + * So we should check it after beforesleep be called. At the same time, + * the parameter flags always should have the highest priority. + * That is to say, once the parameter flag is set to AE_DONT_WAIT, + * no matter what value eventLoop->flags is set to, we should ignore it. */ + if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { + tv.tv_sec = tv.tv_usec = 0; tvp = &tv; + } else if (flags & AE_TIME_EVENTS) { + usUntilTimer = usUntilEarliestTimer(eventLoop); + if (usUntilTimer >= 0) { + tv.tv_sec = usUntilTimer / 1000000; + tv.tv_usec = usUntilTimer % 1000000; + tvp = &tv; + } } + /* Call the multiplexing API, will return only on timeout or when + * some event fires. */ + numevents = aeApiPoll(eventLoop, tvp); } - /* Call the multiplexing API, will return only on timeout or when - * some event fires. */ - numevents = aeApiPoll(eventLoop, tvp); /* Don't process file events if not requested. */ if (!(flags & AE_FILE_EVENTS)) { @@ -503,3 +555,17 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) { eventLoop->aftersleep = aftersleep; } + +/* This function allows setting a custom poll procedure to be used by the event loop. + * The custom poll procedure, if set, will be called instead of the default aeApiPoll */ +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) { + eventLoop->custompoll = custompoll; +} + +void aeSetPollProtect(aeEventLoop *eventLoop, int protect) { + if (protect) { + eventLoop->flags |= AE_PROTECT_POLL; + } else { + eventLoop->flags &= ~AE_PROTECT_POLL; + } +} diff --git a/src/ae.h b/src/ae.h index 3b1c96a01d..652b42a8f5 100644 --- a/src/ae.h +++ b/src/ae.h @@ -34,6 +34,7 @@ #define __AE_H__ #include "monotonic.h" +#include #define AE_OK 0 #define AE_ERR -1 @@ -54,6 +55,7 @@ #define AE_DONT_WAIT (1 << 2) #define AE_CALL_BEFORE_SLEEP (1 << 3) #define AE_CALL_AFTER_SLEEP (1 << 4) +#define AE_PROTECT_POLL (1 << 5) #define AE_NOMORE -1 #define AE_DELETED_EVENT_ID -1 @@ -61,6 +63,7 @@ /* Macros */ #define AE_NOTUSED(V) ((void)V) +struct timeval; /* forward declaration */ struct aeEventLoop; /* Types and data structures */ @@ -69,6 +72,7 @@ typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *client typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents); +typedef int aeCustomPollProc(struct aeEventLoop *eventLoop); /* File event structure */ typedef struct aeFileEvent { @@ -109,6 +113,8 @@ typedef struct aeEventLoop { void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeAfterSleepProc *aftersleep; + aeCustomPollProc *custompoll; + pthread_mutex_t poll_mutex; int flags; } aeEventLoop; @@ -132,6 +138,9 @@ void aeMain(aeEventLoop *eventLoop); char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep); +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll); +void aeSetPollProtect(aeEventLoop *eventLoop, int protect); +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeSetDontWait(aeEventLoop *eventLoop, int noWait); diff --git a/src/db.c b/src/db.c index 5a6562a1e2..073bec6ca3 100644 --- a/src/db.c +++ b/src/db.c @@ -32,6 +32,7 @@ #include "latency.h" #include "script.h" #include "functions.h" +#include "io_threads.h" #include #include @@ -297,7 +298,10 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn old = dictGetVal(de); } kvstoreDictSetVal(db->keys, slot, de, val); - if (server.lazyfree_lazy_server_del) { + /* For efficiency, let the I/O thread that allocated an object also deallocate it. */ + if (tryOffloadFreeObjToIOThreads(old) == C_OK) { + /* OK */ + } else if (server.lazyfree_lazy_server_del) { freeObjAsync(key, old, db->id); } else { decrRefCount(old); diff --git a/src/dict.c b/src/dict.c index 280f0b6abc..2eb3dd386f 100644 --- a/src/dict.c +++ b/src/dict.c @@ -329,6 +329,12 @@ int _dictResize(dict *d, unsigned long size, int *malloc_failed) { return DICT_OK; } + if (d->type->no_incremental_rehash) { + /* If the dict type does not support incremental rehashing, we need to + * rehash the whole table immediately. */ + while (dictRehash(d, 1000)); + } + return DICT_OK; } diff --git a/src/dict.h b/src/dict.h index a7c5c71826..97a79910cb 100644 --- a/src/dict.h +++ b/src/dict.h @@ -87,6 +87,8 @@ typedef struct dictType { /* If embedded_entry flag is set, it indicates that a copy of the key is created and the key is embedded * as part of the dict entry. */ unsigned int embedded_entry : 1; + /* Perform rehashing during resizing instead of incrementally rehashing across multiple steps */ + unsigned int no_incremental_rehash : 1; } dictType; #define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1 << (exp)) diff --git a/src/io_threads.c b/src/io_threads.c index 6149febabc..95d5895d03 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -124,6 +124,22 @@ int inMainThread(void) { return thread_id == 0; } +int getIOThreadID(void) { + return thread_id; +} + +/* Drains the I/O threads queue by waiting for all jobs to be processed. + * This function must be called from the main thread. */ +void drainIOThreadsQueue(void) { + serverAssert(inMainThread()); + for (int i = 1; i < IO_THREADS_MAX_NUM; i++) { /* No need to drain thread 0, which is the main thread. */ + while (!IOJobQueue_isEmpty(&io_jobs[i])) { + /* memory barrier acquire to get the latest job queue state */ + atomic_thread_fence(memory_order_acquire); + } + } +} + /* Wait until the IO-thread is done with the client */ void waitForClientIO(client *c) { /* No need to wait if the client was not offloaded to the IO thread. */ @@ -177,6 +193,17 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only) { } } +/* This function performs polling on the given event loop and updates the server's + * IO fired events count and poll state. */ +void IOThreadPoll(void *data) { + aeEventLoop *el = (aeEventLoop *)data; + struct timeval tvp = {0, 0}; + int num_events = aePoll(el, &tvp); + + server.io_ae_fired_events = num_events; + atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release); +} + static void *IOThreadMain(void *myid) { /* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */ long id = (long)myid; @@ -267,6 +294,8 @@ void killIOThreads(void) { /* Initialize the data structures needed for I/O threads. */ void initIOThreads(void) { server.active_io_threads_num = 1; /* We start with threads not active. */ + server.io_poll_state = AE_IO_STATE_NONE; + server.io_ae_fired_events = 0; /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ @@ -375,3 +404,146 @@ int trySendWriteToIOThreads(client *c) { IOJobQueue_push(jq, ioThreadWriteToClient, c); return C_OK; } + +/* Internal function to free the client's argv in an IO thread. */ +void IOThreadFreeArgv(void *data) { + robj **argv = (robj **)data; + int last_arg = 0; + for (int i = 0;; i++) { + robj *o = argv[i]; + if (o == NULL) { + continue; + } + + /* The main-thread set the refcount to 0 to indicate that this is the last argument to free */ + if (o->refcount == 0) { + last_arg = 1; + o->refcount = 1; + } + + decrRefCount(o); + + if (last_arg) { + break; + } + } + + zfree(argv); +} + +/* This function attempts to offload the client's argv to an IO thread. + * Returns C_OK if the client's argv were successfully offloaded to an IO thread, + * C_ERR otherwise. */ +int tryOffloadFreeArgvToIOThreads(client *c) { + if (server.active_io_threads_num <= 1 || c->argc == 0) { + return C_ERR; + } + + size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1; + + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) { + return C_ERR; + } + + int last_arg_to_free = -1; + + /* Prepare the argv */ + for (int j = 0; j < c->argc; j++) { + if (c->argv[j]->refcount > 1) { + decrRefCount(c->argv[j]); + /* Set argv[j] to NULL to avoid double free */ + c->argv[j] = NULL; + } else { + last_arg_to_free = j; + } + } + + /* If no argv to free, free the argv array at the main thread */ + if (last_arg_to_free == -1) { + zfree(c->argv); + return C_OK; + } + + /* We set the refcount of the last arg to free to 0 to indicate that + * this is the last argument to free. With this approach, we don't need to + * send the argc to the IO thread and we can send just the argv ptr. */ + c->argv[last_arg_to_free]->refcount = 0; + + /* Must succeed as we checked the free space before. */ + IOJobQueue_push(jq, IOThreadFreeArgv, c->argv); + + return C_OK; +} + +/* This function attempts to offload the free of an object to an IO thread. + * Returns C_OK if the object was successfully offloaded to an IO thread, + * C_ERR otherwise.*/ +int tryOffloadFreeObjToIOThreads(robj *obj) { + if (server.active_io_threads_num <= 1) { + return C_ERR; + } + + if (obj->refcount > 1) return C_ERR; + + /* We select the thread ID in a round-robin fashion. */ + size_t tid = (server.stat_io_freed_objects % (server.active_io_threads_num - 1)) + 1; + + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) { + return C_ERR; + } + + IOJobQueue_push(jq, decrRefCountVoid, obj); + server.stat_io_freed_objects++; + return C_OK; +} + +/* This function retrieves the results of the IO Thread poll. + * returns the number of fired events if the IO thread has finished processing poll events, 0 otherwise. */ +static int getIOThreadPollResults(aeEventLoop *eventLoop) { + int io_state; + io_state = atomic_load_explicit(&server.io_poll_state, memory_order_acquire); + if (io_state == AE_IO_STATE_POLL) { + /* IO thread is still processing poll events. */ + return 0; + } + + /* IO thread is done processing poll events. */ + serverAssert(io_state == AE_IO_STATE_DONE); + server.stat_poll_processed_by_io_threads++; + server.io_poll_state = AE_IO_STATE_NONE; + + /* Remove the custom poll proc. */ + aeSetCustomPollProc(eventLoop, NULL); + aeSetPollProtect(eventLoop, 0); + return server.io_ae_fired_events; +} + +void trySendPollJobToIOThreads(void) { + if (server.active_io_threads_num <= 1) { + return; + } + + /* If there are no pending jobs, let the main thread do the poll-wait by itself. */ + if (listLength(server.clients_pending_io_write) + listLength(server.clients_pending_io_read) == 0) { + return; + } + + /* If the IO thread is already processing poll events, don't send another job. */ + if (server.io_poll_state != AE_IO_STATE_NONE) { + return; + } + + /* The poll is sent to the last thread. While a random thread could have been selected, + * the last thread has a slightly better chance of being less loaded compared to other threads, + * As we activate the lowest threads first. */ + int tid = server.active_io_threads_num - 1; + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) return; /* The main thread will handle the poll itself. */ + + server.io_poll_state = AE_IO_STATE_POLL; + aeSetCustomPollProc(server.el, getIOThreadPollResults); + aeSetPollProtect(server.el, 1); + IOJobQueue_push(jq, IOThreadPoll, server.el); +} diff --git a/src/io_threads.h b/src/io_threads.h index 30d1cdad79..f9a9cf762f 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -8,6 +8,10 @@ void killIOThreads(void); int inMainThread(void); int trySendReadToIOThreads(client *c); int trySendWriteToIOThreads(client *c); +int tryOffloadFreeObjToIOThreads(robj *o); +int tryOffloadFreeArgvToIOThreads(client *c); void adjustIOThreadsByEventLoad(int numevents, int increase_only); +void drainIOThreadsQueue(void); +void trySendPollJobToIOThreads(void); #endif /* IO_THREADS_H */ diff --git a/src/module.c b/src/module.c index 876b948323..5eb5c3ac84 100644 --- a/src/module.c +++ b/src/module.c @@ -61,6 +61,7 @@ #include "hdr_histogram.h" #include "crc16_slottable.h" #include "valkeymodule.h" +#include "io_threads.h" #include #include #include @@ -684,7 +685,7 @@ void moduleReleaseTempClient(client *c) { c->raw_flag = 0; c->flag.module = 1; c->user = NULL; /* Root user */ - c->cmd = c->lastcmd = c->realcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = c->io_parsed_cmd = NULL; if (c->bstate.async_rm_call_handle) { ValkeyModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ @@ -1295,7 +1296,8 @@ int VM_CreateCommand(ValkeyModuleCtx *ctx, ValkeyModuleCommand *cp = moduleCreateCommandProxy(ctx->module, declared_name, sdsdup(declared_name), cmdfunc, flags, firstkey, lastkey, keystep); cp->serverCmd->arity = cmdfunc ? -1 : -2; /* Default value, can be changed later via dedicated API */ - + /* Drain IO queue before modifying commands dictionary to prevent concurrent access while modifying it. */ + drainIOThreadsQueue(); serverAssert(dictAdd(server.commands, sdsdup(declared_name), cp->serverCmd) == DICT_OK); serverAssert(dictAdd(server.orig_commands, sdsdup(declared_name), cp->serverCmd) == DICT_OK); cp->serverCmd->id = ACLGetCommandID(declared_name); /* ID used for ACL. */ @@ -6281,7 +6283,7 @@ ValkeyModuleCallReply *VM_Call(ValkeyModuleCtx *ctx, const char *cmdname, const if (error_as_call_replies) reply = callReplyCreateError(err, ctx); goto cleanup; } - if (!commandCheckArity(c, error_as_call_replies ? &err : NULL)) { + if (!commandCheckArity(c->cmd, c->argc, error_as_call_replies ? &err : NULL)) { errno = EINVAL; if (error_as_call_replies) reply = callReplyCreateError(err, ctx); goto cleanup; @@ -10710,6 +10712,8 @@ void moduleCallCommandFilters(client *c) { ValkeyModuleCommandFilterCtx filter = {.argv = c->argv, .argv_len = c->argv_len, .argc = c->argc, .c = c}; + robj *tmp = c->argv[0]; + incrRefCount(tmp); while ((ln = listNext(&li))) { ValkeyModuleCommandFilter *f = ln->value; @@ -10725,6 +10729,12 @@ void moduleCallCommandFilters(client *c) { c->argv = filter.argv; c->argv_len = filter.argv_len; c->argc = filter.argc; + if (tmp != c->argv[0]) { + /* With I/O thread command-lookup offload, we set c->io_parsed_cmd to the command corresponding to c->argv[0]. + * Since the command filter just changed it, we need to reset c->io_parsed_cmd to null. */ + c->io_parsed_cmd = NULL; + } + decrRefCount(tmp); } /* Return the number of arguments a filtered command has. The number of @@ -12072,6 +12082,8 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) { } void moduleUnregisterCommands(struct ValkeyModule *module) { + /* Drain IO queue before modifying commands dictionary to prevent concurrent access while modifying it. */ + drainIOThreadsQueue(); /* Unregister all the commands registered by this module. */ dictIterator *di = dictGetSafeIterator(server.commands); dictEntry *de; diff --git a/src/networking.c b/src/networking.c index 7442a69e5b..501476e35d 100644 --- a/src/networking.c +++ b/src/networking.c @@ -169,7 +169,7 @@ client *createClient(connection *conn) { c->nread = 0; c->read_flags = 0; c->write_flags = 0; - c->cmd = c->lastcmd = c->realcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = c->io_parsed_cmd = NULL; c->cur_script = NULL; c->multibulklen = 0; c->bulklen = -1; @@ -1438,13 +1438,15 @@ void freeClientOriginalArgv(client *c) { } void freeClientArgv(client *c) { - int j; - for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + if (tryOffloadFreeArgvToIOThreads(c) == C_ERR) { + for (int j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); + zfree(c->argv); + } c->argc = 0; c->cmd = NULL; + c->io_parsed_cmd = NULL; c->argv_len_sum = 0; c->argv_len = 0; - zfree(c->argv); c->argv = NULL; } @@ -4691,6 +4693,24 @@ void ioThreadReadQueryFromClient(void *data) { parseCommand(c); + /* Parsing was not completed - let the main-thread handle it. */ + if (!(c->read_flags & READ_FLAGS_PARSING_COMPLETED)) { + goto done; + } + + /* Empty command - Multibulk processing could see a <= 0 length. */ + if (c->argc == 0) { + goto done; + } + + /* Lookup command offload */ + c->io_parsed_cmd = lookupCommand(c->argv, c->argc); + if (c->io_parsed_cmd && commandCheckArity(c->io_parsed_cmd, c->argc, NULL) == 0) { + /* The command was found, but the arity is invalid. + * In this case, we reset the parsed_cmd and will let the main thread handle it. */ + c->io_parsed_cmd = NULL; + } + done: trimClientQueryBuffer(c); atomic_thread_fence(memory_order_release); diff --git a/src/server.c b/src/server.c index fe8b3dd6b0..024787acdc 100644 --- a/src/server.c +++ b/src/server.c @@ -492,12 +492,13 @@ dictType dbExpiresDictType = { /* Command table. sds string -> command struct pointer. */ dictType commandTableDictType = { - dictSdsCaseHash, /* hash function */ - NULL, /* key dup */ - dictSdsKeyCaseCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - NULL, /* val destructor */ - NULL /* allow to expand */ + dictSdsCaseHash, /* hash function */ + NULL, /* key dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL, /* val destructor */ + NULL, /* allow to expand */ + .no_incremental_rehash = 1, /* no incremental rehash as the command table may be accessed from IO threads. */ }; /* Hash type hash table (note that small hashes are represented with listpacks) */ @@ -1564,6 +1565,9 @@ extern int ProcessingEventsWhileBlocked; void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* When I/O threads are enabled and there are pending I/O jobs, the poll is offloaded to one of the I/O threads. */ + trySendPollJobToIOThreads(); + size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used; @@ -1595,10 +1599,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */ connTypeProcessPendingData(); - /* If any connection type(typical TLS) still has pending unread data or if there are clients - * with pending IO reads/writes, don't sleep at all. */ - int dont_sleep = connTypeHasPendingData() || listLength(server.clients_pending_io_read) > 0 || - listLength(server.clients_pending_io_write) > 0; + /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */ + int dont_sleep = connTypeHasPendingData(); /* Call the Cluster before sleep function. Note that this function * may change the state of Cluster (from ok to fail or vice versa), @@ -2494,6 +2496,8 @@ void resetServerStats(void) { server.stat_io_reads_processed = 0; server.stat_total_reads_processed = 0; server.stat_io_writes_processed = 0; + server.stat_io_freed_objects = 0; + server.stat_poll_processed_by_io_threads = 0; server.stat_total_writes_processed = 0; server.stat_client_qbuf_limit_disconnections = 0; server.stat_client_outbuf_limit_disconnections = 0; @@ -3724,11 +3728,11 @@ int commandCheckExistence(client *c, sds *err) { /* Check if c->argc is valid for c->cmd, fills `err` with details in case it isn't. * Return 1 if valid. */ -int commandCheckArity(client *c, sds *err) { - if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { +int commandCheckArity(struct serverCommand *cmd, int argc, sds *err) { + if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) { if (err) { *err = sdsnew(NULL); - *err = sdscatprintf(*err, "wrong number of arguments for '%s' command", c->cmd->fullname); + *err = sdscatprintf(*err, "wrong number of arguments for '%s' command", cmd->fullname); } return 0; } @@ -3799,13 +3803,14 @@ int processCommand(client *c) { * In case we are reprocessing a command after it was blocked, * we do not have to repeat the same checks */ if (!client_reprocessing_command) { - c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv, c->argc); + struct serverCommand *cmd = c->io_parsed_cmd ? c->io_parsed_cmd : lookupCommand(c->argv, c->argc); + c->cmd = c->lastcmd = c->realcmd = cmd; sds err; if (!commandCheckExistence(c, &err)) { rejectCommandSds(c, err); return C_OK; } - if (!commandCheckArity(c, &err)) { + if (!commandCheckArity(c->cmd, c->argc, &err)) { rejectCommandSds(c, err); return C_OK; } @@ -5667,6 +5672,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "total_writes_processed:%lld\r\n", server.stat_total_writes_processed, "io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed, "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, + "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, + "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index 9f755150d3..5266dff3c7 100644 --- a/src/server.h +++ b/src/server.h @@ -658,6 +658,9 @@ typedef enum { #define BUSY_MODULE_YIELD_EVENTS (1 << 0) #define BUSY_MODULE_YIELD_CLIENTS (1 << 1) +/* IO poll */ +typedef enum { AE_IO_STATE_NONE, AE_IO_STATE_POLL, AE_IO_STATE_DONE } AeIoState; + /*----------------------------------------------------------------------------- * Data types *----------------------------------------------------------------------------*/ @@ -1257,6 +1260,7 @@ typedef struct client { struct serverCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified during the command invocation (like on GEOADD for example). */ + struct serverCommand *io_parsed_cmd; /* The command that was parsed by the IO thread. */ user *user; /* User associated with this connection. If the user is set to NULL the connection can do anything (admin). */ @@ -1648,6 +1652,8 @@ struct valkeyServer { dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; + _Atomic AeIoState io_poll_state; /* Indicates the state of the IO polling. */ + int io_ae_fired_events; /* Number of poll events received by the IO thread. */ rax *errors; /* Errors table */ unsigned int lruclock; /* Clock for LRU eviction */ volatile sig_atomic_t shutdown_asap; /* Shutdown ordered by signal handler. */ @@ -1807,6 +1813,8 @@ struct valkeyServer { long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ long long stat_io_reads_processed; /* Number of read events processed by IO threads */ long long stat_io_writes_processed; /* Number of write events processed by IO threads */ + long long stat_io_freed_objects; /* Number of objects freed by IO threads */ + long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */ long long stat_total_reads_processed; /* Total number of read events processed */ long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ @@ -3239,7 +3247,7 @@ struct serverCommand *lookupCommandByCStringLogic(dict *commands, const char *s) struct serverCommand *lookupCommandByCString(const char *s); struct serverCommand *lookupCommandOrOriginal(robj **argv, int argc); int commandCheckExistence(client *c, sds *err); -int commandCheckArity(client *c, sds *err); +int commandCheckArity(struct serverCommand *cmd, int argc, sds *err); void startCommandExecution(void); int incrCommandStatsOnError(struct serverCommand *cmd, int flags); void call(client *c, int flags);