Skip to content

Commit

Permalink
Open UDP ports on traffic_manager if ports are configured for QUIC (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
maskit committed Jul 13, 2020
1 parent 588c49c commit f1ef5ee
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 21 deletions.
3 changes: 2 additions & 1 deletion iocore/net/I_UDPNet.h
Expand Up @@ -63,6 +63,7 @@ class UDPNetProcessor : public Processor
@param c Continuation that is called back with newly created
socket.
@param addr Address to bind (includes port)
@param fd File descriptor to use (if exists)
@param send_bufsize (optional) Socket buffer size for sending.
Limits how much outstanding data to OS before it is able to send
to the NIC.
Expand All @@ -71,7 +72,7 @@ class UDPNetProcessor : public Processor
@return Action* Always returns ACTION_RESULT_DONE if socket was
created successfully, or ACTION_IO_ERROR if not.
*/
inkcoreapi Action *UDPBind(Continuation *c, sockaddr const *addr, int send_bufsize = 0, int recv_bufsize = 0);
inkcoreapi Action *UDPBind(Continuation *c, sockaddr const *addr, int fd = -1, int send_bufsize = 0, int recv_bufsize = 0);

// Regarding sendto_re, sendmsg_re, recvfrom_re:
// * You may be called back on 'c' with completion or error status.
Expand Down
2 changes: 1 addition & 1 deletion iocore/net/QUICNetProcessor.cc
Expand Up @@ -215,7 +215,7 @@ QUICNetProcessor::main_accept(Continuation *cont, SOCKET fd, AcceptOptions const
na->init_accept();

SCOPED_MUTEX_LOCK(lock, na->mutex, this_ethread());
udpNet.UDPBind((Continuation *)na, &na->server.accept_addr.sa, 1048576, 1048576);
udpNet.UDPBind((Continuation *)na, &na->server.accept_addr.sa, fd, 1048576, 1048576);

return na->action_.get();
}
19 changes: 12 additions & 7 deletions iocore/net/UnixUDPNet.cc
Expand Up @@ -735,20 +735,24 @@ UDPNetProcessor::CreateUDPSocket(int *resfd, sockaddr const *remote_addr, Action
}

Action *
UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int send_bufsize, int recv_bufsize)
UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int fd, int send_bufsize, int recv_bufsize)
{
int res = 0;
int fd = -1;
UnixUDPConnection *n = nullptr;
IpEndpoint myaddr;
int myaddr_len = sizeof(myaddr);
PollCont *pc = nullptr;
PollDescriptor *pd = nullptr;
bool need_bind = true;

if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0) {
goto Lerror;
if (fd == -1) {
if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0) {
goto Lerror;
}
fd = res;
} else {
need_bind = false;
}
fd = res;
if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
goto Lerror;
}
Expand Down Expand Up @@ -798,11 +802,12 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int send_bufs
}
}

