Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add support to fallback to all results of getaddrinfo() (ipv6/ ipv4 dual-stack, DNS-RR) #6374

Open
wants to merge 11 commits into
base: 4.0
Choose a base branch
from
83 changes: 46 additions & 37 deletions deps/hiredis/net.c
Expand Up @@ -306,7 +306,8 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,

if (redisContextTimeoutMsec(c, &timeout_msec) != REDIS_OK) {
__redisSetError(c, REDIS_ERR_IO, "Invalid timeout specified");
goto error;
freeaddrinfo(servinfo);
return REDIS_ERR;
}

if (source_addr == NULL) {
Expand All @@ -319,44 +320,43 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,

snprintf(_port, 6, "%d", port);
memset(&hints,0,sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;

/* Try with IPv6 if no IPv4 address was found. We do it in this order since
* in a Redis client you can't afford to test if you have IPv6 connectivity
* as this would add latency to every connect. Otherwise a more sensible
* route could be: Use IPv6 if both addresses are available and there is IPv6
* connectivity. */
/* In an earlier version, IPv6 was only tried if no IPv4 address was
* available, to reduce latency for Redis clients. This breaks setups with
* e.g. AAAA records but not IPv6 connectivity. Therefore, we're just using
* the regular addrinfo struct returned by getaddrinfo() instead and loop
* over the the responses in the order the kernel gave us.
* Hopefully, the latency introduced by this isn't too bad.
*/
if ((rv = getaddrinfo(c->tcp.host,_port,&hints,&servinfo)) != 0) {
hints.ai_family = AF_INET6;
if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) {
__redisSetError(c,REDIS_ERR_OTHER,gai_strerror(rv));
return REDIS_ERR;
}
__redisSetError(c,REDIS_ERR_OTHER,gai_strerror(rv));
return REDIS_ERR;
}

for (p = servinfo; p != NULL; p = p->ai_next) {
addrretry:
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;

c->fd = s;
if (redisSetBlocking(c,0) != REDIS_OK)
goto error;
continue;
if (c->tcp.source_addr) {
int bound = 0;
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
if ((rv = getaddrinfo(c->tcp.source_addr, NULL, &hints, &bservinfo)) != 0) {
char buf[128];
snprintf(buf,sizeof(buf),"Can't get addr: %s",gai_strerror(rv));
__redisSetError(c,REDIS_ERR_OTHER,buf);
goto error;
continue;
}

if (reuseaddr) {
n = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*) &n,
sizeof(n)) < 0) {
goto error;
continue;
}
}

Expand All @@ -371,48 +371,57 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,
char buf[128];
snprintf(buf,sizeof(buf),"Can't bind socket: %s",strerror(errno));
__redisSetError(c,REDIS_ERR_OTHER,buf);
goto error;
continue;
}
}
if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
if (errno == EHOSTUNREACH) {
redisContextCloseFd(c);
continue;
} else if (errno == EINPROGRESS && !blocking) {
/* This is ok. */
} else if (errno == EINPROGRESS) {
/* This breaks nonblocking, but seems to be the only way to
* retry other addresses?
* This was partially backported from
* https://github.com/redis/hiredis/pull/578
*/
goto wait_for_ready;
} else if (errno == EADDRNOTAVAIL && reuseaddr) {
if (++reuses >= REDIS_CONNECT_RETRIES) {
goto error;
break;
} else {
redisContextCloseFd(c);
goto addrretry;
continue;
}
} else {
if (redisContextWaitReady(c,timeout_msec) != REDIS_OK)
goto error;
wait_for_ready:
if (redisContextWaitReady(c,timeout_msec) != REDIS_OK) continue;
}
}
if (blocking && redisSetBlocking(c,1) != REDIS_OK)
goto error;
continue;
if (redisSetTcpNoDelay(c) != REDIS_OK)
goto error;
continue;

c->flags |= REDIS_CONNECTED;
rv = REDIS_OK;
goto end;
}
if (p == NULL) {
char buf[128];
snprintf(buf,sizeof(buf),"Can't create socket: %s",strerror(errno));
__redisSetError(c,REDIS_ERR_OTHER,buf);
goto error;

/* This is needed because c->err is set using __redisSetError on
* connection failures. This can be called by various functions.
* We shall have a clear function that would ideally clean the struct
* of all past errors which have been put into or somehow only set the
* error flag and message after looping through all IP addresses and
* not finding any REDIS_OK.
* Also, we're cleaning c->errstr, analogue to redisReconnect()
*/
c->err = 0;
memset(c->errstr, '\0', strlen(c->errstr));

freeaddrinfo(servinfo);
return REDIS_OK; // Need to return REDIS_OK if alright
}

error:
rv = REDIS_ERR;
end:
// No address could be reached
freeaddrinfo(servinfo);
return rv; // Need to return REDIS_OK if alright
return REDIS_ERR;
}

int redisContextConnectTcp(redisContext *c, const char *addr, int port,
Expand Down
82 changes: 34 additions & 48 deletions src/anet.c
Expand Up @@ -194,45 +194,38 @@ int anetSendTimeout(char *err, int fd, long long ms) {
}

/* anetGenericResolve() is called by anetResolve() and anetResolveIP() to
* do the actual work. It resolves the hostname "host" and set the string
* representation of the IP address into the buffer pointed by "ipbuf".
* do the actual work. It resolves the hostname "host" and returns the addrinfo
* struct pointer.
*
* If flags is set to ANET_IP_ONLY the function only resolves hostnames
* that are actually already IPv4 or IPv6 addresses. This turns the function
* into a validating / normalizing function. */
int anetGenericResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len,
int flags)
* into a validating / normalizing function.
*
* NOTE: Caller needs to make sure to use freeaddrinfo(info) */
int anetGenericResolve(char *err, char *host, struct addrinfo **info, int flags)
{
struct addrinfo hints, *info;
struct addrinfo hints;
int rv;

memset(&hints,0,sizeof(hints));
if (flags & ANET_IP_ONLY) hints.ai_flags = AI_NUMERICHOST;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM; /* specify socktype to avoid dups */

if ((rv = getaddrinfo(host, NULL, &hints, &info)) != 0) {
if ((rv = getaddrinfo(host, NULL, &hints, info)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
if (info->ai_family == AF_INET) {
struct sockaddr_in *sa = (struct sockaddr_in *)info->ai_addr;
inet_ntop(AF_INET, &(sa->sin_addr), ipbuf, ipbuf_len);
} else {
struct sockaddr_in6 *sa = (struct sockaddr_in6 *)info->ai_addr;
inet_ntop(AF_INET6, &(sa->sin6_addr), ipbuf, ipbuf_len);
}

freeaddrinfo(info);
return ANET_OK;
}

int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len) {
return anetGenericResolve(err,host,ipbuf,ipbuf_len,ANET_NONE);
int anetResolve(char *err, char *host, struct addrinfo **info) {
return anetGenericResolve(err,host,info,ANET_NONE);
}

int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len) {
return anetGenericResolve(err,host,ipbuf,ipbuf_len,ANET_IP_ONLY);
int anetResolveIP(char *err, char *host, struct addrinfo **info) {
return anetGenericResolve(err,host,info,ANET_IP_ONLY);
}

static int anetSetReuseAddr(char *err, int fd) {
Expand Down Expand Up @@ -287,16 +280,25 @@ static int anetTcpGenericConnect(char *err, char *addr, int port,
* the next entry in servinfo. */
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK)
goto error;

if (anetSetReuseAddr(err,s) == ANET_ERR) {
close(s);
continue;
}

if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK) {
close(s);
continue;
}

if (source_addr) {
int bound = 0;
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
if ((rv = getaddrinfo(source_addr, NULL, &hints, &bservinfo)) != 0)
{
anetSetError(err, "%s", gai_strerror(rv));
goto error;
close(s);
continue;
}
for (b = bservinfo; b != NULL; b = b->ai_next) {
if (bind(s,b->ai_addr,b->ai_addrlen) != -1) {
Expand All @@ -307,42 +309,26 @@ static int anetTcpGenericConnect(char *err, char *addr, int port,
freeaddrinfo(bservinfo);
if (!bound) {
anetSetError(err, "bind: %s", strerror(errno));
goto error;
close(s);
continue;
}
}

if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
/* If the socket is non-blocking, it is ok for connect() to
* return an EINPROGRESS error here. */
if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK)
goto end;
if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK) {
freeaddrinfo(servinfo);
return s;
}

close(s);
s = ANET_ERR;
continue;
}

/* If we ended an iteration of the for loop without errors, we
* have a connected socket. Let's return to the caller. */
goto end;
}
if (p == NULL)
anetSetError(err, "creating socket: %s", strerror(errno));

error:
if (s != ANET_ERR) {
close(s);
s = ANET_ERR;
}

end:
freeaddrinfo(servinfo);

/* Handle best effort binding: if a binding address was used, but it is
* not possible to create a socket, try again without a binding address. */
if (s == ANET_ERR && source_addr && (flags & ANET_CONNECT_BE_BINDING)) {
return anetTcpGenericConnect(err,addr,port,NULL,flags);
} else {
return s;
}
return -1;
}

int anetTcpConnect(char *err, char *addr, int port)
Expand Down
5 changes: 3 additions & 2 deletions src/anet.h
Expand Up @@ -32,6 +32,7 @@
#define ANET_H

#include <sys/types.h>
#include <netdb.h>

#define ANET_OK 0
#define ANET_ERR -1
Expand All @@ -56,8 +57,8 @@ int anetTcpNonBlockBestEffortBindConnect(char *err, char *addr, int port, char *
int anetUnixConnect(char *err, char *path);
int anetUnixNonBlockConnect(char *err, char *path);
int anetRead(int fd, char *buf, int count);
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len);
int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len);
int anetResolve(char *err, char *host, struct addrinfo **info);
int anetResolveIP(char *err, char *host, struct addrinfo **info);
int anetTcpServer(char *err, int port, char *bindaddr, int backlog);
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
int anetUnixServer(char *err, char *path, mode_t perm, int backlog);
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Expand Up @@ -125,7 +125,7 @@ client *createClient(int fd) {
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
c->slave_ip[0] = '\0';
c->slave_hostname[0] = '\0';
c->slave_capa = SLAVE_CAPA_NONE;
c->reply = listCreate();
c->reply_bytes = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/rdb.c
Expand Up @@ -1724,7 +1724,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
}

/* A background saving child (BGSAVE) terminated its work. Handle this.
* This function covers the case of RDB -> Salves socket transfers for
* This function covers the case of RDB -> Slaves socket transfers for
* diskless replication. */
void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
uint64_t *ok_slaves;
Expand Down