Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change FLUSHALL/FLUSHDB SYNC to run as blocking ASYNC #13167

Merged
merged 2 commits into from Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
135 changes: 125 additions & 10 deletions src/bio.c
@@ -1,16 +1,16 @@
/* Background I/O service for Redis.
*
* This file implements operations that we need to perform in the background.
* Currently there is only a single operation, that is a background close(2)
* system call. This is needed as when the process is the last owner of a
* reference to a file closing it means unlinking it, and the deletion of the
* file is slow, blocking the server.
* Currently there are 3 operations:
* 1) a background close(2) system call. This is needed when the process is
* the last owner of a reference to a file closing it means unlinking it, and
* the deletion of the file is slow, blocking the server.
* 2) AOF fsync
* 3) lazyfree of memory
*
* In the future we'll either continue implementing new things we need or
* we'll switch to libeio. However there are probably long term uses for this
* file as we may want to put here Redis specific background tasks (for instance
* it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
* implementation).
* file as we may want to put here Redis specific background tasks.
*
* DESIGN
* ------
Expand All @@ -26,8 +26,13 @@
* least-recently-inserted to the most-recently-inserted (older jobs processed
* first).
*
* Currently there is no way for the creator of the job to be notified about
* the completion of the operation, this will only be added when/if needed.
* To let the creator of the job to be notified about the completion of the
* operation, it will need to submit additional dummy job, coined as
* completion job request that will be written back eventually, by the
* background thread, into completion job response queue. This notification
* layout can simplify flows that might submit more than one job, such as
* in case of FLUSHALL which for a single command submits multiple jobs. It
* is also correct because jobs are processed in FIFO fashion.
*
* ----------------------------------------------------------------------------
*
Expand All @@ -38,9 +43,9 @@
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*/


#include "server.h"
#include "bio.h"
#include <fcntl.h>

static char* bio_worker_title[] = {
"bio_close_file",
Expand All @@ -55,6 +60,9 @@ static unsigned int bio_job_to_worker[] = {
[BIO_AOF_FSYNC] = 1,
[BIO_CLOSE_AOF] = 1,
[BIO_LAZY_FREE] = 2,
[BIO_COMP_RQ_CLOSE_FILE] = 0,
[BIO_COMP_RQ_AOF_FSYNC] = 1,
[BIO_COMP_RQ_LAZY_FREE] = 2
};

static pthread_t bio_threads[BIO_WORKER_NUM];
Expand All @@ -63,6 +71,18 @@ static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM];
static list *bio_jobs[BIO_WORKER_NUM];
static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0};

/* The bio_comp_list is used to hold completion job responses and to handover
* to main thread to callback as notification for job completion. Main
* thread will be triggered to read the list by signaling via writing to a pipe */
static list *bio_comp_list;
static pthread_mutex_t bio_mutex_comp;
static int job_comp_pipe[2]; /* Pipe used to awake the event loop */

typedef struct bio_comp_item {
comp_fn *func; /* callback after completion job will be processed */
uint64_t arg; /* user data to be passed to the function */
} bio_comp_item;

/* This structure represents a background Job. It is only used locally to this
* file as the API does not expose the internals at all. */
typedef union bio_job {
Expand All @@ -86,9 +106,15 @@ typedef union bio_job {
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
void *free_args[]; /* List of arguments to be passed to the free function */
} free_args;
struct {
int type; /* header */
comp_fn *fn; /* callback. Handover to main thread to cb as notify for job completion */
uint64_t arg; /* callback arguments */
} comp_rq;
} bio_job;

void *bioProcessBackgroundJobs(void *arg);
void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask);

/* Make sure we have enough stack to perform all the things we do in the
* main thread. */
Expand All @@ -108,6 +134,27 @@ void bioInit(void) {
bio_jobs[j] = listCreate();
}

/* init jobs comp responses */
bio_comp_list = listCreate();
pthread_mutex_init(&bio_mutex_comp, NULL);

/* Create a pipe for background thread to be able to wake up the redis main thread.
* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half.
* Enable close-on-exec flag on pipes in case of the fork-exec system calls in
* sentinels or redis servers. */
if (anetPipe(job_comp_pipe, O_CLOEXEC|O_NONBLOCK, O_CLOEXEC|O_NONBLOCK) == -1) {
moticless marked this conversation as resolved.
Show resolved Hide resolved
serverLog(LL_WARNING,
"Can't create the pipe for bio thread: %s", strerror(errno));
exit(1);
}

/* Register a readable event for the pipe used to awake the event loop on job completion */
if (aeCreateFileEvent(server.el, job_comp_pipe[0], AE_READABLE,
bioPipeReadJobCompList, NULL) == AE_ERR) {
serverPanic("Error registering the readable event for the bio pipe.");
}

/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
Expand Down Expand Up @@ -153,6 +200,28 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
bioSubmitJob(BIO_LAZY_FREE, job);
}

void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data) {
int type;
switch (assigned_worker) {
case BIO_WORKER_CLOSE_FILE:
type = BIO_COMP_RQ_CLOSE_FILE;
break;
case BIO_WORKER_AOF_FSYNC:
type = BIO_COMP_RQ_AOF_FSYNC;
break;
case BIO_WORKER_LAZY_FREE:
type = BIO_COMP_RQ_LAZY_FREE;
break;
default:
serverPanic("Invalid worker type in bioCreateCompRq().");
}

bio_job *job = zmalloc(sizeof(*job));
job->comp_rq.fn = func;
job->comp_rq.arg = user_data;
bioSubmitJob(type, job);
}