if (ats_is_ip6(addr) && safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) {
if (need_bind && ats_is_ip6(addr) && safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) {
goto Lerror;
}

if (socketManager.ink_bind(fd, addr, ats_ip_size(addr)) < 0) {
if (need_bind && (socketManager.ink_bind(fd, addr, ats_ip_size(addr)) < 0)) {
Debug("udpnet", "ink_bind failed");
goto Lerror;
}

Expand Down
2 changes: 1 addition & 1 deletion iocore/net/test_I_UDPNet.cc
Expand Up @@ -61,7 +61,7 @@ EchoServer::start()
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = 0;

udpNet.UDPBind(static_cast<Continuation *>(this), reinterpret_cast<sockaddr const *>(&addr), 1048576, 1048576);
udpNet.UDPBind(static_cast<Continuation *>(this), reinterpret_cast<sockaddr const *>(&addr), -1, 1048576, 1048576);

return true;
}
Expand Down
80 changes: 71 additions & 9 deletions mgmt/LocalManager.cc
Expand Up @@ -941,29 +941,91 @@ LocalManager::listenForProxy()
// We are not already bound, bind the port
for (auto &p : lmgmt->m_proxy_ports) {
if (ts::NO_FD == p.m_fd) {
this->bindProxyPort(p);
// Check the protocol (TCP or UDP) and create an appropriate socket
if (p.isQUIC()) {
this->bindUdpProxyPort(p);
} else {
this->bindTcpProxyPort(p);
}
}

// read backlog configuration value and overwrite the default value if found
bool found;
std::string_view fam{ats_ip_family_name(p.m_family)};
RecInt backlog = REC_readInteger("proxy.config.net.listen_backlog", &found);
backlog = (found && backlog >= 0) ? backlog : ats_tcp_somaxconn();
if (p.isQUIC()) {
// Can we do something like listen backlog for QUIC(UDP) ??
// Do nothing for now
} else {
// read backlog configuration value and overwrite the default value if found
bool found;
RecInt backlog = REC_readInteger("proxy.config.net.listen_backlog", &found);
backlog = (found && backlog >= 0) ? backlog : ats_tcp_somaxconn();

if ((listen(p.m_fd, backlog)) < 0) {
mgmt_fatal(errno, "[LocalManager::listenForProxy] Unable to listen on port: %d (%.*s)\n", p.m_port, fam.size(), fam.data());
if ((listen(p.m_fd, backlog)) < 0) {
mgmt_fatal(errno, "[LocalManager::listenForProxy] Unable to listen on port: %d (%.*s)\n", p.m_port, fam.size(), fam.data());
}
}

mgmt_log("[LocalManager::listenForProxy] Listening on port: %d (%.*s)\n", p.m_port, fam.size(), fam.data());
}
return;
}

/*
* bindProxyPort()
* bindUdpProxyPort()
* Function binds the accept port of the proxy
*/
void
LocalManager::bindUdpProxyPort(HttpProxyPort &port)
{
int one = 1;
int priv = (port.m_port < 1024 && 0 != geteuid()) ? ElevateAccess::LOW_PORT_PRIVILEGE : 0;

ElevateAccess access(priv);

if ((port.m_fd = socket(port.m_family, SOCK_DGRAM, 0)) < 0) {
mgmt_fatal(0, "[bindProxyPort] Unable to create socket : %s\n", strerror(errno));
}

if (port.m_family == AF_INET6) {
if (setsockopt(port.m_fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) {
mgmt_log("[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
}
}
if (setsockopt(port.m_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&one), sizeof(int)) < 0) {
mgmt_fatal(0, "[bindProxyPort] Unable to set socket options: %d : %s\n", port.m_port, strerror(errno));
}

IpEndpoint ip;
if (port.m_inbound_ip.isValid()) {
ip.assign(port.m_inbound_ip);
} else if (AF_INET6 == port.m_family) {
if (m_inbound_ip6.isValid()) {
ip.assign(m_inbound_ip6);
} else {
ip.setToAnyAddr(AF_INET6);
}
} else if (AF_INET == port.m_family) {
if (m_inbound_ip4.isValid()) {
ip.assign(m_inbound_ip4);
} else {
ip.setToAnyAddr(AF_INET);
}
} else {
mgmt_fatal(0, "[bindProxyPort] Proxy port with invalid address type %d\n", port.m_family);
}
ip.port() = htons(port.m_port);
if (bind(port.m_fd, &ip.sa, ats_ip_size(&ip)) < 0) {
mgmt_fatal(0, "[bindProxyPort] Unable to bind socket: %d : %s\n", port.m_port, strerror(errno));
}

Debug("lm", "[bindProxyPort] Successfully bound proxy port %d", port.m_port);
}

/*
* bindTcpProxyPort()
* Function binds the accept port of the proxy
*/
void
LocalManager::bindProxyPort(HttpProxyPort &port)
LocalManager::bindTcpProxyPort(HttpProxyPort &port)
{
int one = 1;
int priv = (port.m_port < 1024 && 0 != geteuid()) ? ElevateAccess::LOW_PORT_PRIVILEGE : 0;
Expand Down
3 changes: 2 additions & 1 deletion mgmt/LocalManager.h
Expand Up @@ -76,7 +76,8 @@ class LocalManager : public BaseManager
void processEventQueue();
bool startProxy(const char *onetime_options);
void listenForProxy();
void bindProxyPort(HttpProxyPort &);
void bindUdpProxyPort(HttpProxyPort &);
void bindTcpProxyPort(HttpProxyPort &);
void closeProxyPorts();

void mgmtCleanup();
Expand Down
2 changes: 1 addition & 1 deletion src/traffic_server/InkIOCoreAPI.cc
Expand Up @@ -443,7 +443,7 @@ INKUDPBind(TSCont contp, unsigned int ip, int port)
ats_ip4_set(&addr, ip, htons(port));

return reinterpret_cast<TSAction>(
udpNet.UDPBind((Continuation *)contp, ats_ip_sa_cast(&addr), INK_ETHERNET_MTU_SIZE, INK_ETHERNET_MTU_SIZE));
udpNet.UDPBind((Continuation *)contp, ats_ip_sa_cast(&addr), -1, INK_ETHERNET_MTU_SIZE, INK_ETHERNET_MTU_SIZE));
}

TSAction
Expand Down

0 comments on commit f1ef5ee

Please sign in to comment.