Skip to content

Commit

Permalink
Io thread work offload (#763)
Browse files Browse the repository at this point in the history
### IO-Threads Work Offloading 

This PR is the 2nd of 3 PRs intended to achieve the goal of 1M requests
per second.
(1st PR: #758)

This PR offloads additional work to the I/O threads, beyond the current
read-parse/write operations, to better utilize the I/O threads and
reduce the load on the main thread.

It contains the following 3 commits:

### Poll Offload

Currently, the main thread is responsible for executing the poll-wait
system call, while the IO threads wait for tasks from the main thread.
The poll-wait operation is expensive and can consume up to 30% of the
main thread's time. We could have let the IO threads do the poll-wait by
themselves, with each thread listening to some of the clients and
notifying the main thread when a client's command is ready to execute.

However, the current approach, where the main thread listens for events
from the network, has several benefits. The main thread remains in
charge, allowing it to know the state of each client
(idle/read/write/close) at any given time. Additionally, it makes the
threads flexible, enabling us to drain an IO thread's job queue and stop
a thread when the load is light without modifying the event loop and
moving its clients to a different IO thread. Furthermore, with this
approach, the IO threads don't need to wait for both messages from the
network and from the main thread instead, the threads wait only for
tasks from the main thread.

To enjoy the benefits of both the main thread remaining in charge and
the poll being offloaded, we propose offloading the poll-wait as a
single-time, non-blocking job to one of the IO threads. The IO thread
will perform a poll-wait non-blocking call while the main thread
processes the client commands. Later, in `aeProcessEvents`, instead of
sleeping on the poll, we check for the IO thread's poll-wait results.

The poll-wait will be offloaded in `beforeSleep` only when there are
ready events for the main thread to process. If no events are pending,
the main thread will revert to the current behavior and sleep on the
poll by itself.

**Implementation Details**

A new call back `custompoll` was added to the `aeEventLoop` when not set
to `NULL` the ae will call the `custompoll` callback instead of the
`aeApiPoll`.

When the poll is offloaded we will set the `custompoll` to
`getIOThreadPollResults` and send a poll-job to the thread. the thread
will take a mutex, call a non-blocking (with timeout 0) to `aePoll`
which will populate the fired events array. the IO thread will set the
`server.io_fired_events` to the number of the returning `numevents`,
later the main-thread in `custompoll` will return the
`server.io_fired_events` and will set the `customPoll` back to `NULL`.

To ensure thread safety when accessing server.el, all functions that
modify the eventloop events were wrapped with a mutex to ensure mutual
exclusion when modifying the events.

### Command Lookup Offload

As the IO thread parses the command from the client's Querybuf, it can
perform a command lookup in the commands dictionary, which can consume
up to ~5% of the main-thread runtime.

**Implementation details**
The IO thread will store the looked-up command in the client's new field
`io_parsed_cmd` field. We can't use `c->cmd` for that since we use
`c->cmd `to check if a command was reprocessed or not.

To ensure thread safety when accessing the command dictionary, we make
sure the main thread isn't changing the dictionary while IO threads are
accessing it. This is accomplished by introducing a new flag called
`no_incremental_rehash` for the `dictType` commands. When performing
`dictResize`, we will rehash the entire dictionary in place rather than
deferring the process.

### Free Offload

Since the command arguments are allocated by the I/O thread, it would be
beneficial if they were also freed by the same thread. If the main
thread frees objects allocated by the I/O thread, two issues arise:

1. During the freeing process, the main thread needs to access the SDS
pointed to by the object to get its length.
2. With Jemalloc, each thread manages thread local pool (`tcache`) of
buffers for quick reallocation without accessing the arena. If the main
thread constantly frees objects allocated by other threads, those
threads will have to frequently access the shared arena to obtain new
memory allocations

**Implementation Details**
When freeing the client's argv, we will send the argv array to the
thread that allocated it. The thread will be identified by the client
ID. When freeing an object during `dbOverwrite`, we will offload the
object free as well. We will extend this to offload the free during
`dbDelete` in a future PR, as its effects on defrag/memory evictions
need to be studied.

---------

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
  • Loading branch information
uriyage committed Jul 19, 2024
1 parent 8b48031 commit 94bc15c
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 49 deletions.
116 changes: 91 additions & 25 deletions src/ae.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@
#endif
#endif

#define AE_LOCK(eventLoop) \
if ((eventLoop)->flags & AE_PROTECT_POLL) { \
assert(pthread_mutex_lock(&(eventLoop)->poll_mutex) == 0); \
}

#define AE_UNLOCK(eventLoop) \
if ((eventLoop)->flags & AE_PROTECT_POLL) { \
assert(pthread_mutex_unlock(&(eventLoop)->poll_mutex) == 0); \
}

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
Expand All @@ -81,7 +90,14 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->custompoll = NULL;
eventLoop->flags = 0;
/* Initialize the eventloop mutex with PTHREAD_MUTEX_ERRORCHECK type */
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if (pthread_mutex_init(&eventLoop->poll_mutex, &attr) != 0) goto err;

if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
Expand Down Expand Up @@ -122,11 +138,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait) {
*
* Otherwise AE_OK is returned and the operation is successful. */
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
AE_LOCK(eventLoop);
int ret = AE_OK;
int i;

if (setsize == eventLoop->setsize) return AE_OK;
if (eventLoop->maxfd >= setsize) return AE_ERR;
if (aeApiResize(eventLoop, setsize) == -1) return AE_ERR;
if (setsize == eventLoop->setsize) goto done;
if (eventLoop->maxfd >= setsize) goto err;
if (aeApiResize(eventLoop, setsize) == -1) goto err;

eventLoop->events = zrealloc(eventLoop->events, sizeof(aeFileEvent) * setsize);
eventLoop->fired = zrealloc(eventLoop->fired, sizeof(aeFiredEvent) * setsize);
Expand All @@ -135,7 +153,13 @@ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
/* Make sure that if we created new slots, they are initialized with
* an AE_NONE mask. */
for (i = eventLoop->maxfd + 1; i < setsize; i++) eventLoop->events[i].mask = AE_NONE;
return AE_OK;
goto done;

err:
ret = AE_ERR;
done:
AE_UNLOCK(eventLoop);
return ret;
}

void aeDeleteEventLoop(aeEventLoop *eventLoop) {
Expand All @@ -159,25 +183,35 @@ void aeStop(aeEventLoop *eventLoop) {
}

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
AE_LOCK(eventLoop);
int ret = AE_ERR;

if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
goto done;
}
aeFileEvent *fe = &eventLoop->events[fd];

if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR;
if (aeApiAddEvent(eventLoop, fd, mask) == -1) goto done;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd) eventLoop->maxfd = fd;
return AE_OK;