void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) {
bio_job *job = zmalloc(sizeof(*job));
job->fd_args.fd = fd;
Expand Down Expand Up @@ -264,6 +333,21 @@ void *bioProcessBackgroundJobs(void *arg) {
close(job->fd_args.fd);
} else if (job_type == BIO_LAZY_FREE) {
job->free_args.free_fn(job->free_args.free_args);
} else if ((job_type == BIO_COMP_RQ_CLOSE_FILE) ||
(job_type == BIO_COMP_RQ_AOF_FSYNC) ||
(job_type == BIO_COMP_RQ_LAZY_FREE)) {
bio_comp_item *comp_rsp = zmalloc(sizeof(bio_comp_item));
comp_rsp->func = job->comp_rq.fn;
comp_rsp->arg = job->comp_rq.arg;

/* just write it to completion job responses */
pthread_mutex_lock(&bio_mutex_comp);
listAddNodeTail(bio_comp_list, comp_rsp);
pthread_mutex_unlock(&bio_mutex_comp);

if (write(job_comp_pipe[1],"A",1) != 1) {
/* Pipe is non-blocking, write() may fail if it's full. */
}
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
Expand Down Expand Up @@ -322,3 +406,34 @@ void bioKillThreads(void) {
}
}
}

void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);

char buf[128];
list *tmp_list = NULL;

while (read(fd, buf, sizeof(buf)) == sizeof(buf));

/* Handle event loop events if pipe was written from event loop API */
pthread_mutex_lock(&bio_mutex_comp);
if (listLength(bio_comp_list)) {
tmp_list = bio_comp_list;
bio_comp_list = listCreate();
sundb marked this conversation as resolved.
Show resolved Hide resolved
}
pthread_mutex_unlock(&bio_mutex_comp);

if (!tmp_list) return;
sundb marked this conversation as resolved.
Show resolved Hide resolved

/* callback to all job completions */
while (listLength(tmp_list)) {
listNode *ln = listFirst(tmp_list);
bio_comp_item *rsp = ln->value;
listDelNode(tmp_list, ln);
rsp->func(rsp->arg);
zfree(rsp);
}
listRelease(tmp_list);
}
29 changes: 21 additions & 8 deletions src/bio.h
Expand Up @@ -10,6 +10,26 @@
#define __BIO_H

typedef void lazy_free_fn(void *args[]);
typedef void comp_fn(uint64_t user_data);
sundb marked this conversation as resolved.
Show resolved Hide resolved

typedef enum bio_worker_t {
BIO_WORKER_CLOSE_FILE = 0,
BIO_WORKER_AOF_FSYNC,
BIO_WORKER_LAZY_FREE,
BIO_WORKER_NUM
} bio_worker_t;

/* Background job opcodes */
typedef enum bio_job_type_t {
BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */
BIO_AOF_FSYNC, /* Deferred AOF fsync. */
BIO_LAZY_FREE, /* Deferred objects freeing. */
BIO_CLOSE_AOF,
BIO_COMP_RQ_CLOSE_FILE, /* Job completion request, registered on close-file worker's queue */
BIO_COMP_RQ_AOF_FSYNC, /* Job completion request, registered on aof-fsync worker's queue */
BIO_COMP_RQ_LAZY_FREE, /* Job completion request, registered on lazy-free worker's queue */
moticless marked this conversation as resolved.
Show resolved Hide resolved
BIO_NUM_OPS
} bio_job_type_t;

/* Exported API */
void bioInit(void);
Expand All @@ -20,14 +40,7 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache);
void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data);

/* Background job opcodes */
enum {
BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */
BIO_AOF_FSYNC, /* Deferred AOF fsync. */
BIO_LAZY_FREE, /* Deferred objects freeing. */
BIO_CLOSE_AOF, /* Deferred close for AOF files. */
BIO_NUM_OPS
};

#endif
20 changes: 16 additions & 4 deletions src/blocked.c
Expand Up @@ -68,6 +68,7 @@ void blockClient(client *c, int btype) {
/* Master client should never be blocked unless pause or module */
serverAssert(!(c->flags & CLIENT_MASTER &&
btype != BLOCKED_MODULE &&
btype != BLOCKED_LAZYFREE &&
btype != BLOCKED_POSTPONE));

c->flags |= CLIENT_BLOCKED;
Expand Down Expand Up @@ -175,6 +176,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->postponed_list_node = NULL;
} else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
/* No special cleanup. */
} else if (c->bstate.btype == BLOCKED_LAZYFREE) {
/* No special cleanup. */
} else {
serverPanic("Unknown btype in unblockClient().");
}
Expand Down Expand Up @@ -206,7 +209,9 @@ void unblockClient(client *c, int queue_for_reprocessing) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->bstate.btype == BLOCKED_LIST ||
if (c->bstate.btype == BLOCKED_LAZYFREE) {
addReply(c, shared.ok); /* No reason lazy-free to fail */
} else if (c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
addReplyNullArray(c);
Expand Down Expand Up @@ -263,9 +268,16 @@ void disconnectAllBlockedClients(void) {
if (c->bstate.btype == BLOCKED_POSTPONE)
continue;

unblockClientOnError(c,
"-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
if (c->bstate.btype == BLOCKED_LAZYFREE) {
addReply(c, shared.ok); /* No reason lazy-free to fail */
c->flags &= ~CLIENT_PENDING_COMMAND;
unblockClient(c, 1);
} else {

unblockClientOnError(c,
"-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
}
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
Expand Down