Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use io_uring to batch handle clients pending writes to reduce SYSCALL count. #13139

Open
wants to merge 6 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ ifeq ($(MALLOC),jemalloc)
FINAL_LIBS := ../deps/jemalloc/lib/libjemalloc.a $(FINAL_LIBS)
endif

# only Linux has IO_URING support
ifeq ($(uname_S),Linux)
USE_IO_URING=yes
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
endif

ifeq ($(USE_IO_URING),yes)
FINAL_CFLAGS+= -DUSE_IO_URING
FINAL_LIBS+= -luring
endif

# LIBSSL & LIBCRYPTO
LIBSSL_LIBS=
LIBSSL_PKGCONFIG := $(shell $(PKG_CONFIG) --exists libssl && echo $$?)
Expand Down Expand Up @@ -348,7 +358,7 @@ endif

REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)
Expand Down
77 changes: 77 additions & 0 deletions src/io_uring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include "server.h"

#ifdef USE_IO_URING
#include <liburing.h>
/* io_uring instance queue depth */
static const unsigned int IOUringDepth = 256;
static unsigned int uringQueueLen = 0;

void initIOUring(void) {
struct io_uring_params params;
struct io_uring *ring = zmalloc(sizeof(struct io_uring));
memset(&params, 0, sizeof(params));
/* On success, io_uring_queue_init_params(3) returns 0 and ring will
* point to the shared memory containing the io_uring queues.
* On failure -errno is returned. */
int ret = io_uring_queue_init_params(IOUringDepth, ring, &params);
if (ret != 0) {
serverLog(LL_WARNING, "System doesn't support io_uring, disable io_uring.");
zfree(ring);
server.io_uring = NULL;
server.io_uring_enabled = 0;
} else {
serverLog(LL_NOTICE, "System support io_uring, enable io_uring.");
server.io_uring = ring;
server.io_uring_enabled = 1;
}
}

void ioUringPrepWrite(client *c) {
struct io_uring_sqe *sqe = io_uring_get_sqe(server.io_uring);
io_uring_prep_send(sqe, c->conn->fd, c->buf + c->sentlen,
c->bufpos - c->sentlen, MSG_DONTWAIT);
io_uring_sqe_set_data(sqe, c);
uringQueueLen++;
}

void ioUringSubmitAndWait(void) {
/* wait for all submitted queue entries complete. */
while (uringQueueLen) {
io_uring_submit(server.io_uring);
struct io_uring_cqe *cqe;
if (io_uring_wait_cqe(server.io_uring, &cqe) == 0) {
client *c = io_uring_cqe_get_data(cqe);
c->nwritten = cqe->res;
if ((c->bufpos - c->sentlen) > c->nwritten && c->nwritten > 0) {
c->sentlen += c->nwritten;
ioUringPrepWrite(c);
}
io_uring_cqe_seen(server.io_uring, cqe);
uringQueueLen--;
}
}
}

void freeIOUring(void) {
if(server.io_uring_enabled) {
io_uring_queue_exit(server.io_uring);
zfree(server.io_uring);
server.io_uring = NULL;
server.io_uring_enabled = 0;
}
}
#else
void initIOUring(void) {
serverLog(LL_WARNING, "System doesn't support io_uring, disable io_uring.");
server.io_uring = NULL;
server.io_uring_enabled = 0;
}

void ioUringPrepWrite(client *c) {
UNUSED(c);
}

void ioUringSubmitAndWait(void) {}

