From e36db8e01cb1d90f01c7eb4322b95bba83b56eac Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Mon, 6 May 2024 07:37:58 +0000 Subject: [PATCH] poll offload to io threads Signed-off-by: Uri Yagelnik --- src/ae.c | 118 ++++++++++++++++++++++++++++++++++++----------- src/ae.h | 9 ++++ src/io_threads.c | 62 +++++++++++++++++++++++++ src/io_threads.h | 1 + src/server.c | 11 +++-- src/server.h | 6 +++ 6 files changed, 177 insertions(+), 30 deletions(-) diff --git a/src/ae.c b/src/ae.c index b6a1ce0b10..e5c849ceb6 100644 --- a/src/ae.c +++ b/src/ae.c @@ -41,7 +41,6 @@ #include #include #include -#include #include #include "zmalloc.h" @@ -63,6 +62,17 @@ #endif #endif +#define AE_LOCK(eventLoop) \ + int __ae_locked = 0; \ + if ((eventLoop)->flags & AE_PROTECT_POLL) { \ + pthread_mutex_lock(&(eventLoop)->poll_mutex); \ + __ae_locked = 1; \ + } + +#define AE_UNLOCK(eventLoop) \ + if (__ae_locked) { \ + pthread_mutex_unlock(&(eventLoop)->poll_mutex); \ + } aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; @@ -81,7 +91,9 @@ aeEventLoop *aeCreateEventLoop(int setsize) { eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; + eventLoop->custompoll = NULL; eventLoop->flags = 0; + pthread_mutex_init(&eventLoop->poll_mutex, NULL); if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ @@ -122,11 +134,16 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait) { * * Otherwise AE_OK is returned and the operation is successful. */ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { + AE_LOCK(eventLoop); + int ret = AE_ERR; int i; - if (setsize == eventLoop->setsize) return AE_OK; - if (eventLoop->maxfd >= setsize) return AE_ERR; - if (aeApiResize(eventLoop, setsize) == -1) return AE_ERR; + if (setsize == eventLoop->setsize) { + ret = AE_OK; + goto done; + } + if (eventLoop->maxfd >= setsize) goto done; + if (aeApiResize(eventLoop, setsize) == -1) goto done; eventLoop->events = zrealloc(eventLoop->events, sizeof(aeFileEvent) * setsize); eventLoop->fired = zrealloc(eventLoop->fired, sizeof(aeFiredEvent) * setsize); @@ -135,10 +152,16 @@ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { /* Make sure that if we created new slots, they are initialized with * an AE_NONE mask. */ for (i = eventLoop->maxfd + 1; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; - return AE_OK; + + ret = AE_OK; + + done: + AE_UNLOCK(eventLoop); + return ret; } void aeDeleteEventLoop(aeEventLoop *eventLoop) { + AE_LOCK(eventLoop); aeApiFree(eventLoop); zfree(eventLoop->events); zfree(eventLoop->fired); @@ -152,6 +175,7 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop) { te = next_te; } zfree(eventLoop); + AE_UNLOCK(eventLoop); } void aeStop(aeEventLoop *eventLoop) { @@ -159,25 +183,35 @@ void aeStop(aeEventLoop *eventLoop) { } int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { + AE_LOCK(eventLoop); + int ret = AE_ERR; + if (fd >= eventLoop->setsize) { errno = ERANGE; - return AE_ERR; + goto done; } aeFileEvent *fe = &eventLoop->events[fd]; - if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; + if (aeApiAddEvent(eventLoop, fd, mask) == -1) goto done; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; - return AE_OK; + + ret = AE_OK; + + done: + AE_UNLOCK(eventLoop); + return ret; } void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { - if (fd >= eventLoop->setsize) return; + AE_LOCK(eventLoop); + if (fd >= eventLoop->setsize) goto done; + aeFileEvent *fe = &eventLoop->events[fd]; - if (fe->mask == AE_NONE) return; + if (fe->mask == AE_NONE) goto done; /* We want to always remove AE_BARRIER if set when AE_WRITABLE * is removed. */ @@ -204,6 +238,9 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { * which is required by evport and epoll */ aeApiDelEvent(eventLoop, fd, mask); } + + done: + AE_UNLOCK(eventLoop); } void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) { @@ -345,6 +382,17 @@ static int processTimeEvents(aeEventLoop *eventLoop) { return processed; } +/* This function provides direct access to the aeApiPoll call. + * It is intended to be called from a custom poll function.*/ +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) { + AE_LOCK(eventLoop); + + int ret = aeApiPoll(eventLoop, tvp); + + AE_UNLOCK(eventLoop); + return ret; +} + /* Process every pending file event, then every pending time event * (that may be registered by file event callbacks just processed). * Without special flags the function sleeps until some file event @@ -377,25 +425,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop); - /* The eventLoop->flags may be changed inside beforesleep. - * So we should check it after beforesleep be called. At the same time, - * the parameter flags always should have the highest priority. - * That is to say, once the parameter flag is set to AE_DONT_WAIT, - * no matter what value eventLoop->flags is set to, we should ignore it. */ - if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { - tv.tv_sec = tv.tv_usec = 0; - tvp = &tv; - } else if (flags & AE_TIME_EVENTS) { - usUntilTimer = usUntilEarliestTimer(eventLoop); - if (usUntilTimer >= 0) { - tv.tv_sec = usUntilTimer / 1000000; - tv.tv_usec = usUntilTimer % 1000000; + if (eventLoop->custompoll != NULL) { + numevents = eventLoop->custompoll(eventLoop); + } else { + /* The eventLoop->flags may be changed inside beforesleep. + * So we should check it after beforesleep be called. At the same time, + * the parameter flags always should have the highest priority. + * That is to say, once the parameter flag is set to AE_DONT_WAIT, + * no matter what value eventLoop->flags is set to, we should ignore it. */ + if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) { + tv.tv_sec = tv.tv_usec = 0; tvp = &tv; + } else if (flags & AE_TIME_EVENTS) { + usUntilTimer = usUntilEarliestTimer(eventLoop); + if (usUntilTimer >= 0) { + tv.tv_sec = usUntilTimer / 1000000; + tv.tv_usec = usUntilTimer % 1000000; + tvp = &tv; + } } + /* Call the multiplexing API, will return only on timeout or when + * some event fires. */ + numevents = aeApiPoll(eventLoop, tvp); } - /* Call the multiplexing API, will return only on timeout or when - * some event fires. */ - numevents = aeApiPoll(eventLoop, tvp); /* Don't process file events if not requested. */ if (!(flags & AE_FILE_EVENTS)) { @@ -503,3 +555,17 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) { eventLoop->aftersleep = aftersleep; } + +/* This function allows setting a custom poll procedure to be used by the event loop. + * The custom poll procedure, if set, will be called instead of the default aeApiPoll */ +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) { + eventLoop->custompoll = custompoll; +} + +void aeSetPollProtect(aeEventLoop *eventLoop, int protect) { + if (protect) { + eventLoop->flags |= AE_PROTECT_POLL; + } else { + eventLoop->flags &= ~AE_PROTECT_POLL; + } +} diff --git a/src/ae.h b/src/ae.h index 3b1c96a01d..9926904d1f 100644 --- a/src/ae.h +++ b/src/ae.h @@ -34,6 +34,8 @@ #define __AE_H__ #include "monotonic.h" +#include +#include #define AE_OK 0 #define AE_ERR -1 @@ -54,6 +56,7 @@ #define AE_DONT_WAIT (1 << 2) #define AE_CALL_BEFORE_SLEEP (1 << 3) #define AE_CALL_AFTER_SLEEP (1 << 4) +#define AE_PROTECT_POLL (1 << 5) #define AE_NOMORE -1 #define AE_DELETED_EVENT_ID -1 @@ -69,6 +72,7 @@ typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *client typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents); +typedef int aeCustomPollProc(struct aeEventLoop *eventLoop); /* File event structure */ typedef struct aeFileEvent { @@ -109,6 +113,8 @@ typedef struct aeEventLoop { void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeAfterSleepProc *aftersleep; + aeCustomPollProc *custompoll; + pthread_mutex_t poll_mutex; int flags; } aeEventLoop; @@ -132,6 +138,9 @@ void aeMain(aeEventLoop *eventLoop); char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep); +void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll); +void aeSetPollProtect(aeEventLoop *eventLoop, int protect); +int aePoll(aeEventLoop *eventLoop, struct timeval *tvp); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeSetDontWait(aeEventLoop *eventLoop, int noWait); diff --git a/src/io_threads.c b/src/io_threads.c index 2577cfee67..325ed4b2af 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -193,6 +193,17 @@ void adjustIOThreadsByEventLoad(int numevents, int increase_only) { } } +/* This function performs polling on the given event loop and updates the server's + * IO fired events count and poll state. */ +void IOThreadPoll(void *data) { + aeEventLoop *el = (aeEventLoop *)data; + struct timeval tvp = {0, 0}; + int num_events = aePoll(el, &tvp); + + server.io_ae_fired_events = num_events; + atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release); +} + static void *IOThreadMain(void *myid) { /* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */ long id = (long)myid; @@ -283,6 +294,8 @@ void killIOThreads(void) { /* Initialize the data structures needed for I/O threads. */ void initIOThreads(void) { server.active_io_threads_num = 1; /* We start with threads not active. */ + server.io_poll_state = AE_IO_STATE_NONE; + server.io_ae_fired_events = 0; /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ @@ -485,3 +498,52 @@ int tryOffloadFreeObjToIOThreads(robj *obj) { server.stat_io_freed_objects++; return C_OK; } + +/* This function retrieves the results of the IO Thread poll. + * returns the number of fired events if the IO thread has finished processing poll events, 0 otherwise. */ +static int getIOThreadPollResults(aeEventLoop *eventLoop) { + int io_state; + io_state = atomic_load_explicit(&server.io_poll_state, memory_order_acquire); + if (io_state == AE_IO_STATE_POLL) { + /* IO thread is still processing poll events. */ + return 0; + } + + /* IO thread is done processing poll events. */ + serverAssert(io_state == AE_IO_STATE_DONE); + server.stat_poll_processed_by_io_threads++; + server.io_poll_state = AE_IO_STATE_NONE; + + /* Remove the custom poll proc. */ + aeSetCustomPollProc(eventLoop, NULL); + aeSetPollProtect(eventLoop, 0); + return server.io_ae_fired_events; +} + +void trySendPollJobToIOThreads(void) { + if (server.active_io_threads_num <= 1) { + return; + } + + /* If there are no pending jobs, let the main thread do the poll-wait by itself. */ + if (listLength(server.clients_pending_io_write) + listLength(server.clients_pending_io_read) == 0) { + return; + } + + /* If the IO thread is already processing poll events, don't send another job. */ + if (server.io_poll_state != AE_IO_STATE_NONE) { + return; + } + + /* The poll is sent to the last thread. While a random thread could have been selected, + * the last thread has a slightly better chance of being less loaded compared to other threads, + * As we activate the lowest threads first. */ + int tid = server.active_io_threads_num - 1; + IOJobQueue *jq = &io_jobs[tid]; + if (IOJobQueue_isFull(jq)) return; /* The main thread will handle the poll itself. */ + + server.io_poll_state = AE_IO_STATE_POLL; + aeSetCustomPollProc(server.el, getIOThreadPollResults); + aeSetPollProtect(server.el, 1); + IOJobQueue_push(jq, IOThreadPoll, server.el); +} diff --git a/src/io_threads.h b/src/io_threads.h index a10febd205..f9a9cf762f 100644 --- a/src/io_threads.h +++ b/src/io_threads.h @@ -12,5 +12,6 @@ int tryOffloadFreeObjToIOThreads(robj *o); int tryOffloadFreeArgvToIOThreads(client *c); void adjustIOThreadsByEventLoad(int numevents, int increase_only); void drainIOThreadsQueue(void); +void trySendPollJobToIOThreads(void); #endif /* IO_THREADS_H */ diff --git a/src/server.c b/src/server.c index dfeeeddad9..cd64d5de19 100644 --- a/src/server.c +++ b/src/server.c @@ -1565,6 +1565,9 @@ extern int ProcessingEventsWhileBlocked; void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* When I/O threads are enabled and there are pending I/O jobs, the poll is offloaded to one of the I/O threads. */ + trySendPollJobToIOThreads(); + size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used; @@ -1596,10 +1599,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */ connTypeProcessPendingData(); - /* If any connection type(typical TLS) still has pending unread data or if there are clients - * with pending IO reads/writes, don't sleep at all. */ - int dont_sleep = connTypeHasPendingData() || listLength(server.clients_pending_io_read) > 0 || - listLength(server.clients_pending_io_write) > 0; + /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */ + int dont_sleep = connTypeHasPendingData(); /* Call the Cluster before sleep function. Note that this function * may change the state of Cluster (from ok to fail or vice versa), @@ -2493,6 +2494,7 @@ void resetServerStats(void) { server.stat_total_reads_processed = 0; server.stat_io_writes_processed = 0; server.stat_io_freed_objects = 0; + server.stat_poll_processed_by_io_threads = 0; server.stat_total_writes_processed = 0; server.stat_client_qbuf_limit_disconnections = 0; server.stat_client_outbuf_limit_disconnections = 0; @@ -5704,6 +5706,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed, "io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed, "io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects, + "io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads, "client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index ad6457c905..b4c18358e9 100644 --- a/src/server.h +++ b/src/server.h @@ -639,6 +639,9 @@ typedef enum { #define BUSY_MODULE_YIELD_EVENTS (1 << 0) #define BUSY_MODULE_YIELD_CLIENTS (1 << 1) +/* IO poll */ +typedef enum { AE_IO_STATE_NONE, AE_IO_STATE_POLL, AE_IO_STATE_DONE } AeIoState; + /*----------------------------------------------------------------------------- * Data types *----------------------------------------------------------------------------*/ @@ -1597,6 +1600,8 @@ struct valkeyServer { dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; + _Atomic AeIoState io_poll_state; /* Indicates the state of the IO polling. */ + int io_ae_fired_events; /* Number of poll events received by the IO thread. */ rax *errors; /* Errors table */ int errors_enabled; /* If true, errorstats is enabled, and we will add new errors. */ unsigned int lruclock; /* Clock for LRU eviction */ @@ -1752,6 +1757,7 @@ struct valkeyServer { long long stat_io_reads_processed; /* Number of read events processed by IO threads */ long long stat_io_writes_processed; /* Number of write events processed by IO threads */ long long stat_io_freed_objects; /* Number of objects freed by IO threads */ + long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */ long long stat_total_reads_processed; /* Total number of read events processed */ long long stat_total_writes_processed; /* Total number of write events processed */ long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */