Skip to content

RFC: Socket activation support #2004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/anet.c
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
}

int anetUnixAccept(char *err, int s) {
return anetBasicAccept(err, s);
}

int anetBasicAccept(char *err, int s) {
int fd;
struct sockaddr_un sa;
socklen_t salen = sizeof(sa);
Expand Down
1 change: 1 addition & 0 deletions src/anet.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
int anetUnixServer(char *err, char *path, mode_t perm, int backlog);
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
int anetUnixAccept(char *err, int serversock);
int anetBasicAccept(char *err, int serversock);
int anetWrite(int fd, char *buf, int count);
int anetNonBlock(char *err, int fd);
int anetEnableTcpNoDelay(char *err, int fd);
Expand Down
24 changes: 24 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,24 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}
}

void acceptInheritedHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cfd, max = MAX_ACCEPTS_PER_CALL, nsock = (uintptr_t)privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);

while(max--) {
cfd = anetBasicAccept(server.neterr, fd);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted connection to inherited socket #%d", nsock);
acceptCommonHandler(cfd,REDIS_INHERITED_SOCKET);
}
}

static void freeClientArgv(redisClient *c) {
int j;
for (j = 0; j < c->argc; j++)
Expand Down Expand Up @@ -1233,6 +1251,7 @@ void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) {
* For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234".
* For IPv6 addresses we use [] around the IP part, like in "[::1]:1234".
* For Unix sockets we use path:0, like in "/tmp/redis:0".
* For inherited sockets we use fixed string "inherited:0".
*
* A Peer ID always fits inside a buffer of REDIS_PEER_ID_LEN bytes, including
* the null term.
Expand All @@ -1250,6 +1269,10 @@ int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) {
/* Unix socket client. */
snprintf(peerid,peerid_len,"%s:0",server.unixsocket);
return REDIS_OK;
} else if (client->flags & REDIS_INHERITED_SOCKET) {
/* Inherited socket client. */
snprintf(peerid,peerid_len,"inherited:0");
return REDIS_OK;
} else {
/* TCP client. */
int retval = anetPeerToString(client->fd,ip,sizeof(ip),&port);
Expand Down Expand Up @@ -1293,6 +1316,7 @@ sds catClientInfoString(sds s, redisClient *client) {
if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A';
if (client->flags & REDIS_UNIX_SOCKET) *p++ = 'U';
if (client->flags & REDIS_INHERITED_SOCKET) *p++ = 'I';
if (client->flags & REDIS_READONLY) *p++ = 'r';
if (p == flags) *p++ = 'N';
*p++ = '\0';
Expand Down
43 changes: 38 additions & 5 deletions src/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include <sys/resource.h>
#include <sys/utsname.h>
#include <locale.h>
#include <stdlib.h>

/* Our shared "common" objects */

Expand Down Expand Up @@ -1388,6 +1389,7 @@ void initServerConfig(void) {
server.bindaddr_count = 0;
server.unixsocket = NULL;
server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM;
server.inhfd_count = 0;
server.ipfd_count = 0;
server.sofd = -1;
server.dbnum = REDIS_DEFAULT_DBNUM;
Expand Down Expand Up @@ -1697,6 +1699,25 @@ void resetServerStats(void) {
server.ops_sec_last_sample_ops = 0;
}

void inheritListenFds(pid_t pid, int *fds, int *count) {
const char *e;
int n, fd;

if ((e = getenv("LISTEN_PID")) == NULL || (pid_t)strtoull(e,NULL,10) != pid)
return;

if ((e = getenv("LISTEN_FDS")) == NULL || (n = atoi(e)) <= 0)
return;

/* Inherited FDs start at 3 */

for (fd = 3; fd < 3+n; fd++) {
fcntl(fd, F_SETFD, FD_CLOEXEC);
fds[*count] = fd;
(*count)++;
}
}

void initServer(void) {
int j;

Expand Down Expand Up @@ -1727,6 +1748,8 @@ void initServer(void) {
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
server.db = zmalloc(sizeof(redisDb)*server.dbnum);

inheritListenFds(server.pid,server.inhfd,&server.inhfd_count);

/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
Expand All @@ -1745,7 +1768,7 @@ void initServer(void) {
}

/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.sofd < 0) {
if (server.inhfd_count == 0 && server.ipfd_count == 0 && server.sofd < 0) {
redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
Expand Down Expand Up @@ -1793,8 +1816,16 @@ void initServer(void) {
exit(1);
}

/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
/* Create an event handler for accepting new connections in inherited,
* TCP and Unix domain sockets. */
for (j = 0; j < server.inhfd_count; j++) {
if (aeCreateFileEvent(server.el, server.inhfd[j], AE_READABLE,
acceptInheritedHandler,(void*)(uintptr_t)j) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.inhfd file event.");
}
}
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
Expand Down Expand Up @@ -2274,8 +2305,8 @@ int processCommand(redisClient *c) {

/*================================== Shutdown =============================== */

/* Close listening sockets. Also unlink the unix domain socket if
* unlink_unix_socket is non-zero. */
/* Close listening sockets, except for inherited. Also unlink the unix domain
* socket if unlink_unix_socket is non-zero. */
void closeListeningSockets(int unlink_unix_socket) {
int j;

Expand Down Expand Up @@ -3614,6 +3645,8 @@ int main(int argc, char **argv) {
exit(1);
}
}
if (server.inhfd_count > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections to inherited sockets");
if (server.ipfd_count > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
if (server.sofd > 0)
Expand Down
4 changes: 4 additions & 0 deletions src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */
#define REDIS_READONLY (1<<17) /* Cluster client is in read-only state. */
#define REDIS_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
#define REDIS_INHERITED_SOCKET (1<<19) /* Client connected via inherited socket */

/* Client block type (btype field in client structure)
* if REDIS_BLOCKED flag is set. */
Expand Down Expand Up @@ -658,6 +659,8 @@ struct redisServer {
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
char *unixsocket; /* UNIX socket path */
mode_t unixsocketperm; /* UNIX socket permission */
int inhfd[REDIS_BINDADDR_MAX]; /* Inherited socket file descriptors */
int inhfd_count; /* Used slots in inhfd[] */
int ipfd[REDIS_BINDADDR_MAX]; /* TCP socket file descriptors */
int ipfd_count; /* Used slots in ipfd[] */
int sofd; /* Unix socket file descriptor */
Expand Down Expand Up @@ -1009,6 +1012,7 @@ void processInputBuffer(redisClient *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptInheritedHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
void addReplyBulk(redisClient *c, robj *obj);
void addReplyBulkCString(redisClient *c, char *s);
Expand Down