void freeIOUring(void) {}
#endif
137 changes: 107 additions & 30 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1822,10 +1822,8 @@ client *lookupClientByID(uint64_t id) {

/* This function should be called from _writeToClient when the reply list is not empty,
* it gathers the scattered buffers from reply list and sends them away with connWritev.
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
* and 'nwritten' is an output parameter, it means how many bytes server write
* to client. */
static int _writevToClient(client *c, ssize_t *nwritten) {
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned. */
static int _writevToClient(client *c) {
int iovcnt = 0;
int iovmax = min(IOV_MAX, c->conn->iovcnt);
struct iovec iov[iovmax];
Expand Down Expand Up @@ -1859,12 +1857,12 @@ static int _writevToClient(client *c, ssize_t *nwritten) {
offset = 0;
}
if (iovcnt == 0) return C_OK;
*nwritten = connWritev(c->conn, iov, iovcnt);
if (*nwritten <= 0) return C_ERR;
c->nwritten = connWritev(c->conn, iov, iovcnt);
if (c->nwritten <= 0) return C_ERR;

/* Locate the new node which has leftover data and
* release all nodes in front of it. */
ssize_t remaining = *nwritten;
ssize_t remaining = c->nwritten;
if (c->bufpos > 0) { /* deal with static reply buffer first. */
int buf_len = c->bufpos - c->sentlen;
c->sentlen += remaining;
Expand Down Expand Up @@ -1895,22 +1893,19 @@ static int _writevToClient(client *c, ssize_t *nwritten) {

/* This function does actual writing output buffers to different types of
* clients, it is called by writeToClient.
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
* and 'nwritten' is an output parameter, it means how many bytes server write
* to client. */
int _writeToClient(client *c, ssize_t *nwritten) {
*nwritten = 0;
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned. */
int _writeToClient(client *c) {
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);

replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
serverAssert(o->used >= c->ref_block_pos);
/* Send current block if it is not fully sent. */
if (o->used > c->ref_block_pos) {
*nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
c->nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
o->used-c->ref_block_pos);
if (*nwritten <= 0) return C_ERR;
c->ref_block_pos += *nwritten;
if (c->nwritten <= 0) return C_ERR;
c->ref_block_pos += c->nwritten;
}

/* If we fully sent the object on head, go to the next one. */
Expand All @@ -1928,25 +1923,30 @@ int _writeToClient(client *c, ssize_t *nwritten) {
/* When the reply list is not empty, it's better to use writev to save us some
* system calls and TCP packets. */
if (listLength(c->reply) > 0) {
int ret = _writevToClient(c, nwritten);
int ret = _writevToClient(c);
if (ret != C_OK) return ret;

/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
} else if (c->bufpos > 0) {
*nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen);
if (*nwritten <= 0) return C_ERR;
c->sentlen += *nwritten;
if (server.io_uring_enabled) {
c->flags |= CLIENT_PENDING_WRITE_ASYNC;
ioUringPrepWrite(c);
return C_OK;
}
c->nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen);
if (c->nwritten <= 0) return C_ERR;
c->sentlen += c->nwritten;

/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
}
}

return C_OK;
}
Expand All @@ -1963,12 +1963,14 @@ int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
atomicIncr(server.stat_total_writes_processed, 1);

ssize_t nwritten = 0, totwritten = 0;

ssize_t totwritten = 0;
c->nwritten = 0;
while(clientHasPendingReplies(c)) {
int ret = _writeToClient(c, &nwritten);
int ret = _writeToClient(c);
/* If use io_uring to write, just return. */
if (c->flags & CLIENT_PENDING_WRITE_ASYNC) return C_OK;
if (ret == C_ERR) break;
totwritten += nwritten;
totwritten += c->nwritten;
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
Expand All @@ -1993,7 +1995,7 @@ int writeToClient(client *c, int handler_installed) {
atomicIncr(server.stat_net_output_bytes, totwritten);
}

if (nwritten == -1) {
if (c->nwritten == -1) {
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE,
"Error writing to client: %s", connGetLastError(c->conn));
Expand Down Expand Up @@ -2039,6 +2041,44 @@ void sendReplyToClient(connection *conn) {
writeToClient(c,1);
}

int checkPendingWriteAsync(client *c) {
/* Note that where synchronous system calls will return -1 on
* failure and set errno to the actual error value,
* io_uring never uses errno. Instead it returns the negated
* errno directly in the CQE res field. */
if (c->nwritten <= 0) {
if (c->nwritten != -EAGAIN) {
c->conn->last_errno = -(c->nwritten);
/* Don't overwrite the state of a connection that is not already
* connected, not to mess with handler callbacks. */
if (c->nwritten != -EINTR && c->conn->state == CONN_STATE_CONNECTED)
c->conn->state = CONN_STATE_ERROR;
}
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE,
"Error writing to client: %s", connGetLastError(c->conn));
freeClientAsync(c);
}
return C_ERR;
}

c->sentlen += c->nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
atomicIncr(server.stat_net_output_bytes, c->nwritten);
/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;

return C_OK;
}

/* This function is called just before entering the event loop, in the hope
* we can just write the replies to the client output buffer without any
* need to use a syscall in order to install the writable event handler,
Expand All @@ -2052,24 +2092,61 @@ int handleClientsWithPendingWrites(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listUnlinkNode(server.clients_pending_write,ln);

/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;
if (c->flags & CLIENT_PROTECTED) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;
if (c->flags & CLIENT_CLOSE_ASAP) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* Try to write buffers to the client socket. */
if (writeToClient(c,0) == C_ERR) continue;
if (writeToClient(c,0) == C_ERR) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

if (!(c->flags & CLIENT_PENDING_WRITE_ASYNC)) {
listUnlinkNode(server.clients_pending_write, ln);
}
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
if (clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_WRITE_ASYNC)) {
installClientWriteHandler(c);
}
}

