Skip to content

Commit

Permalink
Add support for UDP_GRO (#11120)
Browse files Browse the repository at this point in the history
Motivation:

UDP_GRO can improve performance when reading UDP datagrams. This patch adds support for it.

See https://lwn.net/Articles/768995/

Modifications:

- Add recvmsg(...)
- Add support for UDP_GRO in recvmsg(...) and recvmmsg(...)
- Remove usage of recvfrom(...) and just always use recvmsg(...) or recvmmsg(...) to simplify things
- Refactor some code for sharing
- Add EpollChannelOption.UDP_GRO and the getter / setter in EpollDatagramConfig

Result:

UDP_GRO is supported when the underlying system supports it.
  • Loading branch information
normanmaurer committed Mar 29, 2021
1 parent cc2b443 commit b05fdf3
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 121 deletions.
26 changes: 24 additions & 2 deletions transport-native-epoll/src/main/c/netty_epoll_linuxsocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
#include <string.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/udp.h> // SOL_UDP
#include <sys/sendfile.h>
#include <linux/tcp.h> // TCP_NOTSENT_LOWAT is a linux specific define

#include "netty_epoll_linuxsocket.h"
#include "netty_unix_errors.h"
#include "netty_unix_filedescriptor.h"
Expand Down Expand Up @@ -57,6 +57,11 @@
#define SO_BUSY_POLL 46
#endif

// UDP_GRO is defined in linux 5. We define this here so older kernels can compile.
#ifndef UDP_GRO
#define UDP_GRO 104
#endif

static jclass peerCredentialsClass = NULL;
static jmethodID peerCredentialsMethodId = NULL;

Expand Down Expand Up @@ -610,6 +615,19 @@ static jobject netty_epoll_linuxsocket_getPeerCredentials(JNIEnv *env, jclass cl
return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, credentials.pid, credentials.uid, gids);
}

static jint netty_epoll_linuxsocket_isUdpGro(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (netty_unix_socket_getOption(env, fd, SOL_UDP, UDP_GRO, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}

static void netty_epoll_linuxsocket_setUdpGro(JNIEnv* env, jclass clazz, jint fd, jint optval) {
netty_unix_socket_setOption(env, fd, SOL_UDP, UDP_GRO, &optval, sizeof(optval));
}


static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd, jobject fileRegion, jlong base_off, jlong off, jlong len) {
jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId);
if (fileChannel == NULL) {
Expand Down Expand Up @@ -642,6 +660,7 @@ static jlong netty_epoll_linuxsocket_sendFile(JNIEnv* env, jclass clazz, jint fd

return res;
}

// JNI Registered Methods End

// JNI Method Registration Table Begin
Expand Down Expand Up @@ -682,7 +701,10 @@ static const JNINativeMethod fixed_method_table[] = {
{ "joinGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_joinGroup },
{ "joinSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_joinSsmGroup },
{ "leaveGroup", "(IZ[B[BII)V", (void *) netty_epoll_linuxsocket_leaveGroup },
{ "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup }
{ "leaveSsmGroup", "(IZ[B[BII[B)V", (void *) netty_epoll_linuxsocket_leaveSsmGroup },
{ "isUdpGro", "(I)I", (void *) netty_epoll_linuxsocket_isUdpGro },
{ "setUdpGro", "(II)V", (void *) netty_epoll_linuxsocket_setUdpGro }

// "sendFile" has a dynamic signature
};

Expand Down
114 changes: 83 additions & 31 deletions transport-native-epoll/src/main/c/netty_epoll_native.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@
#define UDP_SEGMENT 103
#endif

// UDP_GRO is defined in linux 5. We define this here so older kernels can compile.
#ifndef UDP_GRO
#define UDP_GRO 104
#endif


// optional
extern int epoll_create1(int flags) __attribute__((weak));

Expand Down Expand Up @@ -379,6 +385,73 @@ static jint netty_epoll_native_sendmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo
return (jint) res;
}

static void init_packet(JNIEnv* env, jobject packet, struct msghdr* msg, int len) {
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);

(*env)->SetIntField(env, packet, packetCountFieldId, len);

struct sockaddr_storage* addr = (struct sockaddr_storage*) msg->msg_name;

if (addr->ss_family == AF_INET) {
struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr;

(*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr);
(*env)->SetIntField(env, packet, packetAddrLenFieldId, 4);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, 0);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port));
} else {
int addrLen = netty_unix_socket_ipAddressLength(addr);
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr;

if (addrLen == 4) {
// IPV4 mapped IPV6 address
jbyte* addr = (jbyte*) &ip6addr->sin6_addr.s6_addr;
(*env)->SetByteArrayRegion(env, address, 0, 4, addr + 12);
} else {
(*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr);
}
(*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port));
}
struct cmsghdr *cmsg = NULL;
uint16_t gso_size = 0;
uint16_t *gsosizeptr = NULL;
for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) {
gsosizeptr = (uint16_t *) CMSG_DATA(cmsg);
gso_size = *gsosizeptr;
break;
}
}
(*env)->SetIntField(env, packet, packetSegmentSizeFieldId, gso_size);
}