ret = AE_OK;

done:
AE_UNLOCK(eventLoop);
return ret;
}

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
if (fd >= eventLoop->setsize) return;
AE_LOCK(eventLoop);
if (fd >= eventLoop->setsize) goto done;

aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;
if (fe->mask == AE_NONE) goto done;

/* We want to always remove AE_BARRIER if set when AE_WRITABLE
* is removed. */
Expand All @@ -204,6 +238,9 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
* which is required by evport and epoll */
aeApiDelEvent(eventLoop, fd, mask);
}

done:
AE_UNLOCK(eventLoop);
}

void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) {
Expand Down Expand Up @@ -345,6 +382,17 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
return processed;
}

/* This function provides direct access to the aeApiPoll call.
* It is intended to be called from a custom poll function.*/
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp) {
AE_LOCK(eventLoop);

int ret = aeApiPoll(eventLoop, tvp);

AE_UNLOCK(eventLoop);
return ret;
}

/* Process every pending file event, then every pending time event
* (that may be registered by file event callbacks just processed).
* Without special flags the function sleeps until some file event
Expand Down Expand Up @@ -377,25 +425,29 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {

if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop);

/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
if (eventLoop->custompoll != NULL) {
numevents = eventLoop->custompoll(eventLoop);
} else {
/* The eventLoop->flags may be changed inside beforesleep.
* So we should check it after beforesleep be called. At the same time,
* the parameter flags always should have the highest priority.
* That is to say, once the parameter flag is set to AE_DONT_WAIT,
* no matter what value eventLoop->flags is set to, we should ignore it. */
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