/* An optimization for connWrite: batch submit the write(3). */
if (server.io_uring_enabled) {
ioUringSubmitAndWait();
listRewind(server.clients_pending_write, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE_ASYNC;
listUnlinkNode(server.clients_pending_write, ln);

if (checkPendingWriteAsync(c) == C_ERR) continue;
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClientAsync(c);
continue;
}
}
/* Update client's memory usage after writing.
* Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
* handleClientsWithPendingWritesUsingThreads(). */
if (io_threads_op == IO_THREADS_OP_IDLE)
updateClientMemUsageAndBucket(c);
}
}
return processed;
}

Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2865,6 +2865,7 @@ void initListeners(void) {
void InitServerLast(void) {
bioInit();
initThreadedIO();
initIOUring();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
Expand Down Expand Up @@ -7180,6 +7181,7 @@ int main(int argc, char **argv) {

aeMain(server.el);
aeDeleteEventLoop(server.el);
freeIOUring();
return 0;
}

Expand Down
12 changes: 12 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */

#define CLIENT_PENDING_WRITE_ASYNC (1ULL<<51) /* Client has output to send using io_uring_prep_write(5). */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
typedef enum blocking_type {
Expand Down Expand Up @@ -1279,6 +1281,7 @@ typedef struct client {
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
char *buf;
ssize_t nwritten; /* How many bytes server write to client. */
#ifdef LOG_REQ_RES
clientReqResInfo reqres;
#endif
Expand Down Expand Up @@ -2067,6 +2070,9 @@ struct redisServer {
int reply_buffer_resizing_enabled; /* Is reply buffer resizing enabled (1 by default) */
/* Local environment */
char *locale_collate;
/* io_uring */
int io_uring_enabled; /* If io_uring enabled (0 by default) */
struct io_uring *io_uring;
};

#define MAX_KEYS_BUFFER 256
Expand Down Expand Up @@ -3725,6 +3731,12 @@ void quitCommand(client *c);
void resetCommand(client *c);
void failoverCommand(client *c);

/* io_uring.c -- io_uring related operations */
void initIOUring(void);
void freeIOUring(void);
void ioUringPrepWrite(client *c);
void ioUringSubmitAndWait(void);

#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
void free(void *ptr) __attribute__ ((deprecated));
Expand Down