static jint netty_epoll_native_recvmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobject packet) {
struct msghdr msg = { 0 };
struct sockaddr_storage sock_address;
int addrSize = sizeof(sock_address);
// Enough space for GRO
char control[CMSG_SPACE(sizeof(uint16_t))] = { 0 };
msg.msg_name = &sock_address;
msg.msg_namelen = (socklen_t) addrSize;
msg.msg_iov = (struct iovec*) (intptr_t) (*env)->GetLongField(env, packet, packetMemoryAddressFieldId);
msg.msg_iovlen = (*env)->GetIntField(env, packet, packetCountFieldId);
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
ssize_t res;
int err;
do {
res = recvmsg(fd, &msg, 0);
// keep on reading if it was interrupted
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
return -err;
}
init_packet(env, packet, &msg, res);
return (jint) res;
}

static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jboolean ipv6, jobjectArray packets, jint offset, jint len) {
struct mmsghdr msg[len];
memset(msg, 0, sizeof(msg));
Expand Down Expand Up @@ -412,36 +485,7 @@ static jint netty_epoll_native_recvmmsg0(JNIEnv* env, jclass clazz, jint fd, jbo

for (i = 0; i < res; i++) {
jobject packet = (*env)->GetObjectArrayElement(env, packets, i + offset);
jbyteArray address = (jbyteArray) (*env)->GetObjectField(env, packet, packetAddrFieldId);

(*env)->SetIntField(env, packet, packetCountFieldId, msg[i].msg_len);

struct sockaddr_storage* addr = (struct sockaddr_storage*) msg[i].msg_hdr.msg_name;

if (addr->ss_family == AF_INET) {
struct sockaddr_in* ipaddr = (struct sockaddr_in*) addr;

(*env)->SetByteArrayRegion(env, address, 0, 4, (jbyte*) &ipaddr->sin_addr.s_addr);
(*env)->SetIntField(env, packet, packetAddrLenFieldId, 4);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, 0);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ipaddr->sin_port));
} else {
int addrLen = netty_unix_socket_ipAddressLength(addr);
struct sockaddr_in6* ip6addr = (struct sockaddr_in6*) addr;

if (addrLen == 4) {
// IPV4 mapped IPV6 address
jbyte* addr = (jbyte*) &ip6addr->sin6_addr.s6_addr;
(*env)->SetByteArrayRegion(env, address, 0, 4, addr + 12);
} else {
(*env)->SetByteArrayRegion(env, address, 0, 16, (jbyte*) &ip6addr->sin6_addr.s6_addr);
}
(*env)->SetIntField(env, packet, packetAddrLenFieldId, addrLen);
(*env)->SetIntField(env, packet, packetScopeIdFieldId, ip6addr->sin6_scope_id);
(*env)->SetIntField(env, packet, packetPortFieldId, ntohs(ip6addr->sin6_port));
}
// TODO: Support this also for recvmmsg
(*env)->SetIntField(env, packet, packetSegmentSizeFieldId, 0);
init_packet(env, packet, &msg[i].msg_hdr, msg[i].msg_len);
}

return (jint) res;
Expand All @@ -457,6 +501,7 @@ static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) {
netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", errno);
return NULL;
}

static jboolean netty_epoll_native_isSupportingSendmmsg(JNIEnv* env, jclass clazz) {
if (SYS_sendmmsg == -1) {
return JNI_FALSE;
Expand Down Expand Up @@ -603,7 +648,7 @@ static const JNINativeMethod fixed_method_table[] = {
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);

static jint dynamicMethodsTableSize() {
return fixed_method_table_size + 2; // 2 is for the dynamic method signatures.
return fixed_method_table_size + 3; // 3 is for the dynamic method signatures.
}

static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
Expand All @@ -630,6 +675,13 @@ static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
dynamicMethod->fnPtr = (void *) netty_epoll_native_recvmmsg0;
netty_jni_util_free_dynamic_name(&dynamicTypeName);

++dynamicMethod;
NETTY_JNI_UTIL_PREPEND(packagePrefix, "io/netty/channel/epoll/NativeDatagramPacketArray$NativeDatagramPacket;)I", dynamicTypeName, error);
NETTY_JNI_UTIL_PREPEND("(IZL", dynamicTypeName, dynamicMethod->signature, error);
dynamicMethod->name = "recvmsg0";
dynamicMethod->fnPtr = (void *) netty_epoll_native_recvmsg0;
netty_jni_util_free_dynamic_name(&dynamicTypeName);

return dynamicMethods;
error:
free(dynamicTypeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public final class EpollChannelOption<T> extends UnixChannelOption<T> {
public static final ChannelOption<Map<InetAddress, byte[]>> TCP_MD5SIG = valueOf("TCP_MD5SIG");

public static final ChannelOption<Integer> MAX_DATAGRAM_PAYLOAD_SIZE = valueOf("MAX_DATAGRAM_PAYLOAD_SIZE");
public static final ChannelOption<Boolean> UDP_GRO = valueOf("UDP_GRO");

@SuppressWarnings({ "unused", "deprecation" })
private EpollChannelOption() {
Expand Down

0 comments on commit b05fdf3

Please sign in to comment.