/* Don't process file events if not requested. */
if (!(flags & AE_FILE_EVENTS)) {
Expand Down Expand Up @@ -503,3 +555,17 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}

/* This function allows setting a custom poll procedure to be used by the event loop.
* The custom poll procedure, if set, will be called instead of the default aeApiPoll */
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll) {
eventLoop->custompoll = custompoll;
}

void aeSetPollProtect(aeEventLoop *eventLoop, int protect) {
if (protect) {
eventLoop->flags |= AE_PROTECT_POLL;
} else {
eventLoop->flags &= ~AE_PROTECT_POLL;
}
}
9 changes: 9 additions & 0 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#define __AE_H__

#include "monotonic.h"
#include <pthread.h>

#define AE_OK 0
#define AE_ERR -1
Expand All @@ -54,13 +55,15 @@
#define AE_DONT_WAIT (1 << 2)
#define AE_CALL_BEFORE_SLEEP (1 << 3)
#define AE_CALL_AFTER_SLEEP (1 << 4)
#define AE_PROTECT_POLL (1 << 5)

#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1

/* Macros */
#define AE_NOTUSED(V) ((void)V)

struct timeval; /* forward declaration */
struct aeEventLoop;

/* Types and data structures */
Expand All @@ -69,6 +72,7 @@ typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *client
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);
typedef int aeCustomPollProc(struct aeEventLoop *eventLoop);

/* File event structure */
typedef struct aeFileEvent {
Expand Down Expand Up @@ -109,6 +113,8 @@ typedef struct aeEventLoop {
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeAfterSleepProc *aftersleep;
aeCustomPollProc *custompoll;
pthread_mutex_t poll_mutex;
int flags;
} aeEventLoop;

Expand All @@ -132,6 +138,9 @@ void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
void aeSetCustomPollProc(aeEventLoop *eventLoop, aeCustomPollProc *custompoll);
void aeSetPollProtect(aeEventLoop *eventLoop, int protect);
int aePoll(aeEventLoop *eventLoop, struct timeval *tvp);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
Expand Down
6 changes: 5 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "latency.h"
#include "script.h"
#include "functions.h"
#include "io_threads.h"

#include <signal.h>
#include <ctype.h>
Expand Down Expand Up @@ -297,7 +298,10 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
if (server.lazyfree_lazy_server_del) {
/* For efficiency, let the I/O thread that allocated an object also deallocate it. */
if (tryOffloadFreeObjToIOThreads(old) == C_OK) {
/* OK */
} else if (server.lazyfree_lazy_server_del) {
freeObjAsync(key, old, db->id);
} else {
decrRefCount(old);
Expand Down
6 changes: 6 additions & 0 deletions src/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ int _dictResize(dict *d, unsigned long size, int *malloc_failed) {
return DICT_OK;
}

if (d->type->no_incremental_rehash) {
/* If the dict type does not support incremental rehashing, we need to
* rehash the whole table immediately. */
while (dictRehash(d, 1000));
}

return DICT_OK;
}

Expand Down
2 changes: 2 additions & 0 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ typedef struct dictType {
/* If embedded_entry flag is set, it indicates that a copy of the key is created and the key is embedded
* as part of the dict entry. */
unsigned int embedded_entry : 1;
/* Perform rehashing during resizing instead of incrementally rehashing across multiple steps */
unsigned int no_incremental_rehash : 1;
} dictType;

#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1 << (exp))
Expand Down
Loading

0 comments on commit 94bc15c

Please sign in to comment.